Given your original message with S3Result I tested this as follows:
Using this flow:
"""
prefect register --project xyz -p checkpointing_dd_for_restarts.py
prefect agent local start
In a separate terminal window:
prefect run --name checkpointing_dd_for_restarts --watch
"""
import dask.dataframe as dd
import pandas as pd
from prefect import task, Flow
from prefect.engine.results import S3Result
from random import random
@task(log_stdout=True, result=S3Result(bucket="prefectdata"), checkpoint=True)
def checkpoint_data() -> dd.DataFrame:
df = pd.DataFrame({"col_1": ["1", "2", "3"], "col_2": [1, 2, 3]})
return dd.from_pandas(df, npartitions=1)
@task(log_stdout=True, result=S3Result(bucket="prefectdata"), checkpoint=True)
def accept_checkpointed_data(ddf: dd.DataFrame) -> None:
print(ddf.dtypes)
nr = random()
print(f"Number deciding whether task fails or not: {nr} - if nr > 0.5, task fails")
if nr > 0.5:
raise ValueError("Too big number!")
print(f"Nr was < 0.5 - taks succeeded")
return ddf.dtypes
with Flow("checkpointing_dd_for_restarts") as flow:
ddf = checkpoint_data()
accept_checkpointed_data(ddf)
I’m getting results as shown in the images
And the result can be seen in S3 so checkpointing dask dataframe using S3Result and PickleSerializer seems to work fine: