This is just a copy of this documentation page - copied to make it easier to read about the temporary vs. static Dask cluster differences in one place:
The DaskExecutor runs Prefect tasks using Dask’s Distributed Scheduler. It can be used locally on a single machine (much like the LocalDaskExecutor
above), but is most useful when scaling out distributed across multiple nodes.
Prefect’s DaskExecutor
has 3 operating modes:
1. Using a Local Cluster
By default, when you use a DaskExecutor
it creates a temporary local Dask
cluster.
from prefect.executors import DaskExecutor
# By default this will use a temporary local Dask cluster
flow.executor = DaskExecutor()
The number of workers used is based on the number of cores on your machine. The
default should provide a mix of processes and threads that should work well for
most workloads. If you want to specify this explicitly, you can pass
n_workers
or threads_per_worker
to cluster_kwargs
.
# Use 4 worker processes, each with 2 threads
flow.executor = DaskExecutor(
cluster_kwargs={"n_workers": 4, "threads_per_worker": 2}
)
Using a DaskExecutor
with a local cluster is very similar to using a
LocalDaskExecutor
with processes=True
. You may find it more performant in
certain situations (this scheduler does a better job about managing memory),
but generally they should perform equivalently for most Prefect workflows. The
DaskExecutor
becomes much more useful when used in a distributed context.
2. Using a Temporary Cluster
The DaskExecutor
is capable of creating a temporary cluster using any of
Dask’s cluster-manager options.
This can be useful when you want each flow run to have its own Dask cluster,
allowing for adaptive scaling per-flow.
To configure, you need to provide a cluster_class
. This can be either a
string specifying the import path to the cluster class (e.g.
"dask_cloudprovider.aws.FargateCluster"
), the cluster class itself, or a
function for creating a custom cluster. You can also configure
cluster_kwargs
, which takes a dictionary of keyword arguments to pass to
cluster_class
when starting the flow run.
For example, to configure a flow to use a temporary
dask_cloudprovider.aws.FargateCluster
with 4 workers running with an image
named my-prefect-image
:
flow.executor = DaskExecutor(
cluster_class="dask_cloudprovider.aws.FargateCluster",
cluster_kwargs={"n_workers": 4, "image": "my-prefect-image"},
)
Specifying Worker Images
Several Dask cluster managers run using images. It’s important that the images
the dask workers are using have all the Python libraries that are required to
run your flow. You have a few options here:
-
Build a static image that has everything required, and specify the image name
explicitly incluster_kwargs
. All Prefect flow’s running on dask will need
at leastprefect
,dask
, anddistributed
installed - depending on your
flow you may have additional dependencies. -
If your flow is running on an agent that also uses images (e.g. Kubernetes,
Docker, ECS, …), you can also access the main image used for your Flow Run
at runtime atprefect.context.image
. Note that you can’t put this
directly intocluster_kwargs
(since that will resolve at registration
time not run time) - instead you’ll need to define a custom function for
creating your cluster. For example:import prefect from dask_cloudprovider.aws import FargateCluster def fargate_cluster(n_workers=4): """Start a fargate cluster using the same image as the flow run""" return FargateCluster(n_workers=n_workers, image=prefect.context.image) flow.executor = DaskExecutor( cluster_class=fargate_cluster, cluster_kwargs={"n_workers": 4} )
For more information on Prefect and Docker images, see here.
Adaptive Scaling
One nice feature of using a DaskExecutor
is the ability to scale adaptively
to the workload. Instead of specifying n_workers
as a fixed number, this lets
you specify a minimum and maximum number of workers to use, and the dask
cluster will scale up and down as needed.
To do this, you can pass adapt_kwargs
to DaskExecutor
. This takes the
following fields:
-
maximum
(int
orNone
, optional): the maximum number of workers to scale
to. Set toNone
for no maximum. -
minimum
(int
orNone
, optional): the minimum number of workers to scale
to. Set toNone
for no minimum.
For example, here we configure a flow to run on a FargateCluster
scaling up
to at most 10 workers.
flow.executor = DaskExecutor(
cluster_class="dask_cloudprovider.aws.FargateCluster",
adapt_kwargs={"maximum": 10}
)
3. Connecting to an Existing Cluster
Multiple Prefect flow runs can all use the same existing Dask cluster. You
might manage a single long-running Dask cluster (maybe using the Helm
Chart) and
configure flows to connect to it during execution. This has a few downsides
when compared to using a temporary cluster (as described above):
-
All workers in the cluster must have dependencies installed for all flows you
intend to run. When using a temporary cluster, each workers to run the flow
it’s associated with. -
Multiple flow runs may compete for resources. Dask tries to do a good job
sharing resources between tasks, but you may still run into issues. -
When cancelling a flow run, any actively running tasks can’t be hard-stopped
when using a shared Dask cluster - instead the flow runner will stop
submitting tasks but will let all active tasks run to completion. With a
temporary cluster the cluster can be shutdown to force-stop any active tasks,
speeding up cancellation.
That said, you may find managing a single long running cluster simpler (the
choice here is largely preferential). To configure a DaskExecutor
to connect
to an existing cluster, pass in the address of the scheduler to the address
argument:
# Connect to an existing cluster running at a specified address
flow.executor = DaskExecutor(address="tcp://...")