How to run conditional subflows and take action if a subflow fails or succeeds?

Here is an example flow that demonstrates that pattern:

from prefect import flow, task
from prefect.orion.schemas.states import StateType
import random


@task
def on_failure_task():
    print("Taking action on failure here 😡")


@task
def on_success_task():
    print("Taking action on success here 🚀")


@flow
def bad_subflow():
    if random.random() > 0.5:
        raise ValueError("Non-deterministic error has occured.")


@flow(log_prints=True, name="Conditional subflows")
def main():
    state = bad_subflow(return_state=True)
    if state.type == StateType.FAILED:
        print("Subflow failed! Run on_failure_task")
        on_failure_task()
    elif state.type == StateType.COMPLETED:
        print("Subflow succeeded! Run on_success_task")
        on_success_task()
    else:
        print(f"Returned unexpected state: {state.name}")


if __name__ == "__main__":
    main()

Failed run:

Completed run:

1 Like