How can I run multiple subflows (or child flows) in parallel?

related issue

1 Like

Hi! I would like to ask if there are any new best practices on how to run subflows in parallel in Prefect 2.0. I think a good implementation would be to support .submit() of flows to a task_runner like Dask

2 Likes

Yup, there’s a GitHub issue for it somewhere and it’s definitely coming! Stay tuned, I’d expect it within a couple of weeks

1 Like

Any update on this? Do we have an interface to run subflows in parallel?

1 Like

No, not yet. You can follow up directly on the GitHub issue

This issue is added to the backlog.

The current workaround for this seems to be:

import asyncio
from prefect import flow, task, get_run_logger


@task
async def my_task(n: int):
    get_run_logger().info(f"Task {n} started!")
    await asyncio.sleep(1)


async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_task(x)

    await subflow(n)


@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])


if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
1 Like

Hi Khuyên,

My use case is slightly different from the example. I need to build a subflow from a subflow rather than a task:

import asyncio
from prefect import flow, task, get_run_logger

@flow
async def my_flow(n: int):
    sub_subflow_1(n)
    sub_subflow_2(n)
    sub_subflow_3(n)
    await asyncio.sleep(1)

async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_flow(x)

    await subflow(n)

    await asyncio.sleep(1)

@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

I expect it to work but rather it throw the same RuntimeError(“The task runner is already started!”). Where do I do wrong? How can I fix it?

Thanks

So this is just a workaround, we don’t have a solution for this yet

You can run multiple flows from deployments concurrently with no issues - this is something I’d recommend trying for now:

from prefect.deployments import run_deployment


for _ in range(1, 100):
    run_deployment(name="flow/deployment", timeout=0)
3 Likes

Adding this here for future queries:

import time

import requests
from prefect import get_run_logger, task
from prefect.deployments import run_deployment

@task
def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000):

    logger = get_run_logger()

    api_key = "XX"
    api_url = "xx"

    while True:

        flow_run = request_prefect_api(path=f"/flow_runs/{flow_run_id}", api_key=api_key, api_url=api_url)
        logger.info(f"{flow_run['name']} : {flow_run['state_type']}")

        state = flow_run["state_type"]

        if state in ["COMPLETED", "CANCELLED"]:
            break
        if state == "FAILED":
            raise Exception(f"Deployment: {flow_run['name']}, Flow run failed: {flow_run['state']['message']}")
        if flow_run["total_run_time"] > max_run_time:
            raise Exception(f"Flow run exceeded max run time of {max_run_time} seconds: marking as failed")

        time.sleep(log_polling_interval)


def request_prefect_api(path: str, api_key: str, api_url: str):
    url = f"{api_url}{path}"
    headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
    return requests.get(url, headers=headers).json()

# now we can do 
run_ids = []
for _ in range(1, 100):
    run_ids.append(run_deployment(name="flow/deployment", timeout=0).id)

[wait_for_deployment(r) for r in run_ids]
1 Like

I am following the prefect 1.0 solution and got the following errors
“Specify a flow name to run”
“Found multiple flows at ‘batch_feature/create_batch_flow.py’”
the code loops over a bunch of config files under same folder to create multiple flows.
any clue why it doesn’t work?

thanks

Most likely not registered. I’d recommend using Prefect 2 instead, v1 will be deprecated this year

New to Prefect, but does the following recipe also offer an approach to running something similar to parallel subflows?

hi @daniel-oh-sunpower !

yes! that recipe uses run_deployment to run a pre-existing deployment as a subflow of the calling flow

this is effectively the same as calling the flow object itself in a parent flow with something like asyncio.gather

the big difference is that when you use run_deployment, the subflows will run according to the infrastructure associated with that deployment, whereas when you call the subflow() object itself, it will necessarily run on the calling (parent) flow’s infrastructure as a process

Has anyone run into this issue where a subflow returns the following error?

Flow run encountered an exception. RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at (some_address)>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.

