How can I run multiple subflows (or child flows) in parallel?

Prefect 2.0

Asynchronous subflows can be run in parallel by using AnyIO task groups or asyncio.gather. Here is an example:

import asyncio
from prefect import flow

@flow
async def subflow_1():
    print("Subflow 1 started!")
    await asyncio.sleep(1)

@flow
async def subflow_2():
    print("Subflow 2 started!")
    await asyncio.sleep(1)

@flow
async def subflow_3():
    print("Subflow 3 started!")
    await asyncio.sleep(1)

@flow
async def subflow_4():
    print("Subflow 4 started!")
    await asyncio.sleep(1)

@flow
async def main_flow():
    parallel_subflows = [subflow_1(), subflow_2(), subflow_3(), subflow_4()]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
The output of such flow run
13:24:16.962 | Beginning flow run 'friendly-scorpion' for flow 'main-flow'...
13:24:16.962 | Starting task runner `SequentialTaskRunner`...
13:24:17.121 | Beginning subflow run 'apricot-weasel' for flow 'subflow-1'...
13:24:17.121 | Starting task runner `SequentialTaskRunner`...
13:24:17.151 | Beginning subflow run 'spotted-coot' for flow 'subflow-4'...
13:24:17.151 | Starting task runner `SequentialTaskRunner`...
Subflow 1 started!
Subflow 4 started!
13:24:17.355 | Beginning subflow run 'masked-jackrabbit' for flow 'subflow-3'...
13:24:17.355 | Starting task runner `SequentialTaskRunner`...
Subflow 3 started!
13:24:17.560 | Beginning subflow run 'ivory-kiwi' for flow 'subflow-2'...
13:24:17.560 | Starting task runner `SequentialTaskRunner`...
Subflow 2 started!
13:24:18.190 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.277 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.314 | Subflow run 'apricot-weasel' finished in state Completed(message=None, type=COMPLETED)
13:24:18.337 | Subflow run 'spotted-coot' finished in state Completed(message=None, type=COMPLETED)
13:24:18.394 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.451 | Subflow run 'masked-jackrabbit' finished in state Completed(message=None, type=COMPLETED)
13:24:18.599 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.653 | Subflow run 'ivory-kiwi' finished in state Completed(message=None, type=COMPLETED)
13:24:18.655 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.677 | Flow run 'friendly-scorpion' finished in state Completed(message='All states completed.', type=COMPLETED)

We’ll likely add an interface for running subflows in parallel without using async in the future.

Prefect 1.0

To create multiple parallel subflow runs in Prefect 1.0, you would need to leverage mapping. Here is an example mapping over three subflows from the same project:

from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor

with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
    mapped_flow_run_ids = create_flow_run.map(
        flow_name=["flow_name_1", "flow_name_2", "flow_name_3"],
        project_name=unmapped("your_project_name"),
    )

Based on this, Executors | Prefect Docs, I think the example for Prefect 1.0 uses multi-threading?

What about Prefect 2.0? Does it use multi-threading / multi-processing, or does it just use a single thread asynchronously?

You’re spot on. By default, LocalDaskExecutor uses threads, while Orion uses a single thread asynchronously in the default ConcurrentTaskRunner.

1 Like

related issue