How to GROUP BY in Prefect 1.0 or 2.0?

Can Prefect 1.x perform distributed GROUP BY operations? If not, can Prefect 2.0 perform them?

As a simple example consider Sinkhorn iteration: we have a large distributed matrix and want to successively normalize columns and rows:

@task
def create_rows(num_rows):
    return [i for i in range(num_rows)]

@task
def create_column(i, num_cols):
    return [(i, j, random.random()) for j in range(num_cols)]

with Flow() as sinkhorn:
   # We can create a distributed matrix in Prefect 1.0.
    num_rows = num_cols = 1000000   # Some big number.
    matrix = flatten(create_column.map(create_rows(num_rows), unmapped(num_cols)))

    # If this were pure Python we could successively group by rows then columns
    for _ in range(num_steps):
        # Normalize rows.
        row_sums = defaultdict(float)
        for i, j, value in matrix:
            row_sums[i] += value
        matrix = [(i, j, value / row_sums[i]) for i, j, value in matrix]
        
        # Normalize columns.
        col_sums = defaultdict(float)
        for i, j, value in matrix:
            col_sums[j] += value
        matrix = [(i, j, value / col_sums[j]) for i, j, value in matrix]

I’d like to first GROUP BY row, then GROUP BY column, without ever materializing the large matrix on a single machine.

One way to view groupby is as “unflattening” the input, where the initial flatten()'s inverse would be a GROUP BY. This might be generalizable if we could also permute mapped tasks, but I’m not sure how to do that either.

If this is impossible in Prefect 1.0, is it instead possible in Prefect 2.0? (If so, this would give my team a reason to switch).

Thanks!

1 Like

Almost nothing is impossible if you try hard enough and are willing to use hacky workarounds :smile:

But this is waaay easier in Prefect 2.0 - you can just use your Sinkhorn code example and wrap it in a Prefect 2.0 flow and it will just work because it can run any Python code. Using tasks is optional.

from prefect import flow, task

@task
def create_rows(num_rows):
    return [i for i in range(num_rows)]

@task
def create_column(i, num_cols):
    return [(i, j, random.random()) for j in range(num_cols)]

@flow
def sinkhorn_flow():
   # your Sinkhorn implementation in plan Python without map, flatten or unmapped 

Thanks @anna_geller. Do I understand correctly that Prefect 2 materializes the entire dataflow graph on the orchestrater machine? The pseudocode you’ve suggested to eagerly evaluate includes a loop over 10^12 elements, which sounds infeasible unless you’re doing some sort of static code analysis and replacing iterators with distributed maps. It’s perfectly acceptable to answer “prefect cannot perform distributed GROUP BY”, I’m just trying to figure out what dataflow framework to use.

A useful way of answering that question would be: can you express your workflow logic in Python? If the answer is yes, then you can schedule and orchestrate that using Prefect. This is true especially with Prefect 2.0.

Prefect 2.0 is able to dynamically evaluate and execute arbitrary Python code as long as you import Prefect and add the flow decorator to your function. Happy to clarify if this doesn’t answer your question