can prefect see my type annotations and, for any function that returns a pd.DataFrame, automatically use a PandasSerializer that i define somewhere?
Here’s some example code. In this example, I want to (1) not have @task decorations if possible (2) preserve task result caching and (3) persist all DataFrame results as parquet files in a google bucket.
import logging
import time
import numpy as np
import pandas as pd
from prefect import flow, task
from prefect.tasks import task_input_hash
logger = logging.getLogger(__name__)
@task(cache_key_fn=task_input_hash)
def make_df(seed: int = 0) -> pd.DataFrame:
logger.debug("sleeping in make_df...")
time.sleep(1)
logger.debug("done sleeping")
rng = np.random.default_rng(seed)
df = pd.DataFrame(
{
"A": [1, 1, 2, 2],
"B": [1, 2, 3, 4],
"C": rng.standard_normal(4),
}
)
return df
@task(cache_key_fn=task_input_hash)
def aggregate(df: pd.DataFrame) -> pd.DataFrame:
logger.debug("sleeping in aggregate...")
time.sleep(1)
logger.debug("done sleeping")
groups = df.groupby("A")
aggregations = groups.agg({"B": ["min", "max"], "C": "sum"})
return aggregations
@flow
def do_stuff(seed: int = 0) -> pd.DataFrame:
df = make_df(seed)
aggregations = aggregate(df)
return aggregations
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logger.setLevel("DEBUG")
print(do_stuff(seed=0))
print(do_stuff(seed=1))
The serializers were used in Prefect 1.0 but the same feature doesn’t exist in Prefect 2.0.
There’s no Prefect-specific way of doing that, and that’s intentional since Prefect’s focus is more on the coordination spectrum incl. orchestration and observability rather than on building a library of connectors/ETL integrations.
I’m curious, have you tried using Google SDK for that? you could also check if there is an already existing functionality for this in prefect-gcp and if not, you may open a feature request or even contribute
I think I might have misunderstood what problems prefect solves. It also sounds like the focus of 2.0 is different in some keys ways from 1.0? And tbh I don’t think I understand what those ways are. (I don’t work in large scale data engineering / ops, just rapid prototyping of stats / ml modeling pipelines in an academic lab with lots of abandonware).
Re: the Google cloud thing, are you referring to Cloud Run? I have no experience with this and I’m not sure if I should use it (totally open to it! I just don’t know how to evaluate if it fits my use case and workflow?).
I’m starting to wonder if I should just use python’s functools.lru_cache with some custom serializers that save / read my dataframes to parquet, maybe using fsspec for io (sometimes I save things locally, other times remotely in cloud storage). And then if I want to manually re-run a task, I can just delete stuff on disk / in my bucket. I was hoping to use Prefect to do this, with the serialization API and the nice UI
Again, thank you SO MUCH for your help. Prefect seems very powerful, just trying to understand how to use it effectively.
ah ok! sorry, misunderstood you. yeah i love fsspec & gcsfs magically easy io. and everything the fsspec team is working on, including also the pathlib-like universal_pathlib.
thanks for sharing. sorry for not being more familiar. trying to learn as much as possible in the work hours i’m cannibalizing to read about this lol. i’ll check out chris’s talk.
whoa! thanks @awinter . i was looking into implementing some persistent caching logic myself, learning more about cache_key_fn, but this looks promising.