I need to run the same async flow many times with different parameters. When I try to do that even with 2 runs only the first one can finish successfully, the second run crashes.
Here is a sample test to reproduce this issue.
import asyncio
from prefect import flow, task
@task
async def create_partitions(num):
print('create_partitions', num)
res = [i for i in range(num)]
await asyncio.sleep(num)
return res
@task()
async def load_data(partition, num):
print('load_data', partition, num)
await asyncio.sleep(num)
return partition
@task()
async def collect_data(results):
print('collect_data', results)
return results
@flow(name='TestFlow')
async def test_flow(num):
partitions = await create_partitions.submit(num)
raw_data = await load_data.map(partitions, num)
result = await collect_data.submit(raw_data)
return result
async def test_async():
return await asyncio.gather(test_flow(3), test_flow(2))
if __name__ == "__main__":
asyncio.run(test_async())