How can I run multiple subflows (or child flows) in parallel?

Prefect 2.0

Asynchronous subflows can be run in parallel by using AnyIO task groups or asyncio.gather. Here is an example:

import asyncio
from prefect import flow

@flow
async def subflow_1():
    print("Subflow 1 started!")
    await asyncio.sleep(1)

@flow
async def subflow_2():
    print("Subflow 2 started!")
    await asyncio.sleep(1)

@flow
async def subflow_3():
    print("Subflow 3 started!")
    await asyncio.sleep(1)

@flow
async def subflow_4():
    print("Subflow 4 started!")
    await asyncio.sleep(1)

@flow
async def main_flow():
    parallel_subflows = [subflow_1(), subflow_2(), subflow_3(), subflow_4()]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
The output of such flow run
13:24:16.962 | Beginning flow run 'friendly-scorpion' for flow 'main-flow'...
13:24:16.962 | Starting task runner `SequentialTaskRunner`...
13:24:17.121 | Beginning subflow run 'apricot-weasel' for flow 'subflow-1'...
13:24:17.121 | Starting task runner `SequentialTaskRunner`...
13:24:17.151 | Beginning subflow run 'spotted-coot' for flow 'subflow-4'...
13:24:17.151 | Starting task runner `SequentialTaskRunner`...
Subflow 1 started!
Subflow 4 started!
13:24:17.355 | Beginning subflow run 'masked-jackrabbit' for flow 'subflow-3'...
13:24:17.355 | Starting task runner `SequentialTaskRunner`...
Subflow 3 started!
13:24:17.560 | Beginning subflow run 'ivory-kiwi' for flow 'subflow-2'...
13:24:17.560 | Starting task runner `SequentialTaskRunner`...
Subflow 2 started!
13:24:18.190 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.277 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.314 | Subflow run 'apricot-weasel' finished in state Completed(message=None, type=COMPLETED)
13:24:18.337 | Subflow run 'spotted-coot' finished in state Completed(message=None, type=COMPLETED)
13:24:18.394 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.451 | Subflow run 'masked-jackrabbit' finished in state Completed(message=None, type=COMPLETED)
13:24:18.599 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.653 | Subflow run 'ivory-kiwi' finished in state Completed(message=None, type=COMPLETED)
13:24:18.655 | Shutting down task runner `SequentialTaskRunner`...
13:24:18.677 | Flow run 'friendly-scorpion' finished in state Completed(message='All states completed.', type=COMPLETED)

We’ll likely add an interface for running subflows in parallel without using async in the future.

Prefect 1.0

To create multiple parallel subflow runs in Prefect 1.0, you would need to leverage mapping. Here is an example mapping over three subflows from the same project:

from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor

with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
    mapped_flow_run_ids = create_flow_run.map(
        flow_name=["flow_name_1", "flow_name_2", "flow_name_3"],
        project_name=unmapped("your_project_name"),
    )
1 Like

Based on this, Executors | Prefect Docs, I think the example for Prefect 1.0 uses multi-threading?

What about Prefect 2.0? Does it use multi-threading / multi-processing, or does it just use a single thread asynchronously?

You’re spot on. By default, LocalDaskExecutor uses threads, while Orion uses a single thread asynchronously in the default ConcurrentTaskRunner.

1 Like

related issue

1 Like

Hi! I would like to ask if there are any new best practices on how to run subflows in parallel in Prefect 2.0. I think a good implementation would be to support .submit() of flows to a task_runner like Dask

2 Likes

Yup, there’s a GitHub issue for it somewhere and it’s definitely coming! Stay tuned, I’d expect it within a couple of weeks

1 Like

Any update on this? Do we have an interface to run subflows in parallel?

1 Like

No, not yet. You can follow up directly on the GitHub issue

This issue is added to the backlog.

The current workaround for this seems to be:

import asyncio
from prefect import flow, task, get_run_logger


@task
async def my_task(n: int):
    get_run_logger().info(f"Task {n} started!")
    await asyncio.sleep(1)


async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_task(x)

    await subflow(n)


@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])


if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
1 Like

Hi Khuyên,

My use case is slightly different from the example. I need to build a subflow from a subflow rather than a task:

import asyncio
from prefect import flow, task, get_run_logger

@flow
async def my_flow(n: int):
    sub_subflow_1(n)
    sub_subflow_2(n)
    sub_subflow_3(n)
    await asyncio.sleep(1)

async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_flow(x)

    await subflow(n)

    await asyncio.sleep(1)

@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

I expect it to work but rather it throw the same RuntimeError(“The task runner is already started!”). Where do I do wrong? How can I fix it?

Thanks

So this is just a workaround, we don’t have a solution for this yet

You can run multiple flows from deployments concurrently with no issues - this is something I’d recommend trying for now:

from prefect.deployments import run_deployment


for _ in range(1, 100):
    run_deployment(name="flow/deployment", timeout=0)
3 Likes

Adding this here for future queries:

import time

import requests
from prefect import get_run_logger, task
from prefect.deployments import run_deployment

