In general, the main difference between those two is the choice of
LocalDaskExecutor is configurable to use either
processes as a scheduler. In contrast, the
DaskExecutor uses the Dask Distributed scheduler. This way,
LocalDaskExecutor can effectively provide parallelism and help with multithreading/multiprocessing out of the box in a very lightweight way, but at the same time, it can only run on a single machine and is limited in features as compared to the Dask Distributed scheduler.
For local testing and small workflows,
DaskExecutor introduces unnecessary overhead - it creates a local Dask cluster, whereas
LocalDaskExecutor does not create a local cluster. Instead,
LocalDaskExecutor relies on a multiprocessing pool using threads by default.
This has some implications on sharing data between tasks:
- if you use
LocalDaskExecutor(scheduler="threads"), you can directly share data between processes spun up in threads (provided they are thread-safe)
- in contrast, if you use either a
LocalDaskExecutor(scheduler="processes"), you rely on processes rather than threads and therefore the Dask scheduler has to make copies of the data to pass them to workers which introduces some additional overhead.
DaskExecutor is spun up on a local Dask cluster, this is almost the same as using a
LocalDaskExecutor and setting
from prefect import Flow from prefect.executors import LocalDaskExecutor with Flow("dask_flow", executor=LocalDaskExecutor(scheduler="processes")) as flow: pass # on a local Dask cluster, this is almost equivalent to: from prefect import Flow from prefect.executors import DaskExecutor with Flow("dask_flow", executor=DaskExecutor()) as flow: pass
By default the number of threads or processes used is equal to the number of cores available. This can be set explicitly by passing in
# Use 8 threads flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=8) # Use 8 processes flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)
In some scenarios, a local cluster spun up by the
DaskExecutor can be more performant than
LocalDaskExecutor(scheduler="processes"), as it can better manage memory resources (managed by Dask distributed scheduler). But generally for most workflows, they are equivalent.
You should use
- Your tasks are often IO bound (e.g. API requests, uploading/downloading data, database calls, etc…). Tasks like these can sometimes benefit from having more threads than cores, but usually not more than by a factor of 2-4 (e.g. if you have 4 cores available, set
- Your tasks make use of separate processes (e.g. ShellTask).
- Your tasks make use of libraries like
scikit-learnthat release the global interpreter lock (GIL). The default value for
num_workersis likely sufficient - tasks like these are CPU bound and won’t benefit from multiple threads per core.
You should use
scheduler="processes" in most other cases. These tasks are also usually CPU bound, so the default value of
num_workers should be sufficient.
LocalDaskExecutor= all task runs are executed within the flow run pod
DaskExecutor(cluster_class=KubeCluster())= flow run is executed within the flow run pod, but task runs are executed on a distributed Dask
KubeClusterrunning across many pods and compute nodes