How can I change the number of Dask workers in a DaskExecutor based on a custom parameter value?

You can pass a Callable to the DaskExecutor that sizes the DaskExecutor dynamically at runtime.

from prefect import Flow
from prefect.executors import DaskExecutor

def dynamic_executor():
    from distributed import LocalCluster
    # could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
    return LocalCluster(n_workers=prefect.context.parameters["n_workers"])

with Flow("example", executor=DaskExecutor(cluster_class=dynamic_executor)) as flow:
    flow.add_task(Parameter("n_workers", default=5))

Example use case:

View in #prefect-community on Slack

Eddie_Atkinson @Eddie_Atkinson: This is a really silly question, but I can’t quite clue out the answer from Dask’s and Prefect’s documentation. My aim is to use Prefect to orchestrate flows with varying memory requirements using a Dask cluster.

As an MVP I’ve set up an ECSRun flow using the LocalDaskExecutor with 30GB of RAM. For large jobs this flow OOMs and gets killed by the Prefect scheduler. My question is this: If I set up a Dask cluster to run these jobs would it gracefully handle memory issues?

That is to say if I had 30GB of RAM in the cluster and a job that required 50GB would Dask OOM or would it simply run slower? Do I need to modify my code to use Dask dataframes or is there some smarts here I’m not quite across?

Kevin_Kho @Kevin_Kho: It would OOM. Dask does have memory spillover but I think the default is that 30% can be shuffled to disk. You memory requirements would like a lot more. It is also not performant once you hit this. So you really need to bump up resources. What you can do though is parameterize the size of the Dask cluster. See this

Prefect Community: How can I change the number of Dask workers in a DaskExecutor based on a custom parameter value?

Not a silly question btw :slightly_smiling_face:

Eddie_Atkinson @Eddie_Atkinson: The parameterisation is really cool
Thanks