Prefect 2.0
Sometimes you may want to react to a specific state change of a task run, e.g., sending a Slack message if a particular task run fails. Each task run returns a PrefectFuture
object.
- To check the current state of a task run, you can call
get_state()
on a PrefectFuture
.
- To check the final state of a task run, you need to call
wait()
to block until the underlying task run is complete,
- and then call the
get_state()
method on the PrefectFuture
.
Here is an example sending a slack notification on a failed task run:
from prefect import task, flow
from prefect.futures import PrefectFuture
import requests
def send_slack_alert_on_failure(task_future: PrefectFuture):
task_future.wait() # block until completion
if task_future.get_state().is_failed():
name_ = task_future.task_run.name
id_ = task_future.task_run.flow_run_id
requests.post(
"https://hooks.slack.com/services/XXX/XXX/XXX",
json={"text": f"The task `{name_}` failed in a flow run `{id_}`"},
)
@task
def fail_successfully(x):
return 1 / x
@flow
def main_flow(nr: int):
future_obj = fail_successfully.submit(nr)
send_slack_alert_on_failure(future_obj)
if __name__ == "__main__":
main_flow(nr=0)
Prefect 1.0
Prefect 1.0 uses the concept of state_handlers
that can be attached to tasks.
-
The same example sending a Slack message on a failed task run in Prefect 1.0.
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
def send_slack_alert_on_failure(task, old_state, new_state):
if new_state.is_failed():
name_ = prefect.context.task_name
id_ = prefect.context.flow_run_id
SlackTask(message=f"The task `{name_}` failed in a flow run `{id_}`").run()
return new_state
@task(state_handlers=[send_slack_alert_on_failure])
def fail_successfully(x):
return 1 / x
with Flow(name="fail") as flow:
result = fail_successfully(x=0)
if __name__ == "__main__":
flow.run()
I’m trying to get this sample to work but send_slack_alert_on_failure
doesn’t seem to get called. Using 2.0 and instead of posting to slack print(f"The task `{name_}` failed in a flow run `{id_}`")
What should I be checking?
1 Like
To have the above example working with Prefect 2.0, I made a change to the code as below:
from prefect.task_runners import SequentialTaskRunner
@flow(
task_runner=SequentialTaskRunner() # can also be other runner types
)
def main_flow(nr: int):
future_obj = fail_successfully.submit(nr) # now this returns a PrefectFuture object rather than float
send_slack_alert_on_failure(future_obj)
if __name__ == "__main__":
main_flow(nr=0)
Actually, it got much easier now that we have the Notifications feature - would you perhaps want to try that instead?
Hi @anna_geller , thanks for replying
With Prefech 2.0, can you provide the example for the Notifications feature so we know how to implement it in our code please?
Thanks for Anna, after reading the article, I think we need a more complex failed cases handler than just sending out notifications. It’s cool and important in the pipeline tho.
However, this is just something that comes up in this tutorial, we didn’t try to actually send out a notification following the example (in fact, we just tried to print out a message I'm called
with the logger, we want to see if the customised function can be triggered when a Future object returns failed status. So send_slack_alert_on_failure
can be anything not just for sending out notification in our pipeline.
E.g. task function read_some_files
during execution, may be failed because of many reasons such as the file no longer exists, when that happens, we want to keep the whole flow running and another specific handler will then be triggered, let’s say remember_file_that_failed
.
The above example to use with Prefect 2.0, doesn’t work unless we submit
the task future_obj = fail_successfully.submit(nr)
, so future_obj
becomes a PrefectFuture object, and the Python exception won’t prevent the next statement to be called, and send_slack_alert_on_failure
can execute what it’s expected to.
Maybe our dev environment missing something, or the example above needs to be revised.
Gotcha, you can also return state directly though:
Hi Anna
Thanks for your help. I work with @vietnguyengit so we have been discussing this. In our particular case we are processing a number of files and need to do three things when a particular task fails:
- Move a file from one s3 bucket to another
- Send an email notification
- Do not fail the flow and continue on processing the next file
The last two of these seem to be incompatible. The flow needs to fail for the email to be sent but it needs to succeed to continue with the next file. How would I resolve this?
Here is a cut down representation of what we are trying to do.
def on_state_change(task_future: PrefectFuture, raw_object):
task_future.wait() # block until completion
if task_future.get_state().is_failed():
print(f'This task failed: {task_future}')
failed_object = move_to_failed(raw_object['Key'])
else:
print(f'This task did not fail: {task_future}')
@task()
def ingest_to_parquet(raw_object):
print(f'Ingest {raw_object["Key"]} to Parquet')
@flow
def process_file(s3_object):
# Process the file
print(f'Processing {s3_object}')
result = ingest_to_parquet.submit(s3_object)
on_state_change(result, s3_object)
return Completed(message="Done") # TODO: This will prevent the email message as well because the flow is not failed
@flow
def process_files(objects):
# Process each file
for s3_object in objects:
with tags('email_on_failure'):
process_file(s3_object)
move a file
you can call some function that sends an email from the part of the code where you need it - you can do that using this collection:
Starting from 2.0.1 release, you can even use .map() syntax for that
In regards to “The last two of these seem to be incompatible. The flow needs to fail for the email to be sent but it needs to succeed to continue with the next file. How would I resolve this?”
Maybe you can wrap a try/except
?
@flow
def process_files(objects):
# Process each file
for s3_object in objects:
with tags('email_on_failure'):
try:
process_file(s3_object)
except Exception as e:
# email here the exception
pass
1 Like
Also, I think this needs to be updated with submit
@task
def fail_successfully(x):
return 1 / x
@flow
def main_flow(nr: int):
future_obj = fail_successfully.submit(nr)
send_slack_alert_on_failure(future_obj)
1 Like