# How to GROUP BY in Prefect 1.0 or 2.0?

Can Prefect 1.x perform distributed GROUP BY operations? If not, can Prefect 2.0 perform them?

As a simple example consider Sinkhorn iteration: we have a large distributed matrix and want to successively normalize columns and rows:

``````@task
def create_rows(num_rows):
return [i for i in range(num_rows)]

def create_column(i, num_cols):
return [(i, j, random.random()) for j in range(num_cols)]

with Flow() as sinkhorn:
# We can create a distributed matrix in Prefect 1.0.
num_rows = num_cols = 1000000   # Some big number.
matrix = flatten(create_column.map(create_rows(num_rows), unmapped(num_cols)))

# If this were pure Python we could successively group by rows then columns
for _ in range(num_steps):
# Normalize rows.
row_sums = defaultdict(float)
for i, j, value in matrix:
row_sums[i] += value
matrix = [(i, j, value / row_sums[i]) for i, j, value in matrix]

# Normalize columns.
col_sums = defaultdict(float)
for i, j, value in matrix:
col_sums[j] += value
matrix = [(i, j, value / col_sums[j]) for i, j, value in matrix]
``````

Iâ€™d like to first GROUP BY row, then GROUP BY column, without ever materializing the large matrix on a single machine.

One way to view groupby is as â€śunflatteningâ€ť the input, where the initial `flatten()`'s inverse would be a GROUP BY. This might be generalizable if we could also permute mapped tasks, but Iâ€™m not sure how to do that either.

If this is impossible in Prefect 1.0, is it instead possible in Prefect 2.0? (If so, this would give my team a reason to switch).

Thanks!

1 Like

Almost nothing is impossible if you try hard enough and are willing to use hacky workarounds

But this is waaay easier in Prefect 2.0 - you can just use your Sinkhorn code example and wrap it in a Prefect 2.0 flow and it will just work because it can run any Python code. Using tasks is optional.

``````from prefect import flow, task

def create_rows(num_rows):
return [i for i in range(num_rows)]