After prefect>=2.0b9 to run in parallel, calling the submit method is required!

import time

from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner


@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")


@flow(task_runner=DaskTaskRunner())
def count_to(highest_number):
    for number in range(highest_number):
        shout(number)


count_to(3)

Outputs

...

15:10:41.015 | INFO    | Flow run 'gigantic-curassow' - Created task run 'shout-58a68b34-0' for task 'shout'
15:10:41.632 | INFO    | Task run 'shout-58a68b34-0' - Finished in state Completed()
15:10:41.663 | INFO    | Flow run 'gigantic-curassow' - Created task run 'shout-58a68b34-1' for task 'shout'
#0
15:10:42.231 | INFO    | Task run 'shout-58a68b34-1' - Finished in state Completed()
15:10:42.260 | INFO    | Flow run 'gigantic-curassow' - Created task run 'shout-58a68b34-2' for task 'shout'
#1
15:10:42.839 | INFO    | Task run 'shout-58a68b34-2' - Finished in state Completed()
#2
15:10:44.278 | INFO    | Flow run 'gigantic-curassow' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=None),
 Completed(message=None, type=COMPLETED, result=None),
 Completed(message=None, type=COMPLETED, result=None)]

Note it’s sequential! To execute in parallel, use the submit method.

import time

from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner


@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")


@flow(task_runner=DaskTaskRunner())
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)


count_to(3)

Outputs:


15:11:52.396 | INFO    | Flow run 'misty-vole' - Created task run 'shout-58a68b34-0' for task 'shout'
15:11:52.443 | INFO    | Flow run 'misty-vole' - Created task run 'shout-58a68b34-1' for task 'shout'
15:11:52.611 | INFO    | Flow run 'misty-vole' - Created task run 'shout-58a68b34-2' for task 'shout'
/Users/andrew/mambaforge/lib/python3.9/site-packages/snowflake/connector/options.py:96: UserWarning: You have an incompatible version of 'pyarrow' installed (8.0.0), please install a version that adheres to: 'pyarrow<6.1.0,>=6.0.0; extra == "pandas"'
  warn_incompatible_dep(
/Users/andrew/mambaforge/lib/python3.9/site-packages/snowflake/connector/options.py:96: UserWarning: You have an incompatible version of 'pyarrow' installed (8.0.0), please install a version that adheres to: 'pyarrow<6.1.0,>=6.0.0; extra == "pandas"'
  warn_incompatible_dep(
/Users/andrew/mambaforge/lib/python3.9/site-packages/snowflake/connector/options.py:96: UserWarning: You have an incompatible version of 'pyarrow' installed (8.0.0), please install a version that adheres to: 'pyarrow<6.1.0,>=6.0.0; extra == "pandas"'
  warn_incompatible_dep(
#0
#2
#1
15:11:58.581 | INFO    | Flow run 'misty-vole' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=None),
 Completed(message=None, type=COMPLETED, result=None),
 Completed(message=None, type=COMPLETED, result=None)]
1 Like