For education purposes I am trying to setup a local data pipeline using prefect 2. I am looking for a solution to save a pandas dataframe or json during the execution of a flow.
Any help is appreciated.
For education purposes I am trying to setup a local data pipeline using prefect 2. I am looking for a solution to save a pandas dataframe or json during the execution of a flow.
Any help is appreciated.
Sure, do you already have your script written in Python with Pandas? Can you share your script so far?
Thanks for your fast reply. This is my script with the flow consisting of multiple tasks.
import os
import yaml
import json
from minio import Minio
from prefect import flow
from prefect.filesystems import LocalFileSystem
from prefect.results import PersistedResult
from src.etl import (
extract_sub_prefix,
get_object_paths,
download_files,
get_checkpoint,
load_image_batch
)
from src.etl import dummy_transform, model_predict
from src.etl import update_processed_data
@flow(name='test-flow')
def etl_flow(BATCHSIZE, TEST_DATA, IS_TEST):
# get checkpoint from SQL before loading next
# Load Configurations
with open('bucket_config.yaml', 'rb') as yaml_file:
bucket_config = yaml.load(yaml_file, yaml.FullLoader)
df_ckp = get_checkpoint(
is_test=IS_TEST,
path=TEST_DATA
)
# --------------------------------
# Extract
# --------------------------------
df_batch = load_image_batch(
data=df_ckp,
size=BATCHSIZE
)
# initiate s3 client
client = Minio(
bucket_config['HOST'],
access_key=bucket_config['ACCESS_KEY'],
secret_key=bucket_config['SECRET_KEY']
)
download_files(
client=client,
bucket_name=bucket_config['BUCKET_NAME'],
filenames=df_batch['object_name'].to_list(),
n_threads=8,
)
# --------------------------------
# Transform
# --------------------------------
with open('model_config.yaml', 'rb') as yaml_file:
model_config = yaml.load(yaml_file, Loader=yaml.FullLoader)
predictions = model_predict(
data=df_ckp,
cfg=model_config
)
# Here I would like to save the resulting JSON file
with open('results.json', 'w') as json_file:
json.dump(predictions, json_file)
# --------------------------------
# Update
# --------------------------------
df_ckp = update_processed_data(
df=df_ckp,
processed_ids=df_batch['object_name'].to_list(),
path=TEST_DATA
)
#In this step I would like to save the df_ckp locally.
if __name__ == '__main__':
etl_flow()
Nice work! If you run Orion locally: prefect orion start
and start this flow just from your terminal/Python console, all is left to accomplish your goal is to add:
df_ckp.to_json("filename.json", orient="records")
And how can I manage this, when I start it via work queue and the orion UI?
Check out those resources:
It can be as simple as:
prefect deployment build -n dev -q default -a flows/etl_flow.py:etl_flow
prefect agent start -q default
Thank you for the references. I am probably missing out something.
My deployment works fine, but what if I want to transform i.e. a pandas dataframe in my local filestructure during a flow run over an agent and then save the transformed dataframe again to the local filestructure.
If I run this flow locally by calling the script with the flow as mentioned at the beginning it works fine. The changes made to to the dataframe are visible. Now, If I start it via orion interface , then the changes do not persist.
How is it possible to achieve this persistence of the transformed dataframe or the generated JSON?
You can persist all results by default by leveraging the command:
prefect config set PREFECT_RESULTS_PERSIST_BY_DEFAULT=true
All results will be by default stored as pickle files in ~/.prefect/storage
- you can override with:
prefect config set PREFECT_LOCAL_STORAGE_PATH='${PREFECT_HOME}/storage'
prefect config set PREFECT_RESULTS_DEFAULT_SERIALIZER='json'
It works fine when you run this flow locally by calling the script, but not with orion interface (local agent, actually) because local agent defaults to temporary working directory.
Set infrastructure: working_dir: to the path where you want to save the āresults.jsonā in the deployment yaml file.
For example, I set it to
...
infrastructure:
working_dir: /Users/myfolder/projects/prefect-test
...
After setting working_dir and applying it, āresults.jsonā is properly saved in /Users/myfolder/projects/prefect-test/results.json and visible.
@eshyun your solution looks great.
Iād like to do the same as above, but using Deployment.build_from_flow instead CLI. I cannot find fitting parameters in documentation. Any ideas?
'# not working:
Deployment.build_from_flow(ā¦, working_dir=ā/Users/myfolder/projects/prefect-testā)
If you want to use Deployment.build_from_flow then create a infrastructure object as shown below.
from prefect.infrastructure import Process
# The below 2 lines can be removed once the infrastructure object is created by running the python script. This also can be created as a block in the prefect cloud UI.
my_infra=Process(working_dir="path/to/dir")
my_infra.save(name="infra-name")
Now run the python script and the add the below line to load the my_infra object in your Deployment.
my_infra=Process.load("infra-name")
Deployment.build_from_flow(
name="",
flow="",
infrastructure=my_infra # variable name of the infrastructure object that is loaded above.
)
import LocalFileSystem with
from prefect.filesystems import LocalFileSystem
create your local storage block
my_storage_block = LocalFileSystem(basepath=āpath/to/save/dirā)
save your storage block
my_storage_block.save(name=āimage-storageā)
then you can set it in your @flow decorator like this
@flow(name=āretrieval flowā, persist_result=True, result_storage=LocalFileSystem.load(ālocal-storageā))
I thought that just specifying the path
when building a Deployment
would be enough, but specifying the infrastructure
with the same path
solved a similar issue I had. Thanks for your answer!