How to clean up resources used in a flow?

Prefect 2.0

Orion allows you to run any Python code within your flow. Therefore, you can use any context manager to facilitate a cleanup process, such as restoring various kinds of a global state, locking and unlocking resources, closing opened files, etc.

@flow
def main_flow():
    with get_mysql_conn(db="mytestdb") as conn:
        sales = extract_sales_from_db(conn)
        customers = extract_customers_from_db(conn)
    df = merge_data(sales, customers)
    load_to_snowflake_dwh(df)

Full example:

import os
import contextlib
import pandas as pd
from prefect import flow, task

@contextlib.contextmanager
def get_mysql_conn(db):
    import mysql.connector

    conn = mysql.connector.connect(
        host=os.environ.get("MYSQL_HOST"),
        user=os.environ.get("MYSQL_USER"),
        password=os.environ.get("MYSQL_PWD"),
        database=db,
    )
    try:
        yield conn
    finally:
        conn.close()

@task
def extract_sales_from_db(conn) -> pd.DataFrame:
    return pd.read_sql("SELECT * FROM sales", conn)

@task
def extract_customers_from_db(conn) -> pd.DataFrame:
    return pd.read_sql("SELECT * FROM customers", conn)

@task
def merge_data(sales: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame:
    return sales.merge(customers, on="CUSTOMER_ID", how="left")

@task
def load_to_snowflake_dwh(data: pd.DataFrame):
    pass

@flow
def main_flow():
    with get_mysql_conn(db="mytestdb") as conn:
        sales = extract_sales_from_db(conn)
        customers = extract_customers_from_db(conn)
    df = merge_data(sales, customers)
    load_to_snowflake_dwh(df)

Prefect 1.0

Since you can’t run arbitrary Python code in Prefect 1.0, it leverages a special task called resource managers to allow for custom setup and cleanup steps. It allows you to reuse database connections and other resources across tasks. It’s especially beneficial when some expensive resources such as temporary Spark clusters need to be terminated upon completion. Below is the same example as before but using the Prefect 1.0 resource manager abstraction.

Resource manager example using Prefect 1.0 syntax:

import os
import mysql.connector
import pandas as pd
from prefect import Flow, task
from prefect import resource_manager

@resource_manager
class MySQLConnection:
    def __init__(self, database: str = "DB_NAME"):
        self.database = database

    def setup(self):
        return mysql.connector.connect(
            host=os.environ.get("MYSQL_HOST"),
            user=os.environ.get("MYSQL_USER"),
            password=os.environ.get("MYSQL_PWD"),
            database=self.database,
        )

    @staticmethod
    def cleanup(conn):
        conn.close()

@task
def extract_sales_from_db(conn) -> pd.DataFrame:
    return pd.read_sql("SELECT * FROM sales", conn)

@task
def extract_customers_from_db(conn) -> pd.DataFrame:
    return pd.read_sql("SELECT * FROM customers", conn)

@task
def merge_data(sales: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame:
    return sales.merge(customers, on="CUSTOMER_ID", how="left")

@task
def load_to_snowflake_dwh(data: pd.DataFrame):
    pass

with Flow("resource_manager_example") as flow:
    with MySQLConnection() as conn:
        sales = extract_sales_from_db(conn)
        customers = extract_customers_from_db(conn)
    df = merge_data(sales, customers)
    load_to_snowflake_dwh(df)

For more information, check out:

The Prefect 2.0 example uses “called” tasks within the context manager scope. What if I want to use “submitted” tasks? Then the scope will be exited before the task completes.

in that case, I’d recommend moving that DB connection logic to a task and additionally adding a concurrency limit through tags on the task decorator to avoid overwhelming the DB with tons of concurrent runs connecting to that DB simultaneously

there is no easy way to share DB connection between distributed task runs because they may end up running on totally different worker nodes e.g. with Dask/Ray

Thanks. My case doesn’t involve a DB and there’s no concern regarding concurrency limit, nor concerns about distributed access to the resource. I’m still not sure how to arrange for guaranteed cleanup of a resource when tasks are submitted (i.e. they run asynchronously). The prefect 1.x resource_manager did the trick but that’s gone now, and there’s no “all finished” trigger to apply to a cleanup task. I posted to the community slack channel but there were no responses. I’m struggling to build up a DAG that has resource allocation and cleanup tasks amongst the computation tasks, where some computation might fail and yet all cleanup has to be done unconditionally. I’ve resorted to an async subflows per resource and use a completion barrier at the end of the subflow such as calling:

def wait_all(*futures):
    task(lambda *_: None)(*futures)

This seems to wait properly, but throws a MissingResult exception.

1 Like

gotcha, have you seen the allow_failure annotation?

example:

from prefect import task, flow, get_run_logger, allow_failure
import random


@task
def extract_data():
    return 42


@task
def extract_data_2():
    return 2


@task
def transform_data(x: int, y: int = 2) -> int:
    if random.random() > 0.5:
        raise ValueError("Non-deterministic error has occured.")
    else:
        return (x + 42) * y


@task
def clean_up_task():
    logger = get_run_logger()
    logger.info("Cleaning up 🧹")


@flow
def allow_flaky_transformation_to_pass():
    data = extract_data.submit()
    data_2 = extract_data_2.submit()
    result = transform_data.submit(data, data_2)
    clean_up_task.submit(wait_for=[allow_failure(result)])


if __name__ == "__main__":
    allow_flaky_transformation_to_pass()