How to map over flows with various parameter values?

Another example:

Orchestrator-pattern

Child flow

 from prefect import Flow, Parameter, task


@task(log_stdout=True)
def hello_world(user_input: str):
    print(f"hello {user_input}!")


with Flow("dummy-child-flow") as flow:
    param = Parameter("user_input", default="world")
    hw = hello_world(param)

if __name__ == '__main__':
    flow.register("p")

Parent flow

 from prefect import Flow, unmapped, task
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.executors import LocalDaskExecutor
from prefect.triggers import all_finished


with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
    parameters = [
        dict(user_input="Arthur"),
        dict(user_input="Marvin"),
        dict(user_input="Ford"),
    ]
    mapped_flow_ids = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-child-flow"),
        project_name=unmapped("p"),
    )
    wait_for_flow_run.map(mapped_flow_ids, raise_final_state=unmapped(True))


if __name__ == "__main__":
    flow.register("p")

Another example splitting the same flow into 8 flow runs grouped based on parameter values

 from prefect import Flow, task, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor


@task
def generate_thousand_numbers(start, stop, step):
    nrs = range(start, stop, step)
    return list(nrs)


with Flow("mapped_flows", executor=LocalDaskExecutor()) as flow:
    flow_run_1 = generate_thousand_numbers(1, 1000, 1)
    flow_run_2 = generate_thousand_numbers(1000, 2000, 1)
    flow_run_3 = generate_thousand_numbers(2000, 3000, 1)
    flow_run_4 = generate_thousand_numbers(3000, 4000, 1)
    flow_run_5 = generate_thousand_numbers(4000, 5000, 1)
    # ... until 8
    parameters = [
        dict(list_of_numbers=flow_run_1),
        dict(list_of_numbers=flow_run_2),
        dict(list_of_numbers=flow_run_3),
        dict(list_of_numbers=flow_run_4),
        dict(list_of_numbers=flow_run_5),
        # ... until 8
    ]
    mapped_flows = create_flow_run.map(
        parameters=parameters,
        flow_name=unmapped("dummy-child-flow"),
        project_name=unmapped("community"),
    )