Save data during a flow run in the local filestructure

For education purposes I am trying to setup a local data pipeline using prefect 2. I am looking for a solution to save a pandas dataframe or json during the execution of a flow.

Any help is appreciated.

1 Like

Sure, do you already have your script written in Python with Pandas? Can you share your script so far?

Thanks for your fast reply. This is my script with the flow consisting of multiple tasks.

import os
import yaml
import json

from minio import Minio
from prefect import flow
from prefect.filesystems import LocalFileSystem
from prefect.results import PersistedResult

from src.etl import (
    extract_sub_prefix, 
    get_object_paths, 
    download_files, 
    get_checkpoint,
    load_image_batch
)
from src.etl import dummy_transform, model_predict
from src.etl import update_processed_data


@flow(name='test-flow')
def etl_flow(BATCHSIZE, TEST_DATA, IS_TEST):
    # get checkpoint from SQL before loading next
    # Load Configurations
    with open('bucket_config.yaml', 'rb') as yaml_file:
        bucket_config = yaml.load(yaml_file, yaml.FullLoader)

    df_ckp = get_checkpoint(
        is_test=IS_TEST, 
        path=TEST_DATA
    )
    # --------------------------------
    # Extract
    # --------------------------------
    df_batch = load_image_batch(
        data=df_ckp,
        size=BATCHSIZE
    )
    # initiate s3 client
    client = Minio(
        bucket_config['HOST'], 
        access_key=bucket_config['ACCESS_KEY'],
        secret_key=bucket_config['SECRET_KEY']
    )
    download_files(
        client=client,
        bucket_name=bucket_config['BUCKET_NAME'],
        filenames=df_batch['object_name'].to_list(),
        n_threads=8,
    )
    
    # --------------------------------
    # Transform
    # --------------------------------
    with open('model_config.yaml', 'rb') as yaml_file:
        model_config = yaml.load(yaml_file, Loader=yaml.FullLoader)

    predictions = model_predict(
        data=df_ckp,
        cfg=model_config
    )

    # Here I would like to save the resulting JSON file
    with open('results.json', 'w') as json_file:
        json.dump(predictions, json_file)
        

    # --------------------------------
    # Update
    # --------------------------------
    df_ckp = update_processed_data(
        df=df_ckp, 
        processed_ids=df_batch['object_name'].to_list(),
        path=TEST_DATA
    )
   
   #In this step I would like to save the df_ckp locally.


    
if __name__ == '__main__':
    etl_flow()
1 Like

Nice work! If you run Orion locally: prefect orion start and start this flow just from your terminal/Python console, all is left to accomplish your goal is to add:

df_ckp.to_json("filename.json", orient="records")
1 Like

And how can I manage this, when I start it via work queue and the orion UI?

1 Like

Check out those resources:

It can be as simple as:

prefect deployment build -n dev -q default -a flows/etl_flow.py:etl_flow
prefect agent start -q default
1 Like

Thank you for the references. I am probably missing out something.

My deployment works fine, but what if I want to transform i.e. a pandas dataframe in my local filestructure during a flow run over an agent and then save the transformed dataframe again to the local filestructure.
If I run this flow locally by calling the script with the flow as mentioned at the beginning it works fine. The changes made to to the dataframe are visible. Now, If I start it via orion interface , then the changes do not persist.
How is it possible to achieve this persistence of the transformed dataframe or the generated JSON?

You can persist all results by default by leveraging the command:

prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true

All results will be by default stored as pickle files in ~/.prefect/storage - you can override with:

prefect config set PREFECT_LOCAL_STORAGE_PATH='${PREFECT_HOME}/storage'
prefect config set PREFECT_RESULTS_DEFAULT_SERIALIZER='json'
1 Like

It works fine when you run this flow locally by calling the script, but not with orion interface (local agent, actually) because local agent defaults to temporary working directory.
Set infrastructure: working_dir: to the path where you want to save the ‘results.json’ in the deployment yaml file.
For example, I set it to

...
infrastructure:
  working_dir: /Users/myfolder/projects/prefect-test
...

After setting working_dir and applying it, ‘results.json’ is properly saved in /Users/myfolder/projects/prefect-test/results.json and visible.

2 Likes

@eshyun your solution looks great.
I’d like to do the same as above, but using Deployment.build_from_flow instead CLI. I cannot find fitting parameters in documentation. Any ideas?

'# not working:
Deployment.build_from_flow(…, working_dir=“/Users/myfolder/projects/prefect-test”)

If you want to use Deployment.build_from_flow then create a infrastructure object as shown below.

from prefect.infrastructure import Process

# The below 2 lines can be removed once the infrastructure object is created by running the python script. This also can be created as a block in the prefect cloud UI.
my_infra=Process(working_dir="path/to/dir")
my_infra.save(name="infra-name")

Now run the python script and the add the below line to load the my_infra object in your Deployment.
my_infra=Process.load("infra-name")

Deployment.build_from_flow(
name="",
flow="",
infrastructure=my_infra  # variable name of the infrastructure object that is loaded above.
)
1 Like

import LocalFileSystem with

from prefect.filesystems import LocalFileSystem

create your local storage block

my_storage_block = LocalFileSystem(basepath=‘path/to/save/dir’)

save your storage block

my_storage_block.save(name=“image-storage”)

then you can set it in your @flow decorator like this

@flow(name=“retrieval flow”, persist_result=True, result_storage=LocalFileSystem.load(“local-storage”))

I thought that just specifying the path when building a Deployment would be enough, but specifying the infrastructure with the same path solved a similar issue I had. Thanks for your answer!