How to run conditional subflows and take action if a subflow fails or succeeds?

Here is an example flow that demonstrates that pattern:

from prefect import flow, task
from prefect.orion.schemas.states import StateType
import random

def on_failure_task():
    print("Taking action on failure here 😡")

def on_success_task():
    print("Taking action on success here 🚀")

def bad_subflow():
    if random.random() > 0.5:
        raise ValueError("Non-deterministic error has occured.")

@flow(log_prints=True, name="Conditional subflows")
def main():
    state = bad_subflow(return_state=True)
    if state.type == StateType.FAILED:
        print("Subflow failed! Run on_failure_task")
    elif state.type == StateType.COMPLETED:
        print("Subflow succeeded! Run on_success_task")
        print(f"Returned unexpected state: {}")

if __name__ == "__main__":

Failed run:

Completed run:

1 Like