How to clean up resources used in a flow?

Prefect 2.0

Orion allows you to run any Python code within your flow. Therefore, you can use any context manager to facilitate a cleanup process, such as restoring various kinds of a global state, locking and unlocking resources, closing opened files, etc.

@flow
def main_flow():
    with get_mysql_conn(db="mytestdb") as conn:
        sales = extract_sales_from_db(conn)
        customers = extract_customers_from_db(conn)
    df = merge_data(sales, customers)
    load_to_snowflake_dwh(df)

Full example:

import os
import contextlib
import pandas as pd
from prefect import flow, task

@contextlib.contextmanager
def get_mysql_conn(db):
    import mysql.connector

    conn = mysql.connector.connect(
        host=os.environ.get("MYSQL_HOST"),
        user=os.environ.get("MYSQL_USER"),
        password=os.environ.get("MYSQL_PWD"),
        database=db,
    )
    try:
        yield conn
    finally:
        conn.close()

@task
def extract_sales_from_db(conn) -> pd.DataFrame:
    return pd.read_sql("SELECT * FROM sales", conn)

@task
def extract_customers_from_db(conn) -> pd.DataFrame:
    return pd.read_sql("SELECT * FROM customers", conn)

@task
def merge_data(sales: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame:
    return sales.merge(customers, on="CUSTOMER_ID", how="left")

@task
def load_to_snowflake_dwh(data: pd.DataFrame):
    pass

@flow
def main_flow():
    with get_mysql_conn(db="mytestdb") as conn:
        sales = extract_sales_from_db(conn)
        customers = extract_customers_from_db(conn)
    df = merge_data(sales, customers)
    load_to_snowflake_dwh(df)

Prefect 1.0

Since you can’t run arbitrary Python code in Prefect 1.0, it leverages a special task called resource managers to allow for custom setup and cleanup steps. It allows you to reuse database connections and other resources across tasks. It’s especially beneficial when some expensive resources such as temporary Spark clusters need to be terminated upon completion. Below is the same example as before but using the Prefect 1.0 resource manager abstraction.

Resource manager example using Prefect 1.0 syntax:

import os
import mysql.connector
import pandas as pd
from prefect import Flow, task
from prefect import resource_manager

@resource_manager
class MySQLConnection:
    def __init__(self, database: str = "DB_NAME"):
        self.database = database

    def setup(self):
        return mysql.connector.connect(
            host=os.environ.get("MYSQL_HOST"),
            user=os.environ.get("MYSQL_USER"),
            password=os.environ.get("MYSQL_PWD"),
            database=self.database,
        )

    @staticmethod
    def cleanup(conn):
        conn.close()

@task
def extract_sales_from_db(conn) -> pd.DataFrame:
    return pd.read_sql("SELECT * FROM sales", conn)

@task
def extract_customers_from_db(conn) -> pd.DataFrame:
    return pd.read_sql("SELECT * FROM customers", conn)

@task
def merge_data(sales: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame:
    return sales.merge(customers, on="CUSTOMER_ID", how="left")

@task
def load_to_snowflake_dwh(data: pd.DataFrame):
    pass

with Flow("resource_manager_example") as flow:
    with MySQLConnection() as conn:
        sales = extract_sales_from_db(conn)
        customers = extract_customers_from_db(conn)
    df = merge_data(sales, customers)
    load_to_snowflake_dwh(df)

For more information, check out:

The Prefect 2.0 example uses “called” tasks within the context manager scope. What if I want to use “submitted” tasks? Then the scope will be exited before the task completes.

in that case, I’d recommend moving that DB connection logic to a task and additionally adding a concurrency limit through tags on the task decorator to avoid overwhelming the DB with tons of concurrent runs connecting to that DB simultaneously

there is no easy way to share DB connection between distributed task runs because they may end up running on totally different worker nodes e.g. with Dask/Ray