TL;DR - use the apply_map
task.
More details about apply_map
Sometimes you want to express a more complex structure in your mapped pipelines, e.g. adding conditional tasks using prefect.case
. This can be done using prefect.apply_map
.
apply_map
takes a function that adds multiple scalar tasks to a flow, and converts those tasks to run as parallel mapped pipelines.
For example, here we create a function with the following logic:
- If
x
is even, increment it - If
x
is odd, negate it
Note that inc_or_negate
is not a task itself - it’s a “normal” function that creates several tasks. Just as we can map single tasks like inc
using inc.map
, we can map functions that create multiple tasks using apply_map
.
from prefect import Flow, task, case, apply_map
from prefect.tasks.control_flow import merge
@task
def inc(x):
return x + 1
@task
def negate(x):
return -x
@task
def is_even(x):
return x % 2 == 0
def inc_or_negate(x):
cond = is_even(x)
# If x is even, increment it
with case(cond, True):
res1 = inc(x)
# If x is odd, negate it
with case(cond, False):
res2 = negate(x)
return merge(res1, res2)
with Flow("apply-map example") as flow:
result = apply_map(inc_or_negate, range(4))
Running the above flow we get four parallel, conditional mapped pipelines. The computed value of result
is [1, -1, 3, -3]
.
Just as with task.map
, arguments to apply_map
can be wrapped with unmapped
, allowing certain arguments to avoid being mapped. While not always necessary, apply_map
can be quite useful when you want to create complex mapped pipelines with conditional logic within them.