The following manifest file is a simple example running in a default namespace to get you started.
Create a file called agent.yaml
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: k8agent
spec:
selector:
matchLabels:
app: k8agent
replicas: 1
template:
metadata:
labels:
app: k8agent
spec:
containers:
- name: agent
image: prefecthq/prefect:2.0b6-python3.9
command: ["prefect", "agent", "start", "k8s"]
imagePullPolicy: "IfNotPresent"
env:
- name: PREFECT_API_URL
value: https://api-beta.prefect.io/api/accounts/xxxxx/workspaces/xxxxx
- name: PREFECT_API_KEY
value: xxxxxxxx
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: flow-runner
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "pods/status"]
verbs: ["get", "watch", "list"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: flow-runner-role-binding
subjects:
- kind: ServiceAccount
name: default
roleRef:
kind: Role
name: flow-runner
apiGroup: rbac.authorization.k8s.io
Run the following commands to create the work queue referenced in the manifest’s start command (here, the queue is named k8s
):
prefect work-queue create k8s
Deploy your agent:
kubectl apply -f agent.yaml
Create an example flow with a KubernetesFlowRunner
definition - here, we created a file hello_k8y.py
:
from prefect import flow, task, get_run_logger
from prefect.deployments import DeploymentSpec
from prefect.flow_runners import KubernetesFlowRunner
import platform
@task
def say_hi():
logger = get_run_logger()
logger.info("Hello from K8s!")
@task
def print_platform_info():
logger = get_run_logger()
logger.info(
"Platform information: IP = %s, Python = %s, Platform type = %s, OS Version = %s",
platform.node(),
platform.python_version(),
platform.platform(),
platform.version(),
)
@flow
def hello():
hi = say_hi()
print_platform_info(wait_for=[hi])
DeploymentSpec(
name="k8s",
flow=hello,
flow_runner=KubernetesFlowRunner(
env=dict(
AWS_ACCESS_KEY_ID="xxxx",
AWS_SECRET_ACCESS_KEY="xxxx",
),
),
)
The AWS credentials are required if you use S3 as your default storage. Those credentials can be provided as environment variables but you need to supply those to either:
- the
KubernetesFlowRunner
as shown here - your execution layer e.g. via IAM roles for service accounts on AWS
To test that everything is working, create and run a deployment:
prefect deployment create hello_k8y.py
prefect deployment run hello/k8s