What is a recommended strategy for setting memory limits on a task?

I have a flow with several easily parallelizable tasks, some of which require large amounts of memory. The flow is being run in an environment that has a fixed number of cores and amount of memory, both of which I don’t have control over but would like to use efficiently.

If I allow the flow to use all available cores, the flow runs out of memory. I can limit the number of cores it’ll use, but doing that prevents the flow from working on the (many) simple tasks that have only minimal memory requirements.

I would like to be able to have some way of tagging each task with how much memory it’ll need and prevent prefect from starting tasks whose total memory requirements would exceed some limit. Is there a recommended way to do this?

In practice, tasks can also require different numbers of cores and so it would also be helpful to control the total number of cores being used across all tasks. But it is relatively simple to limit the number of cores each task uses outside of prefect – relatively simple in comparison to controlling memory usage.

import prefect
from prefect_dask.task_runners import DaskTaskRunner

@prefect.task
def task_with_heavy_memory_needs(id):
    print(f"Creating piles of data! {id=}")

@prefect.task
def task2_with_heavy_memory_needs(id):
    print(f"Processing data, so much of it! {id=}")

@prefect.task
def task_with_minimal_memory_needs(id):
    print(f"One of many very simple, easily parallelized tasks {id=}")

@prefect.flow(
    name="Flow with Memory Needs",
    task_runner=DaskTaskRunner(),
)
def main(data):
    task_with_heavy_memory_needs.map(data)
    task2_with_heavy_memory_needs.map(data)
    task_with_minimal_memory_needs.map(data)

if __name__ == "__main__":
    main(range(3))

I am aware that Dask can be configured to spill over onto the disk when a memory threshold is exceeded, but I’d also like to avoid that spilling (it is too slow).