How can I send Slack alert on failure including dbt output?

Trying to get the error message of a dbt run in Slack, and have set up the state_handler to only handle the Failed state. But, the DbtShellTask raises the FAIL signal with the message that first occurs Command failed with exit code 2 and the state handler doesn’t get to the Error that I actually want - dbt’s error. How to handle it?

Welcome to the community!

You can solve it by creating a task that only gets triggered on failure by assigning the any_failed trigger.

@task(trigger=any_failed)
def send_slack_alert_on_failure(output):
    SlackTask(message=output).run()

Here is how it would look like in a flow:

And here is a full code snippet based on code from this article.

import prefect
from prefect import task, Flow, Parameter
from prefect.tasks.dbt.dbt import DbtShellTask
from prefect.storage import GitHub
from prefect.triggers import all_finished, any_failed
from prefect.run_configs import LocalRun
import pygit2
import shutil
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.notifications import SlackTask


DBT_PROJECT = "jaffle_shop"
FLOW_NAME = "02_dbt"
STORAGE = GitHub(
    repo="anna-geller/flow-of-flows",
    path=f"flows/{FLOW_NAME}.py",
    access_token_secret="GITHUB_ACCESS_TOKEN",
)


@task(name="Clone DBT repo")
def pull_dbt_repo(repo_url: str, branch: str = None):
    pygit2.clone_repository(url=repo_url, path=DBT_PROJECT, checkout_branch=branch)


@task(name="Delete DBT folder if exists", trigger=all_finished)
def delete_dbt_folder_if_exists():
    shutil.rmtree(DBT_PROJECT, ignore_errors=True)


@task
def get_dbt_credentials(user_name: str, password: str):
    return {"user": user_name, "password": password}


dbt = DbtShellTask(
    return_all=True,
    profile_name=DBT_PROJECT,
    profiles_dir="/Users/anna/.dbt",
    environment="dev",
    overwrite_profiles=True,
    log_stdout=True,
    helper_script=f"cd {DBT_PROJECT}",
    log_stderr=True,
    dbt_kwargs={
        "type": "postgres",
        "host": "localhost",
        "port": 5432,
        "dbname": "postgres",
        "schema": DBT_PROJECT,
        "threads": 4,
        "client_session_keep_alive": False,
    },
)


@task(trigger=all_finished)
def print_dbt_output(output):
    logger = prefect.context.get("logger")
    for line in output:
        logger.info(line)


@task(trigger=any_failed)
def send_slack_alert_on_failure(output):
    SlackTask(message=output).run()


with Flow(FLOW_NAME, storage=STORAGE, run_config=LocalRun(labels=["dev"])) as flow:

    del_task = delete_dbt_folder_if_exists()
    dbt_repo = Parameter(
        "dbt_repo_url", default="https://github.com/anna-geller/jaffle_shop"
    )
    dbt_repo_branch = Parameter("dbt_repo_branch", default=None)
    pull_task = pull_dbt_repo(dbt_repo, dbt_repo_branch)
    del_task.set_downstream(pull_task)

    postgres_user = PrefectSecret("POSTGRES_USER")
    postgres_pass = PrefectSecret("POSTGRES_PASS")
    db_credentials = get_dbt_credentials(postgres_user, postgres_pass)
    dbt_run = dbt(
        command="dbt run", task_args={"name": "DBT Run"}, dbt_kwargs=db_credentials
    )
    dbt_run_out = print_dbt_output(dbt_run, task_args={"name": "DBT Run Log Output"})
    pull_task.set_downstream(dbt_run)

    dbt_test = dbt(
        command="dbt test", task_args={"name": "DBT Test"}, dbt_kwargs=db_credentials
    )
    dbt_test_out = print_dbt_output(dbt_test, task_args={"name": "DBT Test Log Output"})
    dbt_run.set_downstream(dbt_test)

    del_again = delete_dbt_folder_if_exists()
    dbt_test_out.set_downstream(del_again)
    send_slack_alert_on_failure(dbt_run)
    send_slack_alert_on_failure(dbt_test)

flow.set_reference_tasks([dbt_run])
flow.visualize()