How to deserialize flow run results generated by default S3 storage?

Prefect storage blocks are used simultaneously for both flow code and results.

Given the following flow and DeploymentSpec:

from prefect import task, flow
from prefect import get_run_logger
from prefect.deployments import DeploymentSpec

@task
def get_name():
    return "Marvin"

@task
def say_hi(user_name: str):
    logger = get_run_logger()
    logger.info("Hello %s!", user_name)

@flow
def marvin_flow():
    user = get_name()
    say_hi(user)

DeploymentSpec(
    name="dev",
    flow=marvin_flow,
    tags=["local"]
)

You can create and run it using:

prefect deployment create your_file.py
prefect deployment run marvin-flow/dev

If you are using S3 as your default storage, you can then download your result objects using:

aws s3 cp s3://yourbucketname . --resursive

Then, to load the pickled flow run result object, use:

import json
import cloudpickle
from prefect.orion.schemas.data import DataDocument

path = "/Users/your/path/to/s3/object/b5d3346d-6253-417f-9e76-a4b62a779195"

with open(path, "rb") as buffered_reader:
    dict_obj = json.load(buffered_reader)
    flow_run_result = DataDocument.parse_obj(dict_obj).decode()

The flow_run_result object should have a structure as follows:

[Completed(message=None, type=COMPLETED, 
result='Marvin', task_run_id=b85fd60f-0c06-4bed-9bb4-f5a311d47ce0), 
Completed(message=None, type=COMPLETED, 
result=None, task_run_id=1eb7400f-569e-4304-a334-d60cbbb2e2d4)]

You can see that it includes the states of all task runs and their return values:

  • the first task returned a string "Marvin"
  • the other task returned nothing - None