Running a worker as a kubernetes pod

Hello,
I’m sharing my work on running workers as Kubernetes pods, it took some time to achieve, and will possibly break soon as workers are still in beta, but maybe it will save you from some troubles for today :slight_smile:

I needed to run my worker as a kubernetes run and configure auto-restarting of unhealthy workers.
The current issues, at time of writting, are :

  • the worker process does not exit with error automatically when entering unhealthy state, so the pod still runs
  • unhealthy workers still send heartbeats, it can’t be use to define the health status
  • /work_pools/{work_pool_name}/queues/{name} does not return the status we see in the UI displaying unhealthy, but it has the attribute is_paused which I used to trigger the pod’s restart

Here’s the configuration, based on the agent deployement:

  • A custom docker image for our worker, Dockerfile:
FROM prefecthq/prefect:2.10.1-python3.10

RUN apt-get update && apt-get install procps -y
RUN pip install prefect-kubernetes

COPY health_check.py health_check.py
RUN chmod +x health_check.py
COPY start-worker.sh start-worker.sh
RUN chmod +x start-worker.sh

with files :
start-worker.sh

#!/bin/bash

prefect cloud login --key $PREFECT_API_KEY --workspace $WORKSPACE
prefect work-queue resume default
prefect worker start --name my-worker --pool my-pool --work-queue default --type kubernetes

health_check.py

import requests
import json
import sys
import os

api_key = os.getenv("PREFECT_API_KEY")

headers = {
    "Authorization": f"Bearer {api_key}",
    "Content-Type": "application/json",
}

query = os.getenv("PREFECT_API_URL")+"/work_pools/my-pool/queues/default"

response = requests.get(query, headers=headers)
data = response.json()

if data["is_paused"]:
  sys.exit("Queue paused")
  • worker.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: prefect-worker
  namespace: prefect
  labels:
    app: prefect-worker
spec:
  selector:
    matchLabels:
      app: prefect-worker
  replicas: 1
  template:
    metadata:
      labels:
        app: prefect-worker
    spec:
      containers:
        - name: worker
          image: custom-image/worker
          command: ["/usr/bin/tini", "-g", "--", "/opt/prefect/start-worker.sh"]
          imagePullPolicy: "Always"
          env:
            - name: PREFECT_API_URL
              value: https://api.prefect.cloud/api/accounts/####/workspaces/####
            - name: PREFECT_API_KEY
              value: ####
            - name: WORKSPACE
              value: ####
          livenessProbe:
            exec:
              command:
              - python
              - health_check.py
            initialDelaySeconds: 10
            periodSeconds: 10
      imagePullSecrets:
      - name: docker-hub
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: prefect-worker
  namespace: prefect
rules:
  - apiGroups: [""]
    resources: ["pods", "pods/log", "pods/status"]
    verbs: ["get", "watch", "list"]
  - apiGroups: ["batch"]
    resources: ["jobs"]
    verbs: [ "get", "list", "watch", "create", "update", "patch", "delete" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: prefect-worker-role-binding
  namespace: prefect
subjects:
  - kind: ServiceAccount
    name: default
    namespace: prefect
roleRef:
  kind: Role
  name: prefect-worker
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: prefect-worker
rules:
  - apiGroups: [""]
    resources: ["namespaces"]
    verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: prefect-worker-cluster-role-binding
subjects:
  - kind: ServiceAccount
    name: default
    namespace: prefect
roleRef:
  kind: ClusterRole
  name: prefect-worker
  apiGroup: rbac.authorization.k8s.io
  • Automation rule : as the health check relies on the “pause” status, we need to add an automation. When the queue enters an unhealthy state, it is paused.

The queue is resumed when the worker restart, with the start-worker.sh script.

This is probably not the best way to acheive it, I’d be happy to see other methods!

1 Like