Prefect 2.0
You can use the wait_for
keyword to specify that your task depends on the previous task being completed without errors. Here is an example that shows how you can use the wait_for
keyword in your flow:
import random
from prefect import task, flow
@task
def do_something_important():
bool_ = random.random() > 0.5
print(f"Is the number > 0.5? {bool_}")
if bool_:
raise ValueError("Non-deterministic error has occured.")
@task
def fail():
raise RuntimeError("Bad task")
@task
def succeed():
print("Success")
@task
def always_run():
print("Running regardless of upstream task's state")
@flow
def main_flow():
a = do_something_important()
fail(wait_for=[a])
succeed(wait_for=[a])
always_run()
if __name__ == "__main__":
main_flow()
Prefect 1.0
By default, Prefect 1.0 uses the all_successful
trigger so that a task failure propagates to downstream tasks. This results in downstream tasks ending in a TriggerFailed
state.