This is a simple flow of flows where parent flow passes a parameter value to a child flow as data dependency.
import uuid
import subprocess
from prefect import task, Flow, Parameter
from prefect.tasks.prefect import create_flow_run
PROJECT_NAME = "community"
@task(log_stdout=True)
def log_input(param):
print(param)
with Flow("child_flow_example") as child_flow:
some_param = Parameter("some_data", default="foo")
log_input(some_param)
@task(log_stdout=True)
def extract_some_data() -> str:
return "bar"
with Flow("parent_flow_example") as parent_flow:
some_data = extract_some_data()
child_flow_run = create_flow_run(
flow_name=child_flow.name,
project_name=PROJECT_NAME,
parameters={"some_data": some_data},
idempotency_key=str(uuid.uuid4()),
)
if __name__ == "__main__":
child_flow.register(PROJECT_NAME)
parent_flow_id = parent_flow.register(PROJECT_NAME)
subprocess.run(f"prefect run --id {parent_flow_id} --execute --watch", shell=True)
Deploy to GCP with a local agent
If you want to orchestrate such flow of flows using GCS storage and local agent (spun up e.g. on a GCS VM), here is the code that you may use:
Deploy to GCP with a Docker agent
And if you want to orchestrate such flow of flows using GCS storage and Docker agent (spun up e.g. on a GCS VM), you may use this example: