Prefect 2.0
By default, Prefect executes your tasks sequentially, and you don’t have to worry about task runners - your tasks run as any normal Python functions.
If you call .submit()
on a task, Prefect submits this task to a task runner, and this task runner executes your tasks in the background. The type of the task runner determines whether your task runs sequentially, concurrently or in parallel. The default task runner is the ConcurrentTaskRunner
.
What does it mean in practice?
- If
.submit()
is not called, Prefect runs the task immediately and sequentially without using any task runner. - If submit is called, Prefect submits the task to a task runner you specified on your flow decorator; if you haven’t specified any task runner explicitly but you still called
.submit()
on your tasks, Prefect submits your tasks by default to aConcurrentTaskRunner
.
Example with and without .submit()
This example demonstrates the difference:
- Using
.submit()
:
import time
import asyncio
from prefect import flow, task
@task
async def shout(number):
asyncio.sleep(1)
print(f"#{number}")
@flow()
async def count_to(highest_number):
for number in range(highest_number):
await shout.submit(number)
await count_to(3)
Outputs:
#0
#2
#1
Note #2
comes before #1
here!
- NOT using
.submit()
:
import time
import asyncio
from prefect import flow, task
@task
async def shout(number):
asyncio.sleep(1)
print(f"#{number}")
@flow()
async def count_to(highest_number):
for number in range(highest_number):
await shout(number)
await count_to(3)
Outputs:
#0
#1
#2
Note #1
comes before #2
here!
Explicitly defining a task runner
Here is how you can explicitly attach a task runner to your flow:
from prefect.task_runners import SequentialTaskRunner
@flow(task_runner=SequentialTaskRunner())
Note that: some task runners such as DaskTaskRunner
or RayTaskRunner
rely on external dependencies and, therefore, must be installed via separate collections:
pip install prefect-dask
pip install prefect-ray
To import those in your flow:
from prefect_dask import DaskTaskRunner
from prefect_ray import RayTaskRunner
@flow(task_runner=DaskTaskRunner())
def your_dask_flow():
...
Prefect 1.0
The local executor is the default mechanism to make tasks run sequentially. The syntax:
from prefect.executors import LocalExecutor
flow.executor = LocalExecutor()