How can I run multiple subflows (or child flows) in parallel?

Hi! I would like to ask if there are any new best practices on how to run subflows in parallel in Prefect 2.0. I think a good implementation would be to support .submit() of flows to a task_runner like Dask

2 Likes

Yup, there’s a GitHub issue for it somewhere and it’s definitely coming! Stay tuned, I’d expect it within a couple of weeks

1 Like

Any update on this? Do we have an interface to run subflows in parallel?

1 Like

No, not yet. You can follow up directly on the GitHub issue

This issue is added to the backlog.

The current workaround for this seems to be:

import asyncio
from prefect import flow, task, get_run_logger


@task
async def my_task(n: int):
    get_run_logger().info(f"Task {n} started!")
    await asyncio.sleep(1)


async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_task(x)

    await subflow(n)


@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])


if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
1 Like

Hi Khuyên,

My use case is slightly different from the example. I need to build a subflow from a subflow rather than a task:

import asyncio
from prefect import flow, task, get_run_logger

@flow
async def my_flow(n: int):
    sub_subflow_1(n)
    sub_subflow_2(n)
    sub_subflow_3(n)
    await asyncio.sleep(1)

async def build_subflow(n):
    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_flow(x)

    await subflow(n)

    await asyncio.sleep(1)

@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

I expect it to work but rather it throw the same RuntimeError(“The task runner is already started!”). Where do I do wrong? How can I fix it?

Thanks

So this is just a workaround, we don’t have a solution for this yet

You can run multiple flows from deployments concurrently with no issues - this is something I’d recommend trying for now:

from prefect.deployments import run_deployment


for _ in range(1, 100):
    run_deployment(name="flow/deployment", timeout=0)
3 Likes

Adding this here for future queries:

import time

import requests
from prefect import get_run_logger, task
from prefect.deployments import run_deployment

@task
def wait_for_deployment(flow_run_id: str, log_polling_interval: int = 25, max_run_time: int = 1000):

    logger = get_run_logger()

    api_key = "XX"
    api_url = "xx"

    while True:

        flow_run = request_prefect_api(path=f"/flow_runs/{flow_run_id}", api_key=api_key, api_url=api_url)
        logger.info(f"{flow_run['name']} : {flow_run['state_type']}")

        state = flow_run["state_type"]

        if state in ["COMPLETED", "CANCELLED"]:
            break
        if state == "FAILED":
            raise Exception(f"Deployment: {flow_run['name']}, Flow run failed: {flow_run['state']['message']}")
        if flow_run["total_run_time"] > max_run_time:
            raise Exception(f"Flow run exceeded max run time of {max_run_time} seconds: marking as failed")

        time.sleep(log_polling_interval)


def request_prefect_api(path: str, api_key: str, api_url: str):
    url = f"{api_url}{path}"
    headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
    return requests.get(url, headers=headers).json()

# now we can do 
run_ids = []
for _ in range(1, 100):
    run_ids.append(run_deployment(name="flow/deployment", timeout=0).id)

[wait_for_deployment(r) for r in run_ids]
1 Like

I am following the prefect 1.0 solution and got the following errors
“Specify a flow name to run”
“Found multiple flows at ‘batch_feature/create_batch_flow.py’”
the code loops over a bunch of config files under same folder to create multiple flows.
any clue why it doesn’t work?

thanks

Most likely not registered. I’d recommend using Prefect 2 instead, v1 will be deprecated this year

New to Prefect, but does the following recipe also offer an approach to running something similar to parallel subflows?

hi @daniel-oh-sunpower !

yes! that recipe uses run_deployment to run a pre-existing deployment as a subflow of the calling flow

this is effectively the same as calling the flow object itself in a parent flow with something like asyncio.gather

the big difference is that when you use run_deployment, the subflows will run according to the infrastructure associated with that deployment, whereas when you call the subflow() object itself, it will necessarily run on the calling (parent) flow’s infrastructure as a process

Has anyone run into this issue where a subflow returns the following error?

Flow run encountered an exception. RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at (some_address)>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.

We started getting these errors after upgrading to 2.10.20 from 2.10.3.
Note: We’re using calling the subflow function instead of run_deployment

