Prefect 2.0
By default, Orion runs your subflows sequentially. This way, subflow_2
will start after subflow_1
is completed.
from prefect import flow, task
import time
@flow
def subflow_1():
print("Subflow 1 started!")
time.sleep(3)
return "Hello from subflow!"
@flow
def subflow_2():
print("Subflow 2 started!")
time.sleep(3)
return "Hello from the second subflow!"
@task
def normal_task():
print("A normal task")
@flow
def main_flow():
state_subflow_1 = subflow_1()
state_subflow_2 = subflow_2()
normal_task(wait_for=[state_subflow_1, state_subflow_2])
if __name__ == "__main__":
main_flow_state = main_flow()
Even if you assign a DaskTaskRunner
, subflows will still be executed sequentially.
from prefect import flow
from prefect.task_runners import DaskTaskRunner
@flow(task_runner=DaskTaskRunner())
def main_flow():
subflow_1()
subflow_2()
For parallel execution of subflows, see the following topic:
Prefect 1.0
Prefect 1.0 uses an additional task wait_for_flow_run
to block until the child flow run is completed.
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
with Flow("parent_flow") as flow:
child_flow_run_id = create_flow_run(
flow_name="child_flow_name", run_name="custom_run_name"
)
child_flowrunview = wait_for_flow_run(
child_flow_run_id, raise_final_state=True, stream_logs=True
)
The raise_final_state
flag ensures that the state of this task will be set to the final state of the child flow run on completion. By using the default all_successful
trigger, it prevents from running downstream tasks if the child flow run fails.
Setting stream_logs=True
allows to see the child flow run logs directly in the parent flow run.
The alternative is to use the StartFlowRun
task with wait=True
:
from prefect import Flow
from prefect.tasks.prefect import StartFlowRun
start_flow_run = StartFlowRun(project_name="PROJECT_NAME", wait=True)
with Flow("FLOW_NAME") as flow:
staging = start_flow_run(flow_name="child_flow_name")