View in #prefect-community on Slack
@Jean-Baptiste_Six: Hey
I tried to implement a subflow in my main flow (like this) :
@task
def subtask():
return 1
with Flow("subflow") as subflow:
subtask()
@task
def main_task():
subflow.run()
with Flow("main_flow") as main_flow:
main_task()
But I faced this Error :
Unexpected error while running flow: KeyError(‘Task slug init_dirs-1 is not found in the current Flow. This is usually caused by a mismatch between the flow version stored in the Prefect backend and the flow that was loaded from storage.\n- Did you change the flow without re-registering it?\n- Did you register the flow without updating it in your storage location (if applicable)?’)
I precise that I had already register the main_flow (without the subflow inside) and it worked, but then I updated it and I registered the subflow, and it failed, could you help me please ?
The task “init_dirs” is in the main_flow, finishes in a success state, but this error append when subflow.run() is called (and init_dirs is not in the subflow)
@Anna_Geller: There is no true concept of subflows in Prefect 1.0, but in Orion you can do something very similar to what you described.
In Prefect 1.0 there is orchestrator pattern using create_flow_run
task triggering child flows via API calls. So you would have to replace the subflow.run()
by a create_flow_run task
To find out more, check out those Discourse topics:
• How can I create a subflow?
• How can I create a subflow and block until it’s completed?
@Jean-Baptiste_Six: Hi @Anna_Geller, I tried, but it seems impossible to call ‘create_flow_run’ inside a task, I have the following error :
ValueError: Could not infer an active Flow context while creating edge to <Task: create_flow_run>. This often means you called a task outside a with Flow(...)
block. If you’re trying to run this task outside of a Flow context, you need to call create_flow_run.run(...)
Also tried with .run() but it didn’t work (same error), help please
@Anna_Geller: That’s true, you shouldn’t be calling this task within another task. Can you explain the problem a bit more? Why are you trying to call tasks within other tasks?
Also, can you share your flow(s)?
@Jean-Baptiste_Six: Indeed, because I need the result of the previous task to iterate in a loop
I can share, I’ll try to simplify them
with Flow("main flow") as main_flow:
root_dir = init_dirs("sec", archive=False)
download_dir = download(root_dir, False)
metadatas = metadata(download_dir)
index(metadatas) # Error
with Flow("index_flow") as index_flow:
# Imports
metadatas = Parameter("metadatas_batch", required=True)
imports = build_import(metadatas)
index_weaviate(imports)
@task()
def index(metadatas: List[dict]) -> None:
metadata_batches = list(batch(metadatas, BATCH_SIZE_INDEX))
for i, metadata_batch in enumerate(metadata_batches):
print(f"Process batch {i+1}/{len(metadata_batches)} ...")
flow_id = create_flow_run(flow_name="index_flow", project_name="Document Pipeline", parameters={"metadatas_batch":metadata_batch}) # Error here
wait_for_flow_run(flow_id, raise_final_state=True, stream_logs=True)
print(f"Done !")
print('Success: All the indexations are done !')
And I have the tasks :
@task()
def build_import(metadatas: List[dict]):
return weaviate.build_import(metadatas)
@task()
def index_weaviate(imports: dict) -> None:
client = get_client()
weaviate.index_multi_documents(client, imports)
My issue is that I need metadatas
result to iterate, thus I think it’s need to be inside a task, isn’t ?
@Anna_Geller: instead of using a for loop, you can leverage mapping and call the create_flow_run and wait_for_flow_run as shown here:
from prefect import Flow, unmapped
from prefect.tasks.prefect import create_flow_run
from prefect.executors import LocalDaskExecutor
with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
mapped_flow_run_ids = create_flow_run.map(
flow_name=["flow_name_1", "flow_name_2", "flow_name_3"],
project_name=unmapped("your_project_name"),
)
so you need to wrap this into a task and return the list of parameter values, ideally convert this task to return a list of dictionaries {"metadatas_batch":metadata_batch}
:
@task
def get_parameter_values():
return list(batch(metadatas, BATCH_SIZE_INDEX))
and then:
with Flow("parent_flow", executor=LocalDaskExecutor()) as parent_flow:
params = get_parameter_values()
mapped_flow_run_ids = create_flow_run.map(
flow_name=unmapped("index_flow"),
project_name=unmapped("Document Pipeline")
parameters=params,
)
additional benefit is that you’ll get parallelism while the for loop would trigger those flow runs sequentially
@Jean-Baptiste_Six: Thank you
I did smth like this but I still have an issue (KeyError[ 0]), where did I make a mistake ?
with Flow("sec daily") as sec_daily_flow:
root_dir = init_dirs("sec", archive=False)
download_dir = download(root_dir, False)
metadatas = metadata(download_dir)
upload(metadatas)
metadatas_batches = split_batch(metadatas)
mapped_flow_run_ids = create_flow_run.map(
flow_name=unmapped("index_flow"),
project_name=unmapped("Document Pipeline"),
parameters={"metadatas_batch": metadatas_batches},
upstream_tasks=[unmapped(metadatas_batches)]
)
@task(state_handlers=[fail_handler], log_stdout=True)
def split_batch(metadatas: List[dict]) -> List[List[dict]]:
return list(batch(metadatas, BATCH_SIZE_INDEX))
@Anna_Geller: convert this task to return a list of dictionaries in the format {"metadatas_batch":metadata_batch}
:
@task
def get_parameter_values():
return list(batch(metadatas, BATCH_SIZE_INDEX))
@Jean-Baptiste_Six: Mhh I see
@Anna_Geller:
metadatas_batches
should be a list of dict, then you can pass it to the parameters with no modification. Additionally, you don’t need to set upstream task dependency since metadatas_batches is already passed as data dependency to the parameters.
with Flow("sec daily") as sec_daily_flow:
root_dir = init_dirs("sec", archive=False)
download_dir = download(root_dir, False)
metadatas = metadata(download_dir)
upload(metadatas)
metadatas_batches = split_batch(metadatas)
mapped_flow_run_ids = create_flow_run.map(
flow_name=unmapped("index_flow"),
project_name=unmapped("Document Pipeline"),
parameters=metadata_batches,
)
@Jean-Baptiste_Six: It works !! Thank you so much for your time
@Anna_Geller Small question: I have the following error :
Task 'create_flow_run[0]': Exception encountered during task execution!
requests.exceptions.HTTPError: 413 Client Error: Request Entity Too Large for url: <https://api.prefect.io/>
Is it because of the metadata dict ?
@Anna_Geller: Yes, it can be that your Parameter value payload is too large to be stored in the backend. This page provides more info about payload size limits
Prefect Community: What is the API request payload limit in Prefect Cloud?
@Jean-Baptiste_Six: Oh Thus this solution is not working in my case
@Anna_Geller: Are you sure there is no other way to e.g. retrieve this data within the respective child flow rather than passing the entire large payload via parameter? How big is this Parameter value?
@Jean-Baptiste_Six: ~50Mb for 1 metadata_batch
I could retrieve the result stored in Cloud Storage
And pass the path as a parameter instead of the data itself
@Anna_Geller: you mean 50 MB? This is too large, yes. Exactly, this is a great idea, your parameter may point to Cloud Storage location of the data, but shouldn’t be passing large data objects itself.
@Jean-Baptiste_Six: Is it possible to get the location of the Task result inside the Flow itself ?
Do you have an example ? Because I’m not sure how to use it
I use GCSResult to store my tasks results :
with Flow("sec daily", state_handlers=[flow_handler], result=GCSResult(BUCKET_RESULTS)) as sec_daily_flow:
@Anna_Geller: I think for your use case it would be much easier to just return the destination GCS path using something like this in your task (i.e. uploading and returning the relevant path yourself):
from google.cloud import storage as gcs
from google.oauth2 import service_account
from prefect.client.secrets import Secret
creds = Secret("GCP_CREDENTIALS").get()
PROJECT_NAME = "your gcp project name"
BUCKET_NAME = "your gcs bucket"
destination_file_name = "path/on/gcs/file.txt"
source_file_name = "local/path/file.txt"
credentials = service_account.Credentials.from_service_account_info(creds)
gcs_client = gcs.Client(project=PROJECT_NAME, credentials=credentials)
bucket = gcs_client.bucket(BUCKET_NAME)
blob = bucket.blob(blob_name=destination_file_name)
blob.upload_from_filename(source_file_name)