Can Prefect run each task in a different Docker container?

Packaging code dependencies can be challenging, especially if some of your tasks require different Python library versions than other tasks.

There are two patterns supported by Prefect to separate code dependencies on a per task level:

1. Flow-of-flows orchestrator pattern

Prefect allows you to manage dependencies not only between tasks in a single data pipeline but also to manage dependencies between individual data pipelines.

Prefect can provide this high degree of flexibility with respect to how you trigger your flow runs because running flows is decoupled from the scheduler.

To provide different images for separate tasks you may have several child-flows and one parent flow (the “orchestrator” flow). Then, within the parent flow, you can specify the image you wish to use by using the run configuration:

with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
    normal_non_subflow_task = hello_world()
    first_child_flow_run_id = create_flow_run(
        flow_name="first_flow_docker",
        project_name=PREFECT_PROJECT_NAME,
        parameters=dict(user_input="first child flow"),
        run_config=DockerRun(
            labels=[AGENT_LABEL], image="prefecthq/prefect:1.0.0-python3.9"
        ),
        upstream_tasks=[normal_non_subflow_task],
    )

Let’s look at a full example.

1.1 Child flows

First subflow:

import platform
import prefect
from prefect import Flow, Parameter, task
from prefect.client.secrets import Secret
from prefect.storage import S3
from prefect.run_configs import DockerRun


FLOW_NAME = "first_flow_docker"
AGENT_LABEL = "docker"
AWS_ACCOUNT_ID = Secret("AWS_ACCOUNT_ID").get()
STORAGE = S3(
    bucket="prefectdata",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"flows/different_images_per_subflows/{FLOW_NAME}.py",
)

RUN_CONFIG = DockerRun(labels=[AGENT_LABEL],)


@task(log_stdout=True)
def hello_world(x: str):
    print(f"Hello {x} from {FLOW_NAME}!")
    print(
        f"Running this task with Prefect: {prefect.__version__} and Python {platform.python_version()}"
    )


with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
    user_input = Parameter("user_input", default="Marvin")
    hw = hello_world(user_input)

Second subflow:

import platform
import prefect
from prefect import Flow, Parameter, task
from prefect.client.secrets import Secret
from prefect.storage import S3
from prefect.run_configs import DockerRun


FLOW_NAME = "second_flow_docker"
AGENT_LABEL = "docker"
AWS_ACCOUNT_ID = Secret("AWS_ACCOUNT_ID").get()
STORAGE = S3(
    bucket="prefectdata",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"flows/different_images_per_subflows/{FLOW_NAME}.py",
)

RUN_CONFIG = DockerRun(labels=[AGENT_LABEL],)


@task(log_stdout=True)
def hello_world(x: str):
    print(f"Hello {x} from {FLOW_NAME}!")
    print(
        f"Running this task with Prefect: {prefect.__version__} and Python {platform.python_version()}"
    )


with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
    user_input = Parameter("user_input", default="World")
    hw = hello_world(user_input)

1.2 Parent flow

"""
python flows/different_images_per_subflows/parent_docker_different_images_per_subflows.py
"""
import uuid
from prefect import Flow, task
from prefect.client.secrets import Secret
from prefect.storage import S3
from prefect.run_configs import DockerRun
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
import subprocess

FLOW_NAME = "parent_docker_different_images_per_subflow"
AGENT_LABEL = "docker"
PREFECT_PROJECT_NAME = "community"
AWS_ACCOUNT_ID = Secret("AWS_ACCOUNT_ID").get()
STORAGE = S3(
    bucket="prefectdata",
    key=f"flows/{FLOW_NAME}.py",
    stored_as_script=True,
    # this will ensure to upload the Flow script to S3 during registration
    local_script_path=f"flows/different_images_per_subflows/{FLOW_NAME}.py",
)

RUN_CONFIG = DockerRun(labels=[AGENT_LABEL],)


@task(log_stdout=True)
def hello_world():
    print(f"Hello from {FLOW_NAME}!")
    return FLOW_NAME


