Hello,
I’m sharing my work on running workers as Kubernetes pods, it took some time to achieve, and will possibly break soon as workers are still in beta, but maybe it will save you from some troubles for today
I needed to run my worker as a kubernetes run and configure auto-restarting of unhealthy workers.
The current issues, at time of writting, are :
- the worker process does not exit with error automatically when entering unhealthy state, so the pod still runs
- unhealthy workers still send heartbeats, it can’t be use to define the health status
-
/work_pools/{work_pool_name}/queues/{name}
does not return the status we see in the UI displaying unhealthy, but it has the attributeis_paused
which I used to trigger the pod’s restart
Here’s the configuration, based on the agent deployement:
- A custom docker image for our worker, Dockerfile:
FROM prefecthq/prefect:2.10.1-python3.10
RUN apt-get update && apt-get install procps -y
RUN pip install prefect-kubernetes
COPY health_check.py health_check.py
RUN chmod +x health_check.py
COPY start-worker.sh start-worker.sh
RUN chmod +x start-worker.sh
with files :
start-worker.sh
#!/bin/bash
prefect cloud login --key $PREFECT_API_KEY --workspace $WORKSPACE
prefect work-queue resume default
prefect worker start --name my-worker --pool my-pool --work-queue default --type kubernetes
health_check.py
import requests
import json
import sys
import os
api_key = os.getenv("PREFECT_API_KEY")
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
query = os.getenv("PREFECT_API_URL")+"/work_pools/my-pool/queues/default"
response = requests.get(query, headers=headers)
data = response.json()
if data["is_paused"]:
sys.exit("Queue paused")
- worker.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: prefect-worker
namespace: prefect
labels:
app: prefect-worker
spec:
selector:
matchLabels:
app: prefect-worker
replicas: 1
template:
metadata:
labels:
app: prefect-worker
spec:
containers:
- name: worker
image: custom-image/worker
command: ["/usr/bin/tini", "-g", "--", "/opt/prefect/start-worker.sh"]
imagePullPolicy: "Always"
env:
- name: PREFECT_API_URL
value: https://api.prefect.cloud/api/accounts/####/workspaces/####
- name: PREFECT_API_KEY
value: ####
- name: WORKSPACE
value: ####
livenessProbe:
exec:
command:
- python
- health_check.py
initialDelaySeconds: 10
periodSeconds: 10
imagePullSecrets:
- name: docker-hub
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: prefect-worker
namespace: prefect
rules:
- apiGroups: [""]
resources: ["pods", "pods/log", "pods/status"]
verbs: ["get", "watch", "list"]
- apiGroups: ["batch"]
resources: ["jobs"]
verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: prefect-worker-role-binding
namespace: prefect
subjects:
- kind: ServiceAccount
name: default
namespace: prefect
roleRef:
kind: Role
name: prefect-worker
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: prefect-worker
rules:
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: prefect-worker-cluster-role-binding
subjects:
- kind: ServiceAccount
name: default
namespace: prefect
roleRef:
kind: ClusterRole
name: prefect-worker
apiGroup: rbac.authorization.k8s.io
- Automation rule : as the health check relies on the “pause” status, we need to add an automation. When the queue enters an unhealthy state, it is paused.
The queue is resumed when the worker restart, with the start-worker.sh script.
This is probably not the best way to acheive it, I’d be happy to see other methods!