hi @jlam - is it possible for you to share your code? It’s hard to understand what might be happening without the code and/or the full trace

Hi @nate

Here is some code that we’re using (a bit stripped down), there is also a library package that we maintain that this code references, but they’re just API calls to our own internal services.

Just to note, we’re using a Kubernetes infrastructure block to spawn Jobs in our cluster.
I have only been able to replicate this issue once locally.

import asyncio
import paramiko

from prefect import flow, task
from prefect.context import FlowRunContext
from commonLibrary import jobqueue_api_call, get_host_config, update_job, send_email

###
### Child Flow
###
@task
def get_parameters(job):
    expected_checksum = job["parameters"]["expected_checksum"]
    file_to_checksum = job["parameters"]["file_to_checksum"]

    return expected_checksum, file_to_checksum

@task
def get_tool_server():
    # returns a remote server to perform CRC commands on
    host = get_host_config("CRC Server")
    return host


@task
def run_md5sum_command(host, file):
    command = f"md5sum {file}"

    client = paramiko.client.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(host, username="<username>", password="<password>")
    _, stdout, stderr = client.exec_command(command)
    err = stderr.readlines()
    output = stdout.readlines()
    client.close()
    if len(err):
        raise Exception(" ".join(err))

    return output

@task
def compare_checksums(expected, actual):
    if expected != actual:
        raise ValueError(f"Expected checksum {expected=} does not match {actual=}")


@flow
async def crc_file(job):
    try:
        expected_checksum, file = get_parameters(job)
        host = get_tool_server()
        file_checksum = run_md5sum_command(host, file)
        compare_checksums(expected_checksum, file_checksum)
        job["status"] = "SUCCESS"
    except Exception as err:
        job["status"] = "READY"
        job["retries"] -= 1

        if job["retries"] <= 0:
            send_email(err)
    finally:
        update_job(job)


###
### Parent Flow
###
@task
def checkout_jobs(id, limit):
    payload = [{"workflowId": id, "maxCountToReturn": limit}]
    jobs = jobqueue_api_call("jobs/checkout", "POST", payload)
    return jobs

@flow
async def crc_job_watcher(limit=20):
    jobs = checkout_jobs(limit)

    if jobs:
        task_runner_type = type(FlowRunContext.get().task_runner)
        futures = [
            crc_file.with_options(
                flow_run_name=job["name"], task_runner=task_runner_type()
            )(job)
            for job in jobs
        ]

        await asyncio.gather(*futures)

