How to schedule and orchestrate a flow-of-flows for a data warehousing (ETL) use case?

View in #prefect-community on Slack

Yeachan_Park @Yeachan_Park: I have a question regarding flow-of-flows dependencies. I had a look at the documentation and as far as I understand the recommendation is to create a parent flow that creates flow_runs from pre-defined flows and make use of the wait_for_flow_run task to essentially orchestrate flow runs. We have quite a large number of ETL flows and a large number of downstream flows, and further downstream flows which may depend on an ETL again and/or the output of a downstream flow in the middle and so on. This dependency relationship is a many-to-many relationship, i.e. 1 ETL can be upstream to many downstream flows, and 1 downstream flow can be upstream to many downstream flows. We’re responsible for the ETLs and our users are free to create templated downstream flows and specify whatever flows already available as dependencies.

I’m just wondering what the best thing to do is here. In Airflow there is the concept of execution date which can be passed to a sensor task that will wait for whatever upstream flow run to finish.

Based on what I have read, I’m a bit unsure how to approach this in prefect. Would you recommendation be to still use a parent flow that orchestrates the flowruns by creating and waiting? It would be quite a large flow, and something that is constantly changing based on what the user deploys and specifies as their upstream dependency. I was maybe wondering whether it’s also possible to create a sensor that waits for the creation of a flow_run_id of an upstream flow based on something like its scheduled_start_time and pass that into wait_for_flow_run? Then we’d still have individual flows consisting of just tasks and a task at the beginning to wait for a flow run of an upstream task at schedule X to finish running. What would be your advice?

Anna_Geller @Anna_Geller: You talk to the right person :sweat_smile: I struggled with exactly the same problem already 2 years ago and back then I wrote a comparison blog post of how this use case can be approached in both Airflow and Prefect:

Prefect handles it quite elegantly because the wait_for_flow_run polls directly for the end state of the child flow run and allows you to even raise_final_state=True so that if the child flow run fails, the parent flow run fails as well. And you can even stream child flow run logs so that those are displayed directly in the Prefect UI.

I plan to write a blog post about a similar data warehousing use case, you can subscribe to the Tutorials category on Discourse to get notified once it’s out:

Yeachan_Park @Yeachan_Park: Thanks for your help and the article! I understand that prefect’s current handling of flow-on-flow dependencies is quite like the TriggerDagRunOperator with an additional WaitForCompletion sensor in Airflow.

On one hand it’s nicer than how Airflow does it, since the actual flowrun does not start until the upstream flow has finished (nicer than the sensor inside a flow run/the TriggerDagRunOperator not waiting in airflow). On the other hand, I’m wondering whether it should be the other way round, i.e. the downstream flow should just check whether the upstream flowrun is done before executing it’s flowrun, instead of downstream flows being triggered directly once the upstream flowrun is done. In a way this leads to quite enormous flows for us which feel quite coupled as all downstream flows/upstream flows must be orchestrated in one flow (and kept updated) regardless of their function, instead of autonomous flows with their own execution schedule (in airflow the ExternalTaskSensor can be supplied with a different execution_date in case the heartbeats are different, i.e. hourly flow run waiting for a daily flow run) - we do have these cases, I’m also not entirely sure how those cases would be handled in prefect?

Quite curious what your thoughts are on this. Really appreciate your help!

Anna_Geller @Anna_Geller: > since the actual flowrun does not start until the upstream flow has finished
This is configurable. If you use the create_flow_run task without the wait_for_flow_run, you are triggering them in a fire and forget way without waiting and polling for the final state of the triggered child flow run.

On the other hand, I’m wondering whether it should be the other way round, i.e. the downstream flow should just check whether the upstream flowrun is done before executing it’s flowrun

This would be a rather weird separation of concerns, don’t you think? A flow run shouldn’t be evaluating the upstream flow run’s state - Prefect API should! Prefect is orchestrating the child flow runs from a parent flow run and manages the execution based on the states of your workflow components.

can be supplied with a different execution_date in case the heartbeats are different, i.e. hourly flow run waiting for a daily flow run

Did you see the part of the article discussing scheduling? Only the parent flow needs to be scheduled, other child flow runs are simply triggered (or called) from it. This is why it’s called “the orchestrator pattern” since the parent flow run serves as the orchestrator of child flows and this parent orchestrator decides when to trigger each child flow run.

Having schedules attached to child flow runs and synchronizing their execution based on that leads you back to CRON when you schedule one job at 1 AM and another one at 2 AM and hope :crossed_fingers: that this will work. The orchestrator pattern allows you to do it much more reliably because downstream child flow runs are triggered directly once the upstream flow runs are finished (or only once they are successful - configurable using the raise_final_state argument).

Yeachan_Park @Yeachan_Park:

since the parent flow run serves as the orchestrator of child flows and this parent orchestrator decides when to trigger each child flow run.

Sorry, I think my use case example was perhaps not clear, so we have flow C (needs to run every hour), which has upstream dependencies on flow B (needs to run every day) and flow A (also runs every hour). If I schedule this in a parent flow, then I’d need to schedule the parent flow as an hourly run and execute A & C every hour, but flow B should once execute every 24 hours. I think that case is not covered in the scheduling part of the article? I see StartFlowRun takes a scheduled_start_time , but that is a datetime object. Is this something that’s possible?

 A flow run shouldn't be evaluating the upstream flow run's state

