How can I trigger downstream tasks based on upstream task’s state?

Prefect 2.0

Orion allows you to run an arbitrary Python code in your flow. Therefore, there is no need for any special abstraction to react to various state triggers. The example below shows how certain tasks can be skipped based on previous task’s state.

import random
from prefect import task, flow

@task
def do_something_important():
    bool_ = random.random() > 0.5
    print(f"Is the number > 0.5? {bool_}")
    if bool_:
        raise ValueError("Non-deterministic error has occured.")

@task
def fail():
    print("Failure")

@task
def succeed():
    print("Success")

@task
def always_run():
    print("Running regardless of upstream task's state")

@flow
def main_flow():
    a = do_something_important()
    # equivalent of all_failed or any_failed trigger in Prefect 1.0:
    if isinstance(a.wait().result(raise_on_failure=False), Exception):
        fail()  # the task is skipped if condition is not true
    # equivalent of all_succesful trigger in Prefect 1.0
    else:
        succeed()  # the task is skipped on upstream failure

    # equivalent of always_run or all_finished trigger in Prefect 1.0:
    always_run()

if __name__ == "__main__":
    main_flow()

Prefect 1.0

To accomplish the same in Prefect 1.0, you would need to leverage the triggers.

Full flow example:

import random

from prefect.triggers import all_successful, all_failed, all_finished
from prefect import task, Flow

@task
def do_something_important():
    if random.random() > 0.5:
        raise ValueError("Non-deterministic error has occured.")

@task(trigger=all_successful)
def succeed():
    print("Success")

@task(trigger=all_failed)
def fail():
    print("Failure")

@task(trigger=all_finished)
def always_run():
    print("Running regardless of upstream task's state")

with Flow("Trigger example") as flow:
    success = succeed(upstream_tasks=[do_something_important])
    failed_task = fail(upstream_tasks=[do_something_important])
    always_run(upstream_tasks=[do_something_important])

if __name__ == "__main__":
    flow.run()