Prefect 2.0
You can call a function that reacts to a state change, as described in this topic:
There will be more ways to configure custom failure notifications in Orion. Watch the Announcements category in the coming weeks and months
Prefect 1.0
You can leverage Automations or attach a state handler to your tasks.
State handler with a full exception traceback
To get a full exception traceback you may use the following state handler logic:
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from typing import cast
import traceback
def exception_to_string(excp):
# from https://stackoverflow.com/questions/4564559/get-exception-description-and-stack-trace-which-caused-an-exception-all-as-a-st/58764987#58764987
stack = traceback.extract_stack()[:-3] + traceback.extract_tb(
excp.__traceback__
) # add limit=??
pretty = traceback.format_list(stack)
return "".join(pretty) + "\n {} {}".format(excp.__class__, excp)
def post_to_slack_on_failure(task, old_state, new_state):
if new_state.is_failed():
if isinstance(new_state.result, Exception):
value = "```{}```".format(repr(new_state.result))
else:
value = cast(str, new_state.message)
msg = (
f"The task `{prefect.context.task_name}` failed "
f"in a flow run {prefect.context.flow_run_id} "
f"with an exception {value} \n"
f"and a full exception traceback: {exception_to_string(new_state.result)}"
)
SlackTask(message=msg).run()
return new_state
@task(state_handlers=[post_to_slack_on_failure])
def divide_numbers(a, b):
return 1 / (b - a)
with Flow(name="state-inspection-handler") as flow:
result = divide_numbers(1, 1)
if __name__ == "__main__":
flow.run()
Simple state handler
If you prefer shorter and more concise exception traceback, that’s possible, too:
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from typing import cast
def post_to_slack_on_failure(task, old_state, new_state):
if new_state.is_failed():
if isinstance(new_state.result, Exception):
value = "```{}```".format(repr(new_state.result))
else:
value = cast(str, new_state.message)
msg = (
f"The task `{prefect.context.task_name}` failed "
f"in a flow run {prefect.context.flow_run_id} "
f"with an exception {value}"
)
SlackTask(message=msg).run()
return new_state
@task(state_handlers=[post_to_slack_on_failure])
def divide_numbers(a, b):
return 1 / (b - a)
with Flow(name="state-inspection-handler") as flow:
result = divide_numbers(1, 1)
if __name__ == "__main__":
flow.run()