Indeed, I definitely agree. My question was more trying to understand the reasoning behind how this orchestration is handled. Prefect’s solution is to create this parent orchestrator flow. I guess another way to do it is for for prefect (somewhere, e.g. scheduler, or via a sensor outside of the flows themselves) to determine that flow B has a dependency on flow A, therefore schedule flow B after the flow_run of A has completed. I.e. flow B is executed after flow A, but doesn’t have knowledge of flow A’s state. In this situation there’d be no need to create and maintain parent flows. If I had to guess the reason is that prefect’s scheduler is set up to be much leaner/have no access to the actual flow code to infer this kind of stuff? Or is there perhaps another reason?

Anna_Geller @Anna_Geller:

I think that case is not covered in the scheduling part of the article?

I understand your problem, and frankly, I had very similar use cases in my previous job! Back then we used a legacy job scheduler and it had a feature called “job chains” and we were able to specify the link between those in an XML-based job chain - but I digress :smile:

Here is how I would approach it in Prefect:

Option 1) My preferred choice
My preferred solution would be to have only the parent flow being scheduled and this parent flow would orchestrate all child flow runs. The issue with one task that needs to be executed only once every 24 hours could be solved using caching. A simple flow code and flow diagram will explain that better than words:

from datetime import timedelta
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

PARENT_FLOW_NAME = "parent_flow_example"
PREFECT_PROJECT_NAME = "community"


with Flow(PARENT_FLOW_NAME) as parent_flow:
    flow_a_run_id = create_flow_run(
        flow_name="Flow_A",
        project_name=PREFECT_PROJECT_NAME,
        task_args=dict(name="Flow A"),
    )
    flow_a_flowrunview = wait_for_flow_run(
        flow_a_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for Flow A"),
    )

    flow_b_run_id = create_flow_run(
        flow_name="Flow_B",
        project_name=PREFECT_PROJECT_NAME,
        upstream_tasks=[flow_a_flowrunview],
        task_args=dict(name="Flow B", cache_for=timedelta(hours=24)),
    )
    flow_b_flowrunview = wait_for_flow_run(
        flow_b_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for Flow B", cache_for=timedelta(hours=24)),
    )
    flow_c_run_id = create_flow_run(
        flow_name="Flow_C",
        project_name=PREFECT_PROJECT_NAME,
        upstream_tasks=[flow_b_flowrunview],
        task_args=dict(name="Flow C"),
    )
    flow_c_flowrunview = wait_for_flow_run(
        flow_c_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for Flow C"),
    )
if __name__ == "__main__":
    parent_flow.visualize()

I think this is the cleanest approach but please judge for yourself.

Option 2)
Your flow B can still have a daily schedule. The parent flow, orchestrating flow A, B, and C, can have an hourly schedule that should trigger only flow A and C, and it can additionally have a task making an API call to the Prefect API to check the final state of the most recent flow run of flow B to ensure that this flow run was successful and was executed not later than 24 hours ago. To avoid unnecessary computation, this task making an API call to check on the daily flow’s state doesn’t need to run every hour (since you need to check this state of a daily flow run only once per day), you could cache this task for 24 hours using the cache_for task decorator argument.

even better - since you mentioned that flow C depends on A and B but A and B don’t need to run sequentially, you could even do it this way in Prefect:

from datetime import timedelta
from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

PARENT_FLOW_NAME = "parent_flow_example"
PREFECT_PROJECT_NAME = "community"


with Flow(PARENT_FLOW_NAME) as parent_flow:
    flow_a_run_id = create_flow_run(
        flow_name="Flow_A",
        project_name=PREFECT_PROJECT_NAME,
        task_args=dict(name="Flow A"),
    )
    flow_a_flowrunview = wait_for_flow_run(
        flow_a_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for Flow A"),
    )

    flow_b_run_id = create_flow_run(
        flow_name="Flow_B",
        project_name=PREFECT_PROJECT_NAME,
        task_args=dict(name="Flow B", cache_for=timedelta(hours=24)),
    )
    flow_b_flowrunview = wait_for_flow_run(
        flow_b_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for Flow B", cache_for=timedelta(hours=24)),
    )
    flow_c_run_id = create_flow_run(
        flow_name="Flow_C",
        project_name=PREFECT_PROJECT_NAME,
        upstream_tasks=[flow_a_flowrunview, flow_b_flowrunview],
        task_args=dict(name="Flow C"),
    )
    flow_c_flowrunview = wait_for_flow_run(
        flow_c_run_id,
        raise_final_state=True,
        stream_logs=True,
        task_args=dict(name="Wait for Flow C"),
    )
if __name__ == "__main__":
    parent_flow.visualize()

Yeachan_Park @Yeachan_Park: Thank you for the detailed ideas, I will try those out!

Would you also have some insight regarding the second question about flow-on-flow dependencies?

Anna_Geller @Anna_Geller:

schedule flow B after the flow_run of A has completed. I.e. flow B is executed after flow A, but doesn’t have knowledge of flow A’s state

This is possible. You would just set raise_final_state=False
in this case, it will continue downstream even if this child flow run fails

prefect’s scheduler is set up to be much leaner/have no access to the actual flow code to infer this kind of stuff

The flow code has no knowledge of the execution state, the backend API has