What is best way pass a list of files to a mapped task and save outputs to targets based on the filename?

I have a function that accepts a filepath parameter. I would like to use prefect map to run this on a number of files and save each output to a output/filename. I tried target templates with {os.path.basename(path)} and path.split(‘/’)[-1]. Neither worked. I could create another task that splits the filepath into folder and basename then pass two parameters to the function. However this seems awkward. Is there a better way?

1 Like

There are at least two ways how you could pass the list of files:

  1. You could pass those as parameter values:
from prefect import task, Flow, Parameter
from prefect.executors import LocalExecutor
from datetime import timedelta


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def read_data_from_file(file_name: str):
    return "some_data"


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl(x):
    # some cleaning
    return x


with Flow("do_some_file_processing", executor=LocalExecutor()) as flow:
    all_files = Parameter(
        "files", default=["file1.csv", "file2.csv", ..., "file100.csv"]
    )
    data = read_data_from_file.map(all_files)
    transformed_data = etl.map(data)
  1. Or retrieve the files in a separate task and pass those as data dependencies:
from prefect import task, Flow
from prefect.executors import LocalExecutor
from datetime import timedelta


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def get_files_to_process():
    # query some database, data lake or other stateful store where you have this info
    return ["file1.csv", "file2.csv", ..., "file100.csv"]


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def read_data_from_file(file_name: str):
    return "some_data"


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl(x):
    # some cleaning
    return x


with Flow("do_some_file_processing", executor=LocalExecutor()) as flow:
    all_files = get_files_to_process()
    data = read_data_from_file.map(all_files)
    transformed_data = etl.map(data)

Regarding saving outputs using results, targets, and templating, it would be easiest to just store the results yourself at the end of processing because results and targets are more a failure mechanism, not something to persist data to the end systems. But if you want to configure that e.g. to handle failure in your data pipeline, you could do something like this (see the etl task):

import awswrangler as wr
import pandas as pd
from prefect import task, Flow, Task
from prefect.executors import LocalExecutor
from prefect.engine.results import S3Result
from prefect.engine.serializers import PandasSerializer
from datetime import timedelta


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def get_files_to_process():
    # query some database, data lake or other stateful store where you have this info
    return ["file1.csv", "file2.csv", ..., "file100.csv"]


@task(
    max_retries=5,
    retry_delay=timedelta(minutes=10),
    result=S3Result(bucket="prefectdata", serializer=PandasSerializer(file_type="csv")),
    checkpoint=True,
    target="{file_name}__{task_name}__{today}__{task_run_id}__{map_index}.csv",
)
def etl(file_name):
    # read data e.g.
    df = pd.read_csv(file_name)

    # some cleaning
    # ...

    # finally load it somewhere
    wr.s3.to_parquet(
        df=df,
        path=f"s3://prefectdata/somedatasetname/{file_name}/",
        dataset=True,
        mode="append",
        database="default",
        table=file_name,
    )


with Flow("do_some_file_processing", executor=LocalExecutor()) as flow:
    all_files = get_files_to_process()
    etl.map(all_files)

For more on results, targets, and templating, check out this page:

LMK if this didn’t answer your question.

Thanks for the reply. I am using targets to diagnose problems rather than to persist forever. For example I have 4 tasks on 1000 pages. If there is an issue then I want to iterate over step2 outputs to diagnose. Similar to “makefile” it makes sense to use the same files to track completed tasks rather than having two parallel sets of output files. Actually I am testing out prefect trying to replicate an existing luigi project so would like to stick to the same outputs if possible.

I have this working now using a “basename” task. The task function has an extra parameter basename=None which is only used by prefect.

Alternative would be to use the map_index mentioned in above post but hard to know which index refers to which file. Perhaps could have a task that renames the outputs from map_index versions to the original names but not sure if possible to get the the full location/target path?

1 Like

I think for a full path you would need to construct it manually based on the Result location (e.g. S3 path) + target path. It would be great if you can post it here if you find out more about that. You can also share your simplified flow I could reproduce and perhaps we could investigate it more together somehow.