How to get finished futures as soon as they complete in a non-blocking way?

View in #prefect-community on Slack

@Ahmed_Ezzat: Why is the equivalent of gather in prefect 2.0 I’m aware that tasks return instance of PrefectFuture what I want is a way to get finished futures as soon as completes keep in mind I’m talking about a list of futures, not just one

Kevin_Kho @Kevin_Kho: Maybe this, but it’s probably harder to use than just doing a list comprehension

[x.wait() for x in list_of_futures]

@Ahmed_Ezzat: @Kevin_Kho using wait() is blocking I’m looking for non-blocking way
meaning that if A finished and B still running this would block execution until B is finished. what I’m trying to get is some sort of a stream if A finished I get A regardless the state of B
something like https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed

Kevin_Kho @Kevin_Kho: Ok will need to ask someone and get back to you.

@Anna_Geller: you could call your async tasks and do asyncio.gather():

import asyncio
from prefect import task, flow, get_run_logger


@task
async def print_values(values):
    logger = get_run_logger()
    for value in values:
        await asyncio.sleep(1) 
        logger.info(value)


@flow(name="async_flow")
async def async_flow():
    await print_values([1, 2])  # runs immediately
    coros = [print_values("abcd"), print_values("6789")]
    await asyncio.gather(*coros)


if __name__ == "__main__":
    asyncio.run(async_flow())

@Ahmed_Ezzat: asyncio.gather is also blocking, it awaits all futures to finish and then returns the result. to cut it short I’m trying to figure out a hacky way around .map for Orion as it’s really powerful and beneficial for us to have

@Anna_Geller: Thanks for explaining. Mapping is on the roadmap, for now you can do the same with a for loop

Michael_Adkins @Michael_Adkins: For what it’s worth, using gather as in that example is only awaiting submission concurrently, not completion.
We don’t have an as completed feature, but that’s a great feature request. I’ll write a quick blurb as a workaround

from prefect import flow, task


@task
def add(x, y):
    return x + y


def as_completed(*futures):
    futures = set(futures)
    completed = set()
    while len(completed) < len(futures):
        for future in futures.difference(completed):
            state = future.get_state()
            if state.is_completed():
                completed.add(future)
                yield state


@flow
def my_flow():
    futures = []
    # Create 100 futures
    for i in range(100):
        futures.append(add(1, i))

    for state in as_completed(*futures):
        print(state)


my_flow()
1 Like

Adding type hints to the above - shared by Malthe Karbo:

from typing import Generator, Iterable, TypeVar
from prefect.futures import PrefectFuture
from prefect.orion.schemas.states import State
from prefect.utilities.asyncio import Sync


T = TypeVar("T")


def as_completed(
    futures: Iterable[PrefectFuture[T, Sync]]
) -> Generator[State[T], None, None]:
    _futures = set(futures)
    completed = set()
    while len(completed) < len(_futures):
        for future in _futures.difference(completed):
            state = future.get_state()
            if state.is_completed():
                completed.add(future)
                yield state
1 Like