import time
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow(task_runner=DaskTaskRunner())
def count_to(highest_number):
for number in range(highest_number):
shout(number)
count_to(3)
Outputs
...
15:10:41.015 | INFO | Flow run 'gigantic-curassow' - Created task run 'shout-58a68b34-0' for task 'shout'
15:10:41.632 | INFO | Task run 'shout-58a68b34-0' - Finished in state Completed()
15:10:41.663 | INFO | Flow run 'gigantic-curassow' - Created task run 'shout-58a68b34-1' for task 'shout'
#0
15:10:42.231 | INFO | Task run 'shout-58a68b34-1' - Finished in state Completed()
15:10:42.260 | INFO | Flow run 'gigantic-curassow' - Created task run 'shout-58a68b34-2' for task 'shout'
#1
15:10:42.839 | INFO | Task run 'shout-58a68b34-2' - Finished in state Completed()
#2
15:10:44.278 | INFO | Flow run 'gigantic-curassow' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=None),
Completed(message=None, type=COMPLETED, result=None),
Completed(message=None, type=COMPLETED, result=None)]
Note it’s sequential! To execute in parallel, use the submit
method.
import time
from prefect import flow, task
from prefect_dask.task_runners import DaskTaskRunner
@task
def shout(number):
time.sleep(0.5)
print(f"#{number}")
@flow(task_runner=DaskTaskRunner())
def count_to(highest_number):
for number in range(highest_number):
shout.submit(number)
count_to(3)
Outputs:
15:11:52.396 | INFO | Flow run 'misty-vole' - Created task run 'shout-58a68b34-0' for task 'shout'
15:11:52.443 | INFO | Flow run 'misty-vole' - Created task run 'shout-58a68b34-1' for task 'shout'
15:11:52.611 | INFO | Flow run 'misty-vole' - Created task run 'shout-58a68b34-2' for task 'shout'
/Users/andrew/mambaforge/lib/python3.9/site-packages/snowflake/connector/options.py:96: UserWarning: You have an incompatible version of 'pyarrow' installed (8.0.0), please install a version that adheres to: 'pyarrow<6.1.0,>=6.0.0; extra == "pandas"'
warn_incompatible_dep(
/Users/andrew/mambaforge/lib/python3.9/site-packages/snowflake/connector/options.py:96: UserWarning: You have an incompatible version of 'pyarrow' installed (8.0.0), please install a version that adheres to: 'pyarrow<6.1.0,>=6.0.0; extra == "pandas"'
warn_incompatible_dep(
/Users/andrew/mambaforge/lib/python3.9/site-packages/snowflake/connector/options.py:96: UserWarning: You have an incompatible version of 'pyarrow' installed (8.0.0), please install a version that adheres to: 'pyarrow<6.1.0,>=6.0.0; extra == "pandas"'
warn_incompatible_dep(
#0
#2
#1
15:11:58.581 | INFO | Flow run 'misty-vole' - Finished in state Completed('All states completed.')
[Completed(message=None, type=COMPLETED, result=None),
Completed(message=None, type=COMPLETED, result=None),
Completed(message=None, type=COMPLETED, result=None)]