How do I restart a child flow run in a flow-of-flows?

When you have a flow and it fails, restarting it in the UI is pretty easy, you just press restart. When a subflow/s fail there is a little bit more set up because of how flow runs are created. Lets say we have three subflows, they are identical for this example:

with Flow('subflow_1') as subflow:
    a = test_task1()
    b = test_task2()

The parent flow runs them as so:

start = StartFlowRun(project_name="Project",wait=True)

with Flow('main flow', storage) as flow:
   first_flow = start(flow_name="subflow_1" )
   second_flow = start(flow_name="subflow_2" )
   third_flow = start(flow_name="subflow_3" )

If any of the subflows were to fail, in order to restart them, pressing restart on the parent flow will only restart the parent flow and used the failed result of the failed flow runs. Why is that? That is because when prefect starts a flow run it gives it a default idempotency key, and if that key is the same between restarts then it will already assume that the flow ran.

In order to fix this we have to give each subflow a dynamic key that can be calculated at run time. In order to do this we have to do two seperate things. First give a dynamic key which an easy way to do so is by using Datetime.

start(flow_name="subflow test", idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"))

But that is not all! Because prefect uses pickled based storage, that idemepotency key won’t really be dynamic every flow run. The key will be calculated at flow registration and then pickled with the rest of the flow. Thankfully a quick fix to this would be to use flow based storage.

storage = GitHub(repo="myaccount/myrepo",path="flow.py")

With this prefect will be running the datetime function everytime at flow run, allowing you to restart subflows from the main parent flow. Happy engineering!

full code:


import random
from datetime import datetime
from prefect import task, Flow
from prefect.tasks.prefect.flow_run import StartFlowRun
from prefect.storage.github import GitHub

@task()
def test_task1():
    number = random.random()
    if number > .5:
        raise ValueError()

@task()
def test_task2():
    number = random.random()
    if number > .5:
        raise ValueError()


with Flow('subflow_1') as subflow:
    a = test_task1()
    b = test_task2()

with Flow('subflow_2') as subflow2:
    a = test_task1()
    b = test_task2()

with Flow('subflow_3') as subflow3:
    a = test_task1()
    b = test_task2()

start = StartFlowRun(project_name="Project",wait=True)
storage = GitHub(repo="myaccount/myrepo",path="flow.py")

with Flow('main flow', storage) as flow:
    first_flow = start(flow_name="subflow_1", idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"))
    second_flow = start(flow_name='subflow_2', idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"),upstream_tasks=[first_flow])
    third_flow = start(flow_name='subflow_3', idempotency_key=datetime.now().strftime("%m/%d/%Y:%H:%M:%S"),upstream_tasks=[second_flow])

2 Likes