Prefect 2.0
Prefect integrates withDask
via the task runner interface. The DaskTaskRunner runs Prefect tasks using Dask’s Distributed Scheduler. It can be used locally on a single machine, but it’s most useful when scaling out distributed across multiple nodes. This documentation page provides more details.
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner())
Prefect 1.0
There are two ways how you can leverage Dask with Prefect 1.0.
1) Dask executor
The first and easiest way to use Dask is to offload the task run execution to a Dask cluster via executor. You can do that by assigning one of Dask executors:
- LocalDaskExecutor
- DaskExecutor
For a detailed comparison between those two, check out this topic:
Also, if you want to learn more about the differences between a temporary vs. static Dask cluster, check out:
You can then attach the executor directly to your Flow object:
from prefect.executors import DaskExecutor
with Flow("parallel_task_runs", executor=DaskExecutor()) as flow:
Note that the executor information is not stored in the backend during flow registration. Instead, it is retrieved from Storage at runtime since it may contain sensitive information such as the Dask scheduler address.
Then, once you use mapping, Prefect will automatically parallelize the task run execution across processes, threads, or even across several Dask workers. More on mapping:
2) Resource manager with Dask cluster client
The second way to use Dask with Prefect is to combine it with the resource manager abstraction. For more details on that, check out this blog post:
A short code snippet that illustrate this:
# Define a ResourceManager object
@resource_manager
class DaskCluster:
def __init__(self, cluster_type="local", n_workers=None, software=None, account=None, name=None):
self.cluster_type = cluster_type
self.n_workers = n_workers
self.software = software
self.account = account
self.name = name
def setup(self):
if self.cluster_type == "local":
return Client(processes=False)
elif self.cluster_type == "coiled":
cluster = coiled.Cluster(
name = self.name,
software = self.software,
n_workers = self.n_workers,
account = self.account,
)
return Client(cluster)
def cleanup(self, client):
client.close()
if self.cluster_type == "coiled":
client.cluster.close()
# Build Prefect Flow
with Flow(name="Github ETL Test") as flow:
# define parameters
n_workers = Parameter("n_workers", default=4)
software = Parameter("software", default='coiled-examples/prefect')
account = Parameter("account", default=None)
name = Parameter("name", default='cluster-name')
start_date = Parameter("start_date", default="01-01-2015")
end_date = Parameter("end_date", default="31-12-2015")
# build flow
filenames = create_list(start_date=start_date, end_date=end_date)
cluster_type = determine_cluster_type(filenames)
# use ResourceManager object
with DaskCluster(
cluster_type=cluster_type,
n_workers=n_workers,
software=software,
account=account,
name=name
) as client:
push_events = get_github_data(filenames)
df = to_dataframe(push_events)
to_parquet(df)
# Run flow with parameters
flow.run(
parameters=dict(
end_date="02-01-2015",
n_workers=15,
name="prefect-on-coiled")
)