Prefect 2.0
Prefect provides a .map()
implementation that automatically creates a task run for each element of its input data. Mapped tasks represent the computations of many individual children tasks.
The simplest Prefect map takes a tasks and applies it to each element of its inputs.
from prefect import flow, task
@task
def print_nums(nums):
for n in nums:
print(n)
@task
def square_num(num):
return num**2
@flow
def map_flow(nums):
print_nums(nums)
squared_nums = square_num.map(nums)
print_nums(squared_nums)
map_flow([1,2,3,5,8,13])
Prefect also supports unmapped
arguments, allowing you to pass static values that donât get mapped over.
from prefect import flow, task
@task
def add_together(x, y):
return x + y
@flow
def sum_it(numbers, static_value):
futures = add_together.map(numbers, static_value)
return futures
sum_it([1, 2, 3], 5)
If your static argument is an iterable, youâll need to wrap it with unmapped
to tell Prefect that it should be treated as a static value.
from prefect import flow, task, unmapped
@task
def sum_plus(x, static_iterable):
return x + sum(static_iterable)
@flow
def sum_it(numbers, static_iterable):
futures = sum_plus.map(numbers, static_iterable)
return futures
sum_it([4, 5, 6], unmapped([1, 2, 3]))
Prefect 1.0
Prefect 1.0 uses a flexible map/reduce model for dynamically executing parallel tasks.
from prefect import Flow, task
from prefect.executors import LocalDaskExecutor
numbers = [1, 2, 3]
map_fn = task(lambda x: x + 1)
reduce_fn = task(lambda x: sum(x))
with Flow('Map Reduce', executor=LocalDaskExecutor()) as flow:
mapped_result = map_fn.map(numbers)
reduced_result = reduce_fn(mapped_result)