Prefect 2.0
Orion allows you to run any Python code within your flow. Therefore, you can use any context manager to facilitate a cleanup process, such as restoring various kinds of a global state, locking and unlocking resources, closing opened files, etc.
@flow
def main_flow():
with get_mysql_conn(db="mytestdb") as conn:
sales = extract_sales_from_db(conn)
customers = extract_customers_from_db(conn)
df = merge_data(sales, customers)
load_to_snowflake_dwh(df)
Full example:
import os
import contextlib
import pandas as pd
from prefect import flow, task
@contextlib.contextmanager
def get_mysql_conn(db):
import mysql.connector
conn = mysql.connector.connect(
host=os.environ.get("MYSQL_HOST"),
user=os.environ.get("MYSQL_USER"),
password=os.environ.get("MYSQL_PWD"),
database=db,
)
try:
yield conn
finally:
conn.close()
@task
def extract_sales_from_db(conn) -> pd.DataFrame:
return pd.read_sql("SELECT * FROM sales", conn)
@task
def extract_customers_from_db(conn) -> pd.DataFrame:
return pd.read_sql("SELECT * FROM customers", conn)
@task
def merge_data(sales: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame:
return sales.merge(customers, on="CUSTOMER_ID", how="left")
@task
def load_to_snowflake_dwh(data: pd.DataFrame):
pass
@flow
def main_flow():
with get_mysql_conn(db="mytestdb") as conn:
sales = extract_sales_from_db(conn)
customers = extract_customers_from_db(conn)
df = merge_data(sales, customers)
load_to_snowflake_dwh(df)
Prefect 1.0
Since you can’t run arbitrary Python code in Prefect 1.0, it leverages a special task called resource managers to allow for custom setup and cleanup steps. It allows you to reuse database connections and other resources across tasks. It’s especially beneficial when some expensive resources such as temporary Spark clusters need to be terminated upon completion. Below is the same example as before but using the Prefect 1.0 resource manager abstraction.
Resource manager example using Prefect 1.0 syntax:
import os
import mysql.connector
import pandas as pd
from prefect import Flow, task
from prefect import resource_manager
@resource_manager
class MySQLConnection:
def __init__(self, database: str = "DB_NAME"):
self.database = database
def setup(self):
return mysql.connector.connect(
host=os.environ.get("MYSQL_HOST"),
user=os.environ.get("MYSQL_USER"),
password=os.environ.get("MYSQL_PWD"),
database=self.database,
)
@staticmethod
def cleanup(conn):
conn.close()
@task
def extract_sales_from_db(conn) -> pd.DataFrame:
return pd.read_sql("SELECT * FROM sales", conn)
@task
def extract_customers_from_db(conn) -> pd.DataFrame:
return pd.read_sql("SELECT * FROM customers", conn)
@task
def merge_data(sales: pd.DataFrame, customers: pd.DataFrame) -> pd.DataFrame:
return sales.merge(customers, on="CUSTOMER_ID", how="left")
@task
def load_to_snowflake_dwh(data: pd.DataFrame):
pass
with Flow("resource_manager_example") as flow:
with MySQLConnection() as conn:
sales = extract_sales_from_db(conn)
customers = extract_customers_from_db(conn)
df = merge_data(sales, customers)
load_to_snowflake_dwh(df)
For more information, check out: