Prefect 2.0
Orion allows your tasks to run in parallel, as long as you attach a DaskTaskRunner
. Under the hood, it deploys a local Dask cluster and parallelizes work across local threads and processes.
from prefect_dask import DaskTaskRunner
@flow(task_runner=DaskTaskRunner())
Similarly, when you attach a RayTaskRunner()
, a local Ray cluster will be created:
from prefect_ray import RayTaskRunner
@flow(task_runner=RayTaskRunner())
Lastly, when you don’t attach any task runner explicitly, Prefect 2.0 uses a ConcurrentTaskRunner()
by default, running your tasks concurrently .
Prefect 1.0
LocalDaskExecutor
parallelizes task run execution across local threads and processes.
from prefect.executors import LocalDaskExecutor
with Flow("parallel_task_runs", executor=LocalDaskExecutor()) as flow:
Important note!
Prefect can only parallelize actual tasks decorated with @task
. Given that Prefect 2.0 allows you to run arbitrary Python code in your flows, this distinction is important. Check this topic for more details:
I think I discovered the issue; if the function is not decorated with @task, it will run in serial!
The Proof
WITH the decorator however, it will properly utilize the processes.
With Dask, I checked htop because updating the logging formatter is a bit tedious (notice CPU at >100%).
[image]
With Ray, the output automatically shows the unique PIDs.
(begin_task_run pid=93880) 2013-01-01T12:00:00.000000000
(begin_task_run pid=93881) 2013-01-01T00:00:00.000000000
(begin_task_run pid=93878) 201…
and this Github issue:
opened 03:22AM - 11 Apr 22 UTC
closed 06:02PM - 13 Apr 23 UTC
docs
status:stale
**Update**: the function has to be decorated with `@task` for it to run automati… cally in parallel
## Description
I see that four workers have been spawned, but the CPU utilization is negligible.
![image](https://user-images.githubusercontent.com/15331990/162659673-b9c68865-68c0-4191-b957-45fcea1e31ea.png)
Using native dask, I was able to drop the serial version runtime from 2 min 39 seconds down to 46 seconds, but there was no speed up using DaskTaskRunner; full benchmark here.
https://discourse.prefect.io/t/why-is-dasktaskrunner-slower-than-dask-compute/729/8?u=ahuang11
## Expected Behavior
100% CPU utilization on every process spawned.
## Reproduction
```
import dask
import xarray as xr
import matplotlib.pyplot as plt
from cartopy import crs as ccrs
from cartopy import feature as cfeature
import cartopy.crs as ccrs
import matplotlib.textpath
import matplotlib.patches
from matplotlib.font_manager import FontProperties
import numpy as np
from prefect import flow, task, get_run_logger
from prefect.task_runners import DaskTaskRunner
def plot(ds, time):
plt.figure()
ax = plt.axes(projection=ccrs.Orthographic())
ax.add_feature(cfeature.LAKES.with_scale("10m"))
ax.add_feature(cfeature.OCEAN.with_scale("10m"))
ax.add_feature(cfeature.LAND.with_scale("10m"))
ax.add_feature(cfeature.STATES.with_scale("10m"))
ax.add_feature(cfeature.COASTLINE.with_scale("10m"))
ax.pcolormesh(ds["lon"], ds["lat"], ds["air"], transform=ccrs.PlateCarree())
# generate a matplotlib path representing the word "cartopy"
fp = FontProperties(family='Bitstream Vera Sans', weight='bold')
logo_path = matplotlib.textpath.TextPath((-175, -35), 'cartopy',
size=1, prop=fp)
# add a background image
im = ax.stock_img()
# clip the image according to the logo_path. mpl v1.2.0 does not support
# the transform API that cartopy makes use of, so we have to convert the
# projection into a transform manually
plate_carree_transform = ccrs.PlateCarree()._as_mpl_transform(ax)
im.set_clip_path(logo_path, transform=plate_carree_transform)
# add the path as a patch, drawing black outlines around the text
patch = matplotlib.patches.PathPatch(logo_path,
facecolor='none', edgecolor='black',
transform=ccrs.PlateCarree())
ax.add_patch(patch)
ax.set_global()
plt.savefig(str(time)[:16])
plt.close()
ds = xr.tutorial.open_dataset('air_temperature').isel(
time=slice(0, 100))
@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 4, "processes": True}))
def process():
for time in ds['time'].values:
plot(ds.sel(time=time), time)
process()
```
## Environment
prefect>=2.0
2 Likes