How can I use SQLServerExecute task?

View in #prefect-community on Slack

RAISS_Zineb @RAISS_Zineb: Hello everyone
I am a beginner with Prefect, in order to create a Data Pipeline, I want to connect it with a SQL DB that I have locally so I used the documentation: SQL Server Tasks | Prefect Docs
My questions are:
-In order to configure the connection between SQL and Prefect I use the classes and I put it all in a .py file and I do execution??
-If Yes, at the .py code level, do a pyodbc import?

  • If not, how can I do?
    -for the args data & ****kwargs what should be put?
    -For the password of the user what should be added to indicate it pwd or other arg?

@Anna_Geller: This discussion I want to create a global temp table in my SQL Server database. Can I use Prefect SqlServerExecute task for it? seems to solve a similar problem and includes a sample code you could use. LMK if you have any questions after reading this

I’m moving your code block to this thread just to keep the main channel cleaner:

import prefect
from sqlserver import SqlServerExecute

class prefect.tasks.sql_server.sql_server.SqlServerExecute(db_name="OTI_Djoliba", user="sa", host="EQ-EQ6288793\SQLEXPRESS", port=1433, driver="ODBC Driver 17 for SQL Server", query= None, data=None, commit=False, **kwargs)

if you just need to execute a simple query e.g. to insert some row, you could do:

from prefect import Flow, Parameter, task
from prefect.tasks.sql_server import SqlServerExecute
from prefect.tasks.secrets import PrefectSecret


execute_query = SqlServerExecute(
    db_name="xxx", user="yyy", host="zzz", port=42, commit=True
)


@task
def define_query_from_param(table_name: str):
    return f"""
    INSERT INTO {table_name} ("index","id","first_name","last_name","email","gender","city","country")
    VALUES
    (1000,1001,'Anna','Geller', '<mailto:anna@example.com|anna@example.com>','Female','Berlin','Germany');
    """


with Flow("postgres_example") as flow:
    postgres_pwd = PrefectSecret("SQL_SERVER_DB_PASSWORD")
    table_name = Parameter("table_name", default="stage.customers")
    query = define_query_from_param(table_name)
    execute_query(password=postgres_pwd, query=query)

if __name__ == "__main__":
    flow.run()

RAISS_Zineb @RAISS_Zineb: Hi @Anna_Geller Thank you, first I want to use a simple query to make a select to test the connection, and then I’ll go to do an ETL. So I’ll see what you shared with me, thanks :slightly_smiling_face:

1 Like