We started getting these errors after upgrading to 2.10.20 from 2.10.3.
Note: We’re using calling the subflow function instead of run_deployment

hi @jlam - is it possible for you to share your code? It’s hard to understand what might be happening without the code and/or the full trace

Hi @nate

Here is some code that we’re using (a bit stripped down), there is also a library package that we maintain that this code references, but they’re just API calls to our own internal services.

Just to note, we’re using a Kubernetes infrastructure block to spawn Jobs in our cluster.
I have only been able to replicate this issue once locally.

import asyncio
import paramiko

from prefect import flow, task
from prefect.context import FlowRunContext
from commonLibrary import jobqueue_api_call, get_host_config, update_job, send_email

###
### Child Flow
###
@task
def get_parameters(job):
    expected_checksum = job["parameters"]["expected_checksum"]
    file_to_checksum = job["parameters"]["file_to_checksum"]

    return expected_checksum, file_to_checksum

@task
def get_tool_server():
    # returns a remote server to perform CRC commands on
    host = get_host_config("CRC Server")
    return host


@task
def run_md5sum_command(host, file):
    command = f"md5sum {file}"

    client = paramiko.client.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(host, username="<username>", password="<password>")
    _, stdout, stderr = client.exec_command(command)
    err = stderr.readlines()
    output = stdout.readlines()
    client.close()
    if len(err):
        raise Exception(" ".join(err))

    return output

@task
def compare_checksums(expected, actual):
    if expected != actual:
        raise ValueError(f"Expected checksum {expected=} does not match {actual=}")


@flow
async def crc_file(job):
    try:
        expected_checksum, file = get_parameters(job)
        host = get_tool_server()
        file_checksum = run_md5sum_command(host, file)
        compare_checksums(expected_checksum, file_checksum)
        job["status"] = "SUCCESS"
    except Exception as err:
        job["status"] = "READY"
        job["retries"] -= 1

        if job["retries"] <= 0:
            send_email(err)
    finally:
        update_job(job)


###
### Parent Flow
###
@task
def checkout_jobs(id, limit):
    payload = [{"workflowId": id, "maxCountToReturn": limit}]
    jobs = jobqueue_api_call("jobs/checkout", "POST", payload)
    return jobs

@flow
async def crc_job_watcher(limit=20):
    jobs = checkout_jobs(limit)

    if jobs:
        task_runner_type = type(FlowRunContext.get().task_runner)
        futures = [
            crc_file.with_options(
                flow_run_name=job["name"], task_runner=task_runner_type()
            )(job)
            for job in jobs
        ]

        await asyncio.gather(*futures)

Here’s the full trace from the flow

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 825, in orchestrate_flow_run
    from_async.call_soon_in_waiting_thread(
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 125, in call_soon_in_waiting_thread
    waiter.submit(call)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/waiters.py", line 110, in submit
    raise RuntimeError(f"The call {self._call} is already done.")
RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fe0101b1e70>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.

Finished in state Failed('Flow run encountered an exception. RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fe0101b1e70>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.')

@anna_geller How can i get the flow id of each flow which was triggered here and also my parent flow has some sub-flows which needs to run parallely and some sequentially based on dependencies from other flows which was triggered parallely?

did u find solution for this? how is it implemented in ur project @Bal_Raj

I am wondering what I am missing. In my case my subflow has 2 tasks and the 2nd task does not seem to be running async somehow:

import asyncio
from prefect import flow, task, get_run_logger


@task
async def my_task(n: int):
    get_run_logger().info(f"Task {n} started!")
    await asyncio.sleep(1)


@task
async def my_task2(n: int):
    get_run_logger().info(f"Task {n} started!")
    await asyncio.sleep(1)


def build_subflow(n):

    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_task(x)
        await my_task2(x)

    return subflow(n)


@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])


if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

and as you can see in this graph the 2nd task is not run concurrently

Any workaround?