How can I map over a list of inputs?

Prefect 2.0

A .map() operator will be released for Orion soon. Until then, you can take advantage of Orion’s support for native Python to call tasks in loops:

from prefect import flow, task
from prefect.task_runners import DaskTaskRunner

@task
def my_mapped_task(x):
    return x

@task
def my_mapped_task_2(x):
    return x

@task
def list_task():
    return [1, 2, 3, 4, 5]

@flow(task_runner=DaskTaskRunner())
def my_flow():

    # map over a constant
    for i in range(10):
        my_mapped_task(i)

    # map over a task's output
    l = list_task()
    for i in l.wait().result():
        my_mapped_task_2(i)

Note that when tasks are called on constant values, they cannot detect their upstream edges automatically. In this example, my_maped_task_2 does not know that it is downstream from list_task(). Orion will have convenience functions for detecting these associations, and Orion’s .map() operator will automatically track them.

Prefect 1.0

Prefect 1.0 uses a flexible map/reduce model for dynamically executing parallel tasks.

from prefect import Flow, task
from prefect.executors import LocalDaskExecutor

numbers = [1, 2, 3]
map_fn = task(lambda x: x + 1)
reduce_fn = task(lambda x: sum(x))

with Flow('Map Reduce', executor=LocalDaskExecutor()) as flow:
    mapped_result = map_fn.map(numbers)
    reduced_result = reduce_fn(mapped_result)

This is one of my most valued features in Prefect 1.0. Any ETA for this in Prefect 2.0?

Thanks,
Peter

No ETA, but you can already do that using a for loop as shown here. The functionality is the same. Curious to hear if you can’t solve it using a for loop? LMK if you have some issues doing that

I actually haven’t tried, since I couldn’t get v.2.0 to work on my work laptop running Windows. But I was excited to see that you have started implementing Windows compatibility in the latest update and since I use the mapping feature a lot I wanted to know the approximate time frame for when I could possibly switch.

My main concern is that tasks looping over another task’s output cannot detect their upstream edges and how I could work around that. Right now the documentation and this post only states:

Orion will have convenience functions for detecting these associations, and Orion’s .map() operator will automatically track them.

- Peter

1 Like

Sorry to hear, as you noticed we are working on that. For now, perhaps you could explore working from AWS Cloud 9 or GitHub Codespaces? Both would give you a Linux-based IDE in the Cloud where you could easily install and explore Prefect 2.0

This is true. Thanks for showing interest in that feature, we’ll keep iterating on that and keep you posted via release notes.

Unfortunately, I haven’t had the time to experiment with Prefect outside of my work setup yet, but when I do I could use one of those alternatives or WSL. For now I’ll settle for looking forward to Windows compatibility and the map function being implemented and see if I can make the switch to v2 after that.

Thanks!
- Peter