View in #prefect-community on Slack
@Adi_Gandra: Hey, i was trying to figure out is it possible for me to setup a task that if it fails - I can have it retry 3 times (i figured this part out). But, if it still fails I want to kick off another task so the rest of the flow can continue. So basically i’m trying to swap my task if it fails to another task as a backup, so my entire flow can continue on
@Anna_Geller: That’s an interesting question.
Here is an example using state handler for that use case - here, after 3 unsuccessful task runs, we’re sending a Slack message as part of a 3rd retry - you can replace it with any action you would like to take:
from datetime import timedelta
import prefect
from prefect import Flow, task
from prefect.tasks.notifications import SlackTask
def take_action_after_threee_failed_task_runs(task, old_state, new_state):
# allowing 3 retries - on the 4th task run, we we give up retrying and do something else:
if new_state.is_retrying() and new_state.run_count == 3:
# take some action
SlackTask(message="3 task runs didn't cut it - doing sth else now").run()
return new_state
@task(
max_retries=3,
retry_delay=timedelta(seconds=1),
state_handlers=[take_action_after_threee_failed_task_runs],
)
def retry_test():
logger = prefect.context.get("logger")
run_count = prefect.context.get("task_run_count")
logger.info("%s. TaskRun", run_count)
raise Exception("Failing in order to test retries...")
with Flow("retry-tester") as flow:
retry_test()
if __name__ == "__main__":
flow.run()
The output when running this:
[2022-02-27 18:27:02+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:02+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:02+0100] INFO - prefect.retry_test | 1. TaskRun
[2022-02-27 18:27:02+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:02+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Retrying'
[2022-02-27 18:27:02+0100] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-02-27 18:27:02+0100] INFO - prefect.retry-tester | Waiting for next available Task run at 2022-02-27T17:27:03.683739+00:00
[2022-02-27 18:27:03+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:03+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:03+0100] INFO - prefect.retry_test | 2. TaskRun
[2022-02-27 18:27:03+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:03+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Retrying'
[2022-02-27 18:27:03+0100] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-02-27 18:27:03+0100] INFO - prefect.retry-tester | Waiting for next available Task run at 2022-02-27T17:27:04.732207+00:00
[2022-02-27 18:27:04+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:04+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:04+0100] INFO - prefect.retry_test | 3. TaskRun
[2022-02-27 18:27:04+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:05+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Retrying'
[2022-02-27 18:27:05+0100] INFO - prefect.FlowRunner | Flow run RUNNING: terminal tasks are incomplete.
[2022-02-27 18:27:05+0100] INFO - prefect.retry-tester | Waiting for next available Task run at 2022-02-27T17:27:05.763641+00:00
[2022-02-27 18:27:05+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'retry-tester'
[2022-02-27 18:27:05+0100] INFO - prefect.TaskRunner | Task 'retry_test': Starting task run...
[2022-02-27 18:27:05+0100] INFO - prefect.retry_test | 4. TaskRun
[2022-02-27 18:27:05+0100] ERROR - prefect.TaskRunner | Task 'retry_test': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/anna/.conda/envs/prefectCloudFlows/lib/python3.8/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/Users/anna/PycharmProjects/prefectCloudFlows/000_Community/do_sth_on_third_retry.py", line 24, in retry_test
raise Exception("Failing in order to test retries...")
Exception: Failing in order to test retries...
[2022-02-27 18:27:05+0100] INFO - prefect.TaskRunner | Task 'retry_test': Finished task run for task with final state: 'Failed'
[2022-02-27 18:27:05+0100] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.