Is it possible to take action on flow run Cancellation?

Short answer, yes! You can build a state handler that takes action on flow run cancellation. However, there are certain things that you can and cannot do upon cancellation:

  • If you have e.g. EMR cluster on AWS and you want to make sure it gets shut down if the flow run gets cancelled, reacting to a Cancelled state would work because the cluster lives separately from the flow run execution/resources
  • if you want to get notified about a flow run cancellation, reacting to the Cancelled state will solve the problem - you will get the message informing you that the flow run has been cancelled - here is an example code you could use to accomplish that:
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
import time
from typing import cast


def post_to_slack_on_cancellation(flow, old_state, new_state):
    if isinstance(new_state, prefect.engine.state.Cancelled):
        if isinstance(new_state.result, Exception):
            value = "```{}```".format(repr(new_state.result))
        else:
            value = cast(str, new_state.message)
        msg = (
            f"The flow `{prefect.context.flow_name}` was cancelled "
            f"in a flow run `{prefect.context.flow_run_id}` "
            f"with a message: `{value}`"
        )
        # do something here to cleanup resources when flow was cancelled
        SlackTask(message=msg).run()
    return new_state


@task(log_stdout=True)
def hello_world():
    print("Doing something and giving you 30 seconds to cancel the flow run from the UI...")
    time.sleep(30)


with Flow("canc", state_handlers=[post_to_slack_on_cancellation]) as flow:
    hw = hello_world()
  • But if you want to stop the pod in which this flow run is running, then you can’t do that from that pod - i.e. the pod cannot commit suicide :smile:

In short: reacting to a Cancelled state allows to send notifications on a flow run cancellation or to shut down remote execution separate from the agent, but it doesn’t allow to clean up the agent’s / flow run’s compute resources.

E.g. This thread shows how a user leverages a similar state handler logic reacting to a Cancelled state in order to shut down Databricks cluster and it works flawlessly. So it depends on what type of action you want to take and what problem needs to be solved on flow run cancellation.

From my experience, having it on the Flow level works, but it doesn’t on the task. For example,

This was tested on Kubernetes so I needed a remote storage (Git)

from prefect import Flow, task
import time
from prefect.run_configs import KubernetesRun
from prefect.engine.state import State, Cancelled
import prefect
from prefect.storage import GitHub

def post_to_slack_handler_fq(obj: Flow, old_state: State, new_state: State) -> State:
    if isinstance(new_state, Cancelled):
        prefect.context.logger.info("HIT THE CANCELLED STATE HANDLER")
    return new_state

@task()
def abc(x):
    time.sleep(10)
    return 1

with Flow("sleep", state_handlers=[post_to_slack_handler_fq]) as flow:
    abc.map([1,2,3])

storage = GitHub(repo="kvnkho/demos", 
            path="/prefect/git_storage3.py",
            ref="main")

flow.storage = storage
flow.run_config = KubernetesRun(image="prefecthq/prefect:latest-python3.8")
flow.register("databricks")

1 Like