We do have a very similar functionality called targets. Targets provide file-based caching functionality allowing you to prevent computation if a specific file already exists. Additionally, target Result location can be templated so that you can dynamically generate filenames based on runtime-specific information such as execution date.
Below is a more detailed explanation.
What are targets used for?
For caching - this way, Prefect will check if the given (templated) target file exists in your specified Result location directory:
- if so (the file exists), the task run will immediately enter a Cached state and Prefect won’t run this task,
- if not (the file doesn’t exist yet), the task run will be normally executed and the task run output (the target file) will be stored in the specified Result location.
So targets basically combine two features together:
- Results, i.e. a way of storing task run results used mostly for restarting from failure
- Caching, i.e. a way of caching the computation state to avoid recomputation.
Many workflow authors may recognize this pattern from tools like Make or Luigi, where tasks define “targets” (usually, files on disk), and task computation is avoided in favor of using the data from the target if the target exists.
How to set a target
in a flow?
To enable this behavior for a task, provide the target location to the task’s target
kwarg along with the result
and checkpoint
kwargs necessary to enable checkpointing.
Here are a couple of examples:
from prefect import task, Flow
from prefect.engine.results import LocalResult
@task(result=LocalResult(), target="{task_name}-{today}")
def get_data():
return [1, 2, 3, 4, 5]
@task
def print_data(data):
print(data)
with Flow("using-targets") as flow:
data = get_data()
print_data(data)
Another one that explicitly specifies a directory:
from prefect.engine.results import LocalResult
from prefect import task, Task
# create a task via the task decorator
@task(target="func_task_target.txt", checkpoint=True, result=LocalResult(dir="~/.prefect"))
def func_task():
return 99
Gotchas
Here are things you need to be aware of when using targets:
- Your tasks must return something (as you can see in all examples above
) in order for Prefect to be able to persist this returned data
- By default this result that your task run returns is pickled, i.e. stored as a
.pickle
file - if you wish a different type of serialization, check out the Serializers and their API reference.
- Targets provide file-based, rather than time-based caching. This means that if the given file already exists, Prefect won’t rerun the task, and Prefect never deletes your files, which means that this “cache” never expires unless you manually delete a given file! So in order for you to invalidate this “caching” and force Prefect to rerun the task, you have to manually delete the file.
- The
target
argument on the @task
decorator refers to the file name, while the Result location is used to specify a directory for this file.
- The target location is not a Python f-string! Instead, it uses Jinja templating, so make sure to use e.g.
targets="{task_name}"
rather than targets=f"{task_name}"
.
- There are different Result classes you may use - choose the one that works well with your execution environment. For instance, local Result will not work well in ephemeral compute such as a Kubernetes job or a Docker container because the output won’t be persisted, but e.g. S3Result will work well (provided your container or Kubernetes job is properly authenticated with S3).
Can I provide a relative path?
No. The Result(dir="/Users/you/your/path")
requires an absolute path, not a relative one.
Can I define the Result globally on a Flow?
Yes. Result
can be set on the flow object and then all tasks will use that Result type. This is useful when you want to easily set all tasks in your flow to write their results to a single directory or bucket and then you could use the target as a way to verify the existence of that result prior to each task run.
Result Locations vs. Targets
If you provide a location
to a task’s Result
and a target
then the target will be used as the location of the result.
Can I provide a callable for custom formatting of the Result location?
Yes! Here’s an example using a Parameter to template a local result location.
import os
from prefect import task, Flow, Parameter
from prefect.engine.results import LocalResult
result_basepath = Parameter("result_basepath", default="~")
def format_location(result_basepath, task_name, **kwargs):
# Notice we expand the user at runtime so the user of the parameter
# does not need to worry about the path to the home directory on the
# server that the flow runs on
return f"{os.path.expanduser(result_basepath)}/{task_name}.prefect"
@task(result=LocalResult(location=format_location))
def my_task():
return [1, 2, 3, 4]
with Flow("local-result-parametrized") as flow:
my_task()
# Ensure the parameter is registered to the flow even though no task uses it
flow.add_task(result_basepath)
And here is an example that allows manipulation of the timestamp. For example, this writes the result to ~/2042-42_my_task.prefect
based on imaginary date 2042-42-42.
import os
from prefect import task, Flow,
from prefect.engine.results import LocalResult
def format_location(date, task_name, **kwargs):
return os.path.join(
os.path.expanduser("~"), f"{date.year}-{date.month}_{task_name}.prefect"
)
@task(result=LocalResult(location=format_location))
def my_task():
return [1, 2, 3, 4]
with Flow("local-result-with-date-parsing") as flow:
my_task()
Yes! For example, the following will create a name like Monday-Jun-28
:
from prefect import task, Flow
@task(task_run_name="{date:%A}-{date:%b}-{date:%d}")
def compute():
pass
with Flow("template-example") as flow:
compute()
More about templating the Result location
Targets in Prefect are templatable strings for the file name that are used to check for the existence of a task run result.
What are the benefits of using templating with targets?
If you use the target target="{task_name}-{today}"
, then only the first run of this task on any given day will run and write the result. Any other runs up until the next calendar day will use the cached result stored in the templated Result location directory and stored as the templated file name.
Use cases for targets
- You want a task to only run and write data to a given location once
- You don’t want to rerun an expensive computation if some file (the output of that computation) already exists.
How does it work under the hood?
Whenever a task is run, it will first check to see if the storage backend configured by the result has a file matching the name of the target, and if so, will enter a Cached
state with the data from the target file as the task’s return value. If it has not been cached, the output of the task will be written to the target
location and be available as a cache for future runs of this task, even between running Python processes.
References