Prefect 2.0
Sometimes you may want to react to a specific state change of a task run, e.g., sending a Slack message if a particular task run fails. Each task run returns a PrefectFuture
object.
- To check the current state of a task run, you can call
get_state()
on aPrefectFuture
. - To check the final state of a task run, you need to call
wait()
to block until the underlying task run is complete, - and then call the
get_state()
method on thePrefectFuture
.
Here is an example sending a slack notification on a failed task run:
from prefect import task, flow
from prefect.futures import PrefectFuture
import requests
def send_slack_alert_on_failure(task_future: PrefectFuture):
task_future.wait() # block until completion
if task_future.get_state().is_failed():
name_ = task_future.task_run.name
id_ = task_future.task_run.flow_run_id
requests.post(
"https://hooks.slack.com/services/XXX/XXX/XXX",
json={"text": f"The task `{name_}` failed in a flow run `{id_}`"},
)
@task
def fail_successfully(x):
return 1 / x
@flow
def main_flow(nr: int):
future_obj = fail_successfully(nr)
send_slack_alert_on_failure(future_obj)
if __name__ == "__main__":
main_flow(nr=0)
Prefect 1.0
Prefect 1.0 uses the concept of state_handlers
that can be attached to tasks.
-
The same example sending a Slack message on a failed task run in Prefect 1.0.
import prefect from prefect import task, Flow from prefect.tasks.notifications import SlackTask def send_slack_alert_on_failure(task, old_state, new_state): if new_state.is_failed(): name_ = prefect.context.task_name id_ = prefect.context.flow_run_id SlackTask(message=f"The task `{name_}` failed in a flow run `{id_}`").run() return new_state @task(state_handlers=[send_slack_alert_on_failure]) def fail_successfully(x): return 1 / x with Flow(name="fail") as flow: result = fail_successfully(x=0) if __name__ == "__main__": flow.run()