When writing async flows/tasks, all the standard rules of async python apply.
It’s a good practice to run any blocking code in a separate thread, e.g. running time.sleep in a separate worker thread here works well:
from prefect.utilities.asyncutils import run_sync_in_worker_thread
async def async_func():
if random.random() > 0.3:
run_sync_in_worker_thread(time.sleep, 1)
await asyncio.sleep(random.randint(1, 5))
return 1
But doing this instead:
import asyncio
import random
import time
from prefect import flow, task, get_run_logger
async def async_func():
if random.random() > 0.3:
time.sleep(1)
await asyncio.sleep(random.randint(1, 5))
return 1
@task
async def my_task():
result = await async_func()
return result
@flow()
async def my_flow():
tasks = []
for _ in range(500):
t = my_task()
tasks.append(t)
await asyncio.gather(*tasks, return_exceptions=True)
if __name__ == "__main__":
asyncio.run(my_flow())
would result in a timeout error:
sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached, connection timed out
When running the API in “ephemeral mode”, async API calls are made in the process from the main event loop. An async task within an async flow is called from the main event loop, which is also where those async API calls are made. In this case, we likely see timeouts trying to acquire a database connection from the connection pool because database transactions are open and the main event loop is blocked by time.sleep.
Even when running against a hosted API, blocking the main event loop of your async flow will cause issues because async client calls will be made from the main event loop.
Hi @anna_geller, I was wondering how this could be implemented in async tasks like a callback, such as a websocket session? Here’s what I have. It works (sort-of), but I don’t think it is fully operational because I get a runtime warning on run_sync_in_worker_thread after a few runs.
import websockets, asyncio, ast, json
from prefect import task, flow, get_run_logger
from prefect.task_runners import ConcurrentTaskRunner
from prefect.utilities.asyncutils import run_sync_in_worker_thread
@task
async def createAccountEventStream(base_url=None, token = None):
logger = get_run_logger()
logger.info(f"Creating Account Event Stream...")
url = base_url + '/accounts/events/session'
r = requests.post(f'{url}',
data={},
params={'stream': 'true'},
headers={'Authorization': f'Bearer {token}',
'Accept': 'application/json',
'Content-length': '0'})
if r.status_code == 200:
logger.info(f"Account Event Stream created.")
return r.json()['stream']
else:
raise Exception(f"Error creating Account Event Stream: {r.status_code} - {r.text}")
async def connect_and_consume(rss=None):
logger = get_run_logger()
uri = "wss://sandbox-ws.tradier.com/v1/accounts/events"
payload = json.dumps({"events": ["order"], "sessionid": f"{rss.get('sessionid')}", "excludeAccounts": []})
async with websockets.connect(uri) as websocket:
await websocket.send(payload)
logger.info(f"Sent: {payload}")
while True:
response = await websocket.recv()
logger.info(f"< {response}")
# await asyncio.sleep(10)
return response
@flow(name = "Put Spread Algorithm ASYNC", task_runner=ConcurrentTaskRunner())
async def AsyncFlowOptionSpreadTrading(*args, **kwargs):
# create account event stream
account_stream = await createAccountEventStream(api_url, api_token)
while True:
# return response message when you get one
res = run_sync_in_worker_thread(await connect_and_consume(account_stream), 1)
print(res)
# do other tasks downstream
do_stuff()
if __name__ == "__main__":
asyncio.run(AsyncFlowOptionSpreadTrading(expiration_date="2022-12-30", debug=True))
Hi, I’m not familiar with TCP, but Prefect can set limits on prefect task concurrent runs using tags and this command from the CLI (or the analogue in the Python SDK):