I am getting memory usage error warnings from dask and eventually my flow crashes. My task parameters and return values are just file paths. However within tasks I am reading and writing files. It looks like that memory is retained after the task has completed. This is not needed because the data for the next task will be read from disk. The dask dashboard shows an increasing number of tasks “in memory”. As I have 50+ tasks this is an issue. How can I release memory sooner? My flow is as below.:
for png in pdf_pages:
rotated = pngtasks.rotate(png, csv)
boxes = pngtasks.OCR(rotated, csv)
parsed = pngtasks.parse(boxes)
matched = pngtasks.match(parsed, csv)
all_matched.append(matched)
This is not because of tasks in memory. It is releasing memory as tasks finish but keeps just the result in memory. The issue is that Dask splits the memory amongst the workers so each worker has a small allowance. In my case there is 5GB allowed but it is split across 8 workers so only 780M per worker. I have one task that requires more than this.
BTW there is a bug in dask memory management as it ignores dask settings after cluster has started. Solution is to set environment variable MALLOC_TRIM_THRESHOLD_=“65536” before starting the Dask Cluster.
If there are still warning messages then reduce the number of workers so RAM/worker is increased or buy more memory.
Note to anyone using DaskTaskRunner or RayTaskRunner:
from prefect version 2.0b8 onwards, those task runners were moved to the respective Prefect Collections for better code dependency management (the core library no longer requires dask or ray as dependencies - now, those can be installed sepataely when needed).
The correct imports are now:
from prefect_dask import DaskTaskRunner
from prefect_ray import RayTaskRunner