I am facing issues with checkpointing Dask dataframe and restarting from failure

Given your original message with S3Result I tested this as follows:

Using this flow:

"""
prefect register --project xyz -p checkpointing_dd_for_restarts.py
prefect agent local start

In a separate terminal window:
prefect run --name checkpointing_dd_for_restarts --watch
"""
import dask.dataframe as dd
import pandas as pd
from prefect import task, Flow
from prefect.engine.results import S3Result
from random import random


@task(log_stdout=True, result=S3Result(bucket="prefectdata"), checkpoint=True)
def checkpoint_data() -> dd.DataFrame:
    df = pd.DataFrame({"col_1": ["1", "2", "3"], "col_2": [1, 2, 3]})
    return dd.from_pandas(df, npartitions=1)


@task(log_stdout=True, result=S3Result(bucket="prefectdata"), checkpoint=True)
def accept_checkpointed_data(ddf: dd.DataFrame) -> None:
    print(ddf.dtypes)
    nr = random()
    print(f"Number deciding whether task fails or not: {nr} - if nr > 0.5, task fails")
    if nr > 0.5:
        raise ValueError("Too big number!")
    print(f"Nr was < 0.5 - taks succeeded")
    return ddf.dtypes


with Flow("checkpointing_dd_for_restarts") as flow:
    ddf = checkpoint_data()
    accept_checkpointed_data(ddf)

I’m getting results as shown in the images

And the result can be seen in S3 so checkpointing dask dataframe using S3Result and PickleSerializer seems to work fine: