How do I configure pandas serialization for a flow involving lots of DataFrames?

I’m new to Prefect :nerd_face: and have a DAG of functions that consume and produce pandas.DataFrames. I want to “prefectify” my workflow but am pretty lost…

My approach:

  • i’m wrapping my function calls in a @flow-decorated function
  • i then execute this flow in a script

What I’d like:

  • i want all DataFrames persisted as parquet files in a google bucket.
    • i think i need to use serializers.PandasSerializer, but i don’t know how
  • i want to avoid decorating my functions with @task
    • this seems possible with prefect 2.0…?
  • log messages showing either (1) where a task result was persisted or (2) where an already cached result is
    • e.g. DEBUG | Task run ... Finished in state Cached(, type=COMPLETED, uri="gs://mybucket/foo/bar.parquet")

I’ve seen…

but still don’t know wha ti need to do…

  • should i set a serializer at the task level?
  • can prefect see my type annotations and, for any function that returns a pd.DataFrame, automatically use a PandasSerializer that i define somewhere?

Here’s some example code. In this example, I want to (1) not have @task decorations if possible (2) preserve task result caching and (3) persist all DataFrame results as parquet files in a google bucket.

import logging
import time

import numpy as np
import pandas as pd
from prefect import flow, task
from prefect.tasks import task_input_hash

logger = logging.getLogger(__name__)


@task(cache_key_fn=task_input_hash)
def make_df(seed: int = 0) -> pd.DataFrame:
    logger.debug("sleeping in make_df...")
    time.sleep(1)
    logger.debug("done sleeping")
    rng = np.random.default_rng(seed)
    df = pd.DataFrame(
        {
            "A": [1, 1, 2, 2],
            "B": [1, 2, 3, 4],
            "C": rng.standard_normal(4),
        }
    )
    return df


@task(cache_key_fn=task_input_hash)
def aggregate(df: pd.DataFrame) -> pd.DataFrame:
    logger.debug("sleeping in aggregate...")
    time.sleep(1)
    logger.debug("done sleeping")
    groups = df.groupby("A")
    aggregations = groups.agg({"B": ["min", "max"], "C": "sum"})
    return aggregations


@flow
def do_stuff(seed: int = 0) -> pd.DataFrame:
    df = make_df(seed)
    aggregations = aggregate(df)
    return aggregations


if __name__ == "__main__":
    logging.basicConfig(level=logging.DEBUG)
    logger.setLevel("DEBUG")
    print(do_stuff(seed=0))
    print(do_stuff(seed=1))

1 Like

The serializers were used in Prefect 1.0 but the same feature doesn’t exist in Prefect 2.0.

There’s no Prefect-specific way of doing that, and that’s intentional since Prefect’s focus is more on the coordination spectrum incl. orchestration and observability rather than on building a library of connectors/ETL integrations.

I’m curious, have you tried using Google SDK for that? you could also check if there is an already existing functionality for this in prefect-gcp and if not, you may open a feature request or even contribute :raised_hands:

Thanks so much for your reply!

I think I might have misunderstood what problems prefect solves. It also sounds like the focus of 2.0 is different in some keys ways from 1.0? And tbh I don’t think I understand what those ways are. :rofl: (I don’t work in large scale data engineering / ops, just rapid prototyping of stats / ml modeling pipelines in an academic lab with lots of abandonware).

Re: the Google cloud thing, are you referring to Cloud Run? I have no experience with this and I’m not sure if I should use it (totally open to it! I just don’t know how to evaluate if it fits my use case and workflow?).

I’m starting to wonder if I should just use python’s functools.lru_cache with some custom serializers that save / read my dataframes to parquet, maybe using fsspec for io (sometimes I save things locally, other times remotely in cloud storage). And then if I want to manually re-run a task, I can just delete stuff on disk / in my bucket. I was hoping to use Prefect to do this, with the serialization API and the nice UI :slight_smile:

Again, thank you SO MUCH for your help. Prefect seems very powerful, just trying to understand how to use it effectively.

more to GCS to load data to bucket

Yup exactly, you can use gcsfs and Prefect even has a block for that: prefect.filesystems.GCS

We have quite a lot of content on that, check those:

1 Like

ah ok! sorry, misunderstood you. yeah i love fsspec & gcsfs :slight_smile: magically easy io. and everything the fsspec team is working on, including also the pathlib-like universal_pathlib.

thanks for sharing. sorry for not being more familiar. trying to learn as much as possible in the work hours i’m cannibalizing to read about this lol. i’ll check out chris’s talk.

1 Like

One easy way to serialize/deserialize that I’ve personally used is either convert to/from dict, or json (str).
Ie

# in parent task
return df.to_dict(orient='records')
# in child task
df = pd.DataFrame(parent_data)

OR (using json str)

# in parent task
return df.to_json()
# in child task
df = pd.read_json(parent_data)
2 Likes

hi – it looks like serializers are back as of 2.6.0, per the release announcement? And are documented here in the 2.x docs?

(Correct me if wrong, I’m new to this tool)

One easy way to serialize/deserialize

thanks for sharing @Pk13055 :slight_smile:

it looks like serializers are back as of 2.6.0

whoa! thanks @awinter . i was looking into implementing some persistent caching logic myself, learning more about cache_key_fn, but this looks promising.

will reply here when i figure something out.