Prefect 2.0
Yes, Orion supports that. You can find more about that on the Task Runners concept page. Here is a flow example that demonstrates how you can use different task runners per subflow:
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner, SequentialTaskRunner
@task
def hello_local():
print("Hello!")
@task
def hello_dask():
print("Hello from Dask!")
@flow(task_runner=SequentialTaskRunner())
def my_flow():
hello_local()
my_subflow()
hello_local()
@flow(task_runner=DaskTaskRunner())
def my_subflow():
hello_dask()
my_flow()
Prefect 1.0
Prefect 1.0 allows specifying an executor in each flow separately.
Note that Prefect does not store the executor information in the backend. Instead, it retrieves it from storage.
Here is an example showing a parent flow using LocalExecutor
and a child flow using LocalDaskExecutor
:
- Child flow:
from prefect import task, Flow
from prefect.executors import LocalDaskExecutor
@task
def generate_random_numbers():
return list(range(1, 50))
@task
def add_one(x):
return x + 1
@task(log_stdout=True)
def print_results(res):
print(res)
with Flow("map_example", executor=LocalDaskExecutor()) as flow:
numbers = generate_random_numbers()
result = add_one.map(numbers)
print_results(result)
- Parent flow:
from prefect import task, Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
@task
def task_1():
pass
@task
def task_2():
pass
@task
def task_3():
pass
with Flow("parent_of_map") as flow:
t1 = task_1()
t2 = task_2(upstream_tasks=[t1])
t3 = task_3(upstream_tasks=[t2])
id_ = create_flow_run(flow_name="map_example", project_name="xyz", upstream_tasks=[t3])
wait_for_flow_run(id_, stream_logs=True, raise_final_state=True)