How to use targets to cache task run results based on a presence of a specific file (i.e. how to use Makefile or Luigi type file-based caching)?

I have a flow where an image goes through 4 transformations. The image is saved after each step. There can be 25K+ pages so that is 100K tasks.

If this runs in luigi and fails after 80K tasks then I can rerun and it only has to check if the 80K files exist. In prefect it has to read all the 80K data files which obviously takes much longer than just checking the file exists.

One solution might be to create the flow dynamically in prefect starting with the last task for each image so it only builds the flow for tasks which need to be run. Does that seem the best approach? Are there any examples of doing makefile type flows in prefect?

We do have a very similar functionality called targets. Targets provide file-based caching functionality allowing you to prevent computation if a specific file already exists. Additionally, target Result location can be templated so that you can dynamically generate filenames based on runtime-specific information such as execution date.

Below is a more detailed explanation.

What are targets used for?

For caching - this way, Prefect will check if the given (templated) target file exists in your specified Result location directory:

  • if so (the file exists), the task run will immediately enter a Cached state and Prefect won’t run this task,
  • if not (the file doesn’t exist yet), the task run will be normally executed and the task run output (the target file) will be stored in the specified Result location.

So targets basically combine two features together:

  • Results, i.e. a way of storing task run results used mostly for restarting from failure
  • Caching, i.e. a way of caching the computation state to avoid recomputation.

Many workflow authors may recognize this pattern from tools like Make or Luigi, where tasks define “targets” (usually, files on disk), and task computation is avoided in favor of using the data from the target if the target exists.

How to set a target in a flow?

To enable this behavior for a task, provide the target location to the task’s target kwarg along with the result and checkpoint kwargs necessary to enable checkpointing.

Here are a couple of examples:

from prefect import task, Flow
from prefect.engine.results import LocalResult

@task(result=LocalResult(), target="{task_name}-{today}")
def get_data():
    return [1, 2, 3, 4, 5]

@task
def print_data(data):
    print(data)


with Flow("using-targets") as flow:
    data = get_data()
    print_data(data)

Another one that explicitly specifies a directory:

from prefect.engine.results import LocalResult
from prefect import task, Task


# create a task via the task decorator
@task(target="func_task_target.txt", checkpoint=True, result=LocalResult(dir="~/.prefect"))
def func_task():
    return 99

Gotchas

Here are things you need to be aware of when using targets:

  • Your tasks must return something (as you can see in all examples above :point_up_2: ) in order for Prefect to be able to persist this returned data :smile:
  • By default this result that your task run returns is pickled, i.e. stored as a .pickle file - if you wish a different type of serialization, check out the Serializers and their API reference.
  • Targets provide file-based, rather than time-based caching. This means that if the given file already exists, Prefect won’t rerun the task, and Prefect never deletes your files, which means that this “cache” never expires unless you manually delete a given file! So in order for you to invalidate this “caching” and force Prefect to rerun the task, you have to manually delete the file.
  • The target argument on the @task decorator refers to the file name, while the Result location is used to specify a directory for this file.
  • The target location is not a Python f-string! Instead, it uses Jinja templating, so make sure to use e.g. targets="{task_name}" rather than targets=f"{task_name}".
  • There are different Result classes you may use - choose the one that works well with your execution environment. For instance, local Result will not work well in ephemeral compute such as a Kubernetes job or a Docker container because the output won’t be persisted, but e.g. S3Result will work well (provided your container or Kubernetes job is properly authenticated with S3).

Can I provide a relative path?

No. The Result(dir="/Users/you/your/path") requires an absolute path, not a relative one.

Can I define the Result globally on a Flow?

Yes. Result can be set on the flow object and then all tasks will use that Result type. This is useful when you want to easily set all tasks in your flow to write their results to a single directory or bucket and then you could use the target as a way to verify the existence of that result prior to each task run.

Result Locations vs. Targets

If you provide a location to a task’s Result and a target then the target will be used as the location of the result.

Can I provide a callable for custom formatting of the Result location?

Yes! Here’s an example using a Parameter to template a local result location.

import os
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult

result_basepath = Parameter("result_basepath", default="~")

def format_location(result_basepath, task_name, **kwargs):
    # Notice we expand the user at runtime so the user of the parameter
    # does not need to worry about the path to the home directory on the
    # server that the flow runs on
    return f"{os.path.expanduser(result_basepath)}/{task_name}.prefect"

@task(result=LocalResult(location=format_location))
def my_task():
    return [1, 2, 3, 4]

with Flow("local-result-parametrized") as flow:
    my_task()

# Ensure the parameter is registered to the flow even though no task uses it
flow.add_task(result_basepath)

And here is an example that allows manipulation of the timestamp. For example, this writes the result to ~/2042-42_my_task.prefect based on imaginary date 2042-42-42.

import os
from prefect import task, Flow,
from prefect.engine.results import LocalResult


def format_location(date, task_name, **kwargs):
    return os.path.join(
        os.path.expanduser("~"), f"{date.year}-{date.month}_{task_name}.prefect"
    )

@task(result=LocalResult(location=format_location))
def my_task():
    return [1, 2, 3, 4]

with Flow("local-result-with-date-parsing") as flow:
    my_task()

Can I use the Python built-in date formatting?

Yes! For example, the following will create a name like Monday-Jun-28:

from prefect import task, Flow

@task(task_run_name="{date:%A}-{date:%b}-{date:%d}")
def compute():
    pass

with Flow("template-example") as flow:
    compute()

More about templating the Result location

Targets in Prefect are templatable strings for the file name that are used to check for the existence of a task run result.

What are the benefits of using templating with targets?

If you use the target target="{task_name}-{today}", then only the first run of this task on any given day will run and write the result. Any other runs up until the next calendar day will use the cached result stored in the templated Result location directory and stored as the templated file name.

Use cases for targets

  • You want a task to only run and write data to a given location once
  • You don’t want to rerun an expensive computation if some file (the output of that computation) already exists.

How does it work under the hood?

Whenever a task is run, it will first check to see if the storage backend configured by the result has a file matching the name of the target, and if so, will enter a Cached state with the data from the target file as the task’s return value. If it has not been cached, the output of the task will be written to the target location and be available as a cache for future runs of this task, even between running Python processes.

References

I am already using targets. However it does not work well because prefect operates top down. If there are four tasks in sequence then all four tasks will read their input data even if the task does not need to run.

Luigi operates bottom up. You run task4 and if the output file exists then the flow is complete. No files are read. Upstream tasks are only executed as required.

With 80k tasks Luigi is much faster to restart. To replicate in prefect I will need a function that takes a prefect task and builds the flow upstream only as required…not sure this is straightforward and hoped there might be an existing solution.

1 Like

Thanks for explaining, I understand what you mean now. In that case, you may need a bit different strategy such as:

  • time-based caching configured with a custom cache_validator that would mimic targets functionality without loading the data,
  • building some conditional logic that allows you to branch out the computation and skip some tasks based on a specific condition. You could even combine that with the KV Store to store some value and based on that key determine if the computation needs to take place - this wouldn’t load any results and you could likely even reuse some keys (you can think of this strategy as custom cache keys stored in KV Store based on your custom logic).

Or perhaps if Luigi works for you for that specific workflow, maybe you can keep that workflow there and only call/trigger this Luigi workflow run from Prefect? Not sure if something like this is possible (perhaps via an API call or a shell command) but maybe it’s worth exploring?

Prefect 2.0

Unfortunately, it’s unlikely that we are going to support any bottom-up computation, since in Prefect 2.0, we are moving past the DAG and are focusing on general-purpose workflow orchestration that executes workflows as any “normal” Python scripts rather than building a DAG first and executing it bottom-up in some way.

Maybe there is some way of solving your problem more elegantly without relying on targets? For instance, you could explore whether instead of using targets, you could build a custom cache key callable that accomplishes what you need, as described above (e.g. returns the file name but without relying on targets or results). This way, you could do pretty much the same what targets do but without:

  • checking file existence of some file in a directory
  • loading any data when a given task doesn’t have to be computed in the first place.

In Prefect 2.0, you could also write your own target checking function and put it at the beginning of your task (or decorate your task with it). If this function finds the target file, just return early; otherwise run the task itself.

References for building a custom cache key function

For reference, here is how you can create a custom cache key function for Prefect 2.0:
https://orion-docs.prefect.io/concepts/tasks/#caching

And here is about cache_validators in Prefect 1.0:

1 Like

Thanks for the detailed reply. Prefect2 looks great especially avoiding the need for a DAG.

It seems that the target functionality has been removed in prefect2 but it has a database that can persist status which is cleaner so will use that. Re the makefile type functionality I think best approach is to input and output big data as filenames only. This will avoid the issue with unnecessary loading of data.

I note the database can be reset but is it possible to reset individual tasks with specific inputs only to force them to rerun e.g. if I have 80K successful tasks and want to reset 50?

1 Like

You would need to build some custom logic to make that possible. There is no built feature that would work out of the box here so far. The best way to stay up-to-date on that would be to follow our Topics tagged release-notes here on Discourse.

I have been using prefect2 and found generally it results in really simple/clean flows! However some of the above areas still seem awkward to implement:

  • You mentioned targets and templates but both seem to be prefect1 only. Is there a plan to have targets/templates in prefect2?

  • Caching in prefect2 works with encoded locations. Are there plans to allow user defined or templated locations?

  • I have 10K tasks and 30 failures and want to rerun the flow only for incomplete tasks

    • using caching or cache_key_fn will still add 10K tasks as the only way it can identify a target location is using the cache key stored in the database. Ideally would like completed tasks to be completely skipped and not even appear in the task run logs.
    • using conditional logic before each task messes up the clean prefect2 code for a flow and is repetitive if required for each task. Also won’t always work as the flow runs before outputs are created.
    • using a task decorator won’t work as the output target is not known until execution time
      My solution currently is to use a bespoke Task.call to check if output exists and to use the “name” to hold a template of the target filepath so this can be determined at execution time. Works but seems awkward so I wonder if there is another way?
  • Can be better to pass file paths rather than data between tasks or to allow for either. Easy to do this for inputs using a task decorator. The output path is not known until execution; and the task returns a future so the actual result value is not known until execution completes. Is there a way to hook into the task completion?

1 Like

Thanks for your feedback!

Indirectly, we’ll have various caching functions that you’ll be able to use. It’s going to be less confusing and more user-friendly. The same applies to the second bullet point.

The best way to accomplish that would be through retries rather than caching.

I think you would need some conditional logic to accomplish that.

I agree and I honestly don’t think you should rely on that. Instead, you should persist it on your own.