Sorry for the long post, but I have tried to be as concise and precise as possible with my problem.
I have a 100+ files that need to be cleaned (each day) - cleaning one file takes a lot of memory so it has to be done by looping over the files one by one. There are many operations required to clean each file and I would like to make use of various task methods (like
max_retries) for the cleaning functions.
My setup is similar to what I have written below (written in a pseudo-code manner):
# Define many small operations that is to be performed on each file # Using many small functions as suggested in https://docs.prefect.io/core/concepts/tasks.html#overview @task def etl_1(x, max_retries=3): # some cleaning return x @task def etl_2(x, , max_retries=5, retry_delay=datetime.timedelta(minutes=10)): # some other cleaning return x . . . @task def etl_15(x): # some more cleaning return x # make a function that takes one file as input, cleans it and then saves it. @task def clean_file(file): x = load_file(file) x = etl_1(x) x = etl_2(x) . . . x = etl_15(x) save_file(x) # initiate flow for cleaning all the files with Flow('clean all files') as flow: all_files = ['file1.csv', 'file2.csv', ..., 'file100.csv'] clean_file.map(file=all_files)
This gives the error of not having a proper context defined for the
etl_n functions (
ValueError: Could not infer an active Flow context).
Wrapping the etl functions in a flow like this:
@task def clean_file(file): with Flow('inner flow') as inner_flow: x = etl_1(file) x = etl_2(x) . . . x = etl_15(x) save_file(x) inner_flow.run()
results in the following error once it has been registered and set to run on my local prefect server:
Traceback (most recent call last): File "/home/.../lib/python3.9/site-packages/prefect/engine/flow_runner.py", line 264, in run state, task_states, run_context, task_contexts = self.initialize_run( File "/home/.../lib/python3.9/site-packages/prefect/engine/cloud/flow_runner.py", line 400, in initialize_run raise KeyError( KeyError: 'Task slug Constant[list]-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?'
Given my requiremnts of
- Not having one single very large ETL function (essentially copy/pasting all the
etl_n_functions into a single
- being able to use the task decorator for each etl operation, and
- looping over each file one by one,
is there any way to structure the code such that this is possible?