How can I stop a flow run execution based on a condition?

Prefect 2.0

To cancel a flow run based on some custom condition, you can raise an exception within your flow.

from prefect import task, flow
from random import random

@task
def random_number():
    return random()

@flow
def short_circuit():
    while True:
        num = random_number().wait().result()
        print(num)
        if num > 0.5:
            raise ValueError("Big number!")

if __name__ == "__main__":
    short_circuit()

Prefect 1.0

You may leverage the with case() context manager to branch out your DAG so that no other tasks are executed if some condition is satisfied.

Example flow:

from random import random
from prefect import task, Flow, case

@task
def check_condition():
    return random() > 0.5

@task
def action_if_true():
    return "Continuing execution..."

@task(log_stdout=True)
def another_action(val):
    print(val)

@task(log_stdout=True)
def stop_flow_run():
    print("Stopping the flow run...")

with Flow("conditional-branches") as flow:
    cond = check_condition()

    with case(cond, True):
        val = action_if_true()
        another_action(val)

    with case(cond, False):
        val = stop_flow_run()