with Flow(FLOW_NAME, storage=STORAGE, run_config=RUN_CONFIG,) as flow:
    normal_non_subflow_task = hello_world()
    # === FIRST FLOW WITH PYTHON 3.9 DOCKER IMAGE ===
    first_child_flow_run_id = create_flow_run(
        flow_name="first_flow_docker",
        project_name=PREFECT_PROJECT_NAME,
        parameters=dict(user_input="first child flow"),
        run_config=DockerRun(
            labels=[AGENT_LABEL], image="prefecthq/prefect:1.0.0-python3.9"
        ),
        task_args=dict(name="First subflow with Python 3.9 image"),
        upstream_tasks=[normal_non_subflow_task],
    )
    first_child_flowrunview = wait_for_flow_run(
        first_child_flow_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for the first subflow"),
    )
    # === SECOND FLOW WITH PYTHON 3.8 DOCKER IMAGE  ===
    second_child_flow_run_id = create_flow_run(
        flow_name="second_flow_docker",
        project_name=PREFECT_PROJECT_NAME,
        parameters=dict(user_input="second child flow"),
        run_config=DockerRun(
            labels=[AGENT_LABEL], image="prefecthq/prefect:1.0.0-python3.8"
        ),
        task_args=dict(name="Second subflow with Python 3.8 image"),
        upstream_tasks=[first_child_flowrunview],
    )
    second_child_flowrunview = wait_for_flow_run(
        second_child_flow_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for the second subflow"),
    )
    # === FIRST FLOW AGAIN BUT WITH PYTHON 3.7 DOCKER IMAGE  ===
    again_first_child_flow_run_id = create_flow_run(
        flow_name="first_flow_docker",
        project_name=PREFECT_PROJECT_NAME,
        parameters=dict(user_input="first child flow"),
        run_config=DockerRun(
            labels=[AGENT_LABEL], image="prefecthq/prefect:1.0.0-python3.7"
        ),
        # idempotency_key=str(uuid.uuid4()),
        task_args=dict(name="First subflow with Python 3.7 image"),
        upstream_tasks=[second_child_flowrunview],
    )
    first_child_flowrunview = wait_for_flow_run(
        again_first_child_flow_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for the first subflow with 3.7 image"),
    )
if __name__ == "__main__":
    subprocess.run(
        f"prefect register --project {PREFECT_PROJECT_NAME} -p flows/different_images_per_subflows/",
        shell=True,
    )
    subprocess.run(
        f"prefect run --name {FLOW_NAME} --project {PREFECT_PROJECT_NAME}", shell=True
    )
    subprocess.run(
        f"prefect agent docker start --label {AGENT_LABEL} --volume ~/.aws:/root/.aws",
        shell=True,
    )

Here is the flow structure when we visualize it with flow.visualize():

If you want to run this full example, clone this repository:

And run this command in your terminal:

python flows/different_images_per_subflows/parent_docker_different_images_per_subflow.py

If we inspect the agent logs, we can see that all Docker images that were required for each flow run were dynamically pulled at flow runtime:

Creating run for flow 'parent_docker_different_images_per_subflow'... Done
└── Name: peridot-chicken
└── UUID: 1b5d2fe0-0d1b-499d-b9fe-dcc78b8530fc
└── Labels: ['docker']
└── Parameters: {}
└── Context: {}
└── URL: https://cloud.prefect.io/anna-prefect/flow-run/1b5d2fe0-0d1b-499d-b9fe-dcc78b8530fc
[2022-03-05 16:31:07,386] INFO - agent | Registering agent...
[2022-03-05 16:31:07,667] INFO - agent | Registration successful!

 ____            __           _        _                    _
