Are there any guidelines for using a temporary vs. static Dask cluster? How to set one on Kubernetes?

This is just a copy of this :point_down: documentation page - copied to make it easier to read about the temporary vs. static Dask cluster differences in one place:


The DaskExecutor runs Prefect tasks using Dask’s Distributed Scheduler. It can be used locally on a single machine (much like the LocalDaskExecutor above), but is most useful when scaling out distributed across multiple nodes.

Prefect’s DaskExecutor has 3 operating modes:

1. Using a Local Cluster

By default, when you use a DaskExecutor it creates a temporary local Dask
cluster.

from prefect.executors import DaskExecutor

# By default this will use a temporary local Dask cluster
flow.executor = DaskExecutor()

The number of workers used is based on the number of cores on your machine. The
default should provide a mix of processes and threads that should work well for
most workloads. If you want to specify this explicitly, you can pass
n_workers or threads_per_worker to cluster_kwargs.

# Use 4 worker processes, each with 2 threads
flow.executor = DaskExecutor(
    cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}
)

Using a DaskExecutor with a local cluster is very similar to using a
LocalDaskExecutor with processes=True. You may find it more performant in
certain situations (this scheduler does a better job about managing memory),
but generally they should perform equivalently for most Prefect workflows. The
DaskExecutor becomes much more useful when used in a distributed context.

2. Using a Temporary Cluster

The DaskExecutor is capable of creating a temporary cluster using any of
Dask’s cluster-manager options.
This can be useful when you want each flow run to have its own Dask cluster,
allowing for adaptive scaling per-flow.

To configure, you need to provide a cluster_class. This can be either a
string specifying the import path to the cluster class (e.g.
"dask_cloudprovider.aws.FargateCluster"), the cluster class itself, or a
function for creating a custom cluster. You can also configure
cluster_kwargs, which takes a dictionary of keyword arguments to pass to
cluster_class when starting the flow run.

For example, to configure a flow to use a temporary
dask_cloudprovider.aws.FargateCluster with 4 workers running with an image
named my-prefect-image:

flow.executor = DaskExecutor(
    cluster_class="dask_cloudprovider.aws.FargateCluster",
    cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"},
)

Specifying Worker Images

Several Dask cluster managers run using images. It’s important that the images
the dask workers are using have all the Python libraries that are required to
run your flow. You have a few options here:

  • Build a static image that has everything required, and specify the image name
    explicitly in cluster_kwargs. All Prefect flow’s running on dask will need
    at least prefect, dask, and distributed installed - depending on your
    flow you may have additional dependencies.

  • If your flow is running on an agent that also uses images (e.g. Kubernetes,
    Docker, ECS, …), you can also access the main image used for your Flow Run
    at runtime at prefect.context.image. Note that you can’t put this
    directly into cluster_kwargs (since that will resolve at registration
    time not run time) - instead you’ll need to define a custom function for
    creating your cluster. For example:

    import prefect
    from dask_cloudprovider.aws import FargateCluster
    
    def fargate_cluster(n_workers=4):
        """Start a fargate cluster using the same image as the flow run"""
        return FargateCluster(n_workers=n_workers, image=prefect.context.image)
    
    flow.executor = DaskExecutor(
        cluster_class=fargate_cluster,
        cluster_kwargs={"n_workers": 4}
    )
    

For more information on Prefect and Docker images, see here.

Adaptive Scaling

One nice feature of using a DaskExecutor is the ability to scale adaptively
to the workload. Instead of specifying n_workers as a fixed number, this lets
you specify a minimum and maximum number of workers to use, and the dask
cluster will scale up and down as needed.

To do this, you can pass adapt_kwargs to DaskExecutor. This takes the
following fields:

  • maximum (int or None, optional): the maximum number of workers to scale
    to. Set to None for no maximum.
  • minimum (int or None, optional): the minimum number of workers to scale
    to. Set to None for no minimum.

For example, here we configure a flow to run on a FargateCluster scaling up
to at most 10 workers.

flow.executor = DaskExecutor(
    cluster_class="dask_cloudprovider.aws.FargateCluster",
    adapt_kwargs={"maximum": 10}
)

3. Connecting to an Existing Cluster

Multiple Prefect flow runs can all use the same existing Dask cluster. You
might manage a single long-running Dask cluster (maybe using the Helm
Chart
) and
configure flows to connect to it during execution. This has a few downsides
when compared to using a temporary cluster (as described above):

  • All workers in the cluster must have dependencies installed for all flows you
    intend to run. When using a temporary cluster, each workers to run the flow
    it’s associated with.

  • Multiple flow runs may compete for resources. Dask tries to do a good job
    sharing resources between tasks, but you may still run into issues.

  • When cancelling a flow run, any actively running tasks can’t be hard-stopped
    when using a shared Dask cluster - instead the flow runner will stop
    submitting tasks but will let all active tasks run to completion. With a
    temporary cluster the cluster can be shutdown to force-stop any active tasks,
    speeding up cancellation.

That said, you may find managing a single long running cluster simpler (the
choice here is largely preferential). To configure a DaskExecutor to connect
to an existing cluster, pass in the address of the scheduler to the address
argument:

# Connect to an existing cluster running at a specified address
flow.executor = DaskExecutor(address="tcp://...")