Why is my function not running in parallel even with DaskTaskRunner / RayTaskRunner?

TLDR: The function must be wrapped with a @task decorator for it to run in parallel.

Serial finishes in 14.6s on my Macbook.

import xarray as xr
import matplotlib.pyplot as plt

def plot(ds, time):
    plt.figure()
    ds['air'].plot()
    plt.savefig(str(time)[:16])
    plt.close()

def process():
    ds = xr.tutorial.open_dataset('air_temperature').isel(
        time=slice(0, 250))
    [plot(ds.sel(time=time), time) for time in ds['time'].values]

process()

Dask with four workers (processes) finishes in 7.1 seconds

import dask
import xarray as xr
import matplotlib.pyplot as plt

@dask.delayed()
def plot(ds, time):
    plt.figure()
    ds['air'].plot()
    plt.savefig(str(time)[:16])
    plt.close()

def dask_process():
    ds = xr.tutorial.open_dataset('air_temperature').isel(
        time=slice(0, 250))
    dask.compute([
        plot(ds.sel(time=time), time) for time in ds['time'].values],
        scheduler="processes", num_workers=4
    )

dask_process()

Prefect with four workers in DaskTaskRunner finishes in 18.4 seconds.

import xarray as xr
import matplotlib.pyplot as plt
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner

def plot(ds, time):
    plt.figure()
    ds['air'].plot()
    plt.savefig(str(time)[:16])
    plt.close()


@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 4}))
def prefect_process():
    ds = xr.tutorial.open_dataset('air_temperature').isel(
        time=slice(0, 250))
    [plot(ds.sel(time=time), time) for time in ds['time'].values]

prefect_process()

I suspect I am just using threads rather than processes for the DaskTaskRunner, but I don’t know how to specify the scheduler here.

With the default ConcurrentTaskRunner, it finishes in 15.4s.

import xarray as xr
import matplotlib.pyplot as plt
from prefect import flow, task

def plot(ds, time):
    plt.figure()
    ds['air'].plot()
    plt.savefig(str(time)[:16])
    plt.close()


@flow()
def prefect_process():
    ds = xr.tutorial.open_dataset('air_temperature').isel(
        time=slice(0, 250))
    [plot(ds.sel(time=time), time) for time in ds['time'].values]


prefect_process()

Interestingly, if I wrap @task on plot(), the run time doubles to 29.6 seconds; I suspect it’s because I am spamming the logs?

import xarray as xr
import matplotlib.pyplot as plt
from prefect import flow, task

@task
def plot(ds, time):
    plt.figure()
    ds['air'].plot()
    plt.savefig(str(time)[:16])
    plt.close()


@flow()
def prefect_process():
    ds = xr.tutorial.open_dataset('air_temperature').isel(
        time=slice(0, 250))
    [plot(ds.sel(time=time), time) for time in ds['time'].values]


prefect_process()
1 Like

Here is how you could ensure to use threads rather than processes:

@flow(name="dask-threads-flow", task_runner=DaskTaskRunner(cluster_kwargs=dict(processes=False)))

I’m not 100% sure on that but I also think you may use annotations for that:

import dask
from prefect import flow
from prefect.task_runners import DaskTaskRunner

@flow(task_runner=DaskTaskRunner())
def my_flow():
    with dask.annotate(resources={'thread': 8}):

Nice work benchmarking performance! We definitely need more examples about when to use Dask vs. Async vs. Ray vs. configuring various options here :+1:

1 Like

Actually, I don’t want threads; only separate processes. However, when I do this, it only spawns threads.

import dask
import xarray as xr
import matplotlib.pyplot as plt
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner


@task
def plot(ds, time):
    plt.figure()
    ds['air'].plot()
    plt.savefig(str(time)[:16])
    plt.close()



@flow(task_runner=DaskTaskRunner(cluster_kwargs=dict(n_workers=2, processes=True)))
def prefect_process():
    ds = xr.tutorial.open_dataset('air_temperature').isel(
        time=slice(0, 5))
    with dask.annotate(resources={'process': 2}):
        [plot(ds.sel(time=time), time) for time in ds['time'].values]


prefect_process()

I added %(process) to the formatter:

export PREFECT_LOGGING_FORMATTERS_FLOW_RUNS_FORMAT="%(asctime)s.%(msecs)03d | %(levelname)-7s | %(flow_run_id)s - %(process)s - %(message)s"

And the output shows only: 28535 meaning only one process was spawned.

19:12:04.013 | INFO    | cdb3ce4c-28fb-4a3f-bb5b-41ca86e4f625 - 28535 - Created task run 'plot-f5265e9c-0' for task 'plot'
19:12:04.037 | INFO    | cdb3ce4c-28fb-4a3f-bb5b-41ca86e4f625 - 28535 - Created task run 'plot-f5265e9c-1' for task 'plot'
19:12:04.055 | INFO    | cdb3ce4c-28fb-4a3f-bb5b-41ca86e4f625 - 28535 - Created task run 'plot-f5265e9c-2' for task 'plot'
19:12:04.071 | INFO    | cdb3ce4c-28fb-4a3f-bb5b-41ca86e4f625 - 28535 - Created task run 'plot-f5265e9c-3' for task 'plot'
19:12:04.087 | INFO    | cdb3ce4c-28fb-4a3f-bb5b-41ca86e4f625 - 28535 - Created task run 'plot-f5265e9c-4' for task 'plot'

