Trigger_dbt_cli_command does not returns log when the dbt command fails

Hi, I am stuck with an issue when running dbt test with trigger_dbt_cli_command . Hope someone can share some insights.

In prefect 1, I used DbtShellTask to run dbt test , it always returns a full log even when some tests fail. This returned log is used in a following task to populate a performance tracking table, which is important to us. Now in prefect 2, we have to switch to trigger_dbt_cli_command , and this, when any test fails, return RuntimeError (a snippet in the attached screenshot). If I put the task in a try/except block, the flow can continue, but still, I am not able to get the log. Besides, in this case of failling dbt tests, using try/except block skips a task completely, raising another error when the following task uses that task as an argument, which is apparently missing.

In prefect 1 with DbtShellTask,
I was able to basically, in a task, grab these lines below in the logs, make a dataframe out of it, and push to a table in our warehouse:

19:21:05  Failure in test bundle_breakdown_error_test (tests/bundle_breakdown_error_test.sql)
19:21:05    Got 101 results, configured to fail if != 0
19:21:05  
19:21:05    compiled Code at target/compiled/drsquatch_edw/tests/bundle_breakdown_error_test.sql

12:21:05.858 | INFO    | Task run 'trigger_dbt_cli_command-1' - 19:21:05  
19:21:05  Warning in test amazon_bundle_breakdown_nulls (tests/amazon_bundle_breakdown_nulls.sql)
19:21:05    Got 94 results, configured to warn if >1
19:21:05  
19:21:05    compiled Code at target/compiled/drsquatch_edw/tests/amazon_bundle_breakdown_nulls.sql

Hi, I am having the same problem. I just used a not very clean workaround (that also has the problem that the logs are not streamed, but showed at the end of the flow):

ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')


def log_output(log_input: list) -> None:
    """Formats a listed output to be served as lines"""
    for line in log_input:
        log.info(ansi_escape.sub('', line))


@flow(name='Update src from raw_table data')
def update_src():
    """Updates the src tables from the raw tables schema"""
    return generate_dbt_core_operation('shared', "dbt run -s src")


src = update_src()
log_output(src)

(code is not complete and generate_dbt_core_operation is a custom function to wrap the DBT configs).