Best practices for using async in Prefect 2.0

When writing async flows/tasks, all the standard rules of async python apply.

https://docs.python.org/3/library/asyncio-task.html

It’s a good practice to run any blocking code in a separate thread, e.g. running time.sleep in a separate worker thread here works well:

from prefect.utilities.asyncutils import run_sync_in_worker_thread

async def async_func():
    if random.random() > 0.3:
        run_sync_in_worker_thread(time.sleep, 1)
    await asyncio.sleep(random.randint(1, 5))
    return 1

But doing this instead:

import asyncio
import random
import time

from prefect import flow, task, get_run_logger


async def async_func():
    if random.random() > 0.3:
        time.sleep(1)
    await asyncio.sleep(random.randint(1, 5))
    return 1


@task
async def my_task():
    result = await async_func()
    return result


@flow()
async def my_flow():
    tasks = []
    for _ in range(500):
        t = my_task()
        tasks.append(t)

    await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    asyncio.run(my_flow())

would result in a timeout error:

sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out

When running the API in “ephemeral mode”, async API calls are made in the process from the main event loop. An async task within an async flow is called from the main event loop, which is also where those async API calls are made. In this case, we likely see timeouts trying to acquire a database connection from the connection pool because database transactions are open and the main event loop is blocked by time.sleep.

Even when running against a hosted API, blocking the main event loop of your async flow will cause issues because async client calls will be made from the main event loop.

More docs on async in Prefect 2.0: