It’s not directly possible to pass data in-memory from create_flow_run
and StartFlowRun
tasks to the parent flow because the execution of the respective flow runs happens on different executors. Therefore, intermediate persistence of data is needed.
The code below is for Prefect 1.0. In Prefect 2.0 subflows is a first-class feature and you can pass data dependencies in the same way you do it using tasks.
Here is how you can pass data between flows in Prefect 1.0 (example from this article):
from typing import List
from prefect import Flow, Parameter, task
from prefect.engine.results import LocalResult
from prefect.tasks.prefect import create_flow_run, get_task_run_result
@task(result=LocalResult())
def create_some_data(length: int):
return list(range(length))
with Flow("child") as child_flow:
data_size = Parameter("data_size", default=5)
data = create_some_data(data_size)
@task(log_stdout=True)
def transform_and_show(data: List[int]) -> List[int]:
print(f"Got: {data!r}")
new_data = [x + 1 for x in data]
print(f"Created: {new_data!r}")
return new_data
with Flow("parent") as parent_flow:
child_run_id = create_flow_run(
flow_name=child_flow.name, parameters=dict(data_size=10)
)
child_data = get_task_run_result(child_run_id, "create_some_data-1")
transform_and_show(child_data)
The transform_and_show
task could be anything (including downstream tasks of the parent flow).
Note: create_flow_run
, wait_for_flow_run
, get_task_run_result
have been implemented as a more consistent interface with flow runs (instead of StartFlowRun
of which return
value depended on the wait
argument). The get_task_run_result
task retrieves the result based on a task-slug.