Does Prefect support caching on a flow level rather than only on a task level? How to avoid task run recomputation of several tasks in a flow?

Hi there,

We have been using Prefect (0.14.x) for data intensive processing and ML training jobs. And we are using the checkpoint feature to persist and cache critical/expensive tasks, so they don’t have to be recomputed every time.

But we have encountered some issues in practice.

  • Prefect flow execution always evaluates from beginning to the end of a graph and the checkpointing is on each individual task scope. This means even the final task result is cached, the executor still goes back to run every upstream task and then it realises the final output is actually cached.
  • And even we try to checkpoint every task in the flow/graph, it can still take very long to evaluate the final output of the flow. Because every cache checking requires to load all the input of the task (even some inputs are not required by the cache target function), and those input can be pretty large data (e.g Dataframes). This leads to considerable waste of time/benefit that provided by the checkpoints/cache.

Ideally, it would be a lot more powerful if Prefect flow can support checkpoint/caching by immediately returning the result of the final outputs if it is cached, or provide some kind of flow level (or sub-graph) caching so that if the input parameters of the flow/sub-graph didn’t change then it should just return the final cached result instead of re-evaluate every single upstream task and then return the result.

I guess our use case above is specific as we use prefect as a data flow/pipeline tool. This may be less an issue for workflow execution in more general cases.

Wondering are there any plans/solutions to the tackle this problem? Or it has already been resolved in Prefect 2.0 ? :slightly_smiling_face:

Thanks!

Anthony

Based on the use case you described, it looks like you should use time-based caching using the argument cache_for available on the task decorator, rather than using checkpointing.

cache_for will mark the task as Cached and it will prevent recomputation of the task run for the time delta duration you configured. This way:

  • Prefect won’t run the task at all - it will simply see that the task has a Cached state in the backend and it won’t recompute it until the time delta duration you set expires,
  • Prefect won’t load/evaluate the checkpointed results because the task state is Cached which is a subclass of Success.

I think what may be confusing to you with this is that following that pattern, Prefect doesn’t cache the actual task result but rather the task state.

To prevent that, you would need to cache also the upstream tasks, not only the final task.

For that, you could actually use the flow-of-flows orchestration pattern and cache the create_flow_run task - this would effectively have the end result that you’re looking for. You could do something like:

from datetime import timedelta
from prefect import Flow, Task
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run


with Flow("parent_flow") as flow:
    child_flow_run_id = create_flow_run(
        flow_name="child_flow_name",
        run_name="custom_run_name",
        task_args=dict(cache_for=timedelta(hours=2)),
    )
    child_flowrunview = wait_for_flow_run(
        child_flow_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(cache_for=timedelta(hours=2)),
    )

You’re in the right place! The pattern you’re describing is very much in line with the problem that Prefect is solving and it’s likely that there are many users who came across a very similar problem! I believe both approaches described above allow you to solve the issue already in Prefect < 2.0:

  1. The flow-level caching when triggering the flow from a parent flow,
  2. The time-based caching with cache_for is set on all tasks that you wish to avoid recomputation for, rather than only caching the last task.

But LMK if this didn’t answer your issue.

Thanks for the reply Anna,

The flow-of-flow in Prefect 2.0 seems solve the issue to some extent (sub-graph caching/checkpoint)

Sry, may be I didn’t describe the problem clearly with enough context.

So the reason we use checkpoint is that

  • we treat the output of a set of tasks as some kind of dataset that will be persisted (may be forever) , and expected to be queryable manageable out side of prefect (by data storage).
  • And if we run many different flows (which may share a sub-set of tasks/sub-flows) they are expecting to hit the same check-pointed data without recompute again.
  • checkpoint also supports good location formatting in the target parameter and flexibility to set the Result type(Local/S3). Where cache_for only allows you set a validator and a cache key, and most of the control is by Prefect

