Consider the following code,
import random from prefect import flow, task @task def may_fail(x): if random.random() > 0.5: raise RuntimeError() return x @task def add_one(x): return x + 1 @flow def blocking(): # Run a collection of jobs where some may fail. futures = may_fail.map(range(10)) # Filter out failures # Note: because we call PrefectFuture.result(), we wait for all `may_fail` # tasks to finish before starting any `add_one` tasks. results = [ result for f in futures if not isinstance(result := f.result(raise_on_failure=False), Exception) ] # Run second set of tasks based on the first, now that failures have been filtered. return [x.result() for x in add_one.map(results)] @flow def nonblocking_but_fails(): # Same as the above futures = may_fail.map(range(10)) # Attempt to directly pass the futures to the second set of tasks without # filtering Failures. # # This approach causes the exception: # `prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.` # # Reason being -- PrefectFutures from `may_fail` that do Fail will kick off a task that # cannot be completed and is stuck in `Pending` even if we call `PrefectFuture.wait()` or # `PrefectFuture.result()` return [x.result(raise_on_failure=False) for x in add_one.map(futures)] if __name__ == "__main__": print(blocking()) print("\n\n\n\nStarting next pipeline!") print(nonblocking_but_fails())
For the blocking flow, we submit all
may_fail tasks, then are forced to wait for their results before filtering results and proceeding with the
However, if some
may_fail results have succeeded, it would be desirable to allow them to begin processing as input for downstream tasks rather than awaiting the other
Is there a built-in or idiomatic way of supporting this sort of non-blocking error filtering?
Thanks in advance for your help!