How to bulk delete flow runs in a certain state with the API

Here is an example of how to bulk delete flow runs by using Prefect’s Python API client. By default, the example deletes flow runs in state Pending.

import anyio

from prefect.client.orion import get_client
from prefect.orion.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName

async def list_flow_runs_with_state(state):
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    name=FlowRunFilterStateName(any_=[state])
                )
            )
        )
    return flow_runs


async def delete_flow_runs(flow_runs):
    async with get_client() as client:
        for flow_run in flow_runs:
            await client.delete_flow_run(flow_run_id=flow_run.id)


async def bulk_delete_flow_runs(state: str = "Pending"):
    flow_runs = await list_flow_runs_with_state(state)

    if len(flow_runs) == 0:
        print(f"There are no flow runs in state '{state}'")
        return

    print(f"There are {len(flow_runs)} flow runs with state {state}\n")

    for idx, flow_run in enumerate(flow_runs):
        print(f"[{idx + 1}] Flow '{flow_run.name}' with ID '{flow_run.id}'")

    if input("\n[Y/n] Do you wish to proceed: ") == "Y":
        print(f"Deleting {len(flow_runs)} flow runs...")
        await delete_flow_runs(flow_runs)
        print("Done.")
    else:
        print("Aborting...")


if __name__ == "__main__":
    anyio.run(bulk_delete_flow_runs)
2 Likes