Migrating to Prefect 2.0: From Flow of Flows to Subflows

Prefect 2.0 Paradigm Shift: From Flow of Flows to Subflows

Sometimes you’ll have flows that need to run in a specific order, or flows that rely on the output of other flows.

Unlike in Prefect 1, where implementing flows within flows necessitated messy workarounds, Prefect 2 introduces subflows as a first-class feature. The ability to freely call flows from within other flows is a useful and powerful way to organize your code. Let’s look at an example.

Flow of Flows in Prefect 1

from prefect import Flow, task

@task(log_stdout=True)
def my_task():
    print('Hello from Child Flow A')

with Flow('Child Flow A') as flow:
    my_task()
    
if __name__ == "__main__":
    flow.run()
from prefect import Flow, task

@task(log_stdout=True)
def my_task():
    print('Hello from Child Flow B')

with Flow('Child Flow B') as flow:
    my_task()
    
if __name__ == "__main__":
    flow.run()

Here we have 2 independent Prefect 1 Flows. We’re calling them child flows, because they will be called by by this Parent flow, our so-called flow of flows.

from prefect import Flow
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run

with Flow("Parent Flow") as flow:

    # assumes you have registered the following flows in a project named "examples"
    flow_a = create_flow_run(flow_name="Child Flow A", project_name="examples")
    wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True)

    flow_b = create_flow_run(flow_name="Child Flow B", project_name="examples")
    wait_for_flow_b = wait_for_flow_run(flow_b, raise_final_state=True)

    flow_b.set_upstream(wait_for_flow_a)

if __name__ == "__main__":
    flow.run()

In order for our flow to be able to trigger the other flows, we need to leverage two built in utility tasks provided by Prefect. create_flow_run will call the API to create a new flow run of an existing, registered flow, so long as an agent is available to deploy it. wait_for_flow_run will pull the state of the flow run, which allows us to block other flows until the first has entered a completed state, thus ensuring they run in the correct order.

Already you can see that this code is a bit verbose. This problem will only exacerbate with larger, more complicated DAGs.

In order to be able to run our Parent flow, we must first register all of our flows, into the same project that is specified in our code. we can do this by running this command in the terminal:

prefect register -p . --project examples

Subflows in Prefect 2

from prefect import flow, task

@task()
def my_a_task():
    print('Hello from Subflow A')

@task()
def my_b_task():
    print('Hello from Subflow B')

@flow(name="Subflow A")
def subflow_a():
    my_a_task()

@flow(name="Subflow B")
def subflow_b():
    my_b_task()

@flow(name="Main Flow")
def parent_flow():
    subflow_a()
    subflow_b()

if __name__ == "__main__":
    parent_flow()

Now we move on to the same set up of flows in Prefect 2. In this example, we have all of our flows in the same file, though we could import the subflows from other files just as easily. The syntax here is so much simpler, and of course, more pythonic. It really is, in essence, just a bunch of python functions calling each other.

Instead of requiring special tasks, here we can just call any flow function directly within the body of another flow, basically the same as we’d call a task function. This allows much more flexibility, but be aware that there are still important distinctions between tasks and subflows, each with their own strengths and weaknesses. Check out our docs for a full rundown.

2 Likes