Can Prefect 1.x perform distributed GROUP BY operations? If not, can Prefect 2.0 perform them?
As a simple example consider Sinkhorn iteration: we have a large distributed matrix and want to successively normalize columns and rows:
@task
def create_rows(num_rows):
return [i for i in range(num_rows)]
@task
def create_column(i, num_cols):
return [(i, j, random.random()) for j in range(num_cols)]
with Flow() as sinkhorn:
# We can create a distributed matrix in Prefect 1.0.
num_rows = num_cols = 1000000 # Some big number.
matrix = flatten(create_column.map(create_rows(num_rows), unmapped(num_cols)))
# If this were pure Python we could successively group by rows then columns
for _ in range(num_steps):
# Normalize rows.
row_sums = defaultdict(float)
for i, j, value in matrix:
row_sums[i] += value
matrix = [(i, j, value / row_sums[i]) for i, j, value in matrix]
# Normalize columns.
col_sums = defaultdict(float)
for i, j, value in matrix:
col_sums[j] += value
matrix = [(i, j, value / col_sums[j]) for i, j, value in matrix]
I’d like to first GROUP BY row, then GROUP BY column, without ever materializing the large matrix on a single machine.
One way to view groupby is as “unflattening” the input, where the initial flatten()
's inverse would be a GROUP BY. This might be generalizable if we could also permute mapped tasks, but I’m not sure how to do that either.
If this is impossible in Prefect 1.0, is it instead possible in Prefect 2.0? (If so, this would give my team a reason to switch).
Thanks!