How can I cancel several Submitted flow runs at once?

View in #prefect-community on Slack

@Matt_Alhonte: How do I delete a bunch of Submitted runs at once? Deleting the Flow didn’t seem to do it.

@Anna_Geller: Stopping your agent would immediately have the effect of preventing all runs from completion :slightly_smiling_face: it’s a drastic measure for sure.

Are you on Prefect Cloud? If so and if you are a tenant admin, you could also perform a system-wide cancel or pause - under the running flows tab, there is a button STOP ALL, but only if there is at least 1 running flow

image

can you explain your end goal here or the reason for this action? were those runs accidentally created? do you know the root cause of that?
Btw this is waaaaay easier in Prefect 2.0 - you would just pause your work queue. Perhaps one reason to switch to Prefect 2.0 already? :slightly_smiling_face:

@Matt_Alhonte: A bunch of runs were submitted for a Flow that weren’t getting picked up by the Agent (but I also don’t want these flows to run, it’s old code that got out of hand).
Ha, I just upgraded to 1.2 on my dev branch! Once I push it all to main, I’ll think about 2.0!
So yeah, just wanna kill all the Submitted runs from a particular Flow

@Anna_Geller: Gotcha. If those flows were submitted, the only way is to either delete those in the execution layer, stop the agent or cancel the flow runs. E.g. if you use a KubernetesAgent, then deleting the related prefect flow run pods would likely be the most efficient way to kill those runs.

Alternatively, you could query the GraphQL API for all flow runs of this flow which are in a Submitted state and cancel those in a for loop - LMK if you need a query template for that

@Matt_Alhonte: Shall go the GraphQL route I think! Thanks! (someone else helping me just sent some code for that, I shall bug you for yours if that doesn’t work)
Actually, could ya post it either way? Minimally, probably handy to have that in the Slack’s searchable history!

@Anna_Geller: need to build this query, let me dig into that :smile:

@Matt_Alhonte: ah, k!

@Anna_Geller: something like this:

query {
  flow_run(
    where: {_and: [{flow_id: {_eq: "eb45ed0a-a76c-445b-b0b8-1fdc05c1f54c"}}, 
      {state: {_eq: "Submitted"}}]}
  ) {
    id
  }
}

just manually look up the flow-id from the flow page :stuck_out_tongue: easier than doing nested queries

@Matt_Alhonte: Awesome, thanks!

@Anna_Geller: then:

from prefect import Client

client = Client()

list_of_flow_run_ids_to_cancel = []  # result of the query
for id_ in list_of_flow_run_ids_to_cancel:
    client.cancel_flow_run(flow_run_id=id_)

@Matt_Alhonte: made it into a little function:

from prefect.client import Client
import prefect
import time

def cancel_runs_for_flow(version_group_id: str) -> None:
    c = Client()
    query = c.graphql(
        "\n".join(
            [
                "query {",
                "      flow_run(where: {",
                "          _and: {",
                f'            flow: {{version_group_id: {{_eq: "{version_group_id}"}}}}',
                "            state: {",
                "              _in: [",
                '                  "Pending", "Running", "Submitted"]',
                "            }",
                "          }",
                "          }",
                "        ) {",
                "      id",
                "    }}",
            ]
        )
    )

    result = query.get("data").get("flow_run")
    id_list = list()
    print("Fetched flows")
    for i in result:
        flow_id = i["id"]
        state = prefect.engine.state.Cancelled()
        version = None
        print(i)
        time.sleep(1)
        c.set_flow_run_state(flow_run_id=flow_id, state=state)

@Anna_Geller: thank you so much for sharing! :raised_hands: will cross-post on Discourse for posterity