How to sequentially loop several cleanup tasks over a set of files to process for ETL

How to use loops in a convoluted task structure?

Sorry for the long post, but I have tried to be as concise and precise as possible with my problem.
I have a 100+ files that need to be cleaned (each day) - cleaning one file takes a lot of memory so it has to be done by looping over the files one by one. There are many operations required to clean each file and I would like to make use of various task methods (like max_retries) for the cleaning functions.

Attempt 1

My setup is similar to what I have written below (written in a pseudo-code manner):

# Define many small operations that is to be performed on each file
# Using many small functions as suggested in https://docs.prefect.io/core/concepts/tasks.html#overview
@task
def etl_1(x, max_retries=3):
    # some cleaning
    return x
    
@task
def etl_2(x, , max_retries=5, retry_delay=datetime.timedelta(minutes=10)):
    # some other cleaning
    return x
.
.
.
@task
def etl_15(x):
    # some more cleaning
    return x

# make a function that takes one file as input, cleans it and then saves it.    
@task
def clean_file(file):
    x = load_file(file)
    x = etl_1(x)
    x = etl_2(x)
    .
    .
    .
    x = etl_15(x)
    save_file(x)

# initiate flow for cleaning all the files
with Flow('clean all files') as flow:
    all_files = ['file1.csv', 'file2.csv', ..., 'file100.csv']
    clean_file.map(file=all_files)

This gives the error of not having a proper context defined for the etl_n functions (ValueError: Could not infer an active Flow context).

Attempt 2

Wrapping the etl functions in a flow like this:

@task 
def clean_file(file):
    with Flow('inner flow') as inner_flow:
        x = etl_1(file)
        x = etl_2(x)
        .
        .
        .
        x = etl_15(x)
        save_file(x)
    inner_flow.run()

results in the following error once it has been registered and set to run on my local prefect server:

Traceback (most recent call last):
  File "/home/.../lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 264, in run
    state, task_states, run_context, task_contexts = self.initialize_run(
  File "/home/.../lib/python3.9/site-packages/prefect/engine/cloud/flow_runner.py", line 400, in initialize_run
    raise KeyError(
KeyError: 'Task slug Constant[list]-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'

Actual question

Given my requiremnts of

  1. Not having one single very large ETL function (essentially copy/pasting all the etl_n_ functions into a single clean_file function),
  2. being able to use the task decorator for each etl operation, and
  3. looping over each file one by one,

is there any way to structure the code such that this is possible?

Attempt 1

For option 1, calling tasks in other tasks is not really supported in Prefect 1.0, unless you would call the task’s run method:

@task
def clean_file(file):
    x = load_file(file)
    x = etl_1(x)
    x = etl_2(x)

This means that you could rewrite your code to make those cleanup ETL functions, not tasks, but normal functions:

def etl_1(x, max_retries=3):
    # some cleaning
    return x
    
def etl_2(x, , max_retries=5, retry_delay=datetime.timedelta(minutes=10)):
    # some other cleaning
    return x
.
.
.
def etl_15(x):
    # some more cleaning
    return x

# make a function that takes one file as input, cleans it and then saves it.    
@task
def clean_file(file):
    x = load_file(file)
    x = etl_1(x)
    x = etl_2(x)
    .
    .
    .
    x = etl_15(x)
    save_file(x)

# initiate flow for cleaning all the files
with Flow('clean all files') as flow:
    all_files = Parameter("files", default=['file1.csv', 'file2.csv', ..., 'file100.csv'])
    clean_file.map(file=all_files)

The downside is that you are losing observability and things like retries etc. for those small cleanup functions.

Attempt 2

Calling a flow from a task is not supported.

Suggested solution

It’s probably not documented well enough, but actually, if you use mapping with the LocalExecutor rather than LocalDaskExecutor, your files will be processed sequentially, just what you wanted. This way, you can use mapping for your use case and not lose observability and features such as retries for each cleanup function:

from prefect import task, Flow, Parameter
from prefect.executors import LocalExecutor
from datetime import timedelta


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def read_data_from_file(file_name: str):
    return "some_data"


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl_1(x):
    # some cleaning
    return x


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl_2(x):
    # some cleaning
    return x


@task(max_retries=5, retry_delay=timedelta(minutes=10))
def etl_15(x):
    # some cleaning
    return x


with Flow("clean all files", executor=LocalExecutor()) as flow:
    all_files = Parameter(
        "files", default=["file1.csv", "file2.csv", ..., "file100.csv"]
    )
    data = read_data_from_file.map(all_files)
    transformed_data = etl_1.map(data)
    transformed_data = etl_2.map(transformed_data)
    transformed_data = etl_15.map(transformed_data)

For sequential looping, you can also check out Dynamic DAGs: Task Looping | Prefect Docs

1 Like

Your an angel Anna - your solution with mapping worked! Thanks :slight_smile:
I want to add a bit of additional help in case people have (cleaning) functions that take additional input which is static, for example:

from prefect import task, Flow, unmapped

@task
def clean(df: pd.DataFrame, column: str):
    # do some cleaning on df[column]
    return df_cleaned_column

with Flow("clean all files", executor=LocalExecutor()) as flow:
    all_files = Parameter(
        "files", default=["file1.csv", "file2.csv", ..., "file100.csv"]
    )
    data = read_data_from_file.map(all_files)
    clean.map(df, unmapped('A')) # note that column name is wrapped in the unmapped function

When we want to clean column “A” for each dataframe it should be wrapped in the unmapped function.
Thanks again Anna :slight_smile:

1 Like