[UPDATED 2023-06-21] Here is an example of how to bulk delete flow runs by using Prefect’s Python API client. By default, the example deletes flow runs in state Pending
.
import anyio
from prefect import get_client
from prefect.server.schemas.filters import FlowRunFilter, FlowRunFilterState, FlowRunFilterStateName
async def list_flow_runs_with_state(state):
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter=FlowRunFilter(
state=FlowRunFilterState(
name=FlowRunFilterStateName(any_=[state])
)
)
)
return flow_runs
async def delete_flow_runs(flow_runs):
async with get_client() as client:
for flow_run in flow_runs:
await client.delete_flow_run(flow_run_id=flow_run.id)
async def bulk_delete_flow_runs(state: str = "Pending"):
flow_runs = await list_flow_runs_with_state(state)
if len(flow_runs) == 0:
print(f"There are no flow runs in state '{state}'")
return
print(f"There are {len(flow_runs)} flow runs with state {state}\n")
for idx, flow_run in enumerate(flow_runs):
print(f"[{idx + 1}] Flow '{flow_run.name}' with ID '{flow_run.id}'")
if input("\n[Y/n] Do you wish to proceed: ") == "Y":
print(f"Deleting {len(flow_runs)} flow runs...")
await delete_flow_runs(flow_runs)
print("Done.")
else:
print("Aborting...")
if __name__ == "__main__":
anyio.run(bulk_delete_flow_runs)