How to use loops in a convoluted task structure?
Sorry for the long post, but I have tried to be as concise and precise as possible with my problem.
I have a 100+ files that need to be cleaned (each day) - cleaning one file takes a lot of memory so it has to be done by looping over the files one by one. There are many operations required to clean each file and I would like to make use of various task methods (like max_retries
) for the cleaning functions.
Attempt 1
My setup is similar to what I have written below (written in a pseudo-code manner):
# Define many small operations that is to be performed on each file
# Using many small functions as suggested in https://docs.prefect.io/core/concepts/tasks.html#overview
@task
def etl_1(x, max_retries=3):
# some cleaning
return x
@task
def etl_2(x, , max_retries=5, retry_delay=datetime.timedelta(minutes=10)):
# some other cleaning
return x
.
.
.
@task
def etl_15(x):
# some more cleaning
return x
# make a function that takes one file as input, cleans it and then saves it.
@task
def clean_file(file):
x = load_file(file)
x = etl_1(x)
x = etl_2(x)
.
.
.
x = etl_15(x)
save_file(x)
# initiate flow for cleaning all the files
with Flow('clean all files') as flow:
all_files = ['file1.csv', 'file2.csv', ..., 'file100.csv']
clean_file.map(file=all_files)
This gives the error of not having a proper context defined for the etl_n
functions (ValueError: Could not infer an active Flow context
).
Attempt 2
Wrapping the etl functions in a flow like this:
@task
def clean_file(file):
with Flow('inner flow') as inner_flow:
x = etl_1(file)
x = etl_2(x)
.
.
.
x = etl_15(x)
save_file(x)
inner_flow.run()
results in the following error once it has been registered and set to run on my local prefect server:
Traceback (most recent call last):
File "/home/.../lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 264, in run
state, task_states, run_context, task_contexts = self.initialize_run(
File "/home/.../lib/python3.9/site-packages/prefect/engine/cloud/flow_runner.py", line 400, in initialize_run
raise KeyError(
KeyError: 'Task slug Constant[list]-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
Actual question
Given my requiremnts of
- Not having one single very large ETL function (essentially copy/pasting all the
etl_n_
functions into a singleclean_file
function), - being able to use the task decorator for each etl operation, and
- looping over each file one by one,
is there any way to structure the code such that this is possible?