Given your original message with S3Result I tested this as follows:
Using this flow:
"""
prefect register --project xyz -p checkpointing_dd_for_restarts.py
prefect agent local start
In a separate terminal window:
prefect run --name checkpointing_dd_for_restarts --watch
"""
import dask.dataframe as dd
import pandas as pd
from prefect import task, Flow
from prefect.engine.results import S3Result
from random import random
@task(log_stdout=True, result=S3Result(bucket="prefectdata"), checkpoint=True)
def checkpoint_data() -> dd.DataFrame:
df = pd.DataFrame({"col_1": ["1", "2", "3"], "col_2": [1, 2, 3]})
return dd.from_pandas(df, npartitions=1)
@task(log_stdout=True, result=S3Result(bucket="prefectdata"), checkpoint=True)
def accept_checkpointed_data(ddf: dd.DataFrame) -> None:
print(ddf.dtypes)
nr = random()
print(f"Number deciding whether task fails or not: {nr} - if nr > 0.5, task fails")
if nr > 0.5:
raise ValueError("Too big number!")
print(f"Nr was < 0.5 - taks succeeded")
return ddf.dtypes
with Flow("checkpointing_dd_for_restarts") as flow:
ddf = checkpoint_data()
accept_checkpointed_data(ddf)
I’m getting results as shown in the images
And the result can be seen in S3 so checkpointing dask dataframe using S3Result and PickleSerializer seems to work fine:
Regarding original question - I can see checkpoint result in S3, but I don’t see any log that on restart it will pickup data from s3 and not cached_inputs.
@Vadym_Dytyniak Now reproducing option #3 i.e. the same flow but using LocalResult with a local flow run.
"""
prefect agent local start
checkpointing_dd_for_restarts_local_results.py
"""
import dask.dataframe as dd
import pandas as pd
from prefect import task, Flow
from prefect.engine.results import LocalResult
from random import random
FLOW_NAME = "checkpointing_dd_for_restarts_local_results"
@task(
log_stdout=True,
result=LocalResult(dir=f"~/.prefect/results/{FLOW_NAME}/checkpoint_data_task/"),
checkpoint=True,
)
def checkpoint_data() -> dd.DataFrame:
df = pd.DataFrame({"col_1": ["1", "2", "3"], "col_2": [1, 2, 3]})
return dd.from_pandas(df, npartitions=1)
@task(
log_stdout=True,
result=LocalResult(dir=f"~/.prefect/results/{FLOW_NAME}/accept_checkpointed_data/"),
checkpoint=True,
)
def accept_checkpointed_data(ddf: dd.DataFrame) -> None:
print(ddf.dtypes)
nr = random()
print(f"Number deciding whether task fails or not: {nr} - if nr > 0.5, task fails")
if nr > 0.5:
raise ValueError("Too big number!")
print(f"Nr was < 0.5 - taks succeeded")
return ddf.dtypes
with Flow(FLOW_NAME) as flow:
ddf = checkpoint_data()
accept_checkpointed_data(ddf)
if __name__ == "__main__":
flow.run()
Running from the terminal:
export PREFECT__FLOWS__CHECKPOINTING=true
python checkpointing_dd_for_restarts_local_results.py
generates the results as expected:
1 Like
It works when running from terminal. Really strange. Thanks
1 Like
Regarding:
you may need to enable DEBUG logs to see this:
export PREFECT__LOGGING__LEVEL=DEBUG
Output:
(prefectCloudFlows) ➜ 000_Community export PREFECT__LOGGING__LEVEL=DEBUG
(prefectCloudFlows) ➜ 000_Community python checkpointing_dd_for_restarts_local_results.py
[2022-03-09 13:44:55+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'checkpointing_dd_for_restarts_local_results'
[2022-03-09 13:44:55+0100] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2022-03-09 13:44:55+0100] DEBUG - prefect.FlowRunner | Flow 'checkpointing_dd_for_restarts_local_results': Handling state change from Scheduled to Running
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | Task 'checkpoint_data': Starting task run...
[2022-03-09 13:44:55+0100] DEBUG - prefect.TaskRunner | Task 'checkpoint_data': Handling state change from Pending to Running
[2022-03-09 13:44:55+0100] DEBUG - prefect.TaskRunner | Task 'checkpoint_data': Calling task.run() method...
[2022-03-09 13:44:55+0100] DEBUG - prefect.LocalResult | Starting to upload result to /Users/anna/.prefect/results/checkpointing_dd_for_restarts_local_results/checkpoint_data_task/prefect-result-2022-03-09t12-44-55-780785-00-00...
[2022-03-09 13:44:55+0100] DEBUG - prefect.LocalResult | Finished uploading result to /Users/anna/.prefect/results/checkpointing_dd_for_restarts_local_results/checkpoint_data_task/prefect-result-2022-03-09t12-44-55-780785-00-00...
[2022-03-09 13:44:55+0100] DEBUG - prefect.TaskRunner | Task 'checkpoint_data': Handling state change from Running to Success
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | Task 'checkpoint_data': Finished task run for task with final state: 'Success'
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | Task 'accept_checkpointed_data': Starting task run...
[2022-03-09 13:44:55+0100] DEBUG - prefect.TaskRunner | Task 'accept_checkpointed_data': Handling state change from Pending to Running
[2022-03-09 13:44:55+0100] DEBUG - prefect.TaskRunner | Task 'accept_checkpointed_data': Calling task.run() method...
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | col_1 object
col_2 int64
dtype: object
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | Number deciding whether task fails or not: 0.32289249772504836 - if nr > 0.5, task fails
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | Nr was < 0.5 - taks succeeded
[2022-03-09 13:44:55+0100] DEBUG - prefect.LocalResult | Starting to upload result to /Users/anna/.prefect/results/checkpointing_dd_for_restarts_local_results/accept_checkpointed_data/prefect-result-2022-03-09t12-44-55-815810-00-00...
[2022-03-09 13:44:55+0100] DEBUG - prefect.LocalResult | Finished uploading result to /Users/anna/.prefect/results/checkpointing_dd_for_restarts_local_results/accept_checkpointed_data/prefect-result-2022-03-09t12-44-55-815810-00-00...
[2022-03-09 13:44:55+0100] DEBUG - prefect.TaskRunner | Task 'accept_checkpointed_data': Handling state change from Running to Success
[2022-03-09 13:44:55+0100] INFO - prefect.TaskRunner | Task 'accept_checkpointed_data': Finished task run for task with final state: 'Success'
[2022-03-09 13:44:55+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-03-09 13:44:55+0100] DEBUG - prefect.FlowRunner | Flow 'checkpointing_dd_for_restarts_local_results': Handling state change from Running to Success
1 Like