Save data during a flow run in the local filestructure

For education purposes I am trying to setup a local data pipeline using prefect 2. I am looking for a solution to save a pandas dataframe or json during the execution of a flow.

Any help is appreciated.

1 Like

Sure, do you already have your script written in Python with Pandas? Can you share your script so far?

Thanks for your fast reply. This is my script with the flow consisting of multiple tasks.

import os
import yaml
import json

from minio import Minio
from prefect import flow
from prefect.filesystems import LocalFileSystem
from prefect.results import PersistedResult

from src.etl import (
    extract_sub_prefix, 
    get_object_paths, 
    download_files, 
    get_checkpoint,
    load_image_batch
)
from src.etl import dummy_transform, model_predict
from src.etl import update_processed_data


@flow(name='test-flow')
def etl_flow(BATCHSIZE, TEST_DATA, IS_TEST):
    # get checkpoint from SQL before loading next
    # Load Configurations
    with open('bucket_config.yaml', 'rb') as yaml_file:
        bucket_config = yaml.load(yaml_file, yaml.FullLoader)

    df_ckp = get_checkpoint(
        is_test=IS_TEST, 
        path=TEST_DATA
    )
    # --------------------------------
    # Extract
    # --------------------------------
    df_batch = load_image_batch(
        data=df_ckp,
        size=BATCHSIZE
    )
    # initiate s3 client
    client = Minio(
        bucket_config['HOST'], 
        access_key=bucket_config['ACCESS_KEY'],
        secret_key=bucket_config['SECRET_KEY']
    )
    download_files(
        client=client,
        bucket_name=bucket_config['BUCKET_NAME'],
        filenames=df_batch['object_name'].to_list(),
        n_threads=8,
    )
    
    # --------------------------------
    # Transform
    # --------------------------------
    with open('model_config.yaml', 'rb') as yaml_file:
        model_config = yaml.load(yaml_file, Loader=yaml.FullLoader)

    predictions = model_predict(
        data=df_ckp,
        cfg=model_config
    )

    # Here I would like to save the resulting JSON file
    with open('results.json', 'w') as json_file:
        json.dump(predictions, json_file)
        

    # --------------------------------
    # Update
    # --------------------------------
    df_ckp = update_processed_data(
        df=df_ckp, 
        processed_ids=df_batch['object_name'].to_list(),
        path=TEST_DATA
    )
   
   #In this step I would like to save the df_ckp locally.


    
if __name__ == '__main__':
    etl_flow()
1 Like

Nice work! If you run Orion locally: prefect orion start and start this flow just from your terminal/Python console, all is left to accomplish your goal is to add:

df_ckp.to_json("filename.json", orient="records")
1 Like

And how can I manage this, when I start it via work queue and the orion UI?

1 Like

Check out those resources:

It can be as simple as:

prefect deployment build -n dev -q default -a flows/etl_flow.py:etl_flow
prefect agent start -q default
1 Like

Thank you for the references. I am probably missing out something.

My deployment works fine, but what if I want to transform i.e. a pandas dataframe in my local filestructure during a flow run over an agent and then save the transformed dataframe again to the local filestructure.
If I run this flow locally by calling the script with the flow as mentioned at the beginning it works fine. The changes made to to the dataframe are visible. Now, If I start it via orion interface , then the changes do not persist.
How is it possible to achieve this persistence of the transformed dataframe or the generated JSON?

You can persist all results by default by leveraging the command:

prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true

All results will be by default stored as pickle files in ~/.prefect/storage - you can override with:

prefect config set PREFECT_LOCAL_STORAGE_PATH='${PREFECT_HOME}/storage'
prefect config set PREFECT_RESULTS_DEFAULT_SERIALIZER='json'
1 Like

It works fine when you run this flow locally by calling the script, but not with orion interface (local agent, actually) because local agent defaults to temporary working directory.
Set infrastructure: working_dir: to the path where you want to save the ‘results.json’ in the deployment yaml file.
For example, I set it to

...
infrastructure:
  working_dir: /Users/myfolder/projects/prefect-test
...

After setting working_dir and applying it, ‘results.json’ is properly saved in /Users/myfolder/projects/prefect-test/results.json and visible.

2 Likes