Prefect 2
To create a subflow, decorate your subflow function with @flow
and call it within your parent flow. Example (updated for Prefect>=2.0.0):
from prefect import flow
@flow
def subflow():
return "Hello from subflow!"
@flow
def main_flow():
result = subflow()
print(result)
if __name__ == "__main__":
main_flow_state = main_flow()
Prefect 1
Prefect 1 leverages separate tasks to create a child flow run within a parent flow. The create_flow_run_task
and StartFlowRun
tasks invoke a child flow run and return their flow_run_id
. Migrating those to Orion is as easy as calling your child flows from a parent flow.
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
PROJECT_NAME = "community"
with Flow("sample_flow") as flow:
child_run_id = create_flow_run(
flow_name="child_flow",
project_name=PROJECT_NAME,
task_args={"name": "Friendly name for a DAG node"},
)
extract_load_wait_task = wait_for_flow_run(
child_run_id,
raise_final_state=True,
stream_logs=True,
task_args={"name": "Friendly wait task name"},
)
TMYK:
Technically speaking, the concept of a subflow in its true sense does not exist in Prefect 1. The real first-class subflow feature was introduced in Orion. Prefect 1, however. provides the orchestrator pattern allowing to trigger child flows from parent flows via an API call.