I have a self-hosted Prefect server that deploys flow runs on Kubernetes. I need the ability to cancel flow runs from some sort of API–Python, CLI, REST, whatever—that would mimic what happens when I manually press on Cancel in the UI.
I’ve tried a couple of things without success:
Send a DELETE request using the REST API. This simply deletes the flow run, but doesn’t stop it.
Set the run state to Cancelled using the Python API, following this response. This changes the state of the flow run, but does not stop the k8s job.
Is there anything else I could be doing? Perhaps a hook function?
Just found the answer for my own question while diving in the Python API docs.
from prefect import get_client
from prefect.client.schemas.objects import StateType
from prefect import State
from prefect.exceptions import ObjectNotFound
from prefect.client.schemas.responses import SetStateStatus
import logging
import asyncio
async def cancel(id):
"""Cancel a flow run by ID."""
async with get_client() as client:
cancelling_state = State(type=StateType.CANCELLING)
try:
result = await client.set_flow_run_state(
flow_run_id=id, state=cancelling_state
)
except ObjectNotFound:
logging.error(f"Flow run '{id}' not found!")
if result.status == SetStateStatus.ABORT:
logging.error(f"Flow run '{id}' was unable to be cancelled. Reason: '{result.details.reason}'")
logging.info(f"Flow run '{id}' was successfully scheduled for cancellation.")
async def main():
flow_run_id = "MY-RUN-ID"
await cancel(flow_run_id)
if __name__ == "__main__":
asyncio.run(main())