Creating and Deploying a custom Kubernetes Infrastructure Block

Creating and Deploying a custom Kubernetes Infrastructure Block

Purpose

You’ve created a deployment using -i kubernetes-job but now you want to create a custom, re-usable infrastructure block.
There could be a scenario where you want to use image: prefecthq/prefect-2.3.0-python3.9 in dev and testing, but image: privaterepo/custom-prefect-image:latest in production.
Perhaps you want to set some resource limits on your jobs for cpu and memory.
Read on Marvin!

High Level Overview

Creating, deploying. and using a custom Kubernetes infrastructure block requires a deployment.

  1. Generate a base job definition
  2. Customize the job template
  3. Create the infrastructure block
  4. Create a deployment referencing the infrastructure block
  5. Apply and run the deployment

Lets get started!

1 - Create a Base Job Template

Run the command to create a new job template: prefect kubernetes manifest flow-run-job >> base_run_job.yaml.
An example of this base job template is listed below.

apiVersion: batch/v1
kind: Job
metadata:
  # labels are required, even if empty
  labels: {}
spec:
  template:
    spec:
      completions: 1
      containers:  # the first container is required
      - env: []  # env is required, even if empty
        name: prefect-job
      parallelism: 1
      restartPolicy: Never

2 - Customize the Template

In this step, we will modify the template based on parameters we’d like to apply in our environment.
Create a copy of your template: cp -p base_run_job.yaml modified_run_job.yaml
Let’s make changes to the following items: namespace, image, imagePullPolicy, resources.
The changes made are just yaml modifications to the job specification.
The updated / modified template job template now looks like this.

apiVersion: batch/v1
kind: Job
metadata:
  namespace: prefect2
  labels:
    purpose: prefect
spec:
  template:
    spec:
      completions: 1
      containers: # the first container is required
        - env: []
          name: prefect-job
          image: prefecthq/prefect:2.3.0-python3.9
          imagePullPolicy: "IfNotPresent"
          resources:
		    requests:
		      memory: "64Mi"
		      cpu: "250m"
		    limits:
		      memory: "128Mi"
		      cpu: "500m"
      parallelism: 1
      restartPolicy: Never

3- Next we can create a custom infrastructure block to use this job.

A block contains high level infrastructure details, like what namespace to run this in,
what image to pull, the name for the job.
There is also an option to specify the “job” manifest, which we will read in from the modified_run_job.yaml.

Create a custom infrastructure block (either through UI, or in Python).
Because I am testing and demoing this in Azure, but “adlfs” is not pre-packaged in this image, we can include it with the env parameter.
The below code snippet sets the namespace, image, and env variables for the Infrastructure block, but also the job definition that we specified with limits.

from prefect.infrastructure import KubernetesJob

k8s_job=KubernetesJob(
    namespace="prefect2",
    image="prefecthq/prefect:2.3.0-python3.9",
    env={"EXTRA_PIP_PACKAGES": "adlfs"}
    job=KubernetesJob.job_from_file("modified_run_job.yaml")
)

k8s_job.save("k8sdev")

This creates an infrastructure block with the slug “kubernetes-job/k8sdev”

4 - Create a deployment

Next, we can create a deployment to use this custom infrastructure block.
The following command uses a pre-built simple flow healthcheck.py, the tag kubernetes for my agent, and the infrastructure block -ib kubernetes-job/k8sdev that we just created. Additionally, we used a pre-existing storage block azure/boydblock.

prefect deployment build ./healthcheck.py:healthcheck -n flowrida_deploy -t kubernetes -ib kubernetes-job/k8sdev -sb azure/boydblock

With the deployment built, we can apply the deployment.

prefect deployment apply healthcheck-deployment.yaml

5 - Run the deployment

Lastly, we can run the deployment and verify the limits we passed in, and if it successfully pulled in the adlfs package.
If this step was not successful, the job would trigger, but we would see an error “Module not found” because “adlfs” is not pre-built into this image.
If our flow and template were successful, we would see the output of healthcheck.py.

prefect deployment run healthcheck/flowrida_deploy

Success!
If we review the completed spec for this job that completed, we can additionally see our resources, and PIP packages were included.

17:36:53.961 | INFO    | Flow run 'tireless-chipmunk' - Created task run 'log_platform_info-afea9710-0' for task 'log_platform_info'
17:36:53.962 | INFO    | Flow run 'tireless-chipmunk' - Executing 'log_platform_info-afea9710-0' immediately...
/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py:613: UserWarning: Block document has schema checksum sha256:ed12920df72ae97d0b896193634023971f7039486ef85bac6ff596fe246ea507 which does not match the schema checksum for class 'Azure'. This indicates the schema has changed and this block may not load.
  return cls._from_block_document(block_document)
17:36:54.165 | INFO    | Task run 'log_platform_info-afea9710-0' - Host's network name = tireless-chipmunkp2mbs-qjnjp
17:36:54.166 | INFO    | Task run 'log_platform_info-afea9710-0' - Python version = 3.9.13
17:36:54.166 | INFO    | Task run 'log_platform_info-afea9710-0' - Platform information (instance type) = Linux-5.4.0-1085-azure-x86_64-with-glibc2.31 
17:36:54.166 | INFO    | Task run 'log_platform_info-afea9710-0' - OS/Arch = linux/x86_64
17:36:54.166 | INFO    | Task run 'log_platform_info-afea9710-0' - Prefect Version = 2.3.0 🚀
17:36:54.167 | INFO    | Task run 'log_platform_info-afea9710-0' - Prefect API Version = 0.8.0
17:36:54.228 | INFO    | Task run 'log_platform_info-afea9710-0' - Finished in state Completed()
17:36:54.291 | INFO    | Flow run 'tireless-chipmunk' - Finished in state Completed('All states completed.')
3 Likes

just wanted to say this post was super helpful for me recently! thank you for this!