How can I map over a list of inputs?

Prefect 2.0

Prefect provides a .map() implementation that automatically creates a task run for each element of its input data. Mapped tasks represent the computations of many individual children tasks.

The simplest Prefect map takes a tasks and applies it to each element of its inputs.

from prefect import flow, task

@task
def print_nums(nums):
    for n in nums:
        print(n)

@task
def square_num(num):
    return num**2

@flow
def map_flow(nums):
    print_nums(nums)
    squared_nums = square_num.map(nums) 
    print_nums(squared_nums)

map_flow([1,2,3,5,8,13])

Prefect also supports unmapped arguments, allowing you to pass static values that don’t get mapped over.

from prefect import flow, task

@task
def add_together(x, y):
    return x + y

@flow
def sum_it(numbers, static_value):
    futures = add_together.map(numbers, static_value)
    return futures

sum_it([1, 2, 3], 5)

If your static argument is an iterable, you’ll need to wrap it with unmapped to tell Prefect that it should be treated as a static value.

from prefect import flow, task, unmapped

@task
def sum_plus(x, static_iterable):
    return x + sum(static_iterable)

@flow
def sum_it(numbers, static_iterable):
    futures = sum_plus.map(numbers, static_iterable)
    return futures

sum_it([4, 5, 6], unmapped([1, 2, 3]))

Prefect 1.0

Prefect 1.0 uses a flexible map/reduce model for dynamically executing parallel tasks.

from prefect import Flow, task
from prefect.executors import LocalDaskExecutor

numbers = [1, 2, 3]
map_fn = task(lambda x: x + 1)
reduce_fn = task(lambda x: sum(x))

with Flow('Map Reduce', executor=LocalDaskExecutor()) as flow:
    mapped_result = map_fn.map(numbers)
    reduced_result = reduce_fn(mapped_result)

This is one of my most valued features in Prefect 1.0. Any ETA for this in Prefect 2.0?

Thanks,
Peter

No ETA, but you can already do that using a for loop as shown here. The functionality is the same. Curious to hear if you can’t solve it using a for loop? LMK if you have some issues doing that

I actually haven’t tried, since I couldn’t get v.2.0 to work on my work laptop running Windows. But I was excited to see that you have started implementing Windows compatibility in the latest update and since I use the mapping feature a lot I wanted to know the approximate time frame for when I could possibly switch.

My main concern is that tasks looping over another task’s output cannot detect their upstream edges and how I could work around that. Right now the documentation and this post only states:

Orion will have convenience functions for detecting these associations, and Orion’s .map() operator will automatically track them.

- Peter

1 Like

Sorry to hear, as you noticed we are working on that. For now, perhaps you could explore working from AWS Cloud 9 or GitHub Codespaces? Both would give you a Linux-based IDE in the Cloud where you could easily install and explore Prefect 2.0

This is true. Thanks for showing interest in that feature, we’ll keep iterating on that and keep you posted via release notes.

Unfortunately, I haven’t had the time to experiment with Prefect outside of my work setup yet, but when I do I could use one of those alternatives or WSL. For now I’ll settle for looking forward to Windows compatibility and the map function being implemented and see if I can make the switch to v2 after that.

Thanks!
- Peter

:point_right: Note to anyone using DaskTaskRunner or RayTaskRunner:

from prefect version 2.0b8 onwards, those task runners were moved to the respective Prefect Collections for better code dependency management (the core library no longer requires dask or ray as dependencies - now, those can be installed sepataely when needed).

The correct imports are now:

from prefect_dask import DaskTaskRunner
from prefect_ray import RayTaskRunner

Has a map() operator been released for Prefect 2 yet? I tried a Prefect 2 flow using DaskTaskRunner, and the map() operator didn’t seem to work properly.

The below code hangs on the transform task. It submits mapped runs of the tasks to workers, but doesn’t actually run the code.

# adapted from https://medium.com/slateco-blog/prefect-x-kubernetes-x-ephemeral-dask-power-without-responsibility-6e10b4f2fe40
# copied directly from https://github.com/tekumara/prefect-example/blob/main/flows/dask_flow.py
from prefect import task, flow, get_run_logger
from prefect.filesystems import S3
from prefect_dask import DaskTaskRunner

s3_block = S3.load("dask-kubernetes-hello-world")


# Define some tasks for us to run in our flow
@task
def extract() -> list:
    logger = get_run_logger()
    logger.info("extract")
    return [1, 2, 3, 4, 5, 6]


@task
def transform(number: int) -> int:
    logger = get_run_logger()
    logger.info(f"transform {number}")
    return number * 2


@task
def load(numbers: list) -> list:
    logger = get_run_logger()
    logger.info("load")
    return [i for i in numbers if i]


@flow(
    task_runner=DaskTaskRunner(
        address="tcp://daskcluster-scheduler:8786",
    )
)
def dask_flow():
    numbers = extract()
    transformed_numbers = transform.map(numbers)
    numbers_twice = transform.map(transformed_numbers)
    result = load(numbers=numbers_twice)

I was able to get the code to run by using a for loop:

