How to take action on a state change of a task run? (task-level state handler)

Prefect 2.0

Sometimes you may want to react to a specific state change of a task run, e.g., sending a Slack message if a particular task run fails. Each task run returns a PrefectFuture object.

  • To check the current state of a task run, you can call get_state() on a PrefectFuture.
  • To check the final state of a task run, you need to call wait() to block until the underlying task run is complete,
  • and then call the get_state() method on the PrefectFuture.

Here is an example sending a slack notification on a failed task run:

from prefect import task, flow
from prefect.futures import PrefectFuture
import requests

def send_slack_alert_on_failure(task_future: PrefectFuture):
    task_future.wait()  # block until completion
    if task_future.get_state().is_failed():
        name_ = task_future.task_run.name
        id_ = task_future.task_run.flow_run_id
        requests.post(
            "https://hooks.slack.com/services/XXX/XXX/XXX",
            json={"text": f"The task `{name_}` failed in a flow run `{id_}`"},
        )

@task
def fail_successfully(x):
    return 1 / x

@flow
def main_flow(nr: int):
    future_obj = fail_successfully.submit(nr)
    send_slack_alert_on_failure(future_obj)

if __name__ == "__main__":
    main_flow(nr=0)

Prefect 1.0

Prefect 1.0 uses the concept of state_handlers that can be attached to tasks.

  • The same example sending a Slack message on a failed task run in Prefect 1.0.

    import prefect
    from prefect import task, Flow
    from prefect.tasks.notifications import SlackTask
    
    def send_slack_alert_on_failure(task, old_state, new_state):
        if new_state.is_failed():
            name_ = prefect.context.task_name
            id_ = prefect.context.flow_run_id
            SlackTask(message=f"The task `{name_}` failed in a flow run `{id_}`").run()
        return new_state
    
    @task(state_handlers=[send_slack_alert_on_failure])
    def fail_successfully(x):
        return 1 / x
    
    with Flow(name="fail") as flow:
        result = fail_successfully(x=0)
    
    if __name__ == "__main__":
        flow.run()
    

I’m trying to get this sample to work but send_slack_alert_on_failure doesn’t seem to get called. Using 2.0 and instead of posting to slack print(f"The task `{name_}` failed in a flow run `{id_}`") What should I be checking?

1 Like

To have the above example working with Prefect 2.0, I made a change to the code as below:

from prefect.task_runners import  SequentialTaskRunner

@flow(
    task_runner=SequentialTaskRunner() # can also be other runner types
)
def main_flow(nr: int):
    future_obj = fail_successfully.submit(nr) # now this returns a PrefectFuture object rather than float
    send_slack_alert_on_failure(future_obj)

if __name__ == "__main__":
    main_flow(nr=0)

Actually, it got much easier now that we have the Notifications feature - would you perhaps want to try that instead?

Hi @anna_geller , thanks for replying

With Prefech 2.0, can you provide the example for the Notifications feature so we know how to implement it in our code please?

Yes, check out this page

Thanks for Anna, after reading the article, I think we need a more complex failed cases handler than just sending out notifications. It’s cool and important in the pipeline tho.

However, this is just something that comes up in this tutorial, we didn’t try to actually send out a notification following the example (in fact, we just tried to print out a message I'm called with the logger, we want to see if the customised function can be triggered when a Future object returns failed status. So send_slack_alert_on_failure can be anything not just for sending out notification in our pipeline.

E.g. task function read_some_files during execution, may be failed because of many reasons such as the file no longer exists, when that happens, we want to keep the whole flow running and another specific handler will then be triggered, let’s say remember_file_that_failed.

The above example to use with Prefect 2.0, doesn’t work unless we submit the task future_obj = fail_successfully.submit(nr) , so future_obj becomes a PrefectFuture object, and the Python exception won’t prevent the next statement to be called, and send_slack_alert_on_failure can execute what it’s expected to.

Maybe our dev environment missing something, or the example above needs to be revised.

Gotcha, you can also return state directly though:

Hi Anna

Thanks for your help. I work with @vietnguyengit so we have been discussing this. In our particular case we are processing a number of files and need to do three things when a particular task fails:

  • Move a file from one s3 bucket to another
  • Send an email notification
  • Do not fail the flow and continue on processing the next file

The last two of these seem to be incompatible. The flow needs to fail for the email to be sent but it needs to succeed to continue with the next file. How would I resolve this?

Here is a cut down representation of what we are trying to do.

def on_state_change(task_future: PrefectFuture, raw_object):
    task_future.wait()  # block until completion
    if task_future.get_state().is_failed():
        print(f'This task failed: {task_future}')
        failed_object = move_to_failed(raw_object['Key'])
    else:
        print(f'This task did not fail: {task_future}')


@task()
def ingest_to_parquet(raw_object):
    print(f'Ingest {raw_object["Key"]} to Parquet')

@flow
def process_file(s3_object):
    # Process the file
    print(f'Processing {s3_object}')
    result = ingest_to_parquet.submit(s3_object)
    on_state_change(result, s3_object)
    return Completed(message="Done")  # TODO: This will prevent the email message as well because the flow is not failed


@flow
def process_files(objects):
    # Process each file
    for s3_object in objects:
        with tags('email_on_failure'):
            process_file(s3_object)

move a file

you can call some function that sends an email from the part of the code where you need it - you can do that using this collection:

Starting from 2.0.1 release, you can even use .map() syntax for that

In regards to “The last two of these seem to be incompatible. The flow needs to fail for the email to be sent but it needs to succeed to continue with the next file. How would I resolve this?”

Maybe you can wrap a try/except?

@flow
def process_files(objects):
    # Process each file
    for s3_object in objects:
        with tags('email_on_failure'):
            try:
                process_file(s3_object)
            except Exception as e:
                # email here the exception
                pass
1 Like

Also, I think this needs to be updated with submit


@task
def fail_successfully(x):
    return 1 / x

@flow
def main_flow(nr: int):
    future_obj = fail_successfully.submit(nr)
    send_slack_alert_on_failure(future_obj)
1 Like