Why results need to be configured in order to use retries?

View in #prefect-community on Slack

Matthew_Roeschke @Matthew_Roeschke: I have a task in a functional API flow that has one task that calls map . I added max_retries to this task I got this Userwarning I don’t really know how to addressed based on the link. I thought I could pass the results from a functional task to another task?

UserWarning: Task <...> has retry settings but some upstream dependencies do not have result types. 
See <https://docs.prefect.io/core/concepts/results.html> for more details.

Anna_Geller @Anna_Geller: In order for your task to be retried by Prefect, it may need some input data from upstream tasks. For that reason, you would need to have results configured on your tasks. It looks like you don’t have those and that’s why Prefect warns you that the retries may not work properly.

You could add e.g. Prefect Results to make that easy, or something simple that works across platforms such as S3. This page explains it in detail but you could configure it as simple as:

from prefect import Flow, task
from prefect.engine.results import PrefectResult

@task(result=PrefectResult())
def my_task():
    return 3
    
>>> state.result[first_result]._result.value
3
>>> state.result[first_result]._result.location
'3'

Here is explanation from the docs:
“For example, suppose task A is configured to use result A, and task B to use result B, and that A passes data downstream to B. If B fails and requests a retry, it needs to cache its inputs, one of which came from A. If you are using Cloud, Cloud will use results to persist the input cache, and since the data is from task A it will use the result configured on A.”

by default, when you use e.g. S3 storage, Prefect will use S3 also for results automatically, unless you explicitly turn off checkpointing

@task(checkpoint=False) # to disable

Matthew_Roeschke @Matthew_Roeschke: Thanks for the context! Just the explanation I needed.