I recently switched to Prefect Orion for a project of mine.
Since I had used Prefect 1.0 (with pleasure) before, I thought I could refactor my data pipeline. I planned to connect to the database and pass around the connection object as was possible with Prefect 1.0.
So I defined my pipeline as follows (please note that I omitted some more sensitive information):
However, after setting up work queues, agents and the deployment (including the condaenv), I get the following errors from the agent (again, I removed other, more sensitive tasks that all happen after connecting to the DB):
Agent started! Looking for work from queue '145bf05e-7862-4242-9299-0150e5ab5d80'...
16:36:20.055 | INFO | prefect.agent - Submitting flow run '43be6b57-47a7-40f8-bc07-0495d1036e53'
16:36:20.088 | INFO | prefect.flow_runner.subprocess - Opening subprocess for flow run '43be6b57-47a7-40f8-bc07-0495d1036e53'...
16:36:20.096 | INFO | prefect.agent - Completed submission of flow run '43be6b57-47a7-40f8-bc07-0495d1036e53'
16:36:22.567 | INFO | Flow run 'loose-mantis' - Using task runner 'SequentialTaskRunner'
16:36:22.640 | INFO | Flow run 'loose-mantis' - Created task run 'load config-c1cc6188-0' for task 'load config'
16:36:22.706 | INFO | Task run 'load config-c1cc6188-0' - Finished in state Completed()
16:36:22.737 | INFO | Flow run 'loose-mantis' - Created task run 'make db connection string-803ced0c-0' for task 'make db connection string'
16:36:22.793 | INFO | Task run 'make db connection string-803ced0c-0' - Finished in state Completed()
16:36:23.521 | INFO | Flow run 'loose-mantis' - Created task run 'make db connection string-803ced0c-1' for task 'make db connection string'
16:36:23.577 | INFO | Task run 'make db connection string-803ced0c-1' - Finished in state Completed()
16:36:23.605 | INFO | Flow run 'loose-mantis' - Created task run 'connect to database-3aa5bcdc-1' for task 'connect to database'
16:36:23.752 | INFO | Task run 'connect to database-3aa5bcdc-0' - Crash detected! Execution was interrupted by an unexpected exception.
16:36:23.905 | INFO | Task run 'connect to database-3aa5bcdc-1' - Crash detected! Execution was interrupted by an unexpected exception.
I was wondering, what I could do to successfully connect to the database. Thank you for your help!
Also I don’t think you need wait_for since Orion (Prefect 2.0) should automatically figure out the dependencies if the result from one task is used as an argument in another task.
Thank you for your response! Yes, if I comment out the decorators and run this script, there are no errors. I was using the wait_for because of something I read in the documentation for Orion: Task dependencies - Prefect 2.0 on enforcing explicit task dependencies.
Your work on the the sqlalchemy integration looks very promising! Looking forward to using it.
For now, I will probably resort to using Prefect 1 and try to refactor it later.
I’d be curious to find out what’s the root cause here though - can you try it and see if you don’t get this error when you don’t pass DB connection between tasks and define it in the task that needs it? Could be there is some bug preventing from sharing DB connection between tasks
Basically moving the connection to get_data_from_db task
I checked your proposal to move the connection inside the data retrieval task, and this does indeed work!
If I move the sqlalchemy connection creation inside the task and connect through that, I am able to connect to the DB without problems. The task results are also what I would expect.
11:41:01.588 | INFO | prefect.agent - Submitting flow run 'c3a0e7b7-cc2d-4e52-a26f-f1c3656d58b0'
11:41:01.610 | INFO | prefect.flow_runner.subprocess - Opening subprocess for flow run 'c3a0e7b7-cc2d-4e52-a26f-f1c3656d58b0'...
11:41:01.615 | INFO | prefect.agent - Completed submission of flow run 'c3a0e7b7-cc2d-4e52-a26f-f1c3656d58b0'
11:41:02.971 | INFO | Flow run 'incredible-cuscus' - Using task runner 'SequentialTaskRunner'
11:41:03.029 | INFO | Flow run 'incredible-cuscus' - Created task run 'load config-c1cc6188-0' for task 'load config'
11:41:03.073 | INFO | Task run 'load config-c1cc6188-0' - Finished in state Completed()
11:41:03.094 | INFO | Flow run 'incredible-cuscus' - Created task run 'make connection string-ba31a0b9-0' for task 'make connection string'
11:41:03.130 | INFO | Task run 'make connection string-ba31a0b9-0' - Finished in state Completed()
11:41:03.148 | INFO | Flow run 'incredible-cuscus' - Created task run 'retrieve pull data-7d097610-0' for task 'retrieve pull data'
11:41:05.482 | INFO | Task run 'retrieve pull data-7d097610-0' - Finished in state Completed()
11:41:05.500 | INFO | Flow run 'incredible-cuscus' - Finished in state Completed('All states completed.')
11:41:05.751 | INFO | prefect.flow_runner.subprocess - Subprocess for flow run 'c3a0e7b7-cc2d-4e52-a26f-f1c3656d58b0' exited cleanly.
I am migrating from prefect 1 to prefect 2.0. I have a similar issue, I am trying to pass database sessions between different tasks. In prefect 1 I used the checkpoint=False option for these tasks. Is there a similar option in prefect 2? I looked in the source code but couldn’t find anything.
I actually tested that and was able to pass database connection between tasks without any issues - can you try that and report back if you see any issues with it?
Hi, thank you for your quick response, when I return a Session from sqlalchemy.orm.session in a task, I get:
File "/Users/bram/.pyenv/versions/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'weakref' object
or when returning an Engine from sqlalchemy.engine:
File "/Users/bram/.pyenv/versions/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object
When I return a Session from sqlalchemy.orm.session.
Or something completely different, return a PostgresContainer from testcontainers.postgres:
File "/Users/bram/.pyenv/versions/prefect/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
return Pickler.dump(self, obj)
File "/Users/bram/.pyenv/versions/3.9.7/lib/python3.9/socket.py", line 273, in __getstate__
raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
TypeError: cannot pickle 'socket' object
So it seems to error when the object is not serialisable, which makes sense, but I don’t need to pickle these objects and cache these. But I do would like to share these between tasks.
Do you have any suggestions how to do that? Or should I create a bug/request ticket somewhere?
Morning Anna, thanks for the suggestion, I tried using the both 2.0.0 and 2.0.1 on Python 3.9 and 3.10.
All result in the same error in this minimal example.
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from prefect import task, flow
@task
def init_db() -> Engine:
engine = create_engine("sqlite:///foo.db")
return engine
@flow(name="pipeline-1")
def pipeline_1():
init_db()
if __name__ == "__main__":
pipeline_1()
python pipeline.py
result:
File "/Users/bram/.pyenv/versions/prefect-310/lib/python3.10/site-packages/prefect/orion/schemas/data.py", line 42, in encode
blob = lookup_serializer(encoding).dumps(data, **kwargs)
File "/Users/bram/.pyenv/versions/prefect-310/lib/python3.10/site-packages/prefect/serializers.py", line 59, in dumps
data_bytes = cloudpickle.dumps(data)
File "/Users/bram/.pyenv/versions/prefect-310/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/Users/bram/.pyenv/versions/prefect-310/lib/python3.10/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
return Pickler.dump(self, obj)
TypeError: cannot pickle 'sqlalchemy.cprocessors.UnicodeResultProcessor' object
Is there a way to disable pickling of the returned objects?