|  _ \ _ __ ___ / _| ___  ___| |_     / \   __ _  ___ _ __ | |_
| |_) | '__/ _ \ |_ / _ \/ __| __|   / _ \ / _` |/ _ \ '_ \| __|
|  __/| | |  __/  _|  __/ (__| |_   / ___ \ (_| |  __/ | | | |_
|_|   |_|  \___|_|  \___|\___|\__| /_/   \_\__, |\___|_| |_|\__|
                                           |___/

[2022-03-05 16:31:08,882] INFO - agent | Starting DockerAgent with labels ['docker']
[2022-03-05 16:31:08,883] INFO - agent | Agent documentation can be found at https://docs.prefect.io/orchestration/
[2022-03-05 16:31:08,883] INFO - agent | Waiting for flow runs...
[2022-03-05 16:31:09,668] INFO - agent | Deploying flow run 1b5d2fe0-0d1b-499d-b9fe-dcc78b8530fc to execution environment...
[2022-03-05 16:31:09,997] INFO - agent | Pulling image prefecthq/prefect:1.0.0...
[2022-03-05 16:31:11,866] INFO - agent | Successfully pulled image prefecthq/prefect:1.0.0
[2022-03-05 16:31:12,504] INFO - agent | Completed deployment of flow run 1b5d2fe0-0d1b-499d-b9fe-dcc78b8530fc
[2022-03-05 16:31:27,478] INFO - agent | Deploying flow run d51b224b-ca8c-4613-b764-91e351073697 to execution environment...
[2022-03-05 16:31:27,763] INFO - agent | Pulling image prefecthq/prefect:1.0.0-python3.9...
[2022-03-05 16:31:29,360] INFO - agent | Successfully pulled image prefecthq/prefect:1.0.0-python3.9
[2022-03-05 16:31:30,137] INFO - agent | Completed deployment of flow run d51b224b-ca8c-4613-b764-91e351073697
[2022-03-05 16:31:45,609] INFO - agent | Deploying flow run 22b8504d-9ca7-4220-af77-c9a73da7cf8d to execution environment...
[2022-03-05 16:31:45,992] INFO - agent | Pulling image prefecthq/prefect:1.0.0-python3.8...
[2022-03-05 16:31:47,839] INFO - agent | Successfully pulled image prefecthq/prefect:1.0.0-python3.8
[2022-03-05 16:31:48,758] INFO - agent | Completed deployment of flow run 22b8504d-9ca7-4220-af77-c9a73da7cf8d
[2022-03-05 16:32:13,501] INFO - agent | Deploying flow run cfd0f236-98ad-478b-b70d-d719a66d7893 to execution environment...
[2022-03-05 16:32:13,827] INFO - agent | Pulling image prefecthq/prefect:1.0.0-python3.7...
[2022-03-05 16:32:15,629] INFO - agent | Successfully pulled image prefecthq/prefect:1.0.0-python3.7
[2022-03-05 16:32:16,458] INFO - agent | Completed deployment of flow run cfd0f236-98ad-478b-b70d-d719a66d7893

And inspecting the flow run logs in the Prefect Cloud UI we can see the output of all subflows directly in the parent flow, confirming using different Docker images per subflow:

1.3 Extending this use case to incorporate custom conditions

If you would like to use a different image based on a specific condition, check out this topic:

2. Using Docker tasks from the task library

An alternative to subflows is using various Docker tasks from the Prefect task library.

This way, you can start various dockerized tasks in your flow and run those containers either directly on your VM or on a Kubernetes cluster.

This documentation page shows how to use the Docker tasks with a Kubernetes rather than a Docker agent:

A simplified flow example from that page:

from prefect import Flow
from prefect.storage import Docker
from prefect.run_configs import KubernetesRun
from prefect.tasks.docker import (
    PullImage,
    CreateContainer,
    StartContainer,
    GetContainerLogs,
    WaitOnContainer,
)
from prefect.triggers import always_run

# Initialize the tasks from the task library with all constant parameters
# Note that we pass the host of the Docker daemon to each task
image = PullImage(
    docker_server_url="tcp://localhost:2375",
    repository="prefecthq/prefect",
    tag="latest",
)
create_container = CreateContainer(
    docker_server_url="tcp://localhost:2375",
    image_name="prefecthq/prefect:latest",
    command='''python -c "from prefect import Flow; f = Flow('empty'); f.run()"''',
)
start_container = StartContainer(docker_server_url="tcp://localhost:2375")
wait_on_container = WaitOnContainer(docker_server_url="tcp://localhost:2375")
# We pass `trigger=always_run` here so the logs will always be retrieved, even
# if upstream tasks fail
get_logs = GetContainerLogs(
    docker_server_url="tcp://localhost:2375", trigger=always_run
)

with Flow("Docker sidecar example") as flow:
    # Create and start the docker container
    container_id = create_container(image)
    started = start_container(container_id=container_id)
    # Once the docker container has started, wait until it's completed and get the status
    status_code = wait_on_container(container_id=container_id, upstream_tasks=[started])
    # Once the status code has been retrieved, retrieve the logs
    logs = get_logs(container_id=container_id, upstream_tasks=[status_code])

flow.run_config = KubernetesRun()
flow.storage = Docker(
    registry_url="gcr.io/dev/", image_name="docker-on-k8s-flow", image_tag="0.1.0"
)

But note that it’s also possible to do the same using other Prefect agents.

The same pattern for Prefect 2.0

Note about pros and cons of this approach

Pros

  • higher isolation for dependency management

Cons

Running each task in a separate container may be less beneficial than it may initially seem to be:

  • it introduces a lot of overhead in the build process
  • overhead at runtime,
  • more storage requirements and thus costs,
  • makes it harder to pass data between tasks/processes

Conclusion

Running everything in Docker is not a silver bullet, so make sure to carefully evaluate your use case and consider the trade-offs mentioned above

How do you do this in Prefect 2? That is, how do you run each task in a flow in a different Docker container in Prefect 2? The above examples are using Prefect 1, if I understand correctly.

you can use DockerContainer block - you can invoke it from anywhere, from task, flow, subflow

for running on Kubernetes, would you use KubernetesJob blocks? how would you configure it so that each task uses a separate KubernetesJob block and runs in a different pod/container?

would you need to create a separate Deployment per task if you wanted to do this?