Another example:
Orchestrator-pattern
Child flow
from prefect import Flow, Parameter, task
@task(log_stdout=True)
def hello_world(user_input: str):
print(f"hello {user_input}!")
with Flow("dummy-child-flow") as flow:
param = Parameter("user_input", default="world")
hw = hello_world(param)
if __name__ == '__main__':
flow.register("p")
Parent flow
from prefect import Flow, unmapped, task
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor
from prefect.triggers import all_finished
with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
parameters = [
dict(user_input="Arthur"),
dict(user_input="Marvin"),
dict(user_input="Ford"),
]
mapped_flow_ids = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("dummy-child-flow"),
project_name=unmapped("p"),
)
wait_for_flow_run.map(mapped_flow_ids, raise_final_state=unmapped(True))
if __name__ == "__main__":
flow.register("p")
Another example splitting the same flow into 8 flow runs grouped based on parameter values
from prefect import Flow, task, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor
@task
def generate_thousand_numbers(start, stop, step):
nrs = range(start, stop, step)
return list(nrs)
with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
flow_run_1 = generate_thousand_numbers(1, 1000, 1)
flow_run_2 = generate_thousand_numbers(1000, 2000, 1)
flow_run_3 = generate_thousand_numbers(2000, 3000, 1)
flow_run_4 = generate_thousand_numbers(3000, 4000, 1)
flow_run_5 = generate_thousand_numbers(4000, 5000, 1)
# ... until 8
parameters = [
dict(list_of_numbers=flow_run_1),
dict(list_of_numbers=flow_run_2),
dict(list_of_numbers=flow_run_3),
dict(list_of_numbers=flow_run_4),
dict(list_of_numbers=flow_run_5),
# ... until 8
]
mapped_flows = create_flow_run.map(
parameters=parameters,
flow_name=unmapped("dummy-child-flow"),
project_name=unmapped("community"),
)