I’m using prefect with kubernetes infrastructure to scale out some large data / ML tasks. unfortunately, some of the tasks in my pipelines involve running containers parameterized with some params (e.g. docker run my_image:latest --param 1
).
my question is, how can i orchestrate tasks like this with prefect? these tasks aren’t python programs, so i can’t define them in a @task
.
approaches i have looked into:
- i see the prefect-kubernetes package. Should I be using something from this? Is there something here where I can create a kubernetes job that runs my container with the arguments it needs, orchestrated by prefect?
- i looked into a “docker in docker” approach. maybe i can build an image that includes a docker engine / daemon and (2) in my prefect tasks, use the python docker API to run things. but this seems maybe overly complicated?
- should I use Cloud Run somehow?
my kubernetes and prefect experience is limited, so i apologize if i’m missing something obvious! i’m pretty comfortable with docker / containerization, but trying to use tools like prefect and kubernetes to parallelize my compute tasks.
to be more concrete, i want to make a flow like the following:
@task
def run_container(foo: str = "x"):
# do something equivalent to
# docker run --rm myimage:latest --x {x}
# or
# kubernetes client.BatchV1Api().create_namespaced_job(...)
@task
def other_task(foo: str = "x"):
...
@task
def post_process_result(result):
...
@flow
def my_flow():
for x in list("abcd"):
thing = other_task.submit(x)
result = run_container(x, wait_for=[thing])
post_process_result(result, wait_for=[result])
thank you SO much for any help!
i think i figured out a solution with prefect_kubernetes! sharing a minimal self-contained example below. would love feedback if anyone can take a look. (my example assumes an unauthenticated kubernetes cluster on your host (e.g. i am running KinD for development purposes)).
outline:
- use the python kubernetes API to define a “v1Job”. this specifies the container’s image and command to execute
- use prefect_kubernetes.run_namespaced_job to define a flow which (iiuc) wraps the kubernetes job in a prefect flow
- call the flow inside another flow
this is so nice. it automatically cleans up (deletes) the kubernetes job after execution.
from kubernetes import client
from prefect import flow
from prefect_kubernetes import KubernetesJob, run_namespaced_job
from prefect_kubernetes.credentials import KubernetesCredentials
container = client.V1Container(
name="number-printer",
image="ubuntu",
command=[
"/bin/bash",
"-c",
"for i in {1..10}; do echo $i; done",
],
)
job = client.V1Job(
metadata=client.V1ObjectMeta(name="number-printer-job"),
spec=client.V1JobSpec(
template=client.V1PodTemplateSpec(
spec=client.V1PodSpec(containers=[container], restart_policy="Never")
)
),
)
v1_job_dict = client.ApiClient().sanitize_for_serialization(job)
kubernetes_job = KubernetesJob(
v1_job=v1_job_dict,
credentials=KubernetesCredentials(),
)
@flow
def my_flow():
job_run_result = run_namespaced_job(kubernetes_job)
print(job_run_result)
if __name__ == "__main__":
my_flow()
to see the pod being created, running, and being deleted in real time, you can do
$ watch -n 0.1 'kubectl get pods -A --sort-by=.metadata.creationTimestamp | tail'
Update - learned that probably a prefect_gcp.cloud_run.CloudRunJob
is ideal for my use case.
Sharing my code here in case it’s useful to anyone, and in case prefeccionistas have any feedback 
import asyncio
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_run import CloudRunJob
GCP_CREDS_BLOCK_NAME = "gcp-credentials"
with open("sa-private-key.json") as f:
gcp_creds_block = GcpCredentials(
project="MY-GCP-PROJECT",
service_account_info=f.read(),
)
gcp_creds_block.save(GCP_CREDS_BLOCK_NAME, overwrite=True)
@flow
async def cloud_run_job_flow(i: int, sleep_s: int = 5):
creds_block = await GcpCredentials.load(GCP_CREDS_BLOCK_NAME)
full_command = f'echo "hello from {i}!"; sleep {sleep_s}; echo "goodbye from {i}!"'
cloud_run_job = CloudRunJob(
image="us-docker.pkg.dev/cloudrun/container/job:latest",
credentials=creds_block.dict(),
region="us-central1",
command=["/bin/sh", "-c", full_command],
keep_job=True,
)
return await cloud_run_job.run()
@flow
async def a_few_sleeping_flows(n: int):
sleep_seconds = 10
parallel_subflows = [cloud_run_job_flow(i, sleep_seconds) for i in range(n)]
results = await asyncio.gather(*parallel_subflows)
print(results)
if __name__ == "__main__":
n_runs = 3
state = asyncio.run(a_few_sleeping_flows(n_runs))
# state = asyncio.run(cloud_run_job_flow(i=0))
print(state)
1 Like