Prefect 2.0
To cancel a flow run based on some custom condition, you can raise an exception within your flow.
from prefect import task, flow
from random import random
@task
def random_number():
return random()
@flow
def short_circuit():
while True:
num = random_number().wait().result()
print(num)
if num > 0.5:
raise ValueError("Big number!")
if __name__ == "__main__":
short_circuit()
Prefect 1.0
You may leverage the with case()
context manager to branch out your DAG so that no other tasks are executed if some condition is satisfied.
Example flow:
from random import random
from prefect import task, Flow, case
@task
def check_condition():
return random() > 0.5
@task
def action_if_true():
return "Continuing execution..."
@task(log_stdout=True)
def another_action(val):
print(val)
@task(log_stdout=True)
def stop_flow_run():
print("Stopping the flow run...")
with Flow("conditional-branches") as flow:
cond = check_condition()
with case(cond, True):
val = action_if_true()
another_action(val)
with case(cond, False):
val = stop_flow_run()