Flow run states working different in a local run than in a backend run - can't use state results in a state-handlers the same way

Hi Everyone,I’m developing a handler that will be sending slack notification to our team on flow failures with detailed information.
I have managed it so far and it runs flawlessly locally.
The issue comes when running it on our server, the handler sends a default notification that just says Some reference tasks failed.
I was investigating a bit and seem that the state contains the result of all tasks while running it locally ( state.result ) while on the server this is empty (i printed it using the logger) .Any idea on how to address this?

1 Like

The issue was cross-posted and resolved via Slack.

View in #prefect-community on Slack

@Andres: Hi Everyone,

I’m developing a handler that will be sending slack notification to our team on flow failures with detailed information.
I have managed it so far and it runs flawlessly locally.
The issue comes when running it on our server, the handler sends a default notification that just says Some reference tasks failed.
I was investigating a bit and seem that the state contains the result of all tasks while running it locally (state.result) while on the server this is empty (i printed it using the logger) .

Any idea on how to address this?

@Anna_Geller: I understand your problem. Unfortunately, the flow run state of a local flow run is a bit different when you run it on the backend side. There is currently no easy way to get the result of all task run states on a flow level and pass it to a flow-level state handler alerting you about all task runs that failed within this failed flow run. Perhaps you could tackle it using FlowRunView? not sure what’s the best way of approaching it on a backend side:

import json
import requests
import prefect
from prefect import task, Flow
from prefect.backend.flow_run import FlowRunView


def send_report_on_success(task, old_state, new_state):
    if new_state.is_successful():
        flow_run = FlowRunView.from_flow_run_id(prefect.context.get("flow_run_id"))
        task_run = flow_run.get_task_run(task_run_id=prefect.context.get("task_run_id"))
        result = task_run.get_result()
        <http://requests.post|requests.post>(url="webhook_url", data=json.dumps({"text": result}))
    return new_state


@task(state_handlers=[send_report_on_success])
def return_some_data():
    return "Some result"


with Flow(name="state-handler-demo-flow") as flow:
    result = return_some_data()

the easiest solution on Prefect Server would be using Cloud Hooks

Cloud Hooks | Prefect Docs

I definitely heard a similar request in the past - @Kevin_Kho did you see some community contribution about that?

@Andres: The thing is that slack notifications become somewhat useless… On our side we have different sort of errors and it would be really useful to see in the slack message the reason of the failure. Currently, with Some reference tasks failed. I get not context of the failure at all…
I’ll have a look at your suggestion :slightly_smiling_face:

@Anna_Geller: if the problem you try to solve is seeing the error message and full exception traceback in your Slack message for each task run that failed, you could use this logic:

 import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
from typing import cast


def post_to_slack_on_failure(task, old_state, new_state):
    if new_state.is_failed():
        if isinstance(new_state.result, Exception):
            value = "```{}```".format(repr(new_state.result))
        else:
            value = cast(str, new_state.message)
        msg = (
            f"The task `{prefect.context.task_name}` failed "
            f"in a flow run {prefect.context.flow_run_id} "
            f"with an exception {value}"
        )
        SlackTask(message=msg).run()
    return new_state


@task(state_handlers=[post_to_slack_on_failure])
def divide_numbers(a, b):
    return 1 / (b - a)


with Flow(name="state-inspection-handler") as flow:
    result = divide_numbers(1, 1)


if __name__ == "__main__":
    flow.run()

but you would need to attach this state handler to all tasks that need this type of notification, so not on a flow-level

Kevin_Kho @Kevin_Kho: Anna is right that the state handler attached to the flow does not keep track of results because it could be too big to store in-memory. You can get the error though by attaching it to the task level state handler.

For getting the state across all task runs, you’d need to query the GraphQL API in the Flow-level state handler

@Andres: Thanks guys, I’ve managed to implemented by fetching the data from the backend as Anna suggested on her first answer :slightly_smiling_face:

@Anna_Geller: Nice work! Would you be open to sharing your solution? I’m sure many users could benefit from it (doesn’t have to be perfect)

@Andres: SUre, I’ll just share a small portion of it.
So I dont want to put the handler on all tasks, so I put it at flow level and when a state is failed, then I called the handler which fetches the flow info and the tasks from the backend using this:

id_ = prefect.context.get("flow_run_id")
tasks = FlowRunView.from_flow_run_id(id_).get_all_task_runs()
failed_tasks = [task for task in tasks if isinstance(task.state, Failed)]

Now on failed_tasks I have the failed tasks with their information, so it’s just matter to play around and display the message you want :slightly_smiling_face:

@Anna_Geller: Fantastic, thanks so much for sharing! :raised_hands: