View in #prefect-community on Slack
@Erik_Schomburg: Hello! I’m trying to create a convert an existing script into a prefect flow, and the task.map
functionality doesn’t quite work as I had expected. I’m aggregating a bunch of data sources and rows into a DataFrame, and previously this had been sped up by running it over subsets of keys, with each subset running in a different process. So in prefect, I have a task that splits the keys into subsets, and then maps a task over those subsets. The problem is that there’s a small probability of failure on each subset, due to connection timeouts. I have added some retry logic, but still want to make sure that successful sub-tasks have their results checkpointed, and unsuccessful ones are not. But the results = task.map(subset=subsets)
code instead just stores the results
in a single file, and then does not re-run unsuccessful sub-tasks. I tried adding {map_index}
to the task target filename pattern, but this did not work (update: it does work, I just had extra {
brackets, i.e., {{map_index}}
:face_palm:).
Here’s the basic flow:
all_keys = get_keys_task()
key_subsets = partition_keys_task(all_keys, n_subsets)
data_subsets = get_data_task.map(keys=key_subsets)
all_data = concatenate_subsets_task(data_subsets=data_subsets)
I know I can work around this by writing my own utility to create a list of tasks with their own unique task names, but it seems like part of the point of .map
ought to be to do this sort of results management for you… Any tips? Maybe there’s just a parameter in the prefect.task
decorator or the task.map
function I don’t know about?
@Kevin_Kho: Hi @Erik_Schomburg, have you seen the docs on templating result names? This will separate out the files for you.
Templating names | Prefect Docs
@Erik_Schomburg: Yes, though maybe there’s something additional I need to do when using task.map
? For example, when I add target and location parameters to the get_data_task
task:
@prefect.task(target="{task_name}_{map_index}", checkpoint=True, result=LocalResult(location="{task_name}_{map_index}")
def get_data_task(keys):
...
the results across the mapped sub-tasks are stored in a single file called get_data_task_{map_index}.pkl
, rather than one file per mapped subset called get_data_task_0.pkl
, etc.
@Kevin_Kho: One sec let me try this
@Erik_Schomburg: actually, maybe I need a sec…
might’ve had a mistake in the {
brackets in my template and it didn’t recognize to use the map_index
variable properly
@Kevin_Kho: This works on local for me:
from prefect import task, Flow
import prefect
from prefect.engine.results import LocalResult
@task(checkpoint=True, result = LocalResult(dir="/Users/kevinkho/Work/scratch/"), target="test-{map_index}.pkl")
def get_values(x):
return x+1
with Flow("dynamic_tasks") as flow:
get_values.map([1,2,3,4])
flow.run()
as long as the env variable PREFECT__FLOWS__CHECKPOINTING=true
@Erik_Schomburg: right, yeah, like I said, problem was that in my construction of the target template I accidentally had extra {...}
because of a bug in a special pattern constructor tool
sorry, thanks for trying this out, my bad
@Kevin_Kho: No worries at all. Just wanted to make sure my advice was right a well