View in #prefect-community on Slack
@Marius_Haberstock: Hi, I have a question about apply_map
. Details following in the thread
I have a flow where I first download a zip file, unpack it and then further process some files of the zip content. I now want to split these steps into individual tasks.
My idea was to use a task for the downloading and unpacking part, which returns a list of file names.
Then I would use a iterate_files
task to decide for each file, if I call task process_file_a
or task process_file_b
.
I tried using apply_map
based on the example of the documentation (https://docs.prefect.io/core/concepts/mapping.html#complex-mapped-pipelines)
However, the merge
function does not seem to work inside a task.
This is my current code:
with Flow() as flow:
zip_filename = Parameter("zip_filename")
files = download_zipfile(zip_filename)
infos = apply_map(iterate_files, file=files, zip_filename=unmapped(zip_filename))
upload_file.map(info=infos)
@task(log_stdout=True)
def iterate_files(file, zip_filename):
is_recfile = file[0].endswith("...")
with case(is_recfile, True):
result_1 = parse_recfile.run(file, zip_filename)
is_xmlfile = file[0] == '...'
with case(is_xmlfile, True):
result_2 = process_xml_file.run(file, zip_filename)
return merge(result_1, result_2).run()
Does anyone have a idea how to solve this?
Mapping | Prefect Docs
@Kevin_Kho: I think it works. The example in docs uses it here . Maybe don’t call the run()
?
There are a couple things off:
- Apply_map is on a function, not a task
- all the contents of the apply_map have to be tasks
- You don’t call the run method in it
@Marius_Haberstock: This is the error message if I remove run()
Task 'iterate_files[0]': Exception encountered during task execution! Traceback (most recent call last): File "/usr/local/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state value = prefect.utilities.executors.run_task_with_timeout( File "/usr/local/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout return task.run(*args, **kwargs) # type: ignore File "/Users/marius/Documents/Projekte/Pexon/FMC/project-panorama/prefect-experimentation/5008-flow/flow.py", line 44, in iterate_files File "/usr/local/lib/python3.9/site-packages/prefect/core/task.py", line 633, in __call__ new.bind( File "/usr/local/lib/python3.9/site-packages/prefect/core/task.py", line 693, in bind raise ValueError( ValueError: Could not infer an active Flow context while creating edge to <Task: parse_recfile>. This often means you called a task outside a
with Flow(…)block. If you're trying to run this task outside of a Flow context, you need to call
parse_recfile.run(…)``
@Kevin_Kho: That is because you have the task decorator still above the iterate_files
@Marius_Haberstock: If I remove the task decorator, it seems like it calls the function while registering the flow, not while running. I get this error message:
Traceback (most recent call last):
File "/Users/marius/Documents/Projekte/Pexon/FMC/project-panorama/prefect-experimentation/5008-flow/flow.py", line 113, in <module>
infos = apply_map(iterate_files, file=files, zip_filename=unmapped(zip_filename))
File "/Users/marius/miniforge3/envs/prefect-fmc/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 131, in apply_map
res = func(*args2, **kwargs2)
File "/Users/marius/Documents/Projekte/Pexon/FMC/project-panorama/prefect-experimentation/5008-flow/flow.py", line 41, in iterate_files
is_recfile = file[0].endswith("...")
AttributeError: 'GetItem' object has no attribute 'endswith'
@Kevin_Kho: That is because the content of the function has to be tasks but this line:
is_recfile = file[0].endswith("...")
is not a task and it executes eagerly. A task will help defer the execution
same with this one:
is_xmlfile = file[0] == '...'
@Marius_Haberstock: So I have to move the logic of checking what type of file it is to an own task?
@Kevin_Kho: Yes exactly because everything in the apply_map is a task. So basically apply_map just unpacks all of the tasks inside it and adds them all to the Flow block so you can think that it’s the same thing that you can’t have non tasks in the Flow block (for the most part)
@Marius_Haberstock: Okay thank you! It works now, but takes a very long time (because there are about 30 files per zip file it runs a lot of tasks)
Do you think my flow design for this use case is a good idea or would you handle it differently?
@Kevin_Kho: Looks fine to me right? I don’t know what more you can optimize. What is a very long time? 1 hour?
@Marius_Haberstock: Just 3 minutes But without indiviual tasks it was around 20 seconds
@Kevin_Kho: Ahh ok then Prefect is the bottleneck. Each task has at least 3 API calls to update state (Pending -> Submitted -> Running -> Success), which is where the overhead comes from.
So upto you if you want to combine it to reduce overhead
@Anna_Geller: @Marius_Haberstock catching up here: I didn’t see any executor attached to your flow. If you want to optimize performance of this flow, you can run your tasks in parallel by attaching e.g. LocalDaskExecutor
flow.executor = LocalDaskExecutor()