(If you’d like me to open a new thread, I will do so - I wasn’t sure if I should post it here).
Hi, we are in love with Prefect for the last ~1.5 years. Keep up the amazing work.
We could not find a way to implement conditional retries in prefect 2.0.
Here is a typical scenario we have:
We execute tasks with DaskExecuter, some tasks fail with exceptions that are unrecoverable, i.e., should not be retried.
Some fail with errors that can be retried, e.g. TimeoutError and we’d like them to follow the default prefect 2.0 mechanism (@task(retries …))
I tried to use prefect.orion.orchestration.dependencies.temporary_task_policy and override CoreTaskPolicy with a custom list returned from priority(), with a ConditionalRetryFailedTasks instead of RetryFailedTasks but couldn’t get it to work.
As a side note, in prefect 1.0 we successfully used a state_handler to achieve this.
For completeness this is how we used to solve it with prefect 1.0:
# prefect 1.0 code
# here is our old state_handler
@curry
def when_retry_exhausted_return_a_managed_failure_state_handler(
tracked_obj: TrackedObjectType,
old_state: state.State,
new_state: state.State,
# ignore_states: list = None,
# only_states: list = None,
webhook_secret: str = None,
backend_info: bool = True,
) -> state.State: # pragma: no cover
"""
A state handler that acts when attempts are exhausted and returns a managed failure (quasi-state.Success) state notification.
Copied from prefect open source and extended.
For more information, see `better_slack_notifier()` method documentation.
Args:
tracked_obj: Task or Flow object the handler is
registered with
old_state: previous state of tracked object
new_state: new state of tracked object
webhook_secret: the name of the Prefect Secret that stores your slack
webhook URL; defaults to `"SLACK_WEBHOOK_URL"`
backend_info: Whether to supply slack notification with urls
pointing to backend pages; defaults to True
Returns:
the `new_state` object that was provided
"""
_logger: logging.Logger = prefect.context.get("logger")
# while discarding flows is not mandatory, it simplifies the method
if not isinstance(tracked_obj, Task):
return new_state # no notification
task = cast(Task, tracked_obj)
webhook_url = cast(
str, prefect.client.Secret(webhook_secret or "SLACK_WEBHOOK_URL").get()
)
def managed_failure_state(log_msg: str) -> state.State:
"""A helper function that returns a quasi failure state."""
new_state.context.update(managed_failure=True) # Let others know this is a quasi-Successful state
# this is our generated signals.FAIL?
assert new_state.context.get("fail_on_mapped", False), \
"While we have a `managed_failure`, `fail_on_mapped` is not True. How can this be?"
managed_failed_state = state.Success(new_state.message,
result=new_state.result, context=new_state.context)
_logger.info(log_msg)
return _format_message_and_create_slack_api_request(tracked_obj, new_state=managed_failed_state,
webhook_url=webhook_url, backend_info=backend_info)
if new_state.is_failed():
task_run_name = new_state.context.get('task_run_name',
task.task_run_name) # get task run name from context (on task, it is unformatted)
# NOTE: the `should_retry` depends on the type of the error. It shall be updated by the task that failed resulting signals.FAIL()
should_retry = new_state.context.get("should_retry", True)
if not should_retry:
return managed_failure_state(
f"The task `{task_run_name}` encountered an exception with should_retry=false and hence will result a managed failure.")
attempted = prefect.context.get("task_run_count", 1)
max_retries = task.max_retries if task.max_retries else np.inf
if attempted == max_retries:
return managed_failure_state(
f"All the task `{task_run_name}` retries were exhausted and hence will result a managed failure.")
return new_state
could you add more background about what problem are you trying to solve this way? do you want to retry on a specific exception type e.g. here retry on a Timeout Error?