I am trying to modify the prefect-dask “shout” example to use an existing scheduler following the example here: https://prefecthq.github.io/prefect-dask/examples_catalog/
import time
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow(task_runner=DaskTaskRunner)
def count_to(highest_number):
for number in range(highest_number):
shout.submit(number)
if __name__ == "__main__":
count_to(10)
This runs fine, but if I try to use an existing Dask cluster.
First, starting cluster in other terminal
$ dask scheduler
2023-08-14 09:47:38,000 - distributed.scheduler - INFO - -----------------------------------------------
2023-08-14 09:47:38,359 - distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
2023-08-14 09:47:38,395 - distributed.scheduler - INFO - State start
2023-08-14 09:47:38,400 - distributed.scheduler - INFO - -----------------------------------------------
2023-08-14 09:47:38,401 - distributed.scheduler - INFO - Scheduler at: tcp://192.168.86.33:8786
2023-08-14 09:47:38,401 - distributed.scheduler - INFO - dashboard at: http://192.168.86.33:8787/status
Everything looks good on the dashboard.
Then modifying the example to use this cluster (and reducing count_to 3 for clutter):
import time
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow(task_runner=DaskTaskRunner(address="tcp://192.168.86.33:8786"))
def count_to(highest_number):
for number in range(highest_number):
shout.submit(number)
if __name__ == "__main__":
count_to(3)
Running this from another terminal:
$ python the_above_example.py
09:52:07.565 | INFO | prefect.engine - Created flow run ‘spectacular-mule’ for flow ‘count-to’
09:52:07.568 | WARNING | Flow run ‘spectacular-mule’ - Task runner ‘DaskTaskRunner’ does not implement the duplicate
method and will fail if used for concurrent execution of the same flow.
09:52:07.569 | INFO | prefect.task_runner.dask - Connecting to an existing Dask cluster at tcp://192.168.86.33:8786
09:52:07.968 | INFO | prefect.task_runner.dask - The Dask dashboard is available at http://192.168.86.33:8787/status
09:52:08.468 | INFO | Flow run ‘spectacular-mule’ - Created task run ‘shout-2’ for task ‘shout’
09:52:08.490 | INFO | Flow run ‘spectacular-mule’ - Submitted task run ‘shout-2’ for execution.
09:52:08.514 | INFO | Flow run ‘spectacular-mule’ - Created task run ‘shout-0’ for task ‘shout’
09:52:08.523 | INFO | Flow run ‘spectacular-mule’ - Submitted task run ‘shout-0’ for execution.
09:52:08.679 | INFO | Flow run ‘spectacular-mule’ - Created task run ‘shout-1’ for task ‘shout’
09:52:08.684 | INFO | Flow run ‘spectacular-mule’ - Submitted task run ‘shout-1’ for execution.
And that’s it. The dashboard shows the shout function with 3 queued, no workers.
The scheduler terminal shows the new connection:
2023-08-14 09:52:07,961 - distributed.scheduler - INFO - Receive client connection: Client-85a6e7a7-3aba-11ee-9ae0-f4b7e20319e3
2023-08-14 09:52:07,967 - distributed.core - INFO - Starting established connection to tcp://192.168.86.33:55476
More likely than not I’m overlooking something necessary to fire off the actual work.
Using python 3.11.4 and prefect 2.11.3.