Write Audit Publish pattern with interactive workflows

Write-Audit-Publish patterns are very sensible in a lot of cases, and they work nicely together with blue-green deployments. So I wanted to make a little demo of how this could work. The repo is at GitHub - radbrt/prefect-warehouse, but will give a walkthrough here:

We implement a write-audit-publish pattern with Prefect, dbt and snowflake. We run our dbt models via Prefect, but in a prefixed schema (develop_ prefix). After the dbt run is finished, Prefect compiles a small report of the differences between the new data now residing in the prefixed schema develop_dwh, and the old data currently in the unprefixed dwh schema, and registers it as an artifact.

Then, it pauses the flow and notifies slack that some new data is in the pipeline and must be approved for publishing. The notification includes a link to the report artifact, and the link to the now paused flow run where it can be continued by clicking “Resume” and choosing “Yes” on the approval action dropdown.

This will invoke a function that runs a schema swap between develop_dwh and dwh, essentially switching the contents of the schemas. Note that it also swaps the access, so make sure the two schemas have identical permissions or that the necessary grants/revokes are run.

First, we have a flow that triggers dbt:

@task
def trigger_dbt_flow() -> str:

    dbt_cli_profile = DbtCliProfile.load("dbt")

    dbt_init = DbtCoreOperation(
        commands=["dbt run"],
        project_dir="/dbt",
        target="dev",
        dbt_cli_profile=dbt_cli_profile,
    )
    result = dbt_init.run()
    return result

This is nothing special, really, we have a block containing our dbt config and we use that to create the connection, and trigger a simple dbt run.

When that run is finished, we can create a small report detailing the difference between the old data and the new data about to be published.

    query = """
<some-sql>
    """
    try:
        engine = create_engine(url)
        with engine.connect() as conn:
            df = pd.read_sql(query, conn)

        pd_table = df.head(10).to_markdown(index=False)
        markdown_content = dedent(
f"""
# Difference Report

Total new rows: {len(df)}

First 10 new rows in `sysselsetting_forholdstall`:
{pd_table}

This report is generated by the `difference_report` task.
"""
        )
    except Exception as e:
        
        markdown_content = dedent(f"""
        # Unable to generate difference report
        """)

The report we made, in the Artifacts UI:

Now, it is time to notify the team that new data is in the pipeline. We can also generate a link to the new artifact, and a link to the flow:

    artifact_url = f"{settings.PREFECT_UI_URL.value()}/artifacts/artifact/{report['id']}"
    flow_run = context.get_run_context().flow_run
    flow_run_url = (
                f"{settings.PREFECT_UI_URL.value()}/flow-runs/flow-run/{flow_run.id}"
            )

    slack_webhook_block = await SlackWebhook.load("radbrt")
    _ = await slack_webhook_block.notify(
        dedent(
            f"""There is new data waiting to be published.
            Review the difference report: {artifact_url}.

            Approve or reject the publication at {flow_run_url}
            """)
    )

The resulting slack message:

Now, it is time to pause the flow and wait for approval:

    decision = await pause_flow_run(wait_for_input=Approve)
    if decision == Approve.YES:
        logger.info(f"Publishing new data to the world!")
        publish()

In the Prefect UI, this looks as follows:

And the finished flow:

The flow in its entirety: prefect-warehouse/flows/write-audit-publish/wap.py at main · radbrt/prefect-warehouse · GitHub

The Dockerfile that runs the flow: prefect-warehouse/Dockerfile at main · radbrt/prefect-warehouse · GitHub

The dbt part of the project: prefect-warehouse/dbt at main · radbrt/prefect-warehouse · GitHub

2 Likes