Summary
Prefect can schedule flows as Kubernetes Jobs, which accept a number of modifiable parameters.
In Prefect 1.0, this was configured and executed through the “Run Config” concept.
In Prefect 2.0, this syntax and concept has been updated to allow more flexibility.
Infrastructure Blocks will allow you to define the flow environment - the KubernetesJob will be the focal point of this article.
References and documentation are all linked below.
Video Link
Audience
This article applies to you if you are importing and using the following module in your environment:
from prefect.run_configs import KubernetesRun
What is Staying the Same
A KubernetesRun and and a KubernetesJob provide the same functionality - to deploy Flows in a specified environment.
In both cases, local storage is not supported for Kubernetes Jobs. Remote storage is required (AWS, GCP, Azure), and by extension Secrets configured to connect.
Additionally, kubectl and a valid KUBECONFIG
In Prefect 1.0, an example of this configuration might be:
# Use a specific image for the flows to execute.
flow.run_config = KubernetesRun(image="prefecthq/prefect:1.2.4-python3.9)
In Prefect 2.0, this same configuration will be created with as part of the deployment build step, and is no longer contained within the flow.
The optional parameters (namespace, image, pullPolicy, etc.) can be edited in deployment.yaml after the deployment has been built.
What is Different
While the core requirements (Storage, Secrets) are still the same, the implementations for those have also changed. At a high level:
- Storage Blocks should be configured
- A Deployment has been configured.
- A KubernetesFlowRunner designated within the Deployment.
Notably, #3 is the most significant deviation.
In order to use the KubernetesJob for flows, a Deployment must be created that specifies the operational “Infrastructure” to be used.
An assumption has been made that your Storage block has already been configured, but links will be included below.
How to Convert from 1.0 to 2.0
An example successful configuration for Prefect 1.0 is shown below.
Notably, we have the “MYSECRET” which is a secured storage connection string to my Azure container.
Additionally, we have the run_config=KubernetesRun - I am passing two parameters:
#1 Is the labels=[‘test’] which my local agent that is running with the label ‘test’ will submit to the API
#2 Is the image - Here I have specified an image to use that matches my local Python version (3.9).
#prefect 1.0
import prefect
from prefect import task, Flow, Parameter
from prefect.run_configs import KubernetesRun
from prefect.storage import Azure
STORAGE = Azure(
container="prefect-logs",
connection_string_secret="MYSECRET"
)
@task
def hello_world():
text = "hello from my_kubernetes_flow!"
logger.info(text)
return text
with Flow("my_kubernetes_flow", storage=STORAGE, run_config=KubernetesRun(labels=['test'], image="prefecthq/prefect:1.2.4-python3.9")) as flow:
logger = prefect.context.get("logger")
logger.info("Hello from Kubernetes!")
hw = hello_world()
flow.register(project_name="Apollo")
How would we re-factor this existing implementation to work with 2.0?
First we would need to configure our storage block - the assumption here is that this has already been done. If this is not completed yet, references will be included below.
Note I’m using S3 - at the time of writing this, there is not an Azure storage block configuration available. The choice of storage block has no bearing, as long as it is not Local.
from prefect.filesystems import S3
s3_block = S3.load("aws-s3")
Next we will address syntax changes - bulleted here, with the final results listed below.
- “Flow” is now “flow”.
- Removing the import “KubernetesRun”
- The entire line “with Flow … as flow:” has been replaced with an @flow decorator, and a name parameter.
- The flow is now defined as a function
- The logger has changed from prefect.context.get(“logger”) to get_run_logger()
- The flow does not need to be registered - instead we will build and apply a Deployment.
#prefect 2.0
import prefect
from prefect import task, flow, get_run_logger
from prefect.filesystems import S3
s3_block = S3.load("aws-s3")
@task
def hello_world():
logger = get_run_logger()
text = "hello from orion_flow!"
logger.info(text)
return text
@flow(name="orion_flow")
def orion_flow():
logger = get_run_logger()
logger.info("Hello from Kubernetes!")
hw = hello_world()
return
Create the “Deployment.yaml” - this functionally replaces the run_config and flow registration concept. :
#Switch to the directory containing my flow.
cd ~/virtualenvs/prefect2
prefect deployment build ./backup_my_apollo_k8s_flow.py:orion_flow -n orion_flow_demo -t kubernetes -i kubernetes-job --storage-block s3/my-aws-block
# prefect deployment build is the command to execute
# ./backup_my_apollo_k8s_flow.py:orion_flow is the name of the file containing the code above, and the flow name (name="orion_flow")
# -n orion_flow_demo is the name of the Deployment - This is arbitrary, and visible in the UI
# -t kubernetes is the tag I would like to provide for searching purposes, and will be picked up by the work-agent.
# -i kubernetes-job - This is the key piece that specifies this is a KubernetesJob infrastructure item.
# --storage-block s3/my-aws-block - This is the name of the storage block you have created - you can retrieve details from the UI, or override the configuration from the CLI.
Once the deployment has been built, it can be modified for any parameters you would like to specify (through editing the deployment.yaml) , such as image-tag, labels, namespace, etc per the KubernetesJob Parameters.
prefect deployment apply deployment.yaml
Successfully loaded 'orion_flow_demo'
Deployment 'orion_flow/orion_flow_demo' successfully created with id '51e20a03-ee1c-4766-97b8-8d1bda286c70'.
Lastly, execute the deployment from the UI or through CLI using the UUID of the deployment:
prefect deployment run --id "51e20a03-ee1c-4766-97b8-8d1bda286c70"
Created flow run 'deft-newt' ("a5339b34-ab5d-4311-84ff-d2ac148087d8")
For simplicity, this article utilized a Local Kubernetes Agent (an agent that runs local on your system, and sends the jobs to the cluster).
A running agent that is configured with the appropriate tags will pick up the flow-runs, and create an execution environment that is described by the Deployment. This consequently requires a valid KUBECONFIG to access the Kubernetes cluster from your local agent.
To run your Prefect Agent in-cluster, see the article “Creating a Kubernetes Agent and Orion in Kubernetes” in Links and Documentation below.
Links and Documentation
2.0 - Infrastructure - Infrastructure - Prefect Docs
2.0 - Deployments - Deployments - Prefect Docs
2.0 - Storage - Storage - Prefect Docs
2.0 - Blocks - Blocks - Prefect Docs
2.0 - Secrets - Blocks - Prefect Docs
2.0 - KubernetesJob Parameters - prefect.infrastructure - Prefect Docs
Creating a Kubernetes Agent and Orion in Kubernetes - Deploying Prefect Agents on Kubernetes
1.0 - Kubernetes Run Configuration - Run Configuration | Prefect Docs
Troubleshooting
Question: When I run the deployment, I get an API Exception error (403):
kubernetes.client.exceptions.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'c395af06-9eea-4be3-84b5-66d6e27ff586', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'X-Kubernetes-Pf-Flowschema-Uid': '0c0a6c58-9c99-4c3a-acee-f92b60d7b69f', 'X-Kubernetes-Pf-Prioritylevel-Uid': 'd487a522-9823-458f-88ff-3cb352c0b141', 'Date': 'Thu, 28 Jul 2022 18:43:15 GMT', 'Content-Length': '309'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"jobs.batch is forbidden: User \"system:serviceaccount:orion:default\" cannot create resource \"jobs\" in API group \"batch\" in the namespace \"default\"","reason":"Forbidden","details":{"group":"batch","kind":"jobs"},"code":403}
Answer: The deployment.yaml contains the namespace where jobs will go, which is “default” by default.
This likely has occurred because your agent exists in another namespace, that is not “default”, and therefore, does not have role permissions to execute against this namespace.
The resolution is to update namespace: default
in your deployment.yaml config to the namespace that the prefect agent has access to.
Question: When I run the deployment, I get the error “Flow could not be retrieved”:
19:11:12.897 | ERROR | Flow run 'deft-newt' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 246, in retrieve_flow_then_begin_flow_run
flow = await load_flow_from_flow_run(flow_run, client=client)
File "/usr/local/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/deployments.py", line 325, in load_flow_from_flow_run
await storage_block.get_directory(from_path=None, local_path=".")
File "/usr/local/lib/python3.9/site-packages/prefect/filesystems.py", line 97, in get_directory
shutil.copytree(from_path, local_path, dirs_exist_ok=True)
File "/usr/local/lib/python3.9/shutil.py", line 566, in copytree
with os.scandir(src) as itr:
FileNotFoundError: [Errno 2] No such file or directory: '/Users/testuser/virtualenvs/prefect2/flows'
Answer: Kubernetes-Job requires remote storage to be configured, while the Deployment was created without a storage block. Consequently, it is looking for a local file, which does not exist within the image or deployment.
To resolve this issue, ensure that you have create a storage block, that it is included in your flow, and the --storage-block <block/name> is included in your deployment build command.