Also, I wonder if there’s a way to set the formatter within Python rather than having to set environment variables / edit yaml file, i.e. get_logger("task").setFormatter(fmt).

1 Like

Interestingly, it also uses just one thread even though I separate out of the list comprehension:
export PREFECT_LOGGING_FORMATTERS_FLOW_RUNS_FORMAT="%(asctime)s.%(msecs)03d | %(levelname)-7s | %(flow_run_id)s - %(process)s - %(thread)s - %(message)s"

import dask
import xarray as xr
import matplotlib.pyplot as plt
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner


@task
def plot(ds, time):
    plt.figure()
    ds['air'].plot()
    plt.savefig(str(time)[:16])
    plt.close()



@flow(task_runner=DaskTaskRunner(cluster_kwargs=dict(n_workers=2, processes=True)))
def prefect_process():
    ds = xr.tutorial.open_dataset('air_temperature').isel(
        time=slice(0, 5))
    with dask.annotate(resources={'process': 2}):
        for time in ds['time'].values:
            plot(ds.sel(time=time), time)
prefect_process()
19:19:40.909 | INFO    | prefect.engine - Created flow run 'quantum-binturong' for flow 'prefect-process'
19:19:40.909 | INFO    | b5674519-a998-4e70-9ab0-ef0ec0b12c50 - 29158 - 4374447488 - Using task runner 'DaskTaskRunner'
19:19:40.909 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
19:19:41.427 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status
19:19:41.489 | INFO    | b5674519-a998-4e70-9ab0-ef0ec0b12c50 - 29158 - 4374447488 - Created task run 'plot-f5265e9c-0' for task 'plot'
19:19:41.595 | INFO    | b5674519-a998-4e70-9ab0-ef0ec0b12c50 - 29158 - 4374447488 - Created task run 'plot-f5265e9c-1' for task 'plot'
19:19:41.609 | INFO    | b5674519-a998-4e70-9ab0-ef0ec0b12c50 - 29158 - 4374447488 - Created task run 'plot-f5265e9c-2' for task 'plot'
19:19:41.624 | INFO    | b5674519-a998-4e70-9ab0-ef0ec0b12c50 - 29158 - 4374447488 - Created task run 'plot-f5265e9c-3' for task 'plot'
19:19:41.639 | INFO    | b5674519-a998-4e70-9ab0-ef0ec0b12c50 - 29158 - 4374447488 - Created task run 'plot-f5265e9c-4' for task 'plot'
1 Like

Log formatting

I tried that but the get_run_logger() seems to return PrefectLogAdapter rather than logging.Logger, and you can’t add a handler to it.

I was trying to do:

