It’s hard to give a single general answer because it depends on your infrastructure.
- If you want to provide custom keyword arguments to a Dask
cluster_class
ad-hoc per flow run, you could pass a dynamic function to aDaskExecutor
’scluster_class
. This function could retrieve values such asn_workers
from aParameter
task, as follows:
import prefect
from prefect import Flow, Parameter
from prefect.executors import DaskExecutor
def dynamic_executor():
from distributed import LocalCluster
# could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
return LocalCluster(n_workers=prefect.context.parameters["n_workers"])
with Flow(
"dynamic_n_workers", executor=DaskExecutor(cluster_class=dynamic_executor)
) as flow:
flow.add_task(Parameter("n_workers", default=5))
As a result, you could start a new flow run with a different value of n_workers
defined ad-hoc.
- The second option would be to assign more memory in your run configuration on a per-flow-run basis - e.g. you could overwrite the
memory_request
set on aKubernetesRun
from a UI:
with Flow(
FLOW_NAME,
storage=STORAGE,
run_config=KubernetesRun(
labels=["k8s"],
cpu_request=0.5,
memory_request="2Gi",
),
) as flow:
The above run configuration defines 2 GB, but if you notice your flow run ended with an OOM error, you could trigger a new flow run from the UI with a higher memory request.
- The last option would be to override the executor values directly in your flow definition:
import coiled
from prefect.executors import DaskExecutor
flow.executor = DaskExecutor(
cluster_class=coiled.Cluster,
cluster_kwargs={
"software": "user/software_env_name",
"shutdown_on_close": True,
"name": "prefect-cluster",
"scheduler_memory": "4 GiB",
"worker_memory": "8 GiB",
},
)
As long as you use script storage (e.g. one of Git storage classes such as GitHub, Git, Gitlab, Bitbucket, etc) rather than pickle storage, and you commit your code with a modified value of worker_memory
, this should be reflected in your new flow run because metadata about the executor is not stored in the backend - it’s retrieved from your flow storage at runtime.