What's the best practice to get all the results of PrefectFutures without blocking?

import asyncio
from prefect import flow, task

@task
async def test_task(num):
    if num < 5:
        await asyncio.sleep(1)
    print(num)
    return str(num)


@flow
def test_flow():
    nums = ""
    nums += "".join([test_task.submit(num) for num in range(0, 10)])


test_flow()
# expect output
5678901234

I want to submit all numbers first, then join them into a string. Is there a prefect.gather method to turn them into actual strings?

1 Like

Have you tried using mapping instead of list comprehension? I believe this might work

1 Like

Yeah I tried map, but it didn’t like a single boolean, says length was different)

Sorry I meant in my actual code, the task had a flag.

from asyncio import gather, sleep
from prefect import flow, task

@task
async def test_task(num):
    if num < 5:
        await sleep(3)
    return str(num)

@flow
def test_flow():
    results = test_task.map(range(0, 10))
    a = " ".join(results)
    return a

await test_flow()

Map also returns PrefectFuture

Have to loop through it the final time to materialize the futures:

from asyncio import sleep
from prefect import flow, task

@task
async def test_task(num):
    if num < 5:
        await sleep(3)
    return str(num)


@flow
def test_flow():
    results = [future.result() for future in test_task.map(range(0, 10))]
    a = " ".join(results)
    return a

test_flow()

Since map maintains the order, it returns:
‘0 1 2 3 4 5 6 7 8 9’

1 Like

Thanks for posting internally to figure it out and sharing the solution here, much appreciated! :100:

1 Like