How to customize Kubernetes jobs with KubernetesFlowRunner's customizations?

Getting the default Kubernetes job template

First, run:

prefect kubernetes manifest flow-run-job

This will generate:

apiVersion: batch/v1
kind: Job
metadata:
  # labels are required, even if empty
  labels: {}
spec:
  template:
    spec:
      containers:  # the first container is required
      - env: []  # env is required, even if empty
        name: prefect-job

Then, you can provide custom inline customizations as shown in the examples below.

Customizations

Auto-clean-up K8s jobs

KubernetesFlowRunner(namespace="prefect", customizations=[
        { "op": "add", "path": "/spec/ttlSecondsAfterFinished","value": 10}
    ])

Secrets

Environment variable from a Secret which requires appending to the container’s env list with …/env/-:

from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner

@flow
def my_flow(...):
    ...

# Add MY_API_TOKEN as an environment variable
#
# - name: MY_API_TOKEN
#   valueFrom:
#     secretKeyRef:
#       name: the-secret-name
#       key: api-token

DeploymentSpec(
    name="my-flow",
    flow=my_flow,
    flow_runner=KubernetesFlowRunner(
        namespace="my-world",
        image="my/image:mytag",
        customizations=[
            {
                "op": "add",
                "path": "/spec/template/spec/containers/0/env/-",
                "value": {
                    "name": "MY_API_TOKEN",
                    "valueFrom": {
                        "secretKeyRef": {
                            "name": "the-secret-name",  "key": "api-token",
                        }
                    },
                },
            }
        ],
    ),
)

Resources - CPU and memory

from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
from prefect import flow


@flow
def my_flow():
    ...


DeploymentSpec(
    name="my-flow",
    flow=my_flow,
    flow_runner=KubernetesFlowRunner(
        namespace="my-world",
        image="my/image:mytag",
        customizations=[
            {
                "op": "add",
                "path": "/spec/template/spec/resources",
                "value": {"limits": {"memory": "8Gi", "cpu": "4000m"}},
            }
        ],
    ),
)

Scheduling a flow to run on a GPU-enabled node in GCP

from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
from prefect import flow


@flow
def my_flow():
    ...


# Request a GPU node on GCP:
# https://cloud.google.com/kubernetes-engine/docs/how-to/gpus
# ...
#    resources:
#      limits:
#       nvidia.com/gpu: 2
#  nodeSelector:
#    cloud.google.com/gke-accelerator: nvidia-tesla-k80

DeploymentSpec(
    name="my-flow",
    flow=my_flow,
    flow_runner=KubernetesFlowRunner(
        namespace="my-world",
        image="my/image:mytag",
        customizations=[
            {
                "op": "add",
                "path": "/spec/template/spec/resources",
                "value": {"limits": {}},
            },
            {
                "op": "add",
                "path": "/spec/template/spec/resources/limits",
                "value": {"nvidia.com/gpu": 2},
            },
            {
                "op": "add",
                "path": "/spec/template/spec/nodeSelector",
                "value": {"cloud.google.com/gke-accelerator": "nvidia-tesla-k80"},
            },
        ],
    ),
)

Helper utilities for reading customizations or job definitions from files

from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
from prefect import flow


@flow
def my_flow():
    ...


DeploymentSpec(
    name="my-flow",
    flow=my_flow,
    flow_runner=KubernetesFlowRunner(
        namespace="my-world",
        image="my/image:mytag",
        customizations=KubernetesFlowRunner.customize_from_file("customization.yaml"),
    ),
)

...

DeploymentSpec(
    name="my-flow",
    flow=my_flow,
    flow_runner=KubernetesFlowRunner(
        namespace="my-world",
        image="my/image:mytag",
        job=KubernetesFlowRunner.job_from_file("my-world-job.yaml"),
    ),
)