@task
def test_task_run_logger():
    logger = get_run_logger()
    # https://github.com/PrefectHQ/prefect/blob/orion/src/prefect/logging/logging.yml#L53
    stdout_handler = logging.StreamHandler(sys.stdout)
    formatter = logging.Formatter(
        '"%(asctime)s.%(msecs)03d | %(levelname)-7s | Task run %(task_run_name)r - %(process)s - %(thread)s - %(message)s"',
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    stdout_handler.setFormatter(formatter)
    logger.addHandler(stdout_handler)

it returns:

AttributeError: 'PrefectLogAdapter' object has no attribute 'addHandler'

So you would need an extra logger for this to set custom formatting from the Python client. You can do:

prefect config set PREFECT_LOGGING_EXTRA_LOGGERS='your_custom_logger'

Then, you can configure this logger with the format you like and use this logger in your flow. I haven’t tested that yet, but this should work in theory.

DaskTaskRunner

Regarding the DaskTaskRunner using the same thread and process, you were looking at the thread and process information related to the flow run process/thread rather than the task run.

To inspect thread and process for a task run, set this env variable:

export PREFECT_LOGGING_FORMATTERS_TASK_RUNS_FORMAT="%(asctime)s.%(msecs)03d | %(levelname)-7s | Task run %(task_run_name)r - %(process)s - %(thread)s - %(message)s"

Combining the above format for task runs with the default flow run format:

export PREFECT_LOGGING_FORMATTERS_FLOW_RUNS_FORMAT="%(asctime)s.%(msecs)03d | %(levelname)-7s | Flow run %(flow_run_name)r - %(message)s"

Should give you this type of output:

13:23:36.564 | INFO    | prefect.engine - Created flow run 'strange-shrew' for flow 'dask-flow'
13:23:36.565 | INFO    | Flow run 'strange-shrew' - Using task runner 'DaskTaskRunner'
13:23:37.099 | INFO    | prefect.task_runner.dask - Creating a new Dask cluster with `distributed.deploy.local.LocalCluster`
13:23:40.759 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status
13:23:41.097 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-0' for task 'compute_something'
13:23:41.562 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-1' for task 'compute_something'
13:23:41.775 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-2' for task 'compute_something'
13:23:41.792 | INFO    | Task run 'compute_something-86d821e8-0' - 26197 - 123145434738688 - Computing: 0 x 2 = 0
13:23:41.954 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-3' for task 'compute_something'
13:23:41.983 | INFO    | Task run 'compute_something-86d821e8-1' - 26198 - 123145526616064 - Computing: 1 x 2 = 2
13:23:42.127 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-4' for task 'compute_something'
13:23:42.189 | INFO    | Task run 'compute_something-86d821e8-2' - 26199 - 123145679376384 - Computing: 2 x 2 = 4
13:23:42.289 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-5' for task 'compute_something'
13:23:42.411 | INFO    | Task run 'compute_something-86d821e8-3' - 26200 - 123145602711552 - Computing: 3 x 2 = 6
13:23:42.454 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-6' for task 'compute_something'
13:23:42.505 | INFO    | Task run 'compute_something-86d821e8-4' - 26197 - 123145468317696 - Computing: 4 x 2 = 8
13:23:42.624 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-7' for task 'compute_something'
13:23:42.656 | INFO    | Task run 'compute_something-86d821e8-5' - 26198 - 123145560195072 - Computing: 5 x 2 = 10
13:23:42.878 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-8' for task 'compute_something'
13:23:42.896 | INFO    | Task run 'compute_something-86d821e8-6' - 26199 - 123145712955392 - Computing: 6 x 2 = 12
13:23:43.000 | INFO    | Task run 'compute_something-86d821e8-7' - 26200 - 123145636290560 - Computing: 7 x 2 = 14
13:23:43.064 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-9' for task 'compute_something'
13:23:43.242 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-10' for task 'compute_something'
13:23:43.405 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-11' for task 'compute_something'
13:23:43.575 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-12' for task 'compute_something'
13:23:43.817 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-13' for task 'compute_something'
13:23:43.981 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-14' for task 'compute_something'
13:23:44.226 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-15' for task 'compute_something'
13:23:44.399 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-16' for task 'compute_something'
13:23:44.583 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-17' for task 'compute_something'
13:23:44.745 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-18' for task 'compute_something'
13:23:44.918 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-19' for task 'compute_something'
13:23:44.930 | INFO    | Task run 'compute_something-86d821e8-0' - 26197 - 4558319104 - Finished in state Completed(None)
13:23:45.008 | INFO    | Task run 'compute_something-86d821e8-1' - 26198 - 4663889408 - Finished in state Completed(None)
13:23:45.106 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-20' for task 'compute_something'
13:23:45.206 | INFO    | Task run 'compute_something-86d821e8-2' - 26199 - 4419104256 - Finished in state Completed(None)
13:23:45.281 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-21' for task 'compute_something'
13:23:45.370 | INFO    | Task run 'compute_something-86d821e8-19' - 26197 - 123145468317696 - Computing: 19 x 2 = 38
13:23:45.378 | INFO    | Task run 'compute_something-86d821e8-12' - 26198 - 123145560195072 - Computing: 12 x 2 = 24
13:23:45.444 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-22' for task 'compute_something'
13:23:45.460 | INFO    | Task run 'compute_something-86d821e8-4' - 26197 - 4558319104 - Finished in state Completed(None)
13:23:45.460 | INFO    | Task run 'compute_something-86d821e8-3' - 26200 - 4604902912 - Finished in state Completed(None)
13:23:45.561 | INFO    | Task run 'compute_something-86d821e8-11' - 26199 - 123145712955392 - Computing: 11 x 2 = 22
13:23:45.614 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-23' for task 'compute_something'
13:23:45.693 | INFO    | Task run 'compute_something-86d821e8-5' - 26198 - 4663889408 - Finished in state Completed(None)
13:23:45.777 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-24' for task 'compute_something'
13:23:45.852 | INFO    | Task run 'compute_something-86d821e8-9' - 26200 - 123145636290560 - Computing: 9 x 2 = 18
13:23:45.860 | INFO    | Task run 'compute_something-86d821e8-6' - 26199 - 4419104256 - Finished in state Completed(None)
13:23:45.866 | INFO    | Task run 'compute_something-86d821e8-8' - 26197 - 123145501896704 - Computing: 8 x 2 = 16
13:23:45.943 | INFO    | Task run 'compute_something-86d821e8-7' - 26200 - 4604902912 - Finished in state Completed(None)
13:23:45.944 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-25' for task 'compute_something'
13:23:46.074 | INFO    | Task run 'compute_something-86d821e8-17' - 26198 - 123145526616064 - Computing: 17 x 2 = 34
13:23:46.182 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-26' for task 'compute_something'
13:23:46.218 | INFO    | Task run 'compute_something-86d821e8-13' - 26199 - 123145679376384 - Computing: 13 x 2 = 26
13:23:46.313 | INFO    | Task run 'compute_something-86d821e8-10' - 26200 - 123145669869568 - Computing: 10 x 2 = 20
13:23:46.362 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-27' for task 'compute_something'
13:23:46.542 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-28' for task 'compute_something'
13:23:46.708 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-29' for task 'compute_something'
13:23:47.136 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-30' for task 'compute_something'
13:23:47.301 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-31' for task 'compute_something'
13:23:47.473 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-32' for task 'compute_something'
13:23:47.643 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-33' for task 'compute_something'
13:23:47.812 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-34' for task 'compute_something'
13:23:47.985 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-35' for task 'compute_something'
13:23:48.159 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-36' for task 'compute_something'
13:23:48.328 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-37' for task 'compute_something'
13:23:48.403 | INFO    | Task run 'compute_something-86d821e8-12' - 26198 - 4663889408 - Finished in state Completed(None)
13:23:48.428 | INFO    | Task run 'compute_something-86d821e8-19' - 26197 - 4558319104 - Finished in state Completed(None)
13:23:48.611 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-38' for task 'compute_something'
13:23:48.613 | INFO    | Task run 'compute_something-86d821e8-11' - 26199 - 4419104256 - Finished in state Completed(None)
13:23:48.777 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-39' for task 'compute_something'
13:23:48.779 | INFO    | Task run 'compute_something-86d821e8-20' - 26198 - 123145610563584 - Computing: 20 x 2 = 40
13:23:48.797 | INFO    | Task run 'compute_something-86d821e8-15' - 26197 - 123145501896704 - Computing: 15 x 2 = 30
13:23:48.805 | INFO    | Task run 'compute_something-86d821e8-8' - 26197 - 4558319104 - Finished in state Completed(None)
13:23:48.897 | INFO    | Task run 'compute_something-86d821e8-9' - 26200 - 4604902912 - Finished in state Completed(None)
13:23:48.942 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-40' for task 'compute_something'
13:23:48.977 | INFO    | Task run 'compute_something-86d821e8-16' - 26199 - 123145763323904 - Computing: 16 x 2 = 32
13:23:49.007 | INFO    | Task run 'compute_something-86d821e8-17' - 26198 - 4663889408 - Finished in state Completed(None)
13:23:49.114 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-41' for task 'compute_something'
13:23:49.253 | INFO    | Task run 'compute_something-86d821e8-21' - 26197 - 123145535475712 - Computing: 21 x 2 = 42
13:23:49.258 | INFO    | Task run 'compute_something-86d821e8-13' - 26199 - 4419104256 - Finished in state Completed(None)
13:23:49.258 | INFO    | Task run 'compute_something-86d821e8-10' - 26200 - 4604902912 - Finished in state Completed(None)
13:23:49.276 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-42' for task 'compute_something'
13:23:49.288 | INFO    | Task run 'compute_something-86d821e8-14' - 26200 - 123145636290560 - Computing: 14 x 2 = 28
13:23:49.362 | INFO    | Task run 'compute_something-86d821e8-22' - 26198 - 123145526616064 - Computing: 22 x 2 = 44
13:23:49.438 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-43' for task 'compute_something'
13:23:49.605 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-44' for task 'compute_something'
13:23:49.641 | INFO    | Task run 'compute_something-86d821e8-23' - 26199 - 123145679376384 - Computing: 23 x 2 = 46
13:23:49.645 | INFO    | Task run 'compute_something-86d821e8-18' - 26200 - 123145703448576 - Computing: 18 x 2 = 36
13:23:49.792 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-45' for task 'compute_something'
13:23:49.960 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-46' for task 'compute_something'
13:23:50.135 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-47' for task 'compute_something'
13:23:50.319 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-48' for task 'compute_something'
13:23:50.509 | INFO    | Flow run 'strange-shrew' - Created task run 'compute_something-86d821e8-49' for task 'compute_something'
13:23:51.731 | INFO    | Task run 'compute_something-86d821e8-20' - 26198 - 4663889408 - Finished in state Completed(None)
13:23:51.738 | INFO    | Task run 'compute_something-86d821e8-15' - 26197 - 4558319104 - Finished in state Completed(None)
13:23:51.982 | INFO    | Task run 'compute_something-86d821e8-16' - 26199 - 4419104256 - Finished in state Completed(None)
13:23:52.103 | INFO    | Task run 'compute_something-86d821e8-24' - 26197 - 123145535475712 - Computing: 24 x 2 = 48
13:23:52.109 | INFO    | Task run 'compute_something-86d821e8-25' - 26198 - 123145610563584 - Computing: 25 x 2 = 50
13:23:52.179 | INFO    | Task run 'compute_something-86d821e8-21' - 26197 - 4558319104 - Finished in state Completed(None)
13:23:52.231 | INFO    | Task run 'compute_something-86d821e8-14' - 26200 - 4604902912 - Finished in state Completed(None)
13:23:52.320 | INFO    | Task run 'compute_something-86d821e8-22' - 26198 - 4663889408 - Finished in state Completed(None)
13:23:52.337 | INFO    | Task run 'compute_something-86d821e8-26' - 26199 - 123145679376384 - Computing: 26 x 2 = 52
13:23:52.537 | INFO    | Task run 'compute_something-86d821e8-28' - 26197 - 123145569054720 - Computing: 28 x 2 = 56
13:23:52.575 | INFO    | Task run 'compute_something-86d821e8-18' - 26200 - 4604902912 - Finished in state Completed(None)
13:23:52.605 | INFO    | Task run 'compute_something-86d821e8-27' - 26200 - 123145636290560 - Computing: 27 x 2 = 54
13:23:52.615 | INFO    | Task run 'compute_something-86d821e8-23' - 26199 - 4419104256 - Finished in state Completed(None)
13:23:52.675 | INFO    | Task run 'compute_something-86d821e8-29' - 26198 - 123145526616064 - Computing: 29 x 2 = 58
13:23:52.940 | INFO    | Task run 'compute_something-86d821e8-31' - 26200 - 123145737027584 - Computing: 31 x 2 = 62
13:23:52.974 | INFO    | Task run 'compute_something-86d821e8-30' - 26199 - 123145763323904 - Computing: 30 x 2 = 60
13:23:55.146 | INFO    | Task run 'compute_something-86d821e8-24' - 26197 - 4558319104 - Finished in state Completed(None)
13:23:55.176 | INFO    | Task run 'compute_something-86d821e8-25' - 26198 - 4663889408 - Finished in state Completed(None)
13:23:55.308 | INFO    | Task run 'compute_something-86d821e8-26' - 26199 - 4419104256 - Finished in state Completed(None)
13:23:55.542 | INFO    | Task run 'compute_something-86d821e8-32' - 26197 - 123145569054720 - Computing: 32 x 2 = 64
13:23:55.549 | INFO    | Task run 'compute_something-86d821e8-33' - 26198 - 123145610563584 - Computing: 33 x 2 = 66
13:23:55.621 | INFO    | Task run 'compute_something-86d821e8-28' - 26197 - 4558319104 - Finished in state Completed(None)
13:23:55.675 | INFO    | Task run 'compute_something-86d821e8-27' - 26200 - 4604902912 - Finished in state Completed(None)
13:23:55.746 | INFO    | Task run 'compute_something-86d821e8-29' - 26198 - 4663889408 - Finished in state Completed(None)
13:23:55.801 | INFO    | Task run 'compute_something-86d821e8-34' - 26199 - 123145847271424 - Computing: 34 x 2 = 68
13:23:55.987 | INFO    | Task run 'compute_something-86d821e8-36' - 26197 - 123145602633728 - Computing: 36 x 2 = 72
13:23:56.029 | INFO    | Task run 'compute_something-86d821e8-35' - 26200 - 123145737027584 - Computing: 35 x 2 = 70
13:23:56.038 | INFO    | Task run 'compute_something-86d821e8-31' - 26200 - 4604902912 - Finished in state Completed(None)
13:23:56.062 | INFO    | Task run 'compute_something-86d821e8-30' - 26199 - 4419104256 - Finished in state Completed(None)
13:23:56.120 | INFO    | Task run 'compute_something-86d821e8-37' - 26198 - 123145526616064 - Computing: 37 x 2 = 74
13:23:56.415 | INFO    | Task run 'compute_something-86d821e8-39' - 26200 - 123145770606592 - Computing: 39 x 2 = 78
13:23:56.439 | INFO    | Task run 'compute_something-86d821e8-38' - 26199 - 123145679376384 - Computing: 38 x 2 = 76
13:23:58.475 | INFO    | Task run 'compute_something-86d821e8-33' - 26198 - 4663889408 - Finished in state Completed(None)
13:23:58.478 | INFO    | Task run 'compute_something-86d821e8-32' - 26197 - 4558319104 - Finished in state Completed(None)
13:23:58.748 | INFO    | Task run 'compute_something-86d821e8-34' - 26199 - 4419104256 - Finished in state Completed(None)
13:23:58.840 | INFO    | Task run 'compute_something-86d821e8-41' - 26198 - 123145610563584 - Computing: 41 x 2 = 82
13:23:58.884 | INFO    | Task run 'compute_something-86d821e8-40' - 26197 - 123145602633728 - Computing: 40 x 2 = 80
13:23:58.903 | INFO    | Task run 'compute_something-86d821e8-36' - 26197 - 4558319104 - Finished in state Completed(None)
13:23:58.970 | INFO    | Task run 'compute_something-86d821e8-35' - 26200 - 4604902912 - Finished in state Completed(None)
13:23:59.050 | INFO    | Task run 'compute_something-86d821e8-37' - 26198 - 4663889408 - Finished in state Completed(None)
13:23:59.120 | INFO    | Task run 'compute_something-86d821e8-42' - 26199 - 123145679376384 - Computing: 42 x 2 = 84
13:23:59.268 | INFO    | Task run 'compute_something-86d821e8-44' - 26197 - 123145636212736 - Computing: 44 x 2 = 88
13:23:59.332 | INFO    | Task run 'compute_something-86d821e8-39' - 26200 - 4604902912 - Finished in state Completed(None)
13:23:59.333 | INFO    | Task run 'compute_something-86d821e8-43' - 26200 - 123145770606592 - Computing: 43 x 2 = 86
13:23:59.386 | INFO    | Task run 'compute_something-86d821e8-38' - 26199 - 4419104256 - Finished in state Completed(None)
13:23:59.419 | INFO    | Task run 'compute_something-86d821e8-45' - 26198 - 123145526616064 - Computing: 45 x 2 = 90
13:23:59.702 | INFO    | Task run 'compute_something-86d821e8-47' - 26200 - 123145804185600 - Computing: 47 x 2 = 94
13:23:59.765 | INFO    | Task run 'compute_something-86d821e8-46' - 26199 - 123145847271424 - Computing: 46 x 2 = 92
13:24:01.795 | INFO    | Task run 'compute_something-86d821e8-41' - 26198 - 4663889408 - Finished in state Completed(None)
13:24:01.840 | INFO    | Task run 'compute_something-86d821e8-40' - 26197 - 4558319104 - Finished in state Completed(None)
13:24:02.113 | INFO    | Task run 'compute_something-86d821e8-42' - 26199 - 4419104256 - Finished in state Completed(None)
13:24:02.155 | INFO    | Task run 'compute_something-86d821e8-49' - 26198 - 123145610563584 - Computing: 49 x 2 = 98
13:24:02.204 | INFO    | Task run 'compute_something-86d821e8-48' - 26197 - 123145636212736 - Computing: 48 x 2 = 96
13:24:02.285 | INFO    | Task run 'compute_something-86d821e8-43' - 26200 - 4604902912 - Finished in state Completed(None)
13:24:02.304 | INFO    | Task run 'compute_something-86d821e8-44' - 26197 - 4558319104 - Finished in state Completed(None)
13:24:02.376 | INFO    | Task run 'compute_something-86d821e8-45' - 26198 - 4663889408 - Finished in state Completed(None)
13:24:02.657 | INFO    | Task run 'compute_something-86d821e8-47' - 26200 - 4604902912 - Finished in state Completed(None)
13:24:02.710 | INFO    | Task run 'compute_something-86d821e8-46' - 26199 - 4419104256 - Finished in state Completed(None)
13:24:05.136 | INFO    | Task run 'compute_something-86d821e8-49' - 26198 - 4663889408 - Finished in state Completed(None)
13:24:05.179 | INFO    | Task run 'compute_something-86d821e8-48' - 26197 - 4558319104 - Finished in state Completed(None)
13:24:09.471 | INFO    | Flow run 'strange-shrew' - Finished in state Completed('All states completed.')

This was generated by this flow:

from prefect import flow, task
from prefect import get_run_logger
from prefect.task_runners import DaskTaskRunner
import time


@task
def compute_something(x):
    logger = get_run_logger()
    logger.info("Computing: %d x 2 = %d", x, x * 2)
    time.sleep(2)


@flow(task_runner=DaskTaskRunner(cluster_kwargs=dict(n_workers=4, processes=True)))
def dask_flow():
    for i in range(50):
        compute_something(i)


if __name__ == "__main__":
    dask_flow()
1 Like

@ahuang11 and regarding the main question here, executing your code only with Dask without Prefect is faster because this only runs locally and doesn’t need to talk to any database. In contrast, your Prefect flow needs to update flow run and task run states in the backend. Even using a local SQLite DB, it still introduces some overhead, so comparing pure Dask code to a Prefect flow with DaskTaskRunner is a bit unfair :smile: since Prefect flow does more work than what the Dask code does.

1 Like

There’s still something fishy about this I think:

  1. Serial (no Prefect) - 14.4s
  2. Concurrent (with Prefect @flow) - 14.7s (overhead of 0.3 seconds)
  3. Parallel (with Dask @delayed 4 processes) - 6.6s
  4. Parallel (with DaskTaskRunner 4 processes) - 16.2s (slower than Concurrent)
  5. Parallel (with DaskTaskRunner 4 threads) - 15.2s

I would have expected the 4 processes to be closer to 9-10 seconds, but perhaps the mini task processing time < the overhead of preparing logs + cluster. I’ll try to make these tasks larger to see if there’s any difference.

Okay, I added some super unnecessary computations to the function to make it run slower.

Setup (in one Jupyter cell; include all the imports and data already grabbed)

import dask
import xarray as xr
import matplotlib.pyplot as plt
from cartopy import crs as ccrs
from cartopy import feature as cfeature
import cartopy.crs as ccrs
import matplotlib.textpath
import matplotlib.patches
from matplotlib.font_manager import FontProperties
import numpy as np
from prefect import flow, task, get_run_logger
from prefect.task_runners import DaskTaskRunner


def plot(ds, time):
    plt.figure()
    ax = plt.axes(projection=ccrs.Orthographic())
    ax.add_feature(cfeature.LAKES.with_scale("10m"))
    ax.add_feature(cfeature.OCEAN.with_scale("10m"))
    ax.add_feature(cfeature.LAND.with_scale("10m"))
    ax.add_feature(cfeature.STATES.with_scale("10m"))
    ax.add_feature(cfeature.COASTLINE.with_scale("10m"))
    ax.pcolormesh(ds["lon"], ds["lat"], ds["air"], transform=ccrs.PlateCarree())

    # generate a matplotlib path representing the word "cartopy"
    fp = FontProperties(family='Bitstream Vera Sans', weight='bold')
    logo_path = matplotlib.textpath.TextPath((-175, -35), 'cartopy',
                                             size=1, prop=fp)

    # add a background image
    im = ax.stock_img()
    # clip the image according to the logo_path. mpl v1.2.0 does not support
    # the transform API that cartopy makes use of, so we have to convert the
    # projection into a transform manually
    plate_carree_transform = ccrs.PlateCarree()._as_mpl_transform(ax)
    im.set_clip_path(logo_path, transform=plate_carree_transform)

    # add the path as a patch, drawing black outlines around the text
    patch = matplotlib.patches.PathPatch(logo_path,
                                         facecolor='none', edgecolor='black',
                                         transform=ccrs.PlateCarree())
    ax.add_patch(patch)
    ax.set_global()

    plt.savefig(str(time)[:16])
    plt.close()

ds = xr.tutorial.open_dataset('air_temperature').isel(
    time=slice(0, 100))

One plot() call takes 1.6 seconds alone.

Benchmark

Serial - 2m 39.5s

def process():
    for time in ds['time'].values:
        plot(ds.sel(time=time), time)

process()

Dask Delayed - 46.3s

def process():
    jobs = []
    for time in ds['time'].values:
        job = dask.delayed(plot)(ds.sel(time=time), time)
        jobs.append(job)
    dask.compute(jobs, scheduler="processes", num_workers=4)

process()

Prefect Concurrent - 2m 32.5s (slightly faster than serial alone!)

@flow()
def process():
    for time in ds['time'].values:
        plot(ds.sel(time=time), time)

process()

Prefect DaskTaskRunner Processes - 2m 36.1s (slower because see below)

@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 4, "processes": True}))
def process():
    for time in ds['time'].values:
        plot(ds.sel(time=time), time)

