What is the difference between a DaskExecutor and a LocalDaskExecutor?

DaskExecutor vs LocalDaskExecutor in general

In general, the main difference between those two is the choice of scheduler. The LocalDaskExecutor is configurable to use either threads or processes as a scheduler. In contrast, the DaskExecutor uses the Dask Distributed scheduler. This way, LocalDaskExecutor can effectively provide parallelism and help with multithreading/multiprocessing out of the box in a very lightweight way, but at the same time, it can only run on a single machine and is limited in features as compared to the Dask Distributed scheduler.

For local testing and small workflows, DaskExecutor introduces unnecessary overhead - it creates a local Dask cluster, whereas LocalDaskExecutor does not create a local cluster. Instead, LocalDaskExecutor relies on a multiprocessing pool using threads by default.

This has some implications on sharing data between tasks:

  • if you use LocalDaskExecutor(scheduler="threads"), you can directly share data between processes spun up in threads (provided they are thread-safe)
  • in contrast, if you use either a DaskExecutor or LocalDaskExecutor(scheduler="processes"), you rely on processes rather than threads and therefore the Dask scheduler has to make copies of the data to pass them to workers which introduces some additional overhead.

DaskExecutor vs LocalDaskExecutor on a local Dask cluster

When DaskExecutor is spun up on a local Dask cluster, this is almost the same as using a LocalDaskExecutor and setting scheduler="processes":

from prefect import Flow
from prefect.executors import LocalDaskExecutor

with Flow("dask_flow", executor=LocalDaskExecutor(scheduler="processes")) as flow:
    pass

# on a local Dask cluster, this is almost equivalent to:
from prefect import Flow
from prefect.executors import DaskExecutor

with Flow("dask_flow", executor=DaskExecutor()) as flow:
    pass

LocalDaskExecutor and how it can be configured

By default the number of threads or processes used is equal to the number of cores available. This can be set explicitly by passing in num_workers .

# Use 8 threads
flow.executor = LocalDaskExecutor(scheduler="threads", num_workers=8)

# Use 8 processes
flow.executor = LocalDaskExecutor(scheduler="processes", num_workers=8)

Selecting a scheduler and num_workers

In some scenarios, a local cluster spun up by the DaskExecutor can be more performant than LocalDaskExecutor(scheduler="processes"), as it can better manage memory resources (managed by Dask distributed scheduler). But generally for most workflows, they are equivalent.

You should use scheduler="threads" if:

  • Your tasks are often IO bound (e.g. API requests, uploading/downloading data, database calls, etc…). Tasks like these can sometimes benefit from having more threads than cores, but usually not more than by a factor of 2-4 (e.g. if you have 4 cores available, set num_workers=16 at most).
  • Your tasks make use of separate processes (e.g. ShellTask).
  • Your tasks make use of libraries like numpy , pandas , or scikit-learn that release the global interpreter lock (GIL). The default value for num_workers is likely sufficient - tasks like these are CPU bound and won’t benefit from multiple threads per core.

You should use scheduler="processes" in most other cases. These tasks are also usually CPU bound, so the default value of num_workers should be sufficient.

Implications of using LocalDaskExecutor vs.DaskExecutor on Kubernetes

  • LocalDaskExecutor = all task runs are executed within the flow run pod
  • DaskExecutor(cluster_class=KubeCluster()) = flow run is executed within the flow run pod, but task runs are executed on a distributed Dask KubeCluster running across many pods and compute nodes

To read more about executors :point_down: