What is the default TaskRunner (executor)?

Prefect 2.0

By default, Prefect executes your tasks sequentially, and you don’t have to worry about task runners - your tasks run as any normal Python functions.

If you call .submit() on a task, Prefect submits this task to a task runner, and this task runner executes your tasks in the background. The type of the task runner determines whether your task runs sequentially, concurrently or in parallel. The default task runner is the ConcurrentTaskRunner.

What does it mean in practice?

  • If .submit() is not called, Prefect runs the task immediately and sequentially without using any task runner.
  • If submit is called, Prefect submits the task to a task runner you specified on your flow decorator; if you haven’t specified any task runner explicitly but you still called .submit() on your tasks, Prefect submits your tasks by default to a ConcurrentTaskRunner.

Example with and without .submit()

This example demonstrates the difference:

  1. Using .submit():
import time
import asyncio

from prefect import flow, task


@task
async def shout(number):
    asyncio.sleep(1)
    print(f"#{number}")


@flow()
async def count_to(highest_number):
    for number in range(highest_number):
        await shout.submit(number)


await count_to(3)

Outputs:

#0
#2
#1

Note #2 comes before #1 here!

  1. NOT using .submit():
import time
import asyncio

from prefect import flow, task


@task
async def shout(number):
    asyncio.sleep(1)
    print(f"#{number}")


@flow()
async def count_to(highest_number):
    for number in range(highest_number):
        await shout(number)


await count_to(3)

Outputs:

#0
#1
#2

Note #1 comes before #2 here!

Explicitly defining a task runner

Here is how you can explicitly attach a task runner to your flow:

from prefect.task_runners import SequentialTaskRunner

@flow(task_runner=SequentialTaskRunner())

:point_right: Note that: some task runners such as DaskTaskRunner or RayTaskRunner rely on external dependencies and, therefore, must be installed via separate collections:

pip install prefect-dask
pip install prefect-ray

To import those in your flow:

from prefect_dask import DaskTaskRunner
from prefect_ray import RayTaskRunner

@flow(task_runner=DaskTaskRunner())
def your_dask_flow():
    ...

Prefect 1.0

The local executor is the default mechanism to make tasks run sequentially. The syntax:

from prefect.executors import LocalExecutor

flow.executor = LocalExecutor()
1 Like

ConcurrentTaskRunner is back to the default with 2.0b9

1 Like