@flow(
    task_runner=DaskTaskRunner(
        address="tcp://daskcluster-scheduler:8786",
    )
)
def dask_flow():
    numbers = extract()
    transformed_numbers = []
    for i in numbers:
        transformed_numbers.append(transform(i))
    result = load(numbers=transformed_numbers)

But I was wondering if this is still the recommended way to do this with Prefect 2

1 Like

thanks for the example. We could try to reproduce it. Can you share your output of the prefect version and what prefect-dask version do you use?

I couldn’t reproduce the issue, both examples worked and the transform didn’t hang.

# pip freeze | grep prefect-dask
prefect-dask==0.2.1

# prefect version
Version:             2.6.9
API version:         0.8.3
Python version:      3.10.6
Git commit:          014d093e
Built:               Tue, Nov 22, 2022 2:14 PM
OS/Arch:             darwin/arm64
Profile:             default
Server type:         hosted

modifying it a bit more, it not only works, but map is even faster

Hi Nishanth!

Do all the worker nodes in your cluster have Prefect installed? The reason I ask is that I ran into a similar issue when I tried to run exactly the same code.

Based on the address, it looks like you might be using a dask-kubernetes cluster? If so, and you are using the default http://ghcr.io/dask/dask:latest image, it won’t work because every worker node must have all the dependencies your tasks need to run. If you provide the “EXTRA_PIP_PACKAGES” environment variable to the worker node containers with a value of “prefect”, they will install the latest version of Prefect when they start up.

Or, you could use a custom Docker image. As a quick test, I used the following Dockerfile:

FROM ghcr.io/dask/dask:2022.11.1
RUN pip install prefect

I built and tagged the image and pushed it to Docker Hub:

docker build -t rpeden/dask-prefect-aws:2.6.9 .
docker push rpeden/dask-prefect-aws:2.6.9

And then when I used that image to start a new Dask cluster in K8s using KubeCluster, the flow ran and worked as expected.

1 Like

Thank you both for these answers! Ryan, could you please share your python code that you used to start a new Dask cluster in k8s using KubeCluster?

I am trying something like this, but it’s not working to create the Dask cluster

@flow(
    task_runner=DaskTaskRunner(
        cluster_class=KubeCluster,
        cluster_kwargs=dict(
            name="nishanth-cluster",
            namespace="prefect2",
            image="my_custom_ecr_image:tag",
        ),
    )
)

The error I am getting is like this:

16:43:49.117 | INFO | prefect.task_runner.dask - Creating a new Dask cluster
with dask_kubernetes.operator.kubecluster.kubecluster.KubeCluster
16:43:49.127 | ERROR | Flow run ‘speedy-clam’ - Crash detected! Execution was
interrupted by an unexpected exception: Traceback (most recent call last):
File “/usr/local/lib/python3.9/site-packages/prefect/task_runners.py”, line
161, in start
await self._start(exit_stack)
File “/usr/local/lib/python3.9/site-packages/prefect_dask/task_runners.py”,
line 300, in _start
self._client = await exit_stack.enter_async_context(
File “/usr/local/lib/python3.9/contextlib.py”, line 575, in
enter_async_context
result = await _cm_type.aenter(cm)
File “/usr/local/lib/python3.9/site-packages/distributed/client.py”, line
1437, in aenter
await self
File “/usr/local/lib/python3.9/site-packages/distributed/client.py”, line
1252, in _start
await self._ensure_connected(timeout=timeout)
File “/usr/local/lib/python3.9/site-packages/distributed/client.py”, line
1315, in _ensure_connected
comm = await connect(
File “/usr/local/lib/python3.9/site-packages/distributed/comm/core.py”, line
291, in connect
comm = await asyncio.wait_for(
File “/usr/local/lib/python3.9/asyncio/tasks.py”, line 479, in wait_for
return fut.result()
File “/usr/local/lib/python3.9/site-packages/distributed/comm/tcp.py”, line
487, in connect
ip, port = parse_host_port(address)
File “/usr/local/lib/python3.9/site-packages/distributed/comm/addressing.py”,
line 95, in parse_host_port
port = _default()
File “/usr/local/lib/python3.9/site-packages/distributed/comm/addressing.py”,
line 73, in _default
raise ValueError(f"missing port number in address {address!r}")
ValueError: missing port number in address ‘’
During handling of the above exception, another exception occurred:
aiohttp.client_exceptions.InvalidURL:
/apis/kubernetes.dask.org/v1/namespaces/prefect2/daskclusters/nishanth-cluster
16:43:49.201 | ERROR | prefect.engine - Engine execution of flow run
‘58c4a529-4c19-4f3b-af5f-b7266e452f57’ exited with unexpected exception
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File “/usr/local/lib/python3.9/site-packages/dask_kubernetes/operator/kubecluster/kubecluster.py”, line 845, in reap_clusters
loop = asyncio.get_event_loop()
File “/usr/local/lib/python3.9/asyncio/events.py”, line 642, in get_event_loop
raise RuntimeError(‘There is no current event loop in thread %r.’
RuntimeError: There is no current event loop in thread ‘MainThread’.

KubeCluster is kind of deprecated by Dask - @ryan_peden knows more