How to allocate more memory or more worker nodes on a per flow run basis?

It’s hard to give a single general answer because it depends on your infrastructure.

  1. If you want to provide custom keyword arguments to a Dask cluster_class ad-hoc per flow run, you could pass a dynamic function to a DaskExecutor's cluster_class. This function could retrieve values such as n_workers from a Parameter task, as follows:
import prefect
from prefect import Flow, Parameter
from prefect.executors import DaskExecutor

def dynamic_executor():
    from distributed import LocalCluster

    # could be instead some other class e.g. from import FargateCluster
    return LocalCluster(n_workers=prefect.context.parameters["n_workers"])

with Flow(
    "dynamic_n_workers", executor=DaskExecutor(cluster_class=dynamic_executor)
) as flow:
    flow.add_task(Parameter("n_workers", default=5))

As a result, you could start a new flow run with a different value of n_workers defined ad-hoc.

  1. The second option would be to assign more memory in your run configuration on a per-flow-run basis - e.g. you could overwrite the memory_request set on a KubernetesRun from a UI:
with Flow(
) as flow:

The above run configuration defines 2 GB, but if you notice your flow run ended with an OOM error, you could trigger a new flow run from the UI with a higher memory request.

  1. The last option would be to override the executor values directly in your flow definition:
import coiled
from prefect.executors import DaskExecutor

flow.executor = DaskExecutor(
        "software": "user/software_env_name",
        "shutdown_on_close": True,
        "name": "prefect-cluster",
        "scheduler_memory": "4 GiB",
        "worker_memory": "8 GiB",

As long as you use script storage (e.g. one of Git storage classes such as GitHub, Git, Gitlab, Bitbucket, etc) rather than pickle storage, and you commit your code with a modified value of worker_memory, this should be reflected in your new flow run because metadata about the executor is not stored in the backend - it’s retrieved from your flow storage at runtime.