How can I configure my flow to run with Dask?

Prefect 2.0

Prefect integrates withDask via the task runner interface. The DaskTaskRunner runs Prefect tasks using Dask’s Distributed Scheduler. It can be used locally on a single machine, but it’s most useful when scaling out distributed across multiple nodes. This documentation page provides more details.

from prefect_dask import DaskTaskRunner

@flow(task_runner=DaskTaskRunner())

Prefect 1.0

There are two ways how you can leverage Dask with Prefect 1.0.

1) Dask executor

The first and easiest way to use Dask is to offload the task run execution to a Dask cluster via executor. You can do that by assigning one of Dask executors:

  • LocalDaskExecutor
  • DaskExecutor

For a detailed comparison between those two, check out this topic:

Also, if you want to learn more about the differences between a temporary vs. static Dask cluster, check out:

You can then attach the executor directly to your Flow object:

from prefect.executors import DaskExecutor

with Flow("parallel_task_runs", executor=DaskExecutor()) as flow:

Note that the executor information is not stored in the backend during flow registration. Instead, it is retrieved from Storage at runtime since it may contain sensitive information such as the Dask scheduler address.

Then, once you use mapping, Prefect will automatically parallelize the task run execution across processes, threads, or even across several Dask workers. More on mapping:

2) Resource manager with Dask cluster client

The second way to use Dask with Prefect is to combine it with the resource manager abstraction. For more details on that, check out this blog post:

A short code snippet that illustrate this:

# Define a ResourceManager object
@resource_manager
class DaskCluster:
   def __init__(self, cluster_type="local", n_workers=None, software=None, account=None, name=None):
       self.cluster_type = cluster_type
       self.n_workers = n_workers
       self.software = software
       self.account = account
       self.name = name

   def setup(self):
       if self.cluster_type == "local":
           return Client(processes=False)
       elif self.cluster_type == "coiled":
           cluster = coiled.Cluster(
               name = self.name,
               software = self.software,
               n_workers = self.n_workers,
               account = self.account,
           )
           return Client(cluster)
  
   def cleanup(self, client):
       client.close()
       if self.cluster_type == "coiled":
           client.cluster.close()

# Build Prefect Flow
with Flow(name="Github ETL Test") as flow:
   # define parameters
   n_workers = Parameter("n_workers", default=4)
   software = Parameter("software", default='coiled-examples/prefect')
   account = Parameter("account", default=None)
   name = Parameter("name", default='cluster-name')
   start_date = Parameter("start_date", default="01-01-2015")
   end_date = Parameter("end_date", default="31-12-2015")

   # build flow
   filenames = create_list(start_date=start_date, end_date=end_date)
   cluster_type = determine_cluster_type(filenames)

   
   # use ResourceManager object
   with DaskCluster(
           cluster_type=cluster_type,
           n_workers=n_workers,
           software=software,
           account=account,
           name=name
           ) as client:
       push_events = get_github_data(filenames)
       df = to_dataframe(push_events)
       to_parquet(df)

# Run flow with parameters
flow.run(
   parameters=dict(
       end_date="02-01-2015",
       n_workers=15,
       name="prefect-on-coiled")
)

Additionally, if you want to use Dask dataframes in your flow, check out this issue:

:point_right: Note to anyone using DaskTaskRunner or RayTaskRunner:

from prefect version 2.0b8 onwards, those task runners were moved to the respective Prefect Collections for better code dependency management (the core library no longer requires dask or ray as dependencies - now, those can be installed sepataely when needed).

The correct imports are now:

from prefect_dask import DaskTaskRunner
from prefect_ray import RayTaskRunner