Prefect storage blocks are used simultaneously for both flow code and results.
Given the following flow and DeploymentSpec
:
from prefect import task, flow
from prefect import get_run_logger
from prefect.deployments import DeploymentSpec
@task
def get_name():
return "Marvin"
@task
def say_hi(user_name: str):
logger = get_run_logger()
logger.info("Hello %s!", user_name)
@flow
def marvin_flow():
user = get_name()
say_hi(user)
DeploymentSpec(
name="dev",
flow=marvin_flow,
tags=["local"]
)
You can create and run it using:
prefect deployment create your_file.py
prefect deployment run marvin-flow/dev
If you are using S3 as your default storage, you can then download your result objects using:
aws s3 cp s3://yourbucketname . --resursive
Then, to load the pickled flow run result object, use:
import json
import cloudpickle
from prefect.orion.schemas.data import DataDocument
path = "/Users/your/path/to/s3/object/b5d3346d-6253-417f-9e76-a4b62a779195"
with open(path, "rb") as buffered_reader:
dict_obj = json.load(buffered_reader)
flow_run_result = DataDocument.parse_obj(dict_obj).decode()
The flow_run_result
object should have a structure as follows:
[Completed(message=None, type=COMPLETED,
result='Marvin', task_run_id=b85fd60f-0c06-4bed-9bb4-f5a311d47ce0),
Completed(message=None, type=COMPLETED,
result=None, task_run_id=1eb7400f-569e-4304-a334-d60cbbb2e2d4)]
You can see that it includes the states of all task runs and their return values:
- the first task returned a string
"Marvin"
- the other task returned nothing -
None