Here’s the full trace from the flow

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 825, in orchestrate_flow_run
    from_async.call_soon_in_waiting_thread(
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 125, in call_soon_in_waiting_thread
    waiter.submit(call)
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/concurrency/waiters.py", line 110, in submit
    raise RuntimeError(f"The call {self._call} is already done.")
RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fe0101b1e70>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.

Finished in state Failed('Flow run encountered an exception. RuntimeError: The call get_task_call_return_value(task=<prefect.tasks.Task object at 0x7fe0101b1e70>, flow_run_context=FlowRunContext(start_time=DateT...) is already done.')

@anna_geller How can i get the flow id of each flow which was triggered here and also my parent flow has some sub-flows which needs to run parallely and some sequentially based on dependencies from other flows which was triggered parallely?

did u find solution for this? how is it implemented in ur project @Bal_Raj

I am wondering what I am missing. In my case my subflow has 2 tasks and the 2nd task does not seem to be running async somehow:

import asyncio
from prefect import flow, task, get_run_logger


@task
async def my_task(n: int):
    get_run_logger().info(f"Task {n} started!")
    await asyncio.sleep(1)


@task
async def my_task2(n: int):
    get_run_logger().info(f"Task {n} started!")
    await asyncio.sleep(1)


def build_subflow(n):

    @flow(name=f"subflow:{n}")
    async def subflow(x):
        await my_task(x)
        await my_task2(x)

    return subflow(n)


@flow
async def main_flow():
    await asyncio.gather(*[build_subflow(n + 1) for n in range(4)])


if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

and as you can see in this graph the 2nd task is not run concurrently

Any workaround?

I think there is new documentation that helps will parallel child flows (or maybe I’m thinking of an open issue), but I went this way using the async run_deployment function. The difference from what you are asking is that the ‘subflow’ in your example needs to be a deployed flow to take advantage of the run_deployment function.

So in this example I have a flow backfill that can be deployed or run adhoc, this flow runs many parallel flows of a DLL job backfilling partitions of a large table.

This example uses the newer asyncio.TaskGroup paradigm. It calls a deployed flow called ddl/sqlfile which just sends SQL from a file to my database engine. The details of that are not important here. In this case I call the same deployment many times with various parameters, but you could adjust this pattern to call a list of many different deployments by name.

I hope this helps!

import asyncio
import pendulum
from prefect import flow, get_run_logger
from prefect.deployments import run_deployment
from src.utility import dataset_name_from_file
from src.utility import check_bulk_states

@flow(retries=1)
async def backfill(
        ddl_filepath
        , begin_dataset_date
        , end_dataset_date
        , sql_params={}
        , flow_timeout_seconds=14400
        ):
    """
    Run N instances of ddl job, once for each date in [begin_dataset_date, end_dataset_date].
    Args:
    ddl_filepath: str -- path from project root to parametrized ddl sql file.
    begin_dataset_date: str -- start date of interval to backfill
    end_dataset_date: str -- end date of interval to backfill
    sql_params: dict -- all the parameters passed to the SQL string, including dataset_date.
    vpn_scan_minutes: Int -- minutes to wait polling on a VPN connection (this is for local laptop usage.)
    """
    
    logger = get_run_logger()
    dataset_name = dataset_name_from_file(ddl_filepath)
    logger.info(f"Backfill job file: {ddl_filepath}")
    logger.info(f"dataset_name: {dataset_name}")
    logger.info(f"begin_dataset_date: {begin_dataset_date}")
    logger.info(f"end_dataset_date: {end_dataset_date}")

    start = pendulum.parse(begin_dataset_date)
    end = pendulum.parse(end_dataset_date)

    if not start <= end:
        raise ValueError("End date must be equal to or after begin date.")
    
    period = pendulum.period(start,end)
    interval_date_list = [ x.format('YYYY-MM-DD') for x in period ]

    # this loop overwrites and increments the dataset_date parameter
    #   all other paramaters remain constant
    flow_runs = []
    async with asyncio.TaskGroup() as tg:
        for day_string in interval_date_list:
            day_param = {"dataset_date": day_string}
            interval_sql_params = sql_params | day_param
            logger.info(f"passing parameters to next taskgroup task: {interval_sql_params}")

            flow = tg.create_task(
                run_deployment(
                    name="ddl/sqlfile"
                    , tags=["ddl", "backfill", "limit", dataset_name, day_param["dataset_date"]]
                    , parameters={
                        "ddl_filepath": ddl_filepath
                        , "sql_params": interval_sql_params
                        , "vpn_scan_minutes": 20
                        }
                    , timeout=flow_timeout_seconds
                    )
                )
            flow_runs.append(flow)

    final_states = [x.result().state.type._value_  for x in flow_runs]
    logger.info(f"Final states: {final_states}")

    return check_bulk_states(final_states)

# src/utility.py
@task
def check_bulk_states(states_list):
    """
    TaskGroup flows will succeed regardless of end state for subflow async deployment runs. This function returns a Failed state to be passed back from the flow.
    """
    logger = get_run_logger()

    if not all(x == "COMPLETED" for x in states_list):
        logger.error(f"Check final states: {states_list}")
        return Failed(message="State check FAILED: one or more jobs failed.")
    else:
        logger.info(f"Check final states: {states_list}")
        logger.info("SUCCESS: all jobs complete.")
        return states_list

# running the backfill() flow adhoc:
asyncio.run(backfill(...kwarg parameters here...)

Lastly I’ll add that this works great, I routinely call this and backfill hundreds of days, spawning hundreds of parallel flows.
Its important to set your max concurrency at the work-pool or task (with tag) level, because its likely your database has a limit to concurrent jobs.

Also, there is some extra stuff relating to the backfill() kwargs (flow parameters) which aren’t pertinent but I didn’t take the time to remove for clarity.