How to bulk delete flow runs in a certain state with the API

[UPDATED 2023-06-21] Here is an example of how to bulk delete flow runs by using Prefect’s Python API client. By default, the example deletes flow runs in state Pending.

import anyio

from prefect import get_client
from prefect.server.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName

async def list_flow_runs_with_state(state):
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    name=FlowRunFilterStateName(any_=[state])
                )
            )
        )
    return flow_runs


async def delete_flow_runs(flow_runs):
    async with get_client() as client:
        for flow_run in flow_runs:
            await client.delete_flow_run(flow_run_id=flow_run.id)


async def bulk_delete_flow_runs(state: str = "Pending"):
    flow_runs = await list_flow_runs_with_state(state)

    if len(flow_runs) == 0:
        print(f"There are no flow runs in state '{state}'")
        return

    print(f"There are {len(flow_runs)} flow runs with state {state}\n")

    for idx, flow_run in enumerate(flow_runs):
        print(f"[{idx + 1}] Flow '{flow_run.name}' with ID '{flow_run.id}'")

    if input("\n[Y/n] Do you wish to proceed: ") == "Y":
        print(f"Deleting {len(flow_runs)} flow runs...")
        await delete_flow_runs(flow_runs)
        print("Done.")
    else:
        print("Aborting...")


if __name__ == "__main__":
    anyio.run(bulk_delete_flow_runs)
5 Likes

This was very helpful for me, thanks!

Prefect 2.10+ got rid of using Orion in their codebase from my understanding. I am just uploading the same prefect code with those small changes and I am using asyncio instead(I have nothing against anyio but was already using asyncio).

import asyncio

from prefect.client.orchestration import get_client
from prefect.server.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName

async def list_flow_runs_with_state(state):
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter=FlowRunFilter(
                state=FlowRunFilterState(
                    name=FlowRunFilterStateName(any_=[state])
                )
            )
        )
    return flow_runs


async def delete_flow_runs(flow_runs):
    async with get_client() as client:
        for flow_run in flow_runs:
            await client.delete_flow_run(flow_run_id=flow_run.id)


async def bulk_delete_flow_runs(state: str = "Failed"):
    flow_runs = await list_flow_runs_with_state(state)

    if len(flow_runs) == 0:
        print(f"There are no flow runs in state '{state}'")
        return

    print(f"There are {len(flow_runs)} flow runs with state {state}\n")

    for idx, flow_run in enumerate(flow_runs):
        print(f"[{idx + 1}] Flow '{flow_run.name}' with ID '{flow_run.id}'")

    if input("\n[Y/n] Do you wish to proceed: ") == "Y":
        print(f"Deleting {len(flow_runs)} flow runs...")
        await delete_flow_runs(flow_runs)
        print("Done.")
    else:
        print("Aborting...")


if __name__ == "__main__":
    asyncio.run(bulk_delete_flow_runs())

3 Likes