How to take action on a state change of a task run? (task-level state handler)

Prefect 2.0

Sometimes you may want to react to a specific state change of a task run, e.g., sending a Slack message if a particular task run fails. Each task run returns a PrefectFuture object.

  • To check the current state of a task run, you can call get_state() on a PrefectFuture.
  • To check the final state of a task run, you need to call wait() to block until the underlying task run is complete,
  • and then call the get_state() method on the PrefectFuture.

Here is an example sending a slack notification on a failed task run:

from prefect import task, flow
from prefect.futures import PrefectFuture
import requests

def send_slack_alert_on_failure(task_future: PrefectFuture):
    task_future.wait()  # block until completion
    if task_future.get_state().is_failed():
        name_ = task_future.task_run.name
        id_ = task_future.task_run.flow_run_id
        requests.post(
            "https://hooks.slack.com/services/XXX/XXX/XXX",
            json={"text": f"The task `{name_}` failed in a flow run `{id_}`"},
        )

@task
def fail_successfully(x):
    return 1 / x

@flow
def main_flow(nr: int):
    future_obj = fail_successfully(nr)
    send_slack_alert_on_failure(future_obj)

if __name__ == "__main__":
    main_flow(nr=0)

Prefect 1.0

Prefect 1.0 uses the concept of state_handlers that can be attached to tasks.

  • The same example sending a Slack message on a failed task run in Prefect 1.0.

    import prefect
    from prefect import task, Flow
    from prefect.tasks.notifications import SlackTask
    
    def send_slack_alert_on_failure(task, old_state, new_state):
        if new_state.is_failed():
            name_ = prefect.context.task_name
            id_ = prefect.context.flow_run_id
            SlackTask(message=f"The task `{name_}` failed in a flow run `{id_}`").run()
        return new_state
    
    @task(state_handlers=[send_slack_alert_on_failure])
    def fail_successfully(x):
        return 1 / x
    
    with Flow(name="fail") as flow:
        result = fail_successfully(x=0)
    
    if __name__ == "__main__":
        flow.run()