Can my subflow use a different task runner than my parent flow?

Prefect 2.0

Yes, Orion supports that. You can find more about that on the Task Runners concept page. Here is a flow example that demonstrates how you can use different task runners per subflow:

from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, SequentialTaskRunner

@task
def hello_local():
    print("Hello!")

@task
def hello_dask():
    print("Hello from Dask!")

@flow(task_runner=SequentialTaskRunner())
def my_flow():
    hello_local()
    my_subflow()
    hello_local()

@flow(task_runner=DaskTaskRunner())
def my_subflow():
    hello_dask()

my_flow()

Prefect 1.0

Prefect 1.0 allows specifying an executor in each flow separately.

:point_right: Note that Prefect does not store the executor information in the backend. Instead, it retrieves it from storage.

Here is an example showing a parent flow using LocalExecutor and a child flow using LocalDaskExecutor:

  1. Child flow:
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor

@task
def generate_random_numbers():
    return list(range(1, 50))

@task
def add_one(x):
    return x + 1

@task(log_stdout=True)
def print_results(res):
    print(res)

with Flow("map_example", executor=LocalDaskExecutor()) as flow:
    numbers = generate_random_numbers()
    result = add_one.map(numbers)
    print_results(result)
  1. Parent flow:
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

@task
def task_1():
    pass

@task
def task_2():
    pass

@task
def task_3():
    pass

with Flow("parent_of_map") as flow:
    t1 = task_1()
    t2 = task_2(upstream_tasks=[t1])
    t3 = task_3(upstream_tasks=[t2])
    id_ = create_flow_run(flow_name="map_example", project_name="xyz", upstream_tasks=[t3])
    wait_for_flow_run(id_, stream_logs=True, raise_final_state=True)