How can I specify the retry behavior for a specific task?

Prefect 2.0

You can set the maximum number of retries as an integer value on the @task decorator. The same applies to the retry delay:

@task(retries=2, retry_delay_seconds=60)

Prefect 1.0

Prefect 1.0 uses a slightly different syntax. The retry_delay expects a datetime.timedelta() object.

@task(max_retries=2, retry_delay=datetime.timedelta(minutes=1))

(If you’d like me to open a new thread, I will do so - I wasn’t sure if I should post it here).

Hi, we are in love with Prefect for the last ~1.5 years. Keep up the amazing work.

We could not find a way to implement conditional retries in prefect 2.0.
Here is a typical scenario we have:

  • We execute tasks with DaskExecuter, some tasks fail with exceptions that are unrecoverable, i.e., should not be retried.
  • Some fail with errors that can be retried, e.g. TimeoutError and we’d like them to follow the default prefect 2.0 mechanism (@task(retries …))

I tried to use prefect.orion.orchestration.dependencies.temporary_task_policy and override CoreTaskPolicy with a custom list returned from priority(), with a ConditionalRetryFailedTasks instead of RetryFailedTasks but couldn’t get it to work.

As a side note, in prefect 1.0 we successfully used a state_handler to achieve this.

Any other ideas,
Thanks,
Shay

instead of using a state handler, you can operate on a state directly within a flow using if/else

and the retries argument still exists on a task decorator

if this doesn’t help, can you share your state handler?

If I understand correctly, I’ll have to rethrow the exception if retries>1 which makes my code a bit verbose.
Here is an example:

# prefect 2.0

def should_exception_be_retried(exception: Exception) -> bool:
    return isinstance(exception, TimeoutError)


@task()
def conditional_failure(x):
    retries = 3
    for attempt in range(1, retries + 1):
        # TODO: we should replace print with the prefect logger
        print("ATTEMPT", attempt)
        try:
            # choice = random.choice([0, 1, 2])
            choice = random.choice([0]*1 + [1]*2 + [2]*8)
            print("DENOMINATOR", choice)
            if choice == 2:
                raise TimeoutError("A timeout error")
            return x / choice
        except Exception as e:
            if attempt < retries and should_exception_be_retried(e):
                # here we will log a warning
                continue
            raise
1 Like

For completeness this is how we used to solve it with prefect 1.0:

# prefect 1.0 code
# here is our old state_handler

@curry
def when_retry_exhausted_return_a_managed_failure_state_handler(
        tracked_obj: TrackedObjectType,
        old_state: state.State,
        new_state: state.State,
        # ignore_states: list = None,
        # only_states: list = None,
        webhook_secret: str = None,
        backend_info: bool = True,
) -> state.State:  # pragma: no cover
    """
    A state handler that acts when attempts are exhausted and returns a managed failure (quasi-state.Success) state notification.

    Copied from prefect open source and extended.
    For more information, see `better_slack_notifier()` method documentation.

    Args:
        tracked_obj: Task or Flow object the handler is
            registered with
        old_state: previous state of tracked object
        new_state: new state of tracked object
        webhook_secret: the name of the Prefect Secret that stores your slack
            webhook URL; defaults to `"SLACK_WEBHOOK_URL"`
        backend_info: Whether to supply slack notification with urls
            pointing to backend pages; defaults to True

    Returns:
        the `new_state` object that was provided
    """

    _logger: logging.Logger = prefect.context.get("logger")

    # while discarding flows is not mandatory, it simplifies the method
    if not isinstance(tracked_obj, Task):
        return new_state  # no notification

    task = cast(Task, tracked_obj)

    webhook_url = cast(
        str, prefect.client.Secret(webhook_secret or "SLACK_WEBHOOK_URL").get()
    )

    def managed_failure_state(log_msg: str) -> state.State:
        """A helper function that returns a quasi failure state."""
        new_state.context.update(managed_failure=True)  # Let others know this is a quasi-Successful state

        # this is our generated signals.FAIL?
        assert new_state.context.get("fail_on_mapped", False), \
            "While we have a `managed_failure`, `fail_on_mapped` is not True. How can this be?"

        managed_failed_state = state.Success(new_state.message,
                                             result=new_state.result, context=new_state.context)
        _logger.info(log_msg)
        return _format_message_and_create_slack_api_request(tracked_obj, new_state=managed_failed_state,
                                                            webhook_url=webhook_url, backend_info=backend_info)

    if new_state.is_failed():
        task_run_name = new_state.context.get('task_run_name',
                                              task.task_run_name)  # get task run name from context (on task, it is unformatted)
        # NOTE: the `should_retry` depends on the type of the error. It shall be updated by the task that failed resulting signals.FAIL()
        should_retry = new_state.context.get("should_retry", True)
        if not should_retry:
            return managed_failure_state(
                f"The task `{task_run_name}` encountered an exception with should_retry=false and hence will result a managed failure.")

        attempted = prefect.context.get("task_run_count", 1)
        max_retries = task.max_retries if task.max_retries else np.inf
        if attempted == max_retries:
            return managed_failure_state(
                f"All the task `{task_run_name}` retries were exhausted and hence will result a managed failure.")

    return new_state

thanks for sharing the code!

could you add more background about what problem are you trying to solve this way? do you want to retry on a specific exception type e.g. here retry on a Timeout Error?

we have an open issue for that here:

It looks like a similar request to the one I have. I’ll try it :slightly_smiling_face:
Thanks for the answers

1 Like