Prefect provides a
.map() implementation that automatically creates a task run for each element of its input data. Mapped tasks represent the computations of many individual children tasks.
The simplest Prefect map takes a tasks and applies it to each element of its inputs.
from prefect import flow, task @task def print_nums(nums): for n in nums: print(n) @task def square_num(num): return num**2 @flow def map_flow(nums): print_nums(nums) squared_nums = square_num.map(nums) print_nums(squared_nums) map_flow([1,2,3,5,8,13])
Prefect also supports
unmapped arguments, allowing you to pass static values that don’t get mapped over.
from prefect import flow, task @task def add_together(x, y): return x + y @flow def sum_it(numbers, static_value): futures = add_together.map(numbers, static_value) return futures sum_it([1, 2, 3], 5)
If your static argument is an iterable, you’ll need to wrap it with
unmapped to tell Prefect that it should be treated as a static value.
from prefect import flow, task, unmapped @task def sum_plus(x, static_iterable): return x + sum(static_iterable) @flow def sum_it(numbers, static_iterable): futures = sum_plus.map(numbers, static_iterable) return futures sum_it([4, 5, 6], unmapped([1, 2, 3]))
Prefect 1.0 uses a flexible map/reduce model for dynamically executing parallel tasks.
from prefect import Flow, task from prefect.executors import LocalDaskExecutor numbers = [1, 2, 3] map_fn = task(lambda x: x + 1) reduce_fn = task(lambda x: sum(x)) with Flow('Map Reduce', executor=LocalDaskExecutor()) as flow: mapped_result = map_fn.map(numbers) reduced_result = reduce_fn(mapped_result)