View in #prefect-community on Slack
@Ahmed_Ezzat: Why is the equivalent of gather
in prefect 2.0 I’m aware that tasks return instance of PrefectFuture
what I want is a way to get finished futures as soon as completes keep in mind I’m talking about a list of futures, not just one
@Kevin_Kho: Maybe this, but it’s probably harder to use than just doing a list comprehension
[x.wait() for x in list_of_futures]
@Ahmed_Ezzat: @Kevin_Kho using wait()
is blocking I’m looking for non-blocking way
meaning that if A finished and B still running this would block execution until B is finished. what I’m trying to get is some sort of a stream if A finished I get A regardless the state of B
something like https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.as_completed
@Kevin_Kho: Ok will need to ask someone and get back to you.
@Anna_Geller: you could call your async tasks and do asyncio.gather()
:
import asyncio
from prefect import task, flow, get_run_logger
@task
async def print_values(values):
logger = get_run_logger()
for value in values:
await asyncio.sleep(1)
logger.info(value)
@flow(name="async_flow")
async def async_flow():
await print_values([1, 2]) # runs immediately
coros = [print_values("abcd"), print_values("6789")]
await asyncio.gather(*coros)
if __name__ == "__main__":
asyncio.run(async_flow())
@Ahmed_Ezzat: asyncio.gather
is also blocking, it awaits all futures to finish and then returns the result. to cut it short I’m trying to figure out a hacky way around .map
for Orion as it’s really powerful and beneficial for us to have
@Anna_Geller: Thanks for explaining. Mapping is on the roadmap, for now you can do the same with a for loop
@Michael_Adkins: For what it’s worth, using gather as in that example is only awaiting submission concurrently, not completion.
We don’t have an as completed feature, but that’s a great feature request. I’ll write a quick blurb as a workaround
from prefect import flow, task
@task
def add(x, y):
return x + y
def as_completed(*futures):
futures = set(futures)
completed = set()
while len(completed) < len(futures):
for future in futures.difference(completed):
state = future.get_state()
if state.is_completed():
completed.add(future)
yield state
@flow
def my_flow():
futures = []
# Create 100 futures
for i in range(100):
futures.append(add(1, i))
for state in as_completed(*futures):
print(state)
my_flow()