Why are my mapped tasks running out of expected order when using multiple levels of mapped tasks?

View in #prefect-community on Slack

TL;DR - wrap the upstream task with the unmapped keyword.

Constantino_Schillebeeckx @Constantino_Schillebeeckx: I’m having issue with multiple layers of mapped tasks running out of expected order :thread:

from prefect import task, Flow
import time
from prefect.executors import LocalDaskExecutor


@task
def m1(x):
    time.sleep(x)
    print(x)


@task
def m2(x):
    print(x)


with Flow('foo') as f:
    l1 = m1.map([1, 2, 3, 4, 5])
    l2 = m2.map(['a', 'b'], upstream_tasks=[l1])

f.executor = LocalDaskExecutor(scheduler="processes", num_workers=7)

For the example above, I would like (and would expect) all children of the mapped task m1 to be executed first before m2 begins (since it’s upstream_task) - however that’s not what I’m seeing (see screenshot)

Ah, I think I found the issue - this Upstream tasks issue with Task.map · Issue #2752 · PrefectHQ/prefect · GitHub post helped - needed to unmapped the upstream_tasks


from prefect import task, Flow
import time
from prefect.executors import LocalDaskExecutor
from prefect import unmapped


@task
def m1(x):
    time.sleep(x)
    print(x)


@task
def m2(x):
    print(x)


@task
def red(x):
    print('dummy reduce')


with Flow('foo') as f:
    l1 = m1.map([1, 2, 3, 4, 5])
    l2 = m2.map(['a', 'b'], upstream_tasks=[unmapped(l1)])

f.executor = LocalDaskExecutor(scheduler="processes", num_workers=7)

Anna_Geller @Anna_Geller: Nice work figuring this out and thanks for sharing the solution!