Connection to Database in Task

Hey everyone!

I recently switched to Prefect Orion for a project of mine.

Since I had used Prefect 1.0 (with pleasure) before, I thought I could refactor my data pipeline. I planned to connect to the database and pass around the connection object as was possible with Prefect 1.0.

So I defined my pipeline as follows (please note that I omitted some more sensitive information):

import toml
import sqlalchemy as sql
from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner


@task(name="load config")
def load_config(path: str) -> dict:
    config = toml.load(open(path))
    return config

@task(name="make db connection string")
def make_connection_string(config: dict) -> str:
    connection_string = f"""mssql+pyodbc://{config["connection_info"]["user_id"]}:{config["connection_info"]["password"]}@{config["connection_info"]["server_address"]}/{config["connection_info"]["database"]}?{config["connection_info"]["params"]}"""
    return connection_string

@task(name="connect to database")
def connect_to_db(connection_string: str):
    engine = sql.create_engine(connection_string)
    return engine

@flow(name="ETL", task_runner=SequentialTaskRunner())
def run_pipeline(config_path: str):

    config = load_config(config_path)

    connection_string = make_connection_string(config, wait_for=[config])

    conn = connect_to_db(connection_string, wait_for=[connection_string])

    data = get_data_from_db(conn, wait_for=[conn])

However, after setting up work queues, agents and the deployment (including the condaenv), I get the following errors from the agent (again, I removed other, more sensitive tasks that all happen after connecting to the DB):

Agent started! Looking for work from queue '145bf05e-7862-4242-9299-0150e5ab5d80'...
16:36:20.055 | INFO    | prefect.agent - Submitting flow run '43be6b57-47a7-40f8-bc07-0495d1036e53'
16:36:20.088 | INFO    | prefect.flow_runner.subprocess - Opening subprocess for flow run '43be6b57-47a7-40f8-bc07-0495d1036e53'...
16:36:20.096 | INFO    | prefect.agent - Completed submission of flow run '43be6b57-47a7-40f8-bc07-0495d1036e53'
16:36:22.567 | INFO    | Flow run 'loose-mantis' - Using task runner 'SequentialTaskRunner'
16:36:22.640 | INFO    | Flow run 'loose-mantis' - Created task run 'load config-c1cc6188-0' for task 'load config'
16:36:22.706 | INFO    | Task run 'load config-c1cc6188-0' - Finished in state Completed()
16:36:22.737 | INFO    | Flow run 'loose-mantis' - Created task run 'make db connection string-803ced0c-0' for task 'make db connection string'
16:36:22.793 | INFO    | Task run 'make db connection string-803ced0c-0' - Finished in state Completed()
16:36:23.521 | INFO    | Flow run 'loose-mantis' - Created task run 'make db connection string-803ced0c-1' for task 'make db connection string'
16:36:23.577 | INFO    | Task run 'make db connection string-803ced0c-1' - Finished in state Completed()
16:36:23.605 | INFO    | Flow run 'loose-mantis' - Created task run 'connect to database-3aa5bcdc-1' for task 'connect to database'
16:36:23.752 | INFO    | Task run 'connect to database-3aa5bcdc-0' - Crash detected! Execution was interrupted by an unexpected exception.
16:36:23.905 | INFO    | Task run 'connect to database-3aa5bcdc-1' - Crash detected! Execution was interrupted by an unexpected exception.

I was wondering, what I could do to successfully connect to the database. Thank you for your help!

2 Likes

What happens if you comment out all the decorators @task and @flow, as well as the wait_for like below? Does it still error?

import toml
import sqlalchemy as sql
from prefect import task, flow
from prefect.task_runners import SequentialTaskRunner


def load_config(path: str) -> dict:
    config = toml.load(open(path))
    return config

def make_connection_string(config: dict) -> str:
    connection_string = f"""mssql+pyodbc://{config["connection_info"]["user_id"]}:{config["connection_info"]["password"]}@{config["connection_info"]["server_address"]}/{config["connection_info"]["database"]}?{config["connection_info"]["params"]}"""
    return connection_string

def connect_to_db(connection_string: str):
    engine = sql.create_engine(connection_string)
    return engine

def run_pipeline(config_path: str):

    config = load_config(config_path)

    connection_string = make_connection_string(config)

    conn = connect_to_db(connection_string)

    data = get_data_from_db(conn)

Also I don’t think you need wait_for since Orion (Prefect 2.0) should automatically figure out the dependencies if the result from one task is used as an argument in another task.

Lastly, in the coming week, I think prefect-sqlalchemy should be released; just waiting for this to be merged: External dialects, sync driver by ahuang11 · Pull Request #5 · PrefectHQ/prefect-sqlalchemy · GitHub

2 Likes

Thank you for your response! Yes, if I comment out the decorators and run this script, there are no errors. I was using the wait_for because of something I read in the documentation for Orion: Task dependencies - Prefect 2.0 on enforcing explicit task dependencies.

Your work on the the sqlalchemy integration looks very promising! Looking forward to using it.

For now, I will probably resort to using Prefect 1 and try to refactor it later.

I’d be curious to find out what’s the root cause here though - can you try it and see if you don’t get this error when you don’t pass DB connection between tasks and define it in the task that needs it? Could be there is some bug preventing from sharing DB connection between tasks

Basically moving the connection to get_data_from_db task

