How to resume mapped task runs from failure at scale, or limit the amount of allowed runs that may fail?

View in #prefect-community on Slack

Anders_Segerberg @Anders_Segerberg: What is the best practice for the following situation?
I have tens of thousands of mapped tasks. Suppose that each writes to a remote database. If that database goes down, I will see cascading failures.
What I would like to do is
a) manage the threshold of allowed failures across the aggregate mapped tasks
b) when that threshold is reached, halt the workflow
c) Be able to resume the workflow from the point of failure

c) would ostensibly require caching behavior, I would think. But since there are no dependencies between the mapped tasks, I don’t want to just cache the results of the successful tasks – I just want to somehow remember
the mapped tasks that already got ran to successful completion, and then, when the flow is restarted / resumed, only map across those tasks that did not run to completion / were not started.

Kevin_Kho @Kevin_Kho: This is an interesting question. I don’t know if I have the best answer immediately, but let me bounce some ideas and thoughts

  1. Use the KV Store to count to the number of failures across the maps. Set the counter to 0 at the start. If the number hits the threshold, use the GraphQL API to mark the run as Failed. I think anything that tries to update task state should just error out. I am not sure though if the KV Store can handle that many concurrent connections but you need some kind of low latency cache
  2. Other than that, the action has to be done on the Flow level before this task. Intuitively, I’m thinking of batching things and running the map in batches. As you said though, the problem is independent, so I would consider batching the 10k into 1000 for example and doing task looping to run each batch at a time. The loop can keep track of the total failures, the problem here though is that it becomes sequential when using looping.
  3. So I think what we are left with is using some combination of Subflows. Can we use the subflow to fire off batches at a time sequentially and then use the DaskExecutor on the subflows and LocalExecutor on the main flow? Then you can use Task looping in the main flow to submit the batches and keep track of the main flow. I think something like this might work.
  4. But all this kind of loses caching if we use the looping. If we use mapping, each task can be cached individually but with looping it can’t. One of the current flaws of Prefect 1 is that if you have a chain of tasks A -> B - > C and A and B succeed but C fails, you can’t retry A and B. This is one of the reasons Orion (Prefect 2.0) has no DAG. So you may need to compress tasks together to use the same cache. When you do that, you can use Prefect caching or targets to avoid re-running the same code.

KV Store | Prefect Docs

Dynamic DAGs: Task Looping | Prefect Docs

Caching and Persisting Data | Prefect Docs

Anna_Geller @Anna_Geller: To manage the threshold of allowed failures, you may also limit the number of allowed concurrent runs. Check the docs for details

Anders_Segerberg @Anders_Segerberg: Thank you both for the suggestions
@Kevin_Kho unrelated question i had – will a main-flow local dask executor, respect a sub-flow local executor?
@Anna_Geller Oh, we aren’t using Prefect Cloud.
Would this be something we could configure via Prefect’s local dask executor?

Kevin_Kho @Kevin_Kho: For any flow, the execution is pulled from the flow storage
No concurrency is only for Cloud in Prefect 1

Anders_Segerberg @Anders_Segerberg: I’ve encountered a situation before where (it seemed) a local parent executor didn’t respect a local-dask sub-flow. Could have been a misconfiguration on my part, though

Kevin_Kho @Kevin_Kho: That should work if you attach the executor in the subflow file/storage. Parent executor shouldn’t affect it. But LocalDask on both parent and sub is weird because you can hit resource contention you know?

Anders_Segerberg @Anders_Segerberg: More generally – assume we crash the workflow on one mapped task failure. What is the approach for restarting the flow, without re-running the mapped tasks that already succeeded? I suppose that would mean doing the straightforward flow-level caching, right? Prefect would see the cached results for those mapped tasks that already ran, and skip over them

Kevin_Kho @Kevin_Kho: If you use the target which is file-based persistence, just re-run the subflow again and the new run will skip because of the caching mechanism. Or you can restart the subflow. This is worth a read

Prefect Community: How do I restart a child flow run in a flow-of-flows?

Anna_Geller @Anna_Geller: your parent and child flows would need to be in a different storage file

Anders_Segerberg @Anders_Segerberg: awesome, thank you both