How to run async flows in parallel

I need to run the same async flow many times with different parameters. When I try to do that even with 2 runs only the first one can finish successfully, the second run crashes.

Here is a sample test to reproduce this issue.

import asyncio
from prefect import flow, task

@task
async def create_partitions(num):
    print('create_partitions', num)
    res = [i for i in range(num)]
    await asyncio.sleep(num)
    return res

@task()
async def load_data(partition, num):
    print('load_data', partition, num)
    await asyncio.sleep(num)
    return partition

@task()
async def collect_data(results):
    print('collect_data', results)
    return results

@flow(name='TestFlow')
async def test_flow(num):
    partitions = await create_partitions.submit(num)
    raw_data = await load_data.map(partitions, num)
    result = await collect_data.submit(raw_data)
    return result

async def test_async():
    return await asyncio.gather(test_flow(3), test_flow(2))

if __name__ == "__main__":
    asyncio.run(test_async())
1 Like