How can I create a subflow and block until it’s completed?

Prefect 2.0

By default, Orion runs your subflows sequentially. This way, subflow_2 will start after subflow_1 is completed.

from prefect import flow, task
import time

@flow
def subflow_1():
    print("Subflow 1 started!")
    time.sleep(3)
    return "Hello from subflow!"

@flow
def subflow_2():
    print("Subflow 2 started!")
    time.sleep(3)
    return "Hello from the second subflow!"

@task
def normal_task():
    print("A normal task")

@flow
def main_flow():
    state_subflow_1 = subflow_1()
    state_subflow_2 = subflow_2()
    normal_task(wait_for=[state_subflow_1, state_subflow_2])

if __name__ == "__main__":
    main_flow_state = main_flow()

Even if you assign a DaskTaskRunner, subflows will still be executed sequentially.

from prefect import flow
from prefect.task_runners import DaskTaskRunner

@flow(task_runner=DaskTaskRunner())
def main_flow():
    subflow_1()
    subflow_2()

For parallel execution of subflows, see the following topic:

Prefect 1.0

Prefect 1.0 uses an additional task wait_for_flow_run to block until the child flow run is completed.

from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

with Flow("parent_flow") as flow:
    child_flow_run_id = create_flow_run(
        flow_name="child_flow_name", run_name="custom_run_name"
    )
    child_flowrunview = wait_for_flow_run(
        child_flow_run_id, raise_final_state=True, stream_logs=True
    )

The raise_final_state flag ensures that the state of this task will be set to the final state of the child flow run on completion. By using the default all_successful trigger, it prevents from running downstream tasks if the child flow run fails.

Setting stream_logs=True allows to see the child flow run logs directly in the parent flow run.

The alternative is to use the StartFlowRun task with wait=True:

from prefect import Flow
from prefect.tasks.prefect import StartFlowRun

start_flow_run = StartFlowRun(project_name="PROJECT_NAME", wait=True)

with Flow("FLOW_NAME") as flow:
    staging = start_flow_run(flow_name="child_flow_name")