How to rerun all flows in a project?

View in #prefect-community on Slack

Zach_Schumacher @Zach_Schumacher: Is there an easy way to rerun all the flows in a project for a cloud backend?

@Anna_Geller: Unfortunately, there isn’t. The easiest way would be to write a GraphQL query to list all active flows in the project (taking only the most recent version of each flow), and then return the flow names as a list. Then you could start a flow run for each in parallel using mapping and create_flow_run. Here is an incomplete code snippet you can start with:

from prefect.tasks.prefect import create_flow_run
from prefect import Flow, unmapped, task, Client


@task
def get_all_flows_for_project():
    query = """query {
              flow(where: { project: { name: {_eq: "your_project_name"} } }) {
                    id
                    name
                    version
                  }
                }"""
    client = Client()
    response = client.graphql(query)
    return response


with Flow("xxx") as flow:
    flows = get_all_flows_for_project()
    create_flow_run.map(flow_name=flows, project_name=unmapped("your_project_name"))

may I ask what happened that you need to rerun all flows in a project? did you have some sort of DB outage?

Zach_Schumacher @Zach_Schumacher: we had a k8s issue last night
i settled on this

import logging

from prefect import Client
from typing import TypedDict, List
from logging import getLogger


logger = getLogger(__name__)


class Flow(TypedDict):
    name: str
    id: str


def get_flows_by_project_name(c: Client, project_name: str) -> List[Flow]:
    flows = c.graphql(
        """
        query {
          flow (
              where: { 
              archived: { _eq: false },
              project: { name: { _eq: "%s" } } }
          ) 
          { name, id }
        }
        """ % project_name
    )["data"]["flow"]
    logger.info(f"{len(flows)} flows found in {project_name}")
    return flows


def create_flow_run(c: Client, flow: Flow):
    logger.info(f"Creating flow run for {flow}")
    c.create_flow_run(flow["id"])


def start_all_flows_in_project(token: str, project_name: str):
    c = Client(api_token=token)
    flows = get_flows_by_project_name(c, project_name)
    for flow in flows:
        create_flow_run(c, flow)

@Anna_Geller: nice, thanks for sharing! :raised_hands:

and sorry to hear you need to troubleshoot Kubernetes cluster issues over the weekend, hope you’ll get it resolved quickly