How to deploy a Prefect 2.0 agent to a local Kubernetes cluster and connect it to Cloud 2.0 backend?

The following manifest file is a simple example running in a default namespace to get you started.

Create a file called agent.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: k8agent
spec:
  selector:
    matchLabels:
      app: k8agent
  replicas: 1
  template:
    metadata:
      labels:
        app: k8agent
    spec:
      containers:
        - name: agent
          image: prefecthq/prefect:2.0b6-python3.9
          command: ["prefect", "agent", "start", "k8s"]
          imagePullPolicy: "IfNotPresent"
          env:
            - name: PREFECT_API_URL
              value: https://api-beta.prefect.io/api/accounts/xxxxx/workspaces/xxxxx
            - name: PREFECT_API_KEY
              value: xxxxxxxx
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: flow-runner
rules:
  - apiGroups: [""]
    resources: ["pods", "pods/log", "pods/status"]
    verbs: ["get", "watch", "list"]
  - apiGroups: ["batch"]
    resources: ["jobs"]
    verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: flow-runner-role-binding
subjects:
  - kind: ServiceAccount
    name: default
roleRef:
  kind: Role
  name: flow-runner
  apiGroup: rbac.authorization.k8s.io

Run the following commands to create the work queue referenced in the manifest’s start command (here, the queue is named k8s):

prefect work-queue create k8s

Deploy your agent:

kubectl apply -f agent.yaml

Create an example flow with a KubernetesFlowRunner definition - here, we created a file hello_k8y.py:

from prefect import flow, task, get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
import platform


@task
def say_hi():
    logger = get_run_logger()
    logger.info("Hello from K8s!")


@task
def print_platform_info():
    logger = get_run_logger()
    logger.info(
        "Platform information: IP = %s, Python = %s, Platform type = %s, OS Version = %s",
        platform.node(),
        platform.python_version(),
        platform.platform(),
        platform.version(),
    )


@flow
def hello():
    hi = say_hi()
    print_platform_info(wait_for=[hi])


DeploymentSpec(
    name="k8s",
    flow=hello,
    flow_runner=KubernetesFlowRunner(
        env=dict(
            AWS_ACCESS_KEY_ID="xxxx",
            AWS_SECRET_ACCESS_KEY="xxxx",
        ),
    ),
)

The AWS credentials are required if you use S3 as your default storage. Those credentials can be provided as environment variables but you need to supply those to either:

To test that everything is working, create and run a deployment:

prefect deployment create hello_k8y.py
prefect deployment run hello/k8s

To save others some time: If you happen to run into prefect.exceptions.PrefectHTTPStatusError: Client error '422 Unprocessable Entity' errors, make sure to use the UUIDs of the workspace and account in the PREFECT_API_URL and not their names.

You can get the correct full PREFECT_API_URL from prefect profile inspect default after you authenticated locally with your client :slight_smile:

1 Like

Good point, thanks for sharing :raised_hands: