This error usually happens when the upstream failure gets propagated downstream to implicitly created tasks representing collections such as List
, Dict
or Tuple
. The example below shows how an intermediate List
task gets created when combining task inputs passed from previous tasks as data dependency.
import prefect
from prefect import task, Flow
from typing import Any, List
@task
def task_fail_a() -> str:
raise RuntimeError("Expected Error A")
@task
def task_fail_b() -> str:
raise RuntimeError("Expected Error B")
@task(trigger=prefect.triggers.all_finished)
def print_errors(potential_errors: List[Any]):
print(f"potential_errors: {potential_errors!r}")
with Flow("tmp") as flow:
a = task_fail_a()
b = task_fail_b()
print_errors(potential_errors=[a, b])
if __name__ == '__main__':
flow.visualize()
result = flow.run()
While you would normally expect that the output of the print_errors
task returns a list of two exceptions, due to the presence of the List task in between it will instead print:
potential_errors: TRIGGERFAIL('Trigger was "all_successful" but some of the upstream tasks failed.')
This happens because the trigger of the List
task gets propagated downstream to the print_errors
task.
To solve it, you can create a task in between that merges the data and passes it to the print_errors
task:
import prefect
from prefect import task, Flow
from typing import Any, List
@task
def task_fail_a() -> str:
raise RuntimeError("Expected Error A")
@task
def task_fail_b() -> str:
raise RuntimeError("Expected Error B")
@task(trigger=prefect.triggers.all_finished)
def print_errors(potential_errors: List[Any]):
print(f"potential_errors: {potential_errors!r}")
@task(trigger=prefect.triggers.all_finished)
def merge_elements_to_a_list(a, b):
return [a, b]
with Flow("tmp") as flow:
a = task_fail_a()
b = task_fail_b()
list_ = merge_elements_to_a_list(a, b)
print_errors(potential_errors=list_)
if __name__ == "__main__":
flow.visualize()
result = flow.run()