Prefect integrates with
Dask via the task runner interface. The DaskTaskRunner runs Prefect tasks using Dask’s Distributed Scheduler. It can be used locally on a single machine, but it’s most useful when scaling out distributed across multiple nodes. This documentation page provides more details.
from prefect_dask import DaskTaskRunner @flow(task_runner=DaskTaskRunner())
There are two ways how you can leverage Dask with Prefect 1.0.
The first and easiest way to use Dask is to offload the task run execution to a Dask cluster via executor. You can do that by assigning one of Dask executors:
For a detailed comparison between those two, check out this topic:
Also, if you want to learn more about the differences between a temporary vs. static Dask cluster, check out:
You can then attach the executor directly to your Flow object:
from prefect.executors import DaskExecutor with Flow("parallel_task_runs", executor=DaskExecutor()) as flow:
Note that the executor information is not stored in the backend during flow registration. Instead, it is retrieved from Storage at runtime since it may contain sensitive information such as the Dask scheduler address.
Then, once you use mapping, Prefect will automatically parallelize the task run execution across processes, threads, or even across several Dask workers. More on mapping:
The second way to use Dask with Prefect is to combine it with the resource manager abstraction. For more details on that, check out this blog post:
A short code snippet that illustrate this:
# Define a ResourceManager object @resource_manager class DaskCluster: def __init__(self, cluster_type="local", n_workers=None, software=None, account=None, name=None): self.cluster_type = cluster_type self.n_workers = n_workers self.software = software self.account = account self.name = name def setup(self): if self.cluster_type == "local": return Client(processes=False) elif self.cluster_type == "coiled": cluster = coiled.Cluster( name = self.name, software = self.software, n_workers = self.n_workers, account = self.account, ) return Client(cluster) def cleanup(self, client): client.close() if self.cluster_type == "coiled": client.cluster.close() # Build Prefect Flow with Flow(name="Github ETL Test") as flow: # define parameters n_workers = Parameter("n_workers", default=4) software = Parameter("software", default='coiled-examples/prefect') account = Parameter("account", default=None) name = Parameter("name", default='cluster-name') start_date = Parameter("start_date", default="01-01-2015") end_date = Parameter("end_date", default="31-12-2015") # build flow filenames = create_list(start_date=start_date, end_date=end_date) cluster_type = determine_cluster_type(filenames) # use ResourceManager object with DaskCluster( cluster_type=cluster_type, n_workers=n_workers, software=software, account=account, name=name ) as client: push_events = get_github_data(filenames) df = to_dataframe(push_events) to_parquet(df) # Run flow with parameters flow.run( parameters=dict( end_date="02-01-2015", n_workers=15, name="prefect-on-coiled") )