Hi,
I’m making a data processing pipeline with Prefect, where each task creates an intermediate file. For each run I’d like to be able to trace where all the intermediate files are stored.
Some tasks are expensive, so I use caching. The problem is, if the task was skipped and the cached result was used, I can’t find where that cached result was taken from. Prefect generates the cache key and retrieves the persisted result, but neither the UI nor the context have information where that persisted result was retrieved from, as far as I could tell.
The only workaround I’ve found so far is to have each task return the path to a file with the data, rather than the data itself, and then log/artifact each path:
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.artifacts import create_markdown_artifact
@task(cache_key_fn=task_input_hash)
def my_task():
path = "some-unique-path"
# ...write something to path...
return path
@flow
def my_flow():
path = my_task()
create_markdown_artifact(key="my-task-output-path", markdown=path)
# or:
print(f"my_task output is at {path}")
…and do the same for every task of every flow. Is there a better way to do this?