[Flow Modelling] Fan-Out & Reducer Pattern

Hi folks.

We often encounter flows that get a list of items, process them in parallel, and in the end, gather all the results of the parallel processing steps to create one flow result.

Example: First, get satellite images for Northern Europe, then run ML on every image in parallel, and finally merge all ML results into one prediction.
The task dependencies from the fan-out to the reducer task are nicely modeled with Prefect because they directly depend on the task result and therefore generate nice dependency arrows in the UI:

However, the task dependency between the first “generate_list” task that fans out into the processor tasks does not generate direct dependencies because we cannot use the typical .submit() interface but must utilize .submit().result() or synchronously call the task.
Root cause: Submitting the “generate_list” task and then fanning out via a for loop is not possible because, of course, we cannot iterate over a PrefectFuture since we do not the length of the result-length, yet.

Is there a better way to do this than we are doing this at the moment?



Mock code related to the screenshot above:

from time import sleep
from prefect import flow, task


@task
def generate_list():
    sleep(5)
    return [1, 2, 3, 4, 5]


@task
def fan_out_processor(item):
    sleep(5)
    return item * 2


@task
def reducer(results):
    sleep(5)
    sum_of_results = sum(results)
    print(sum_of_results)
    return sum_of_results


@flow(log_prints=True)
def main():
    item_list = generate_list()

    results = []
    for item in item_list:
        result = fan_out_processor.submit(item)
        results.append(result)

    reducer.submit(results)


if __name__ == "__main__":
    main()

One option is to call .submit on the generator to capture it as a future and then pass it to wait_for on each task to construct the dependency. For example,

@flow(log_prints=True)
def main():
    item_list_task = generate_list.submit()
    item_list = item_list_task.result()

    results = []
    for item in item_list:
        result = fan_out_processor.submit(item, wait_for=[item_list_task])
        results.append(result)

    reducer.submit(results)