How can I trigger downstream tasks based on upstream task’s state?

Prefect 2.0

Orion allows you to run an arbitrary Python code in your flow. Therefore, there is no need for any special abstraction to react to various state triggers. The example below shows how certain tasks can be skipped based on previous task’s state.

import random
from prefect import task, flow

@task
def do_something_important():
    bool_ = random.random() > 0.5
    print(f"Is the number > 0.5? {bool_}")
    if bool_:
        raise ValueError("Non-deterministic error has occured.")

@task
def fail():
    print("Failure")

@task
def succeed():
    print("Success")

@task
def always_run():
    print("Running regardless of upstream task's state")

@flow(log_prints=True)
def main_flow():
    a = do_something_important.submit()
    # equivalent of all_failed or any_failed trigger in Prefect 1.0:
    if a.wait() == "Failed":
        fail.submit()  # the task is skipped if the condition is not true
    # equivalent of all_succesful trigger in Prefect 1.0
    else:
        succeed.submit()  # the task is skipped on upstream failure

    # equivalent of always_run or all_finished trigger in Prefect 1.0:
    always_run.submit()

if __name__ == "__main__":
    main_flow()

Prefect 1.0

To accomplish the same in Prefect 1.0, you would need to leverage the triggers.

Full flow example:

import random

from prefect.triggers import all_successful, all_failed, all_finished
from prefect import task, Flow

@task
def do_something_important():
    if random.random() > 0.5:
        raise ValueError("Non-deterministic error has occured.")

@task(trigger=all_successful)
def succeed():
    print("Success")

@task(trigger=all_failed)
def fail():
    print("Failure")

@task(trigger=all_finished)
def always_run():
    print("Running regardless of upstream task's state")

with Flow("Trigger example") as flow:
    success = succeed(upstream_tasks=[do_something_important])
    failed_task = fail(upstream_tasks=[do_something_important])
    always_run(upstream_tasks=[do_something_important])

if __name__ == "__main__":
    flow.run()

I don’t think the 2.0 example is correct. The do_something_important() task is not “submitted” so it won’t return a future, and thus you can’t call a.wait().

I’m struggling to handle resource cleanup when all tasks are “submitted”. The examples show use of a plain context manager when the tasks are “called”, but I don’t want to block within the context manager. I need to acquire multiple resources and run dag of tasks for each resource, concurrently. If a part of a dag fails, the associated resource still needs to be cleaned up.

thanks for pointing that out, it was code from January 2022 using the old Prefect version where we were submitting to task runner and returning futures by default (and at that time it was correct)

I updated to code example now - ty! :pray:

I believe there is a similar issue in many examples on this page:

The examples never use .submit() but assume the return value from any task call is a future object.

1 Like

ty so much, added PR to fix it :pray: