How can I send Slack alert on failure including dbt output?

Trying to get the error message of a dbt run in Slack, and have set up the state_handler to only handle the Failed state. But, the DbtShellTask raises the FAIL signal with the message that first occurs Command failed with exit code 2 and the state handler doesn’t get to the Error that I actually want - dbt’s error. How to handle it?

Welcome to the community!

You can solve it by creating a task that only gets triggered on failure by assigning the any_failed trigger.

@task(trigger=any_failed)
def send_slack_alert_on_failure(output):
    SlackTask(message=output).run()

Here is how it would look like in a flow:

And here is a full code snippet based on code from this article.

import prefect
from prefect import task, Flow, Parameter
from prefect.tasks.dbt.dbt import DbtShellTask
from prefect.storage import GitHub
from prefect.triggers import all_finished, any_failed
from prefect.run_configs import LocalRun
import pygit2
import shutil
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.notifications import SlackTask


DBT_PROJECT = "jaffle_shop"
FLOW_NAME = "02_dbt"
STORAGE = GitHub(
    repo="anna-geller/flow-of-flows",
    path=f"flows/{FLOW_NAME}.py",
    access_token_secret="GITHUB_ACCESS_TOKEN",
)


@task(name="Clone DBT repo")
def pull_dbt_repo(repo_url: str, branch: str = None):
    pygit2.clone_repository(url=repo_url, path=DBT_PROJECT, checkout_branch=branch)


@task(name="Delete DBT folder if exists", trigger=all_finished)
def delete_dbt_folder_if_exists():
    shutil.rmtree(DBT_PROJECT, ignore_errors=True)


@task
def get_dbt_credentials(user_name: str, password: str):
    return {"user": user_name, "password": password}


dbt = DbtShellTask(
    return_all=True,
    profile_name=DBT_PROJECT,
    profiles_dir="/Users/anna/.dbt",
    environment="dev",
    overwrite_profiles=True,
    log_stdout=True,
    helper_script=f"cd {DBT_PROJECT}",
    log_stderr=True,
    dbt_kwargs={
        "type": "postgres",
        "host": "localhost",
        "port": 5432,
        "dbname": "postgres",
        "schema": DBT_PROJECT,
        "threads": 4,
        "client_session_keep_alive": False,
    },
)


@task(trigger=all_finished)
def print_dbt_output(output):
    logger = prefect.context.get("logger")
    for line in output:
        logger.info(line)


@task(trigger=any_failed)
def send_slack_alert_on_failure(output):
    SlackTask(message=output).run()


with Flow(FLOW_NAME, storage=STORAGE, run_config=LocalRun(labels=["dev"])) as flow:

    del_task = delete_dbt_folder_if_exists()
    dbt_repo = Parameter(
        "dbt_repo_url", default="https://github.com/anna-geller/jaffle_shop"
    )
    dbt_repo_branch = Parameter("dbt_repo_branch", default=None)
    pull_task = pull_dbt_repo(dbt_repo, dbt_repo_branch)
    del_task.set_downstream(pull_task)

    postgres_user = PrefectSecret("POSTGRES_USER")
    postgres_pass = PrefectSecret("POSTGRES_PASS")
    db_credentials = get_dbt_credentials(postgres_user, postgres_pass)
    dbt_run = dbt(
        command="dbt run", task_args={"name": "DBT Run"}, dbt_kwargs=db_credentials
    )
    dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Log Output"})
    pull_task.set_downstream(dbt_run)

    dbt_test = dbt(
        command="dbt test", task_args={"name": "DBT Test"}, dbt_kwargs=db_credentials
    )
    dbt_test_out = print_dbt_output(dbt_test, task_args={"name": "DBT Test Log Output"})
    dbt_run.set_downstream(dbt_test)

    del_again = delete_dbt_folder_if_exists()
    dbt_test_out.set_downstream(del_again)
    send_slack_alert_on_failure(dbt_run)
    send_slack_alert_on_failure(dbt_test)

flow.set_reference_tasks([dbt_run])
flow.visualize()

Hi @anna_geller, Could you provide a similar example for Prefect 2.0?
I have dbt jobs and tests running but would like to send more detailed notifications to the slack channel.

hi @Joao_Reis - I actually just posted an example of this this morning here

you can supply whatever text you want to the webhook’s notify method

Thanks for your reply! I got the notifications working. Im not being able to get the results from “dbt test” command result and pass it to the notification.
Do you think you can help?

how are you invoking dbt test? There should be a way to associate that output with the state that the flow run is going to enter (the 3rd argument in my example) such that you could extract and print it within the state hook

Through the trigger_dbt_command function from prefect_dbt package

I’m using something like this:


from prefect_dbt.cli import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command
from prefect import flow,get_run_logger
from prefect_github.repository import GitHubRepository
from .utilities import ENV

TMP_FOLDER = 'tmp-dbt'
LOG_MESSAGE = '[DBT_FLOW]'
DBT_PROFILE = 'dbt-profile-'+ENV

@flow
def trigger_dbt_cmds(dbt_commands=[]):
    trigger_dbt_cli_command(
                "dbt test",
                project_dir=TMP_FOLDER,
                overwrite_profiles=True,
                dbt_cli_profile=dbt_cli_profile
            )
    

okay!

I believe the CLI output that you’re looking for can be found in state.message within the hook, like:

In [1]: from prefect_dbt.cli.commands import trigger_dbt_cli_command

In [2]: from prefect import flow

In [3]: def something(flow, flow_run, state):
   ...:     print(state.message) # replace with your slack notif
   ...:

In [4]: @flow(on_failure=[something])
   ...: def dbt_stuff():
   ...:     trigger_dbt_cli_command("dbt test")
   ...:

In [5]: dbt_stuff()
Flow run encountered an exception. RuntimeError: Command failed with exit code 2:
  Verify that each entry within packages.yml (and their transitive dependencies) contains a file named dbt_project.yml