Imagine the following flow configuration:
import pandas as pd
from prefect import task, flow, get_run_logger
from dataplatform.blocks import BigQueryPandas
from typing import List
@task
def extract(dataset: str) -> pd.DataFrame:
file = f"https://raw.githubusercontent.com/dbt-labs/jaffle_shop/main/seeds/{dataset}.csv"
return pd.read_csv(file)
@task
def load(df: pd.DataFrame, tbl: str, **kwargs) -> None:
logger = get_run_logger()
block = BigQueryPandas.load("default")
block.load_data(dataframe=df, table_name=tbl, **kwargs)
ref = block.credentials.get_bigquery_client().get_table(tbl)
logger.info(
"Df loaded โ
table %s has now %d rows and %s MB",
tbl,
ref.num_rows,
ref.num_bytes / 1_000_000,
)
@flow
def ingestion_bigquery(
dataset: str = "jaffle_shop2",
tables: List[str] = ["raw_customers", "raw_orders", "raw_payments"],
**kwargs,
) -> None:
block = BigQueryPandas.load("default")
block.create_dataset_if_not_exists(dataset)
for table in tables:
bq_table = f"{dataset}.{table}"
df = extract.with_options(name=f"๐๏ธ extract_{table}")(table)
load.with_options(name=f"๐ load_{table}")(df, bq_table, **kwargs)
if __name__ == "__main__":
ingestion_bigquery(if_exists="replace")
You can see here that we are using **kwargs
on the flow function, and this allows us to provide extra arguments when calling that flow in the last line.
Ad-hoc runs
This is a fully supported behavior for ad-hoc runs and the UI is even able to render that nicely in the Parameters tab.
Deployments
When you are ready to deploy your flow, it is best practice to replace any **kwargs
on the flow function with explicit parameter values and corresponding default values.
As the Zen of Python says:
Explicit is better than implicit
In the above example, consider changing the flow to:
@flow
def ingestion_bigquery(
dataset: str = "jaffle_shop",
tables: List[str] = ["raw_customers", "raw_orders", "raw_payments"],
if_exists="replace"
) -> None:
You can change this value to โappendโ at runtime when needed, but define the parameter explicitly.