I am trying to create flows where even if a task fails, the next task will run.
In prefect 1.0 there was a way to do that, but I can’t find documentation on how to do it in Prefect 2.
Looking at: Task Runners - Prefect Docs
and at How can I handle errors, allowing my flow runs to continue until complete and not raise upon failure?
it was not clear how to achieve this behavior.
from prefect import task, flow, get_run_logger
@task()
def run_first(x):
logger = get_run_logger()
logger.info(f"x: {x}")
if x > 5:
raise IOError('Failed...')
else:
return x * 5
@task()
def run_last(y):
logger = get_run_logger()
logger.info(f"y: {y}")
return y ** 2
@flow(name="Test running on failure")
def run_test_tasks():
for x in [2, 8]:
r = run_first.submit(x)
final_state = r.wait()
print(f"is_final: {final_state.is_final()}, is_failed: {final_state.is_failed()}")
y = r.result()
r2 = run_last.submit(y, wait_for=final_state.is_final())
new_y = r2.result()
print(new_y)
if __name__ == "__main__":
run_test_tasks()
Can you refer me to the proper documentation ?