Hi All!
Consider the following code,
import random
from prefect import flow, task
@task
def may_fail(x):
if random.random() > 0.5:
raise RuntimeError()
return x
@task
def add_one(x):
return x + 1
@flow
def blocking():
# Run a collection of jobs where some may fail.
futures = may_fail.map(range(10))
# Filter out failures
# Note: because we call PrefectFuture.result(), we wait for all `may_fail`
# tasks to finish before starting any `add_one` tasks.
results = [
result
for f in futures
if not isinstance(result := f.result(raise_on_failure=False), Exception)
]
# Run second set of tasks based on the first, now that failures have been filtered.
return [x.result() for x in add_one.map(results)]
@flow
def nonblocking_but_fails():
# Same as the above
futures = may_fail.map(range(10))
# Attempt to directly pass the futures to the second set of tasks without
# filtering Failures.
#
# This approach causes the exception:
# `prefect.exceptions.UnfinishedRun: Run is in PENDING state, its result is not available.`
#
# Reason being -- PrefectFutures from `may_fail` that do Fail will kick off a task that
# cannot be completed and is stuck in `Pending` even if we call `PrefectFuture.wait()` or
# `PrefectFuture.result()`
return [x.result(raise_on_failure=False) for x in add_one.map(futures)]
if __name__ == "__main__":
print(blocking())
print("\n\n\n\nStarting next pipeline!")
print(nonblocking_but_fails())
For the blocking flow, we submit all may_fail
tasks, then are forced to wait for their results before filtering results and proceeding with the add_one
tasks.
However, if some may_fail
results have succeeded, it would be desirable to allow them to begin processing as input for downstream tasks rather than awaiting the other may_fail
tasks.
Is there a built-in or idiomatic way of supporting this sort of non-blocking error filtering?
Thanks in advance for your help!