Tasks within async subflow waiting for other subflows to be done before starting

Hello,

I’m trying to run multiple subflows at the same time, with each subflow running an extract, transform and load task (see test code below). I expected it to run each task right after each other during each subflow’s run. From the UI (see attachement), it seems that ‘subflow / 4’ does do that.

But then ‘subflow / 3’ waits for ‘subflow / 4’ to be done before starting to do the ‘transform and load’ tasks.

then ‘subflow / 2’ waits for ‘subflow / 3’ to be done, before running its transform and load

and finally ‘subflow / 1’ runs transform and load, after ‘subflow / 2’ has completed

Are my expectations misplaced? How could I improve my code to have all subflows run their respective tasks as soon as possible?

Many thanks.

import asyncio

import random
from prefect import task, flow

from prefect.runtime import flow_run


def flow_run_name():
    return f"{flow_run.parameters['flow_index']}"

@task(name='extract')
async def extract():
    await asyncio.sleep(2)
    return 'extracted'

@task(name='transform')
async def transform(extracted):
    await asyncio.sleep(2)
    return 'transformed'

@task(name='load')
async def load(transformed):
    await asyncio.sleep(2)
    return 'loaded'

@flow(name='subflow', flow_run_name=flow_run_name, log_prints=True)
async def subflow(flow_index):
    extracted = await extract()
    transformed = await transform(extracted=extracted)
    loaded = await load(transformed=transformed)
    print('flow index: ' + str(flow_index) + ' ' + extracted + ' +' + transformed + '+' + loaded)

@flow(name='main_flow')
async def main_flow():
    subflows = [subflow(1), subflow(2), subflow(3),  subflow(4),]
    await asyncio.gather(*subflows)

if __name__ == '__main__':
    asyncio.run(main_flow())

1 Like

I am also trying to understand why it is so and how to overcome it.

Maybe reporting this as a bug on github would help?