Correct me if I’m wrong about cache_for (I didn’t use it that much).
base on the doc:
" Out of the box, Prefect Core does not persist cached data in a permanent fashion. All data, results, and cached states are only stored in memory within the Python process running the flow"

  • My understanding is that the cache_for is used within the execution scope of one flow, so it knows the status/data of the tasks within that run. It seems working well if one flow runs the same task multiple times or retry when downstream task fails over.
  • Does cache_for works across different flow runs: e.g. another flow run with different flow id, would it know the status the task that has been computed by a previous flow (I assume the task run id will be different in different flow runs).

And the reason that we don’t prefer to checkpoint every task in a flow is that for a data intensive process, persisting all the intermediate output can generate a lot of redundant data and eat up a lot of storage

My bad if it is still not clear, I think the cache and checkpoint is a bit mixed in my original question post.

Cheers,

Anthony

1 Like

Yes. You can think of it this way: cache_for doesn’t cache the actual task result but rather the task state. And it will be respected across many runs.

This is a really important distinction so let me highlight this so that we understand each other. Caching and checkpointing task run results are two completely different concepts.

  • Caching can be used to prevent recomputation of expensive (time-consuming or storage/compute-resources-heavy) data processing - this seems to be what you want
  • Checkpointing results feature is used to recover from failure and is often used with the Restart functionality.

Note that in my last comment I recommended caching, rather than checkpointing:

Thanks for the reply Anna!
To sum up, for the two issues I listed originally.

  1. Prefect re-evaluate/checks upstream task even the final task is cached
    • The solution is: currently we need to set all the upstream task to be cached to avoid re-computation
  2. The overhead of checking a cached task
    • The solution is: use cache_for instead of checkpoint, since it only checks the status of the task.

Sry for being a bit bothering here. Just want to make sure I understand the full usage of cache_for.
I still have two follow up questions about using cache_for, you mentioned the caching status can be shared across different flows.

  • a) So does that mean it also stores the output of the cached task somewhere, so the downstream task from a different flow can load the same data as its input. - again this is specific to a data flow scenario as we need to pass the cached output as the input for downstream tasks across different flows that runs from different process/cluster.
  • b) If a) is True. When using cache_for, is there a way to configure where the output location and serialisation (parquet, feather, json, compressed, etc) ? just like using the target parameter/function to format output location on checkpoint

Appreciate for you help and patience

Cheers

Anthony

Sorry to reopen this old convo, but I’m still confused about how to cache a flow/subflow in prefect 2. The above example with create_flow_run is for prefect 1.

So far it seems I need to rework my pipeline inefficiently. For example, say I want to load some data and train an sklearn ml model:

from prefect import flow, task
from prefect.tasks import task_input_hash
import time

# Let's say we imported an sklearn model
# It's expensive to compute
def sklearn_model(data, hyperparams):
    time.sleep(2)
    return hash(data + hyperparams)

# function that gets some big data from a file
# don't want to have to cache this
def load_data(filename):
    time.sleep(1)
    return 123 

To make this into a flow I WANT to do this:

# want to just use the cached trained model if all inputs are the same
@flow(cache_key_fn=task_input_hash))
def train_model(filename, hyperparams):
    data = load_data(filename)
    return sklearn_model(data, hyperparams)

But it seems like I have to do this:

# Have to wrap model training in a task
@task(cache_key_fn=task_input_hash,)
def train_sklearn_model(data, hyperparams):
    return sklearn_model(data, hyperparams)

# Have to cache big data wasting space
@task(cache_key_fn=task_input_hash,)
def load_data(filename):
    time.sleep(1)
    return 123 

@flow()
def train_model(filename, hyperparams):
    data = load_data(filename)
    return train_sklearn_model(data, hyperparams)

train_model("file.csv", 3)
1 Like

Another option seems to be something like the following, but this seems wrong too as it forces you to move multiple tasks into a “god task” just to allow caching.

@task(cache_key_fn=task_input_hash)
def train_and_load(filename, hyperparams):
    data = load_data(filename)
    return sklearn_model(data, hyperparams)

@flow
def train_and_load_flow(filename, hyperparams):
    return train_and_load(filename, hyperparams)

train_and_load_flow('test.csv', 2)
1 Like

Hi, this is an excellent question and a great problem description. I asked some colleagues, and we will consider adding caching to the flow decorator based on your request. Thanks for raising that!

1 Like