I’m new to Prefect and have a DAG of functions that consume and produce
pandas.DataFrame
s. I want to “prefectify” my workflow but am pretty lost…
My approach:
- i’m wrapping my function calls in a
@flow
-decorated function - i then execute this flow in a script
What I’d like:
- i want all DataFrames persisted as parquet files in a google bucket.
- i think i need to use
serializers.PandasSerializer
, but i don’t know how
- i think i need to use
- i want to avoid decorating my functions with
@task
- this seems possible with prefect 2.0…?
- log messages showing either (1) where a task result was persisted or (2) where an already cached result is
- e.g.
DEBUG | Task run ... Finished in state Cached(, type=COMPLETED, uri="gs://mybucket/foo/bar.parquet")
- e.g.
I’ve seen…
- docs for “Flow and task configuration → Task caching”
- docs for
serializers.PandasSerializer
- the tutorial “Orchestrate Your Data Science Project with Prefect 2.0”.
but still don’t know wha ti need to do…
- should i set a serializer at the
task
level? - can prefect see my type annotations and, for any function that returns a
pd.DataFrame
, automatically use aPandasSerializer
that i define somewhere?
Here’s some example code. In this example, I want to (1) not have @task
decorations if possible (2) preserve task result caching and (3) persist all DataFrame
results as parquet files in a google bucket.
import logging
import time
import numpy as np
import pandas as pd
from prefect import flow, task
from prefect.tasks import task_input_hash
logger = logging.getLogger(__name__)
@task(cache_key_fn=task_input_hash)
def make_df(seed: int = 0) -> pd.DataFrame:
logger.debug("sleeping in make_df...")
time.sleep(1)
logger.debug("done sleeping")
rng = np.random.default_rng(seed)
df = pd.DataFrame(
{
"A": [1, 1, 2, 2],
"B": [1, 2, 3, 4],
"C": rng.standard_normal(4),
}
)
return df
@task(cache_key_fn=task_input_hash)
def aggregate(df: pd.DataFrame) -> pd.DataFrame:
logger.debug("sleeping in aggregate...")
time.sleep(1)
logger.debug("done sleeping")
groups = df.groupby("A")
aggregations = groups.agg({"B": ["min", "max"], "C": "sum"})
return aggregations
@flow
def do_stuff(seed: int = 0) -> pd.DataFrame:
df = make_df(seed)
aggregations = aggregate(df)
return aggregations
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
logger.setLevel("DEBUG")
print(do_stuff(seed=0))
print(do_stuff(seed=1))