Getting the default Kubernetes job template
First, run:
prefect kubernetes manifest flow-run-job
This will generate:
apiVersion: batch/v1
kind: Job
metadata:
# labels are required, even if empty
labels: {}
spec:
template:
spec:
containers: # the first container is required
- env: [] # env is required, even if empty
name: prefect-job
Then, you can provide custom inline customizations as shown in the examples below.
Customizations
Auto-clean-up K8s jobs
KubernetesFlowRunner(namespace="prefect", customizations=[
{ "op": "add", "path": "/spec/ttlSecondsAfterFinished","value": 10}
])
Secrets
Environment variable from a Secret which requires appending to the container’s env list with …/env/-:
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
@flow
def my_flow(...):
...
# Add MY_API_TOKEN as an environment variable
#
# - name: MY_API_TOKEN
# valueFrom:
# secretKeyRef:
# name: the-secret-name
# key: api-token
DeploymentSpec(
name="my-flow",
flow=my_flow,
flow_runner=KubernetesFlowRunner(
namespace="my-world",
image="my/image:mytag",
customizations=[
{
"op": "add",
"path": "/spec/template/spec/containers/0/env/-",
"value": {
"name": "MY_API_TOKEN",
"valueFrom": {
"secretKeyRef": {
"name": "the-secret-name", "key": "api-token",
}
},
},
}
],
),
)
Resources - CPU and memory
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
from prefect import flow
@flow
def my_flow():
...
DeploymentSpec(
name="my-flow",
flow=my_flow,
flow_runner=KubernetesFlowRunner(
namespace="my-world",
image="my/image:mytag",
customizations=[
{
"op": "add",
"path": "/spec/template/spec/resources",
"value": {"limits": {"memory": "8Gi", "cpu": "4000m"}},
}
],
),
)
Scheduling a flow to run on a GPU-enabled node in GCP
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
from prefect import flow
@flow
def my_flow():
...
# Request a GPU node on GCP:
# https://cloud.google.com/kubernetes-engine/docs/how-to/gpus
# ...
# resources:
# limits:
# nvidia.com/gpu: 2
# nodeSelector:
# cloud.google.com/gke-accelerator: nvidia-tesla-k80
DeploymentSpec(
name="my-flow",
flow=my_flow,
flow_runner=KubernetesFlowRunner(
namespace="my-world",
image="my/image:mytag",
customizations=[
{
"op": "add",
"path": "/spec/template/spec/resources",
"value": {"limits": {}},
},
{
"op": "add",
"path": "/spec/template/spec/resources/limits",
"value": {"nvidia.com/gpu": 2},
},
{
"op": "add",
"path": "/spec/template/spec/nodeSelector",
"value": {"cloud.google.com/gke-accelerator": "nvidia-tesla-k80"},
},
],
),
)
Helper utilities for reading customizations or job definitions from files
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
from prefect import flow
@flow
def my_flow():
...
DeploymentSpec(
name="my-flow",
flow=my_flow,
flow_runner=KubernetesFlowRunner(
namespace="my-world",
image="my/image:mytag",
customizations=KubernetesFlowRunner.customize_from_file("customization.yaml"),
),
)
...
DeploymentSpec(
name="my-flow",
flow=my_flow,
flow_runner=KubernetesFlowRunner(
namespace="my-world",
image="my/image:mytag",
job=KubernetesFlowRunner.job_from_file("my-world-job.yaml"),
),
)