When writing async flows/tasks, all the standard rules of async python apply.
It’s a good practice to run any blocking code in a separate thread, e.g. running
time.sleep in a separate worker thread here works well:
from prefect.utilities.asyncutils import run_sync_in_worker_thread async def async_func(): if random.random() > 0.3: run_sync_in_worker_thread(time.sleep, 1) await asyncio.sleep(random.randint(1, 5)) return 1
But doing this instead:
import asyncio import random import time from prefect import flow, task, get_run_logger async def async_func(): if random.random() > 0.3: time.sleep(1) await asyncio.sleep(random.randint(1, 5)) return 1 @task async def my_task(): result = await async_func() return result @flow() async def my_flow(): tasks =  for _ in range(500): t = my_task() tasks.append(t) await asyncio.gather(*tasks, return_exceptions=True) if __name__ == "__main__": asyncio.run(my_flow())
would result in a timeout error:
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out
When running the API in “ephemeral mode”, async API calls are made in the process from the main event loop. An async task within an async flow is called from the main event loop, which is also where those async API calls are made. In this case, we likely see timeouts trying to acquire a database connection from the connection pool because database transactions are open and the main event loop is blocked by time.sleep.
Even when running against a hosted API, blocking the main event loop of your async flow will cause issues because async client calls will be made from the main event loop.
More docs on async in Prefect 2.0: