How to run a stand-alone container as a subflow or task?

I’m using prefect with kubernetes infrastructure to scale out some large data / ML tasks. unfortunately, some of the tasks in my pipelines involve running containers parameterized with some params (e.g. docker run my_image:latest --param 1).

my question is, how can i orchestrate tasks like this with prefect? these tasks aren’t python programs, so i can’t define them in a @task.

approaches i have looked into:

  • i see the prefect-kubernetes package. Should I be using something from this? Is there something here where I can create a kubernetes job that runs my container with the arguments it needs, orchestrated by prefect?
  • i looked into a “docker in docker” approach. maybe i can build an image that includes a docker engine / daemon and (2) in my prefect tasks, use the python docker API to run things. but this seems maybe overly complicated?
  • should I use Cloud Run somehow?

my kubernetes and prefect experience is limited, so i apologize if i’m missing something obvious! i’m pretty comfortable with docker / containerization, but trying to use tools like prefect and kubernetes to parallelize my compute tasks.

to be more concrete, i want to make a flow like the following:

def run_container(foo: str = "x"):
    # do something equivalent to 
    # docker run --rm myimage:latest --x {x}
    # or
    # kubernetes client.BatchV1Api().create_namespaced_job(...)

def other_task(foo: str = "x"):

def post_process_result(result):

def my_flow():
    for x in list("abcd"):
        thing = other_task.submit(x)
        result = run_container(x, wait_for=[thing])
        post_process_result(result, wait_for=[result])

thank you SO much for any help!

i think i figured out a solution with prefect_kubernetes! sharing a minimal self-contained example below. would love feedback if anyone can take a look. (my example assumes an unauthenticated kubernetes cluster on your host (e.g. i am running KinD for development purposes)).


  • use the python kubernetes API to define a “v1Job”. this specifies the container’s image and command to execute
  • use prefect_kubernetes.run_namespaced_job to define a flow which (iiuc) wraps the kubernetes job in a prefect flow
  • call the flow inside another flow

this is so nice. it automatically cleans up (deletes) the kubernetes job after execution.

from kubernetes import client
from prefect import flow
from prefect_kubernetes import KubernetesJob, run_namespaced_job
from prefect_kubernetes.credentials import KubernetesCredentials

container = client.V1Container(
        "for i in {1..10}; do echo $i; done",
job = client.V1Job(
            spec=client.V1PodSpec(containers=[container], restart_policy="Never")

v1_job_dict = client.ApiClient().sanitize_for_serialization(job)

kubernetes_job = KubernetesJob(

def my_flow():
    job_run_result = run_namespaced_job(kubernetes_job)

if __name__ == "__main__":

to see the pod being created, running, and being deleted in real time, you can do

$ watch -n 0.1 'kubectl get pods -A --sort-by=.metadata.creationTimestamp | tail'

Update - learned that probably a prefect_gcp.cloud_run.CloudRunJob is ideal for my use case.

Sharing my code here in case it’s useful to anyone, and in case prefeccionistas have any feedback :slight_smile:

import asyncio
from prefect import flow
from prefect_gcp import GcpCredentials
from prefect_gcp.cloud_run import CloudRunJob

GCP_CREDS_BLOCK_NAME = "gcp-credentials"

with open("sa-private-key.json") as f:
    gcp_creds_block = GcpCredentials(
    ), overwrite=True)

async def cloud_run_job_flow(i: int, sleep_s: int = 5):
    creds_block = await GcpCredentials.load(GCP_CREDS_BLOCK_NAME)
    full_command = f'echo "hello from {i}!"; sleep {sleep_s}; echo "goodbye from {i}!"'
    cloud_run_job = CloudRunJob(
        command=["/bin/sh", "-c", full_command],
    return await

async def a_few_sleeping_flows(n: int):
    sleep_seconds = 10
    parallel_subflows = [cloud_run_job_flow(i, sleep_seconds) for i in range(n)]
    results = await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    n_runs = 3
    state =
    # state =
1 Like