Best practices for using async in Prefect 2.0

When writing async flows/tasks, all the standard rules of async python apply.

It’s a good practice to run any blocking code in a separate thread, e.g. running time.sleep in a separate worker thread here works well:

from prefect.utilities.asyncutils import run_sync_in_worker_thread

async def async_func():
    if random.random() > 0.3:
        run_sync_in_worker_thread(time.sleep, 1)
    await asyncio.sleep(random.randint(1, 5))
    return 1

But doing this instead:

import asyncio
import random
import time

from prefect import flow, task, get_run_logger


async def async_func():
    if random.random() > 0.3:
        time.sleep(1)
    await asyncio.sleep(random.randint(1, 5))
    return 1


@task
async def my_task():
    result = await async_func()
    return result


@flow()
async def my_flow():
    tasks = []
    for _ in range(500):
        t = my_task()
        tasks.append(t)

    await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == "__main__":
    asyncio.run(my_flow())

would result in a timeout error:

sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out

When running the API in “ephemeral mode”, async API calls are made in the process from the main event loop. An async task within an async flow is called from the main event loop, which is also where those async API calls are made. In this case, we likely see timeouts trying to acquire a database connection from the connection pool because database transactions are open and the main event loop is blocked by time.sleep.

Even when running against a hosted API, blocking the main event loop of your async flow will cause issues because async client calls will be made from the main event loop.

More docs on async in Prefect 2.0:

1 Like

Hi @anna_geller, I was wondering how this could be implemented in async tasks like a callback, such as a websocket session? Here’s what I have. It works (sort-of), but I don’t think it is fully operational because I get a runtime warning on run_sync_in_worker_thread after a few runs.

import websockets, asyncio, ast, json
from prefect import task, flow, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
from prefect.utilities.asyncutils import run_sync_in_worker_thread

@task
async def createAccountEventStream(base_url=None, token = None):
    logger = get_run_logger()
    logger.info(f"Creating Account Event Stream...")

    url = base_url + '/accounts/events/session'
    r = requests.post(f'{url}',
                      data={},
                      params={'stream': 'true'},
                      headers={'Authorization': f'Bearer {token}',
                               'Accept': 'application/json',
                               'Content-length': '0'})
    if r.status_code == 200:
        logger.info(f"Account Event Stream created.")
        return r.json()['stream']
    else:
        raise Exception(f"Error creating Account Event Stream: {r.status_code} - {r.text}")

async def connect_and_consume(rss=None):
    logger = get_run_logger()

    uri = "wss://sandbox-ws.tradier.com/v1/accounts/events"
    payload = json.dumps({"events": ["order"], "sessionid": f"{rss.get('sessionid')}", "excludeAccounts": []})
    async with websockets.connect(uri) as websocket:

        await websocket.send(payload)
        logger.info(f"Sent: {payload}")
        while True:
            response = await websocket.recv()
            logger.info(f"< {response}")
            # await asyncio.sleep(10)
            return response

@flow(name = "Put Spread Algorithm ASYNC", task_runner=ConcurrentTaskRunner())
async def AsyncFlowOptionSpreadTrading(*args, **kwargs):

        # create account event stream
        account_stream = await createAccountEventStream(api_url, api_token)

        while True:
            # return response message when you get one
            res = run_sync_in_worker_thread(await connect_and_consume(account_stream), 1)
            print(res)

           # do other tasks downstream
           do_stuff()


if __name__ == "__main__":
    asyncio.run(AsyncFlowOptionSpreadTrading(expiration_date="2022-12-30", debug=True))
1 Like

this won’t work when creating a deployment, all parameters must be explicitly provided

apart from that it looks OK

I’m sick and OOO, LMK if you manage to solve it, otherwise I can cross check when I’m back

hi @anna_geller what is the best way to limit concurrency task in async task?

I have tried the aiohttp way which is

conn = client.TCPConnector(limit=10)
    aio_session = client.ClientSession(connector=conn)

It does not seem to limit the run to my task to the limit at all

Hi, I’m not familiar with TCP, but Prefect can set limits on prefect task concurrent runs using tags and this command from the CLI (or the analogue in the Python SDK):

prefect concurrency-limit create 'my-tag-name-here' 10

More details with this:

prefect concurrency-limit create --help