Prefect 2.8.3 is here with completion and failure hooks for flows and tasks!

on_completion and on_failure hooks for flows and tasks

With this release, you can now add client-side hooks that will be called when your flow or task enters a Completed or Failed state. This is great for any case where you want to execute code without involvement of the Prefect API.

Both flows and tasks include on_completion and on_failure options where a list of callable hooks can be provided. The callable will receive three arguments:

  • flow, flow_run, and state in the case of a flow hook
  • task, task_run, and state in the case of a task hook

For example, here we add completion hooks to a flow and a task:

from prefect import task, flow

def my_completion_task_hook_1(task, task_run, state):
    print("This is the first hook - Task completed!!!")
    
def my_completion_task_hook_2(task, task_run, state):
  print("This is the second hook - Task completed!!!")
    
def my_completion_flow_hook(flow, flow_run, state):
    print("Flow completed!!!")
    
@task(on_completion=[my_completion_task_hook_1, my_completion_task_hook_2])
def my_task():
    print("This is the task!")

@flow(on_completion=[my_completion_flow_hook])
def my_flow():
    my_task()

if __name__ == "__main__":
    my_flow()

Next, we’ll include a failure hook as well. It’s worth noting that you can supply both on_completion and on_failure hooks to a flow or task. Only the hooks that are relevant to the final state of the flow or task will be called.

from prefect import task, flow

def my_task_completion_hook(task, task_run, state):
    print("Our task completed successfully!")

def my_task_failure_hook(task, task_run, state):
    print("Our task failed :(")

@task(on_completion=[my_task_completion_hook], on_failure=[my_task_failure_hook])
def my_task():
    raise Exception("Oh no!")

@flow
def my_flow():
    my_task.submit()

if __name__ == "__main__":
    my_flow()

Key enhancements and fixes

Other highlights of this week’s enhancements and fixes include:

  • We added light and dark mode color and contrast enhancements to the UI #8629
  • The Task.map type hint now provides type-checker compatibility for async tasks — #8607
  • The Kubernetes Job block now correctly handles timeouts when streaming logs — #8618
  • We fixed date range filter selection on the flow runs UI page — #8616

See the release notes on GitHub for a full list of the changes in this release!

Contributors

Thanks to the external contributors to Prefect 2.8.3!

3 Likes

Hi Ryan,

Do you have a concrete exemple of the task hook and flow hook? I cannot find the document that’s related to this topic.

Is this new hook function an alternative to the below topic?

Thanks,
Jason

Hi Jason,

It’s not in the docs yet, but we will add it soon.

These hooks can sometimes be an alternative to the docs you linked to, especially for tasks. But there will be times when you want or need to handle success or failure separately.

For example, without a hook, a flow wouldn’t be able to catch its own failure. But adding a failure hook allows you to clean up resources after a flow failure occurs.

Will you be able to pass arguments to the hooks?

Hi are we able to trigger external flows from this?

I’ve tried and it causes an asycio.exceptions.CancelledError

Is there anything similar to on_completion, but that would run if the task finished in state Cached?