How to build conditional logic within mapped tasks?

TL;DR - use the apply_map task.

More details about apply_map

Sometimes you want to express a more complex structure in your mapped pipelines, e.g. adding conditional tasks using prefect.case . This can be done using prefect.apply_map.

apply_map takes a function that adds multiple scalar tasks to a flow, and converts those tasks to run as parallel mapped pipelines.

For example, here we create a function with the following logic:

  • If x is even, increment it
  • If x is odd, negate it

Note that inc_or_negate is not a task itself - it’s a “normal” function that creates several tasks. Just as we can map single tasks like inc using inc.map , we can map functions that create multiple tasks using apply_map .

from prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge

@task
def inc(x):
    return x + 1

@task
def negate(x):
    return -x

@task
def is_even(x):
    return x % 2 == 0

def inc_or_negate(x):
    cond = is_even(x)
    # If x is even, increment it
    with case(cond, True):
        res1 = inc(x)
    # If x is odd, negate it
    with case(cond, False):
        res2 = negate(x)
    return merge(res1, res2)

with Flow("apply-map example") as flow:
    result = apply_map(inc_or_negate, range(4))

Running the above flow we get four parallel, conditional mapped pipelines. The computed value of result is [1, -1, 3, -3] .

Just as with task.map , arguments to apply_map can be wrapped with unmapped , allowing certain arguments to avoid being mapped. While not always necessary, apply_map can be quite useful when you want to create complex mapped pipelines with conditional logic within them.