How can I run multiple subflows (or child flows) in parallel?

Prefect 2.0

Asynchronous subflows can be run in parallel by using AnyIO task groups or asyncio.gather. Here is an example:

import asyncio
from prefect import flow

@flow
async def subflow_1():
    print("Subflow 1 started!")
    await asyncio.sleep(1)

@flow
async def subflow_2():
    print("Subflow 2 started!")
    await asyncio.sleep(1)

@flow
async def subflow_3():
    print("Subflow 3 started!")
    await asyncio.sleep(1)

@flow
async def subflow_4():
    print("Subflow 4 started!")
    await asyncio.sleep(1)

@flow
async def main_flow():
    parallel_subflows = [subflow_1(), subflow_2(), subflow_3(), subflow_4()]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
The output of such flow run
13:24:16.962 | Beginning flow run 'friendly-scorpion' for flow 'main-flow'...
13:24:16.962 | Starting task runner `SequentialTaskRunner`...
13:24:17.121 | Beginning subflow run 'apricot-weasel' for flow 'subflow-1'...
13:24:17.121 | Starting task runner `SequentialTaskRunner`...
13:24:17.151 | Beginning subflow run 'spotted-coot' for flow 'subflow-4'...
13:24:17.151 | Starting task runner `SequentialTaskRunner`...
Subflow 1 started!
Subflow 4 started!
13:24:17.355 | Beginning subflow run 'masked-jackrabbit' for flow 'subflow-3'...
13:24:17.355 | Starting task runner `SequentialTaskRunner`...
Subflow 3 started!
13:24:17.560 | Beginning subflow run 'ivory-kiwi' for flow 'subflow-2'...
13:24:17.560 | Starting task runner `SequentialTaskRunner`...
Subflow 2 started!
13:24:18.190 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.277 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.314 | Subflow run 'apricot-weasel' finished in state Completed(message=None, type=COMPLETED)
13:24:18.337 | Subflow run 'spotted-coot' finished in state Completed(message=None, type=COMPLETED)
13:24:18.394 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.451 | Subflow run 'masked-jackrabbit' finished in state Completed(message=None, type=COMPLETED)
13:24:18.599 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.653 | Subflow run 'ivory-kiwi' finished in state Completed(message=None, type=COMPLETED)
13:24:18.655 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.677 | Flow run 'friendly-scorpion' finished in state Completed(message='All states completed.', type=COMPLETED)

We’ll likely add an interface for running subflows in parallel without using async in the future.

Prefect 1.0

To create multiple parallel subflow runs in Prefect 1.0, you would need to leverage mapping. Here is an example mapping over three subflows from the same project:

from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor

with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
    mapped_flow_run_ids = create_flow_run.map(
        flow_name=["flow_name_1", "flow_name_2", "flow_name_3"],
        project_name=unmapped("your_project_name"),
    )

Based on this, Executors | Prefect Docs, I think the example for Prefect 1.0 uses multi-threading?

What about Prefect 2.0? Does it use multi-threading / multi-processing, or does it just use a single thread asynchronously?

You’re spot on. By default, LocalDaskExecutor uses threads, while Orion uses a single thread asynchronously in the default ConcurrentTaskRunner.

1 Like

related issue

1 Like

Hi! I would like to ask if there are any new best practices on how to run subflows in parallel in Prefect 2.0. I think a good implementation would be to support .submit() of flows to a task_runner like Dask

1 Like

Yup, there’s a GitHub issue for it somewhere and it’s definitely coming! Stay tuned, I’d expect it within a couple of weeks

1 Like

Any update on this? Do we have an interface to run subflows in parallel?

No, not yet. You can follow up directly on the GitHub issue

This issue is added to the backlog.

The current workaround for this seems to be:

import asyncio
from prefect import flow, task, get_run_logger


@task
async def my_task(n: int):
    get_run_logger().info(f"Task {n} started!")
    await asyncio.sleep(1)


async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_task(x)

    await subflow(n)


@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])


if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
1 Like

Hi Khuyên,

My use case is slightly different from the example. I need to build a subflow from a subflow rather than a task:

import asyncio
from prefect import flow, task, get_run_logger

@flow
async def my_flow(n: int):
    sub_subflow_1(n)
    sub_subflow_2(n)
    sub_subflow_3(n)
    await asyncio.sleep(1)

async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_flow(x)

    await subflow(n)

    await asyncio.sleep(1)

@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

I expect it to work but rather it throw the same RuntimeError(“The task runner is already started!”). Where do I do wrong? How can I fix it?

Thanks

So this is just a workaround, we don’t have a solution for this yet

You can run multiple flows from deployments concurrently with no issues - this is something I’d recommend trying for now:

from prefect.deployments import run_deployment


for _ in range(1, 100):
    run_deployment(name="flow/deployment", timeout=0)
2 Likes

Adding this here for future queries:

import time

import requests
from prefect import get_run_logger, task
from prefect.deployments import run_deployment

@task
def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000):

    logger = get_run_logger()

    api_key = "XX"
    api_url = "xx"

    while True:

        flow_run = request_prefect_api(path=f"/flow_runs/{flow_run_id}", api_key=api_key, api_url=api_url)
        logger.info(f"{flow_run['name']} : {flow_run['state_type']}")

        state = flow_run["state_type"]

        if state in ["COMPLETED", "CANCELLED"]:
            break
        if state == "FAILED":
            raise Exception(f"Deployment: {flow_run['name']}, Flow run failed: {flow_run['state']['message']}")
        if flow_run["total_run_time"] > max_run_time:
            raise Exception(f"Flow run exceeded max run time of {max_run_time} seconds: marking as failed")

        time.sleep(log_polling_interval)


def request_prefect_api(path: str, api_key: str, api_url: str):
    url = f"{api_url}{path}"
    headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
    return requests.get(url, headers=headers).json()

# now we can do 
run_ids = []
for _ in range(1, 100):
    run_ids.append(run_deployment(name="flow/deployment", timeout=0).id)

[wait_for_deployment(r) for r in run_ids]