process()

I see the cluster spawned four workers but not actually utilizing it.

Even when I annotate it:

@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 4, "processes": True}))
def process():
    with dask.annotate(resources={'process': 4}):
        for time in ds['time'].values:
            plot(ds.sel(time=time), time)

process()

Maybe this should be filed as GitHub issue because I don’t think the DaskTaskRunner is actually passing the jobs to its workers?

1 Like

Thanks so much for testing this so thoroughly. You’re right, it’s worth opening a GitHub issue so that Michael and other engineers can have a look and estimate whether this is an actual issue. Could you share the GitHub issue link once you create it?

1 Like
1 Like

After I found out Ray could be installed on M1, I also tried Ray, but I think I have the same issue of serial execution.
https://docs.ray.io/en/latest/ray-overview/installation.html#m1-mac-apple-silicon-support

import dask
import xarray as xr
import matplotlib.pyplot as plt
from cartopy import crs as ccrs
from cartopy import feature as cfeature
import cartopy.crs as ccrs
import matplotlib.textpath
import matplotlib.patches
from matplotlib.font_manager import FontProperties
import numpy as np
from prefect import flow, task, get_run_logger
from prefect.task_runners import RayTaskRunner


def plot(ds, time):
    print(time)
    plt.figure()
    ax = plt.axes(projection=ccrs.Orthographic())
    ax.add_feature(cfeature.LAKES.with_scale("10m"))
    ax.add_feature(cfeature.OCEAN.with_scale("10m"))
    ax.add_feature(cfeature.LAND.with_scale("10m"))
    ax.add_feature(cfeature.STATES.with_scale("10m"))
    ax.add_feature(cfeature.COASTLINE.with_scale("10m"))
    ax.pcolormesh(ds["lon"], ds["lat"], ds["air"], transform=ccrs.PlateCarree())

    # generate a matplotlib path representing the word "cartopy"
    fp = FontProperties(family='Bitstream Vera Sans', weight='bold')
    logo_path = matplotlib.textpath.TextPath((-175, -35), 'cartopy',
                                             size=1, prop=fp)

    # add a background image
    im = ax.stock_img()
    # clip the image according to the logo_path. mpl v1.2.0 does not support
    # the transform API that cartopy makes use of, so we have to convert the
    # projection into a transform manually
    plate_carree_transform = ccrs.PlateCarree()._as_mpl_transform(ax)
    im.set_clip_path(logo_path, transform=plate_carree_transform)

    # add the path as a patch, drawing black outlines around the text
    patch = matplotlib.patches.PathPatch(logo_path,
                                         facecolor='none', edgecolor='black',
                                         transform=ccrs.PlateCarree())
    ax.add_patch(patch)
    ax.set_global()

    plt.savefig(str(time)[:16])
    plt.close()

ds = xr.tutorial.open_dataset('air_temperature').isel(
    time=slice(0, 100))


@flow(task_runner=RayTaskRunner(init_kwargs={"num_cpus": 4}))
def process():
    for time in ds['time'].values:
        plot(ds.sel(time=time), time)

process()
1 Like

Perhaps it’s helpful to create a separate issue for that? we have one regarding Ray performance optimizations

I think I discovered the issue; if the function is not decorated with @task, it will run in serial!

The Proof

WITH the decorator however, it will properly utilize the processes.

With Dask, I checked htop because updating the logging formatter is a bit tedious (notice CPU at >100%).

With Ray, the output automatically shows the unique PIDs.

(begin_task_run pid=93880) 2013-01-01T12:00:00.000000000
(begin_task_run pid=93881) 2013-01-01T00:00:00.000000000
(begin_task_run pid=93878) 2013-01-01T18:00:00.000000000
(begin_task_run pid=93879) 2013-01-01T06:00:00.000000000

Setup

import dask
import xarray as xr
import matplotlib.pyplot as plt
from cartopy import crs as ccrs
from cartopy import feature as cfeature
import cartopy.crs as ccrs
import matplotlib.textpath
import matplotlib.patches
from matplotlib.font_manager import FontProperties
import numpy as np
from prefect import flow, task, get_run_logger
from prefect.task_runners import DaskTaskRunner, RayTaskRunner


