DaskExecutor
vs LocalDaskExecutor
in general
In general, the main difference between those two is the choice of scheduler
. The LocalDaskExecutor
is configurable to use either threads
or processes
as a scheduler. In contrast, the DaskExecutor
uses the Dask Distributed scheduler. This way, LocalDaskExecutor
can effectively provide parallelism and help with multithreading/multiprocessing out of the box in a very lightweight way, but at the same time, it can only run on a single machine and is limited in features as compared to the Dask Distributed scheduler.
For local testing and small workflows, DaskExecutor
introduces unnecessary overhead - it creates a local Dask cluster, whereas LocalDaskExecutor
does not create a local cluster. Instead, LocalDaskExecutor
relies on a multiprocessing pool using threads by default.
This has some implications on sharing data between tasks:
- if you use
LocalDaskExecutor(scheduler="threads")
, you can directly share data between processes spun up in threads (provided they are thread-safe) - in contrast, if you use either a
DaskExecutor
orLocalDaskExecutor(scheduler="processes")
, you rely on processes rather than threads and therefore the Dask scheduler has to make copies of the data to pass them to workers which introduces some additional overhead.
DaskExecutor
vs LocalDaskExecutor
on a local Dask cluster
When DaskExecutor
is spun up on a local Dask cluster, this is almost the same as using a LocalDaskExecutor
and setting scheduler="processes"
:
from prefect import Flow
from prefect.executors import LocalDaskExecutor
with Flow("dask_flow", executor=LocalDaskExecutor(scheduler="processes")) as flow:
pass
# on a local Dask cluster, this is almost equivalent to:
from prefect import Flow
from prefect.executors import DaskExecutor
with Flow("dask_flow", executor=DaskExecutor()) as flow:
pass
LocalDaskExecutor
and how it can be configured
By default the number of threads or processes used is equal to the number of cores available. This can be set explicitly by passing in num_workers
.
# Use 8 threads
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=8)
# Use 8 processes
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)
Selecting a scheduler
and num_workers
In some scenarios, a local cluster spun up by the DaskExecutor
can be more performant than LocalDaskExecutor(scheduler="processes")
, as it can better manage memory resources (managed by Dask distributed scheduler). But generally for most workflows, they are equivalent.
You should use scheduler="threads"
if:
- Your tasks are often IO bound (e.g. API requests, uploading/downloading data, database calls, etc…). Tasks like these can sometimes benefit from having more threads than cores, but usually not more than by a factor of 2-4 (e.g. if you have 4 cores available, set
num_workers=16
at most). - Your tasks make use of separate processes (e.g. ShellTask).
- Your tasks make use of libraries like
numpy
,pandas
, orscikit-learn
that release the global interpreter lock (GIL). The default value fornum_workers
is likely sufficient - tasks like these are CPU bound and won’t benefit from multiple threads per core.
You should use scheduler="processes"
in most other cases. These tasks are also usually CPU bound, so the default value of num_workers
should be sufficient.
Implications of using LocalDaskExecutor
vs.DaskExecutor
on Kubernetes
-
LocalDaskExecutor
= all task runs are executed within the flow run pod -
DaskExecutor(cluster_class=KubeCluster())
= flow run is executed within the flow run pod, but task runs are executed on a distributed DaskKubeCluster
running across many pods and compute nodes