How does Prefect send work to a DaskExecutor and handle memory?

Prefect executors determine how to execute your task runs.

Here is a simplified description of what Prefect does under the hood when using a DaskExecutor:

  • Prefect initializes the Dask client
  • Prefect then sends the task as a future to Dask with client.submit()
  • When you have a mapped task, Prefect uses client.submit() on each one, and then collects them back with client.gather() to bring them back to the client.

Collecting results and how it affects memory

  • The Dask scheduler orchestrates gathering results and could run out of memory if it has to collect a lot of big results.
  • When it comes to collecting task run results after mapping, your Dask driver node needs to be able to have enough resources to pull off the collection of the mapped tasks (and their results) in memory, otherwise the process can die.
  • The driver node in distributed compute is the one that orchestrates the work. The driver can cumulatively run out of memory from all of the previous tasks.
  • To reduce the memory footprint of mapped tasks, you can manually save the file somewhere and then return the location instead so that downstream tasks can read it.

Tackling unmanaged memory

There are ways to tackle unmanaged memory in Dask. This video explains setting an environment variable that can help influence how Dask handles unmanaged memory.

  • According to Dask documentation, MALLOC_TRIM_THRESHOLD_ defaults to 65536. You can try setting it to 0 or a low number to aggressively trim memory.
  • You can also set the environment variable on the scheduler to trim unmanaged memory there.

More resources

Read the Dask executor source code:

Read Dask documentation:
http://distributed.dask.org/en/latest/client.html