Trouble getting started with using existing Dask scheduler example

I am trying to modify the prefect-dask “shout” example to use an existing scheduler following the example here: https://prefecthq.github.io/prefect-dask/examples_catalog/


import time
  
from prefect import flow, task
from prefect_dask import DaskTaskRunner
  
@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")
  
@flow(task_runner=DaskTaskRunner)
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)
  
if __name__ == "__main__":
    count_to(10)

This runs fine, but if I try to use an existing Dask cluster.

First, starting cluster in other terminal
$ dask scheduler

2023-08-14 09:47:38,000 - distributed.scheduler - INFO - -----------------------------------------------
2023-08-14 09:47:38,359 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2023-08-14 09:47:38,395 - distributed.scheduler - INFO - State start
2023-08-14 09:47:38,400 - distributed.scheduler - INFO - -----------------------------------------------
2023-08-14 09:47:38,401 - distributed.scheduler - INFO - Scheduler at: tcp://192.168.86.33:8786
2023-08-14 09:47:38,401 - distributed.scheduler - INFO - dashboard at: http://192.168.86.33:8787/status

Everything looks good on the dashboard.

Then modifying the example to use this cluster (and reducing count_to 3 for clutter):

import time
  
from prefect import flow, task
from prefect_dask import DaskTaskRunner
  
@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")
  
@flow(task_runner=DaskTaskRunner(address="tcp://192.168.86.33:8786"))
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)
  
if __name__ == "__main__":
    count_to(3)

Running this from another terminal:
$ python the_above_example.py

09:52:07.565 | INFO | prefect.engine - Created flow run ‘spectacular-mule’ for flow ‘count-to’
09:52:07.568 | WARNING | Flow run ‘spectacular-mule’ - Task runner ‘DaskTaskRunner’ does not implement the duplicate method and will fail if used for concurrent execution of the same flow.
09:52:07.569 | INFO | prefect.task_runner.dask - Connecting to an existing Dask cluster at tcp://192.168.86.33:8786
09:52:07.968 | INFO | prefect.task_runner.dask - The Dask dashboard is available at http://192.168.86.33:8787/status
09:52:08.468 | INFO | Flow run ‘spectacular-mule’ - Created task run ‘shout-2’ for task ‘shout’
09:52:08.490 | INFO | Flow run ‘spectacular-mule’ - Submitted task run ‘shout-2’ for execution.
09:52:08.514 | INFO | Flow run ‘spectacular-mule’ - Created task run ‘shout-0’ for task ‘shout’
09:52:08.523 | INFO | Flow run ‘spectacular-mule’ - Submitted task run ‘shout-0’ for execution.
09:52:08.679 | INFO | Flow run ‘spectacular-mule’ - Created task run ‘shout-1’ for task ‘shout’
09:52:08.684 | INFO | Flow run ‘spectacular-mule’ - Submitted task run ‘shout-1’ for execution.

And that’s it. The dashboard shows the shout function with 3 queued, no workers.

The scheduler terminal shows the new connection:

2023-08-14 09:52:07,961 - distributed.scheduler - INFO - Receive client connection: Client-85a6e7a7-3aba-11ee-9ae0-f4b7e20319e3
2023-08-14 09:52:07,967 - distributed.core - INFO - Starting established connection to tcp://192.168.86.33:55476

More likely than not I’m overlooking something necessary to fire off the actual work.
Using python 3.11.4 and prefect 2.11.3.

I didn’t start a dask-worker after starting the dask scheduler to start the cluster.

so, before running the example, in another terminal

$ dask-worker 192.168.86.33:8786

starts the actual workers. In my use of dask.distributed I’ve always created an explicit client, which apparently does this for me.