How to run a stand-alone container as a subflow or task?

I’m using prefect with kubernetes infrastructure to scale out some large data / ML tasks. unfortunately, some of the tasks in my pipelines involve running containers parameterized with some params (e.g. docker run my_image:latest --param 1).

my question is, how can i orchestrate tasks like this with prefect? these tasks aren’t python programs, so i can’t define them in a @task.

approaches i have looked into:

  • i see the prefect-kubernetes package. Should I be using something from this? Is there something here where I can create a kubernetes job that runs my container with the arguments it needs, orchestrated by prefect?
  • i looked into a “docker in docker” approach. maybe i can build an image that includes a docker engine / daemon and (2) in my prefect tasks, use the python docker API to run things. but this seems maybe overly complicated?
  • should I use Cloud Run somehow?

my kubernetes and prefect experience is limited, so i apologize if i’m missing something obvious! i’m pretty comfortable with docker / containerization, but trying to use tools like prefect and kubernetes to parallelize my compute tasks.

to be more concrete, i want to make a flow like the following:

@task
def run_container(foo: str = "x"):
    # do something equivalent to 
    # docker run --rm myimage:latest --x {x}
    # or
    # kubernetes client.BatchV1Api().create_namespaced_job(...)

@task
def other_task(foo: str = "x"):
    ...

@task
def post_process_result(result):
    ...

@flow
def my_flow():
    for x in list("abcd"):
        thing = other_task.submit(x)
        result = run_container(x, wait_for=[thing])
        post_process_result(result, wait_for=[result])

thank you SO much for any help!

i think i figured out a solution with prefect_kubernetes! sharing a minimal self-contained example below. would love feedback if anyone can take a look. (my example assumes an unauthenticated kubernetes cluster on your host (e.g. i am running KinD for development purposes)).

outline:

  • use the python kubernetes API to define a “v1Job”. this specifies the container’s image and command to execute
  • use prefect_kubernetes.run_namespaced_job to define a flow which (iiuc) wraps the kubernetes job in a prefect flow
  • call the flow inside another flow

this is so nice. it automatically cleans up (deletes) the kubernetes job after execution.

from kubernetes import client
from prefect import flow
from prefect_kubernetes import KubernetesJob, run_namespaced_job
from prefect_kubernetes.credentials import KubernetesCredentials

container = client.V1Container(
    name="number-printer",
    image="ubuntu",
    command=[
        "/bin/bash",
        "-c",
        "for i in {1..10}; do echo $i; done",
    ],
)
job = client.V1Job(
    metadata=client.V1ObjectMeta(name="number-printer-job"),
    spec=client.V1JobSpec(
        template=client.V1PodTemplateSpec(
            spec=client.V1PodSpec(containers=[container], restart_policy="Never")
        )
    ),
)

v1_job_dict = client.ApiClient().sanitize_for_serialization(job)

kubernetes_job = KubernetesJob(
    v1_job=v1_job_dict,
    credentials=KubernetesCredentials(),
)


@flow
def my_flow():
    job_run_result = run_namespaced_job(kubernetes_job)
    print(job_run_result)


if __name__ == "__main__":
    my_flow()

to see the pod being created, running, and being deleted in real time, you can do

$ watch -n 0.1 'kubectl get pods -A --sort-by=.metadata.creationTimestamp | tail'

Update - learned that probably a prefect_gcp.cloud_run.CloudRunJob is ideal for my use case.

Sharing my code here in case it’s useful to anyone, and in case prefeccionistas have any feedback :slight_smile:


import asyncio
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_run import CloudRunJob


GCP_CREDS_BLOCK_NAME = "gcp-credentials"

with open("sa-private-key.json") as f:
    gcp_creds_block = GcpCredentials(
        project="MY-GCP-PROJECT",
        service_account_info=f.read(),
    )
gcp_creds_block.save(GCP_CREDS_BLOCK_NAME, overwrite=True)


@flow
async def cloud_run_job_flow(i: int, sleep_s: int = 5):
    creds_block = await GcpCredentials.load(GCP_CREDS_BLOCK_NAME)
    full_command = f'echo "hello from {i}!"; sleep {sleep_s}; echo "goodbye from {i}!"'
    cloud_run_job = CloudRunJob(
        image="us-docker.pkg.dev/cloudrun/container/job:latest",
        credentials=creds_block.dict(),
        region="us-central1",
        command=["/bin/sh", "-c", full_command],
        keep_job=True,
    )
    return await cloud_run_job.run()


@flow
async def a_few_sleeping_flows(n: int):
    sleep_seconds = 10
    parallel_subflows = [cloud_run_job_flow(i, sleep_seconds) for i in range(n)]
    results = await asyncio.gather(*parallel_subflows)
    print(results)


if __name__ == "__main__":
    n_runs = 3
    state = asyncio.run(a_few_sleeping_flows(n_runs))
    # state = asyncio.run(cloud_run_job_flow(i=0))
    print(state)
1 Like