Currently, it is not possible to call synchronous tasks and asynchronous tasks from the same flow:
from prefect import flow, task
import asyncio, time
@task
async def async_task(n):
await asyncio.sleep(1)
return n
@task
def sync_task(res):
time.sleep(1)
print(res)
@flow
async def async_flow():
res = await asyncio.gather(*(async_task(n) for n in range(0, 3, 1)))
sync_task(res)
if __name__ == "__main__":
asyncio.run(async_flow())
This is a bug that we are working on fixing. This is the workaround for now:
from prefect import flow, task
import asyncio, time
@task
async def async_task(n):
await asyncio.sleep(1)
return n
@task
def sync_task(res):
time.sleep(1)
res.append(1)
@flow
async def async_flow():
res = await asyncio.gather(*(async_task(n) for n in range(0, 3, 1)))
return res
@flow
def sync_flow(res):
return sync_task(res)
if __name__ == "__main__":
res = asyncio.run(async_flow())
sync_flow(res)
Basically, you create one flow for asynchronous tasks and one flow for synchronous tasks