Prefect executors determine how to execute your task runs.
Here is a simplified description of what Prefect does under the hood when using a
- Prefect initializes the Dask client
- Prefect then sends the task as a future to Dask with
- When you have a mapped task, Prefect uses client.submit() on each one, and then collects them back with client.gather() to bring them back to the client.
- The Dask scheduler orchestrates gathering results and could run out of memory if it has to collect a lot of big results.
- When it comes to collecting task run results after mapping, your Dask driver node needs to be able to have enough resources to pull off the collection of the mapped tasks (and their results) in memory, otherwise the process can die.
- The driver node in distributed compute is the one that orchestrates the work. The driver can cumulatively run out of memory from all of the previous tasks.
- To reduce the memory footprint of mapped tasks, you can manually save the file somewhere and then return the location instead so that downstream tasks can read it.
There are ways to tackle unmanaged memory in Dask. This video explains setting an environment variable that can help influence how Dask handles unmanaged memory.
- According to Dask documentation,
MALLOC_TRIM_THRESHOLD_defaults to 65536. You can try setting it to 0 or a low number to aggressively trim memory.
- You can also set the environment variable on the scheduler to trim unmanaged memory there.
Read the Dask executor source code:
Read Dask documentation: