View in #prefect-community on Slack
TL;DR - wrap the upstream task with the unmapped
keyword.
@Constantino_Schillebeeckx: I’m having issue with multiple layers of mapped tasks running out of expected order
from prefect import task, Flow
import time
from prefect.executors import LocalDaskExecutor
@task
def m1(x):
time.sleep(x)
print(x)
@task
def m2(x):
print(x)
with Flow('foo') as f:
l1 = m1.map([1, 2, 3, 4, 5])
l2 = m2.map(['a', 'b'], upstream_tasks=[l1])
f.executor = LocalDaskExecutor(scheduler="processes", num_workers=7)
For the example above, I would like (and would expect) all children of the mapped task m1
to be executed first before m2
begins (since it’s upstream_task) - however that’s not what I’m seeing (see screenshot)
Ah, I think I found the issue - this Upstream tasks issue with Task.map · Issue #2752 · PrefectHQ/prefect · GitHub post helped - needed to unmapped
the upstream_tasks
from prefect import task, Flow
import time
from prefect.executors import LocalDaskExecutor
from prefect import unmapped
@task
def m1(x):
time.sleep(x)
print(x)
@task
def m2(x):
print(x)
@task
def red(x):
print('dummy reduce')
with Flow('foo') as f:
l1 = m1.map([1, 2, 3, 4, 5])
l2 = m2.map(['a', 'b'], upstream_tasks=[unmapped(l1)])
f.executor = LocalDaskExecutor(scheduler="processes", num_workers=7)
@Anna_Geller: Nice work figuring this out and thanks for sharing the solution!