# How to GROUP BY in Prefect 1.0 or 2.0?

Can Prefect 1.x perform distributed GROUP BY operations? If not, can Prefect 2.0 perform them?

As a simple example consider Sinkhorn iteration: we have a large distributed matrix and want to successively normalize columns and rows:

``````@task
def create_rows(num_rows):
return [i for i in range(num_rows)]

def create_column(i, num_cols):
return [(i, j, random.random()) for j in range(num_cols)]

with Flow() as sinkhorn:
# We can create a distributed matrix in Prefect 1.0.
num_rows = num_cols = 1000000   # Some big number.
matrix = flatten(create_column.map(create_rows(num_rows), unmapped(num_cols)))

# If this were pure Python we could successively group by rows then columns
for _ in range(num_steps):
# Normalize rows.
row_sums = defaultdict(float)
for i, j, value in matrix:
row_sums[i] += value
matrix = [(i, j, value / row_sums[i]) for i, j, value in matrix]

# Normalize columns.
col_sums = defaultdict(float)
for i, j, value in matrix:
col_sums[j] += value
matrix = [(i, j, value / col_sums[j]) for i, j, value in matrix]
``````

I’d like to first GROUP BY row, then GROUP BY column, without ever materializing the large matrix on a single machine.

One way to view groupby is as “unflattening” the input, where the initial `flatten()`'s inverse would be a GROUP BY. This might be generalizable if we could also permute mapped tasks, but I’m not sure how to do that either.

If this is impossible in Prefect 1.0, is it instead possible in Prefect 2.0? (If so, this would give my team a reason to switch).

Thanks!

1 Like

Almost nothing is impossible if you try hard enough and are willing to use hacky workarounds But this is waaay easier in Prefect 2.0 - you can just use your Sinkhorn code example and wrap it in a Prefect 2.0 flow and it will just work because it can run any Python code. Using tasks is optional.

``````from prefect import flow, task

def create_rows(num_rows):
return [i for i in range(num_rows)]