@task
def plot(ds, time):
    print(time)
    plt.figure()
    ax = plt.axes(projection=ccrs.Orthographic())
    ax.add_feature(cfeature.LAKES.with_scale("10m"))
    ax.add_feature(cfeature.OCEAN.with_scale("10m"))
    ax.add_feature(cfeature.LAND.with_scale("10m"))
    ax.add_feature(cfeature.STATES.with_scale("10m"))
    ax.add_feature(cfeature.COASTLINE.with_scale("10m"))
    ax.pcolormesh(ds["lon"], ds["lat"], ds["air"], transform=ccrs.PlateCarree())

    # generate a matplotlib path representing the word "cartopy"
    fp = FontProperties(family='Bitstream Vera Sans', weight='bold')
    logo_path = matplotlib.textpath.TextPath((-175, -35), 'cartopy',
                                             size=1, prop=fp)

    # add a background image
    im = ax.stock_img()
    # clip the image according to the logo_path. mpl v1.2.0 does not support
    # the transform API that cartopy makes use of, so we have to convert the
    # projection into a transform manually
    plate_carree_transform = ccrs.PlateCarree()._as_mpl_transform(ax)
    im.set_clip_path(logo_path, transform=plate_carree_transform)

    # add the path as a patch, drawing black outlines around the text
    patch = matplotlib.patches.PathPatch(logo_path,
                                         facecolor='none', edgecolor='black',
                                         transform=ccrs.PlateCarree())
    ax.add_patch(patch)
    ax.set_global()

    plt.savefig(str(time)[:16])
    plt.close()

