on_completion
and on_failure
hooks for flows and tasks
With this release, you can now add client-side hooks that will be called when your flow or task enters a Completed
or Failed
state. This is great for any case where you want to execute code without involvement of the Prefect API.
Both flows and tasks include on_completion
and on_failure
options where a list of callable hooks can be provided. The callable will receive three arguments:
-
flow
,flow_run
, andstate
in the case of a flow hook -
task
,task_run
, andstate
in the case of a task hook
For example, here we add completion hooks to a flow and a task:
from prefect import task, flow
def my_completion_task_hook_1(task, task_run, state):
print("This is the first hook - Task completed!!!")
def my_completion_task_hook_2(task, task_run, state):
print("This is the second hook - Task completed!!!")
def my_completion_flow_hook(flow, flow_run, state):
print("Flow completed!!!")
@task(on_completion=[my_completion_task_hook_1, my_completion_task_hook_2])
def my_task():
print("This is the task!")
@flow(on_completion=[my_completion_flow_hook])
def my_flow():
my_task()
if __name__ == "__main__":
my_flow()
Next, we’ll include a failure hook as well. It’s worth noting that you can supply both on_completion
and on_failure
hooks to a flow or task. Only the hooks that are relevant to the final state of the flow or task will be called.
from prefect import task, flow
def my_task_completion_hook(task, task_run, state):
print("Our task completed successfully!")
def my_task_failure_hook(task, task_run, state):
print("Our task failed :(")
@task(on_completion=[my_task_completion_hook], on_failure=[my_task_failure_hook])
def my_task():
raise Exception("Oh no!")
@flow
def my_flow():
my_task.submit()
if __name__ == "__main__":
my_flow()
Key enhancements and fixes
Other highlights of this week’s enhancements and fixes include:
- We added light and dark mode color and contrast enhancements to the UI #8629
- The
Task.map
type hint now provides type-checker compatibility for async tasks — #8607 - The Kubernetes Job block now correctly handles timeouts when streaming logs — #8618
- We fixed date range filter selection on the flow runs UI page — #8616
See the release notes on GitHub for a full list of the changes in this release!
Contributors
Thanks to the external contributors to Prefect 2.8.3!
- New contributor @jefflaporte added support for log streaming from multiple containers in a K8s job.
- New contributor @AzemaBaptiste added the
prefect-sifflet
collection to our collections catalog. - Veteran contributor @darrida improved the Docker infrastructure block’s handling of Docker version names.