Creating and Deploying a custom Kubernetes Infrastructure Block
Purpose
You’ve created a deployment using -i kubernetes-job
but now you want to create a custom, re-usable infrastructure block.
There could be a scenario where you want to use image: prefecthq/prefect-2.3.0-python3.9
in dev and testing, but image: privaterepo/custom-prefect-image:latest
in production.
Perhaps you want to set some resource limits on your jobs for cpu
and memory
.
Read on Marvin!
High Level Overview
Creating, deploying. and using a custom Kubernetes infrastructure block requires a deployment.
- Generate a base job definition
- Customize the job template
- Create the infrastructure block
- Create a deployment referencing the infrastructure block
- Apply and run the deployment
Lets get started!
1 - Create a Base Job Template
Run the command to create a new job template: prefect kubernetes manifest flow-run-job >> base_run_job.yaml
.
An example of this base job template is listed below.
apiVersion: batch/v1
kind: Job
metadata:
# labels are required, even if empty
labels: {}
spec:
template:
spec:
completions: 1
containers: # the first container is required
- env: [] # env is required, even if empty
name: prefect-job
parallelism: 1
restartPolicy: Never
2 - Customize the Template
In this step, we will modify the template based on parameters we’d like to apply in our environment.
Create a copy of your template: cp -p base_run_job.yaml modified_run_job.yaml
Let’s make changes to the following items: namespace
, image
, imagePullPolicy
, resources
.
The changes made are just yaml modifications to the job specification.
The updated / modified template job template now looks like this.
apiVersion: batch/v1
kind: Job
metadata:
namespace: prefect2
labels:
purpose: prefect
spec:
template:
spec:
completions: 1
containers: # the first container is required
- env: []
name: prefect-job
image: prefecthq/prefect:2.3.0-python3.9
imagePullPolicy: "IfNotPresent"
resources:
requests:
memory: "64Mi"
cpu: "250m"
limits:
memory: "128Mi"
cpu: "500m"
parallelism: 1
restartPolicy: Never
3- Next we can create a custom infrastructure block to use this job.
A block contains high level infrastructure details, like what namespace to run this in,
what image to pull, the name for the job.
There is also an option to specify the “job” manifest, which we will read in from the modified_run_job.yaml.
Create a custom infrastructure block (either through UI, or in Python).
Because I am testing and demoing this in Azure, but “adlfs” is not pre-packaged in this image, we can include it with the env
parameter.
The below code snippet sets the namespace
, image
, and env
variables for the Infrastructure block, but also the job
definition that we specified with limits
.
from prefect.infrastructure import KubernetesJob
k8s_job=KubernetesJob(
namespace="prefect2",
image="prefecthq/prefect:2.3.0-python3.9",
env={"EXTRA_PIP_PACKAGES": "adlfs"}
job=KubernetesJob.job_from_file("modified_run_job.yaml")
)
k8s_job.save("k8sdev")
This creates an infrastructure block with the slug “kubernetes-job/k8sdev”
4 - Create a deployment
Next, we can create a deployment to use this custom infrastructure block.
The following command uses a pre-built simple flow healthcheck.py
, the tag kubernetes
for my agent, and the infrastructure block -ib kubernetes-job/k8sdev
that we just created. Additionally, we used a pre-existing storage block azure/boydblock
.
prefect deployment build ./healthcheck.py:healthcheck -n flowrida_deploy -t kubernetes -ib kubernetes-job/k8sdev -sb azure/boydblock
With the deployment built, we can apply the deployment.
prefect deployment apply healthcheck-deployment.yaml
5 - Run the deployment
Lastly, we can run the deployment and verify the limits we passed in, and if it successfully pulled in the adlfs package.
If this step was not successful, the job would trigger, but we would see an error “Module not found” because “adlfs” is not pre-built into this image.
If our flow and template were successful, we would see the output of healthcheck.py
.
prefect deployment run healthcheck/flowrida_deploy
Success!
If we review the completed spec for this job that completed, we can additionally see our resources, and PIP packages were included.
17:36:53.961 | INFO | Flow run 'tireless-chipmunk' - Created task run 'log_platform_info-afea9710-0' for task 'log_platform_info'
17:36:53.962 | INFO | Flow run 'tireless-chipmunk' - Executing 'log_platform_info-afea9710-0' immediately...
/usr/local/lib/python3.9/site-packages/prefect/blocks/core.py:613: UserWarning: Block document has schema checksum sha256:ed12920df72ae97d0b896193634023971f7039486ef85bac6ff596fe246ea507 which does not match the schema checksum for class 'Azure'. This indicates the schema has changed and this block may not load.
return cls._from_block_document(block_document)
17:36:54.165 | INFO | Task run 'log_platform_info-afea9710-0' - Host's network name = tireless-chipmunkp2mbs-qjnjp
17:36:54.166 | INFO | Task run 'log_platform_info-afea9710-0' - Python version = 3.9.13
17:36:54.166 | INFO | Task run 'log_platform_info-afea9710-0' - Platform information (instance type) = Linux-5.4.0-1085-azure-x86_64-with-glibc2.31
17:36:54.166 | INFO | Task run 'log_platform_info-afea9710-0' - OS/Arch = linux/x86_64
17:36:54.166 | INFO | Task run 'log_platform_info-afea9710-0' - Prefect Version = 2.3.0 🚀
17:36:54.167 | INFO | Task run 'log_platform_info-afea9710-0' - Prefect API Version = 0.8.0
17:36:54.228 | INFO | Task run 'log_platform_info-afea9710-0' - Finished in state Completed()
17:36:54.291 | INFO | Flow run 'tireless-chipmunk' - Finished in state Completed('All states completed.')