Sometimes we don’t want our flow to end immediately when a task or flow would normally throw an exception. All we want is observability into what failed. By setting return_result=True
, an exception won’t be thrown until the result is retrieved.
from prefect import flow, task
@task
def good_task():
return True
@task()
def bad_task():
raise Exception("Error raised for testing")
@flow()
def first_flow():
bad_task()
good_task()
@flow
def second_flow():
good_task()
if __name__ == '__main__':
first_flow(return_state=True)
second_flow(return_state=True)
If we want observability into which tasks failed, we can use .submit()
like so:
from prefect import flow, task
@task
def good_task():
return True
@task()
def bad_task():
raise Exception("Error raised for testing")
@flow()
def first_flow():
bad_task_future = bad_task.submit()
good_task()
@flow
def second_flow():
good_task()
if __name__ == '__main__':
first_flow(return_state=True)
second_flow(return_state=True)
so that bad_task_future
is a future, rather than result, and as such the error won’t be thrown until the result is returned.
Here is a handy example when considering what you would like your tasks and flows to return:
from prefect import flow, task
@task
def add_one(x):
return x + 1
def subflow():
return 42
@flow
def flow_of_all_flows():
result = add_one(1) # int
state = add_one(1, return_state=True) # Prefect State
future = add_one.submit(1) # Future, runs through the task runner
state = add_one.submit(1, return_state=True) # State, runs through the task runner
mapped_future = add_one.map([1, 2, 3]) # Future, runs through the task runner
mapped_future = add_one.map([1, 2, 3], return_state=True) # State, runs through the task runner
result = subflow() # int
state = subflow(return_state=True) # Prefect State
Happy engineering!