I am facing issues with checkpointing Dask dataframe and restarting from failure

Given your original message with S3Result I tested this as follows:

Using this flow:

"""
prefect register --project xyz -p checkpointing_dd_for_restarts.py
prefect agent local start

In a separate terminal window:
prefect run --name checkpointing_dd_for_restarts --watch
"""
import dask.dataframe as dd
import pandas as pd
from prefect import task, Flow
from prefect.engine.results import S3Result
from random import random


@task(log_stdout=True, result=S3Result(bucket="prefectdata"), checkpoint=True)
def checkpoint_data() -> dd.DataFrame:
    df = pd.DataFrame({"col_1": ["1", "2", "3"], "col_2": [1, 2, 3]})
    return dd.from_pandas(df, npartitions=1)


@task(log_stdout=True, result=S3Result(bucket="prefectdata"), checkpoint=True)
def accept_checkpointed_data(ddf: dd.DataFrame) -> None:
    print(ddf.dtypes)
    nr = random()
    print(f"Number deciding whether task fails or not: {nr} - if nr > 0.5, task fails")
    if nr > 0.5:
        raise ValueError("Too big number!")
    print(f"Nr was < 0.5 - taks succeeded")
    return ddf.dtypes


with Flow("checkpointing_dd_for_restarts") as flow:
    ddf = checkpoint_data()
    accept_checkpointed_data(ddf)

I’m getting results as shown in the images

And the result can be seen in S3 so checkpointing dask dataframe using S3Result and PickleSerializer seems to work fine:

Regarding original question - I can see checkpoint result in S3, but I don’t see any log that on restart it will pickup data from s3 and not cached_inputs.

@Vadym_Dytyniak Now reproducing option #3 i.e. the same flow but using LocalResult with a local flow run.

"""
prefect agent local start
checkpointing_dd_for_restarts_local_results.py
"""
import dask.dataframe as dd
import pandas as pd
from prefect import task, Flow
from prefect.engine.results import LocalResult
from random import random

FLOW_NAME = "checkpointing_dd_for_restarts_local_results"


@task(
    log_stdout=True,
    result=LocalResult(dir=f"~/.prefect/results/{FLOW_NAME}/checkpoint_data_task/"),
    checkpoint=True,
)
def checkpoint_data() -> dd.DataFrame:
    df = pd.DataFrame({"col_1": ["1", "2", "3"], "col_2": [1, 2, 3]})
    return dd.from_pandas(df, npartitions=1)


@task(
    log_stdout=True,
    result=LocalResult(dir=f"~/.prefect/results/{FLOW_NAME}/accept_checkpointed_data/"),
    checkpoint=True,
)
def accept_checkpointed_data(ddf: dd.DataFrame) -> None:
    print(ddf.dtypes)
    nr = random()
    print(f"Number deciding whether task fails or not: {nr} - if nr > 0.5, task fails")
    if nr > 0.5:
        raise ValueError("Too big number!")
    print(f"Nr was < 0.5 - taks succeeded")
    return ddf.dtypes


with Flow(FLOW_NAME) as flow:
    ddf = checkpoint_data()
    accept_checkpointed_data(ddf)

if __name__ == "__main__":
    flow.run()

Running from the terminal:

export PREFECT__FLOWS__CHECKPOINTING=true
python checkpointing_dd_for_restarts_local_results.py

generates the results as expected:

1 Like

It works when running from terminal. Really strange. Thanks

1 Like

Regarding:

you may need to enable DEBUG logs to see this:

export PREFECT__LOGGING__LEVEL=DEBUG

Output:

(prefectCloudFlows) ➜  000_Community export PREFECT__LOGGING__LEVEL=DEBUG
(prefectCloudFlows) ➜  000_Community python checkpointing_dd_for_restarts_local_results.py
[2022-03-09 13:44:55+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'checkpointing_dd_for_restarts_local_results'
[2022-03-09 13:44:55+0100] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2022-03-09 13:44:55+0100] DEBUG - prefect.FlowRunner | Flow 'checkpointing_dd_for_restarts_local_results': Handling state change from Scheduled to Running
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | Task 'checkpoint_data': Starting task run...
[2022-03-09 13:44:55+0100] DEBUG - prefect.TaskRunner | Task 'checkpoint_data': Handling state change from Pending to Running
[2022-03-09 13:44:55+0100] DEBUG - prefect.TaskRunner | Task 'checkpoint_data': Calling task.run() method...
[2022-03-09 13:44:55+0100] DEBUG - prefect.LocalResult | Starting to upload result to /Users/anna/.prefect/results/checkpointing_dd_for_restarts_local_results/checkpoint_data_task/prefect-result-2022-03-09t12-44-55-780785-00-00...
[2022-03-09 13:44:55+0100] DEBUG - prefect.LocalResult | Finished uploading result to /Users/anna/.prefect/results/checkpointing_dd_for_restarts_local_results/checkpoint_data_task/prefect-result-2022-03-09t12-44-55-780785-00-00...
[2022-03-09 13:44:55+0100] DEBUG - prefect.TaskRunner | Task 'checkpoint_data': Handling state change from Running to Success
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | Task 'checkpoint_data': Finished task run for task with final state: 'Success'
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | Task 'accept_checkpointed_data': Starting task run...
[2022-03-09 13:44:55+0100] DEBUG - prefect.TaskRunner | Task 'accept_checkpointed_data': Handling state change from Pending to Running
[2022-03-09 13:44:55+0100] DEBUG - prefect.TaskRunner | Task 'accept_checkpointed_data': Calling task.run() method...
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | col_1    object
col_2     int64
dtype: object
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | Number deciding whether task fails or not: 0.32289249772504836 - if nr > 0.5, task fails
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | Nr was < 0.5 - taks succeeded
[2022-03-09 13:44:55+0100] DEBUG - prefect.LocalResult | Starting to upload result to /Users/anna/.prefect/results/checkpointing_dd_for_restarts_local_results/accept_checkpointed_data/prefect-result-2022-03-09t12-44-55-815810-00-00...
[2022-03-09 13:44:55+0100] DEBUG - prefect.LocalResult | Finished uploading result to /Users/anna/.prefect/results/checkpointing_dd_for_restarts_local_results/accept_checkpointed_data/prefect-result-2022-03-09t12-44-55-815810-00-00...
[2022-03-09 13:44:55+0100] DEBUG - prefect.TaskRunner | Task 'accept_checkpointed_data': Handling state change from Running to Success
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | Task 'accept_checkpointed_data': Finished task run for task with final state: 'Success'
[2022-03-09 13:44:55+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-03-09 13:44:55+0100] DEBUG - prefect.FlowRunner | Flow 'checkpointing_dd_for_restarts_local_results': Handling state change from Running to Success
1 Like