By default, Orion runs your subflows sequentially. This way,
subflow_2 will start after
subflow_1 is completed.
from prefect import flow, task import time @flow def subflow_1(): print("Subflow 1 started!") time.sleep(3) return "Hello from subflow!" @flow def subflow_2(): print("Subflow 2 started!") time.sleep(3) return "Hello from the second subflow!" @task def normal_task(): print("A normal task") @flow def main_flow(): state_subflow_1 = subflow_1() state_subflow_2 = subflow_2() normal_task(wait_for=[state_subflow_1, state_subflow_2]) if __name__ == "__main__": main_flow_state = main_flow()
Even if you assign a
DaskTaskRunner, subflows will still be executed sequentially.
from prefect import flow from prefect.task_runners import DaskTaskRunner @flow(task_runner=DaskTaskRunner()) def main_flow(): subflow_1() subflow_2()
For parallel execution of subflows, see the following topic:
Prefect 1.0 uses an additional task
wait_for_flow_run to block until the child flow run is completed.
from prefect import Flow from prefect.tasks.prefect import create_flow_run, wait_for_flow_run with Flow("parent_flow") as flow: child_flow_run_id = create_flow_run( flow_name="child_flow_name", run_name="custom_run_name" ) child_flowrunview = wait_for_flow_run( child_flow_run_id, raise_final_state=True, stream_logs=True )
raise_final_state flag ensures that the state of this task will be set to the final state of the child flow run on completion. By using the default
all_successful trigger, it prevents from running downstream tasks if the child flow run fails.
stream_logs=True allows to see the child flow run logs directly in the parent flow run.
The alternative is to use the
StartFlowRun task with
from prefect import Flow from prefect.tasks.prefect import StartFlowRun start_flow_run = StartFlowRun(project_name="PROJECT_NAME", wait=True) with Flow("FLOW_NAME") as flow: staging = start_flow_run(flow_name="child_flow_name")