1 Like

I checked your proposal to move the connection inside the data retrieval task, and this does indeed work!
If I move the sqlalchemy connection creation inside the task and connect through that, I am able to connect to the DB without problems. The task results are also what I would expect.

11:41:01.588 | INFO    | prefect.agent - Submitting flow run 'c3a0e7b7-cc2d-4e52-a26f-f1c3656d58b0'
11:41:01.610 | INFO    | prefect.flow_runner.subprocess - Opening subprocess for flow run 'c3a0e7b7-cc2d-4e52-a26f-f1c3656d58b0'...
11:41:01.615 | INFO    | prefect.agent - Completed submission of flow run 'c3a0e7b7-cc2d-4e52-a26f-f1c3656d58b0'
11:41:02.971 | INFO    | Flow run 'incredible-cuscus' - Using task runner 'SequentialTaskRunner'
11:41:03.029 | INFO    | Flow run 'incredible-cuscus' - Created task run 'load config-c1cc6188-0' for task 'load config'
11:41:03.073 | INFO    | Task run 'load config-c1cc6188-0' - Finished in state Completed()
11:41:03.094 | INFO    | Flow run 'incredible-cuscus' - Created task run 'make connection string-ba31a0b9-0' for task 'make connection string'
11:41:03.130 | INFO    | Task run 'make connection string-ba31a0b9-0' - Finished in state Completed()
11:41:03.148 | INFO    | Flow run 'incredible-cuscus' - Created task run 'retrieve pull data-7d097610-0' for task 'retrieve pull data'
11:41:05.482 | INFO    | Task run 'retrieve pull data-7d097610-0' - Finished in state Completed()
11:41:05.500 | INFO    | Flow run 'incredible-cuscus' - Finished in state Completed('All states completed.')
11:41:05.751 | INFO    | prefect.flow_runner.subprocess - Subprocess for flow run 'c3a0e7b7-cc2d-4e52-a26f-f1c3656d58b0' exited cleanly.
2 Likes

Awesome, thanks for confirming that - so it looks like a bug or some issue with passing DB connections between tasks. I’ll open an issue for tracking.

2 Likes

I think the reason is because an engine is not serializable (not pickleable)

Thanks, @ahuang11 - that’s true but it would be nice to not pickle all task run results when not needed i.e. be able to turn off checkpointing.

prefect-sqlalchemy is now released! Looking forward for your feedback!
https://prefecthq.github.io/prefect-sqlalchemy/

3 Likes

Hi Anna / other prefect members,

I am migrating from prefect 1 to prefect 2.0. I have a similar issue, I am trying to pass database sessions between different tasks. In prefect 1 I used the checkpoint=False option for these tasks. Is there a similar option in prefect 2? I looked in the source code but couldn’t find anything.

Thanks in advance,
Bram

I actually tested that and was able to pass database connection between tasks without any issues - can you try that and report back if you see any issues with it?

Hi, thank you for your quick response, when I return a Session from
sqlalchemy.orm.session in a task, I get:

  File "/Users/bram/.pyenv/versions/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'weakref' object

or when returning an Engine from sqlalchemy.engine:

  File "/Users/bram/.pyenv/versions/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object

When I return a Session from sqlalchemy.orm.session.

Or something completely different, return a PostgresContainer from testcontainers.postgres:

File "/Users/bram/.pyenv/versions/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
    return Pickler.dump(self, obj)
  File "/Users/bram/.pyenv/versions/3.9.7/lib/python3.9/socket.py", line 273, in __getstate__
    raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
TypeError: cannot pickle 'socket' object

So it seems to error when the object is not serialisable, which makes sense, but I don’t need to pickle these objects and cache these. But I do would like to share these between tasks.
Do you have any suggestions how to do that? Or should I create a bug/request ticket somewhere?

Many thanks!

Oh, sorry, I see now this is the same issue and that is still open.

Can you please cross-check using a new environment and the latest Prefect version?

If this still doesn’t work, could you share a minimal reproducible example?

Morning Anna, thanks for the suggestion, I tried using the both 2.0.0 and 2.0.1 on Python 3.9 and 3.10.
All result in the same error in this minimal example.

With a fresh environment:

pyenv virtualenv 3.10.5 mlflow-poc-310
pyenv shell mlflow-poc-310
pip install prefect sqlalchemy

pipeline.py

from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from prefect import task, flow


@task
def init_db() -> Engine:
    engine = create_engine("sqlite:///foo.db")

    return engine


@flow(name="pipeline-1")
def pipeline_1():
    init_db()


if __name__ == "__main__":
    pipeline_1()
python pipeline.py

result:

  File "/Users/bram/.pyenv/versions/prefect-310/lib/python3.10/site-packages/prefect/orion/schemas/data.py", line 42, in encode
    blob = lookup_serializer(encoding).dumps(data, **kwargs)
  File "/Users/bram/.pyenv/versions/prefect-310/lib/python3.10/site-packages/prefect/serializers.py", line 59, in dumps
    data_bytes = cloudpickle.dumps(data)
  File "/Users/bram/.pyenv/versions/prefect-310/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/bram/.pyenv/versions/prefect-310/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
    return Pickler.dump(self, obj)
TypeError: cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object

Is there a way to disable pickling of the returned objects?

I was able to reproduce the issue and brought this up to the team - I will update once we know more

1 Like