Connection to Database in Task

Hey everyone!

I recently switched to Prefect Orion for a project of mine.

Since I had used Prefect 1.0 (with pleasure) before, I thought I could refactor my data pipeline. I planned to connect to the database and pass around the connection object as was possible with Prefect 1.0.

So I defined my pipeline as follows (please note that I omitted some more sensitive information):

import toml
import sqlalchemy as sql
from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner


@task(name="load config")
def load_config(path: str) -> dict:
    config = toml.load(open(path))
    return config

@task(name="make db connection string")
def make_connection_string(config: dict) -> str:
    connection_string = f"""mssql+pyodbc://{config["connection_info"]["user_id"]}:{config["connection_info"]["password"]}@{config["connection_info"]["server_address"]}/{config["connection_info"]["database"]}?{config["connection_info"]["params"]}"""
    return connection_string

@task(name="connect to database")
def connect_to_db(connection_string: str):
    engine = sql.create_engine(connection_string)
    return engine

@flow(name="ETL", task_runner=SequentialTaskRunner())
def run_pipeline(config_path: str):

    config = load_config(config_path)

    connection_string = make_connection_string(config, wait_for=[config])

    conn = connect_to_db(connection_string, wait_for=[connection_string])

    data = get_data_from_db(conn, wait_for=[conn])

However, after setting up work queues, agents and the deployment (including the condaenv), I get the following errors from the agent (again, I removed other, more sensitive tasks that all happen after connecting to the DB):

Agent started! Looking for work from queue '145bf05e-7862-4242-9299-0150e5ab5d80'...
16:36:20.055 | INFO    | prefect.agent - Submitting flow run '43be6b57-47a7-40f8-bc07-0495d1036e53'
16:36:20.088 | INFO    | prefect.flow_runner.subprocess - Opening subprocess for flow run '43be6b57-47a7-40f8-bc07-0495d1036e53'...
16:36:20.096 | INFO    | prefect.agent - Completed submission of flow run '43be6b57-47a7-40f8-bc07-0495d1036e53'
16:36:22.567 | INFO    | Flow run 'loose-mantis' - Using task runner 'SequentialTaskRunner'
16:36:22.640 | INFO    | Flow run 'loose-mantis' - Created task run 'load config-c1cc6188-0' for task 'load config'
16:36:22.706 | INFO    | Task run 'load config-c1cc6188-0' - Finished in state Completed()
16:36:22.737 | INFO    | Flow run 'loose-mantis' - Created task run 'make db connection string-803ced0c-0' for task 'make db connection string'
16:36:22.793 | INFO    | Task run 'make db connection string-803ced0c-0' - Finished in state Completed()
16:36:23.521 | INFO    | Flow run 'loose-mantis' - Created task run 'make db connection string-803ced0c-1' for task 'make db connection string'
16:36:23.577 | INFO    | Task run 'make db connection string-803ced0c-1' - Finished in state Completed()
16:36:23.605 | INFO    | Flow run 'loose-mantis' - Created task run 'connect to database-3aa5bcdc-1' for task 'connect to database'
16:36:23.752 | INFO    | Task run 'connect to database-3aa5bcdc-0' - Crash detected! Execution was interrupted by an unexpected exception.
16:36:23.905 | INFO    | Task run 'connect to database-3aa5bcdc-1' - Crash detected! Execution was interrupted by an unexpected exception.

I was wondering, what I could do to successfully connect to the database. Thank you for your help!

1 Like

What happens if you comment out all the decorators @task and @flow, as well as the wait_for like below? Does it still error?

import toml
import sqlalchemy as sql
from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner


def load_config(path: str) -> dict:
    config = toml.load(open(path))
    return config

def make_connection_string(config: dict) -> str:
    connection_string = f"""mssql+pyodbc://{config["connection_info"]["user_id"]}:{config["connection_info"]["password"]}@{config["connection_info"]["server_address"]}/{config["connection_info"]["database"]}?{config["connection_info"]["params"]}"""
    return connection_string

def connect_to_db(connection_string: str):
    engine = sql.create_engine(connection_string)
    return engine

def run_pipeline(config_path: str):

    config = load_config(config_path)

    connection_string = make_connection_string(config)

    conn = connect_to_db(connection_string)

    data = get_data_from_db(conn)

Also I don’t think you need wait_for since Orion (Prefect 2.0) should automatically figure out the dependencies if the result from one task is used as an argument in another task.

Lastly, in the coming week, I think prefect-sqlalchemy should be released; just waiting for this to be merged: External dialects, sync driver by ahuang11 · Pull Request #5 · PrefectHQ/prefect-sqlalchemy · GitHub

2 Likes

Thank you for your response! Yes, if I comment out the decorators and run this script, there are no errors. I was using the wait_for because of something I read in the documentation for Orion: Task dependencies - Prefect 2.0 on enforcing explicit task dependencies.

Your work on the the sqlalchemy integration looks very promising! Looking forward to using it.

For now, I will probably resort to using Prefect 1 and try to refactor it later.

I’d be curious to find out what’s the root cause here though - can you try it and see if you don’t get this error when you don’t pass DB connection between tasks and define it in the task that needs it? Could be there is some bug preventing from sharing DB connection between tasks

Basically moving the connection to get_data_from_db task

1 Like

I checked your proposal to move the connection inside the data retrieval task, and this does indeed work!
If I move the sqlalchemy connection creation inside the task and connect through that, I am able to connect to the DB without problems. The task results are also what I would expect.

11:41:01.588 | INFO    | prefect.agent - Submitting flow run 'c3a0e7b7-cc2d-4e52-a26f-f1c3656d58b0'
11:41:01.610 | INFO    | prefect.flow_runner.subprocess - Opening subprocess for flow run 'c3a0e7b7-cc2d-4e52-a26f-f1c3656d58b0'...
11:41:01.615 | INFO    | prefect.agent - Completed submission of flow run 'c3a0e7b7-cc2d-4e52-a26f-f1c3656d58b0'
11:41:02.971 | INFO    | Flow run 'incredible-cuscus' - Using task runner 'SequentialTaskRunner'
11:41:03.029 | INFO    | Flow run 'incredible-cuscus' - Created task run 'load config-c1cc6188-0' for task 'load config'
11:41:03.073 | INFO    | Task run 'load config-c1cc6188-0' - Finished in state Completed()
11:41:03.094 | INFO    | Flow run 'incredible-cuscus' - Created task run 'make connection string-ba31a0b9-0' for task 'make connection string'
11:41:03.130 | INFO    | Task run 'make connection string-ba31a0b9-0' - Finished in state Completed()
11:41:03.148 | INFO    | Flow run 'incredible-cuscus' - Created task run 'retrieve pull data-7d097610-0' for task 'retrieve pull data'
11:41:05.482 | INFO    | Task run 'retrieve pull data-7d097610-0' - Finished in state Completed()
11:41:05.500 | INFO    | Flow run 'incredible-cuscus' - Finished in state Completed('All states completed.')
11:41:05.751 | INFO    | prefect.flow_runner.subprocess - Subprocess for flow run 'c3a0e7b7-cc2d-4e52-a26f-f1c3656d58b0' exited cleanly.
1 Like

Awesome, thanks for confirming that - so it looks like a bug or some issue with passing DB connections between tasks. I’ll open an issue for tracking.

2 Likes

I think the reason is because an engine is not serializable (not pickleable)

Thanks, @ahuang11 - that’s true but it would be nice to not pickle all task run results when not needed i.e. be able to turn off checkpointing.

prefect-sqlalchemy is now released! Looking forward for your feedback!
https://prefecthq.github.io/prefect-sqlalchemy/

3 Likes