So, I would like the flow_success to be the final validation before the run_success task is triggered. Is there a way to change the final flow run state to failure if the on_success hook for the flow fails? Example code:
@task(name="run_failed")
def run_failed(**kwargs):
print("run_failed")
return ValueError("failed")
@task(name="run_success")
def run_success(slack_context, result, **kwargs):
print("run_success")
def flow_failure(flow, flow_run, state):
run_failed.fn(**flow_run.parameters)
def flow_success(flow, flow_run, state):
try:
#if a validation fails happens here, fail the run
raise ValueError("Fail purposely")
except Exception as err:
logger.exception(err)
run_failed.fn(**flow_run.parameters)
@flow(name= "test", flow_run_name="test flow", on_failure=[flow_failure], on_completion=[flow_success])
def test(query):
return {
"results": query["message"]
}
if __name__ == "__main__":
test(**{
"query": {
"message": "my message"
}
})
The logs continue to mark the flow run as a success:
14:34:13.738 | INFO | prefect.engine - Created flow run 'incredible-dormouse' for flow 'test'
14:34:16.352 | INFO | Flow run 'test flow' - Running hook 'flow_success' in response to entering state 'Completed'
14:34:16.365 | ERROR | prefect.flows - Fail purposely
Traceback (most recent call last):
File "/app/flows/test.py", line 22, in flow_success
raise ValueError("Fail purposely")
ValueError: Fail purposely
run_failed
14:34:16.366 | INFO | Flow run 'test flow' - Hook 'flow_success' finished running successfully
14:34:16.367 | INFO | Flow run 'incredible-dormouse' - Finished in state Completed()