ds = xr.tutorial.open_dataset('air_temperature').isel(
    time=slice(0, 100))

Benchmark

Running on a Jupyter notebook,

ConcurrentTaskRunner - 2m 32.7s

@flow()
def process():
    for time in ds['time'].values:
        plot(ds.sel(time=time), time)

process()

DaskTaskRunner - 1m 29.3s

@flow(task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 4, "processes": True}))
def process():
    for time in ds['time'].values:
        plot(ds.sel(time=time), time)

process()

RayTaskRunner - 1m 35.2s

@flow(task_runner=RayTaskRunner(init_kwargs={"num_cpus": 4}))
def process():
    for time in ds['time'].values:
        plot(ds.sel(time=time), time)

process()

Native Dask (dropped the @task decorator) - 1m 31.4s (!! around the same time as DaskTaskRunner!)

def process():
    jobs = []
    for time in ds['time'].values:
        job = dask.delayed(plot)(ds.sel(time=time), time)
        jobs.append(job)
    dask.compute(jobs, scheduler="processes", num_workers=4)

process()

Serial mode - 3m 15.9s (I think this could be like an exciting blog post or announcement because just by wrapping @flow and @task, which defaults to the ConcurrentTaskRunner, one can speed up the run by 45 seconds!!)

def process():
    for time in ds['time'].values:
        plot(ds.sel(time=time), time)

process()

Conclusion:
Wrap your functions with @task to get parallelization.

1 Like

This makes a lot of sense, since this is a “Task” runner in the end :smile:

1 Like

:point_right: Note to anyone using DaskTaskRunner or RayTaskRunner:

from prefect version 2.0b8 onwards, those task runners were moved to the respective Prefect Collections for better code dependency management (the core library no longer requires dask or ray as dependencies - now, those can be installed sepataely when needed).

The correct imports are now:

from prefect_dask import DaskTaskRunner
from prefect_ray import RayTaskRunner