@task
def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000):

    logger = get_run_logger()

    api_key = "XX"
    api_url = "xx"

    while True:

        flow_run = request_prefect_api(path=f"/flow_runs/{flow_run_id}", api_key=api_key, api_url=api_url)
        logger.info(f"{flow_run['name']} : {flow_run['state_type']}")

        state = flow_run["state_type"]

        if state in ["COMPLETED", "CANCELLED"]:
            break
        if state == "FAILED":
            raise Exception(f"Deployment: {flow_run['name']}, Flow run failed: {flow_run['state']['message']}")
        if flow_run["total_run_time"] > max_run_time:
            raise Exception(f"Flow run exceeded max run time of {max_run_time} seconds: marking as failed")

        time.sleep(log_polling_interval)


def request_prefect_api(path: str, api_key: str, api_url: str):
    url = f"{api_url}{path}"
    headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
    return requests.get(url, headers=headers).json()

# now we can do 
run_ids = []
for _ in range(1, 100):
    run_ids.append(run_deployment(name="flow/deployment", timeout=0).id)

[wait_for_deployment(r) for r in run_ids]
2 Likes

I am following the prefect 1.0 solution and got the following errors
“Specify a flow name to run”
“Found multiple flows at ‘batch_feature/create_batch_flow.py’”
the code loops over a bunch of config files under same folder to create multiple flows.
any clue why it doesn’t work?

thanks

Most likely not registered. I’d recommend using Prefect 2 instead, v1 will be deprecated this year

New to Prefect, but does the following recipe also offer an approach to running something similar to parallel subflows?

hi @daniel-oh-sunpower !

yes! that recipe uses run_deployment to run a pre-existing deployment as a subflow of the calling flow

this is effectively the same as calling the flow object itself in a parent flow with something like asyncio.gather

the big difference is that when you use run_deployment, the subflows will run according to the infrastructure associated with that deployment, whereas when you call the subflow() object itself, it will necessarily run on the calling (parent) flow’s infrastructure as a process

Has anyone run into this issue where a subflow returns the following error?

Flow run encountered an exception. RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at (some_address)>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.

We started getting these errors after upgrading to 2.10.20 from 2.10.3.
Note: We’re using calling the subflow function instead of run_deployment

hi @jlam - is it possible for you to share your code? It’s hard to understand what might be happening without the code and/or the full trace

Hi @nate

Here is some code that we’re using (a bit stripped down), there is also a library package that we maintain that this code references, but they’re just API calls to our own internal services.

Just to note, we’re using a Kubernetes infrastructure block to spawn Jobs in our cluster.
I have only been able to replicate this issue once locally.

import asyncio
import paramiko

from prefect import flow, task
from prefect.context import FlowRunContext
from commonLibrary import jobqueue_api_call, get_host_config, update_job, send_email

###
### Child Flow
###
@task
def get_parameters(job):
    expected_checksum = job["parameters"]["expected_checksum"]
    file_to_checksum = job["parameters"]["file_to_checksum"]

    return expected_checksum, file_to_checksum

@task
def get_tool_server():
    # returns a remote server to perform CRC commands on
    host = get_host_config("CRC Server")
    return host


@task
def run_md5sum_command(host, file):
    command = f"md5sum {file}"

    client = paramiko.client.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(host, username="<username>", password="<password>")
    _, stdout, stderr = client.exec_command(command)
    err = stderr.readlines()
    output = stdout.readlines()
    client.close()
    if len(err):
        raise Exception(" ".join(err))

    return output

@task
def compare_checksums(expected, actual):
    if expected != actual:
        raise ValueError(f"Expected checksum {expected=} does not match {actual=}")


@flow
async def crc_file(job):
    try:
        expected_checksum, file = get_parameters(job)
        host = get_tool_server()
        file_checksum = run_md5sum_command(host, file)
        compare_checksums(expected_checksum, file_checksum)
        job["status"] = "SUCCESS"
    except Exception as err:
        job["status"] = "READY"
        job["retries"] -= 1

        if job["retries"] <= 0:
            send_email(err)
    finally:
        update_job(job)


###
### Parent Flow
###
@task
def checkout_jobs(id, limit):
    payload = [{"workflowId": id, "maxCountToReturn": limit}]
    jobs = jobqueue_api_call("jobs/checkout", "POST", payload)
    return jobs

@flow
async def crc_job_watcher(limit=20):
    jobs = checkout_jobs(limit)

    if jobs:
        task_runner_type = type(FlowRunContext.get().task_runner)
        futures = [
            crc_file.with_options(
                flow_run_name=job["name"], task_runner=task_runner_type()
            )(job)
            for job in jobs
        ]

        await asyncio.gather(*futures)

Here’s the full trace from the flow

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 825, in orchestrate_flow_run
    from_async.call_soon_in_waiting_thread(
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 125, in call_soon_in_waiting_thread
    waiter.submit(call)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/waiters.py", line 110, in submit
    raise RuntimeError(f"The call {self._call} is already done.")
RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fe0101b1e70>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.

Finished in state Failed('Flow run encountered an exception. RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fe0101b1e70>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.')