Hello,
I’m trying to run multiple subflows at the same time, with each subflow running an extract, transform and load task (see test code below). I expected it to run each task right after each other during each subflow’s run. From the UI (see attachement), it seems that ‘subflow / 4’ does do that.
But then ‘subflow / 3’ waits for ‘subflow / 4’ to be done before starting to do the ‘transform and load’ tasks.
then ‘subflow / 2’ waits for ‘subflow / 3’ to be done, before running its transform and load
and finally ‘subflow / 1’ runs transform and load, after ‘subflow / 2’ has completed
Are my expectations misplaced? How could I improve my code to have all subflows run their respective tasks as soon as possible?
Many thanks.
import asyncio
import random
from prefect import task, flow
from prefect.runtime import flow_run
def flow_run_name():
return f"{flow_run.parameters['flow_index']}"
@task(name='extract')
async def extract():
await asyncio.sleep(2)
return 'extracted'
@task(name='transform')
async def transform(extracted):
await asyncio.sleep(2)
return 'transformed'
@task(name='load')
async def load(transformed):
await asyncio.sleep(2)
return 'loaded'
@flow(name='subflow', flow_run_name=flow_run_name, log_prints=True)
async def subflow(flow_index):
extracted = await extract()
transformed = await transform(extracted=extracted)
loaded = await load(transformed=transformed)
print('flow index: ' + str(flow_index) + ' ' + extracted + ' +' + transformed + '+' + loaded)
@flow(name='main_flow')
async def main_flow():
subflows = [subflow(1), subflow(2), subflow(3), subflow(4),]
await asyncio.gather(*subflows)
if __name__ == '__main__':
asyncio.run(main_flow())