I don’t think this is a Prefect bug, per se, but I am struggling to narrow down why this is happening and could use some guidance on how to troubleshoot this – and perhaps general suggestions for why I might be doing something wrong.
Our Setup
- We are using Prefect v2.11.4 (upgraded to latest to rule that out)
- For the server, we have this installed in a custom Docker image based on rocky linux 9.2 (x86_64) and have also tried the vanilla prefecthq/prefect:2.11.4-python3.11 server with same results.
- We have a dedicated postgresql container (a postgres 15 based on rocky linux) for Prefect’s persistence.
- We are using a single agent (based on same Docker image) – have not yet changed to using “workers”.
- We have setup concurrency limit for the tasks.
- We are using Docker Swarm for all services (with local volumes, as this is a single node swarm).
Issue Summary
When attempting to run tasks concurrently using task.submit()
pattern, the server get a TimeoutError attempting to create a task run in the database which then yields a 500 error to the agent and results in crashing the job. This doesn’t happen 100% of the time, but it happens most of the time.
- Originally we were using
DaskTaskRunner
. It seemed at first like just using theConcurrentTaskRunner
was a solution, but it only succeeded once and then started failing in much the same way.
Code Snippets
This flow is for processing network scan results. The files themselves are large JSONL files (compressed each file is around 7GB).
I don’t yet have a simple reproduce case, but the general flow & task invocation looks like this:
@task(tags=["process-file", "minio-read"])
def process_file(object_path: str, tmp_dir_name: Path, verbose: bool = False) -> int:
# Note: original design had read_and_split_file and process_batch tasks being called by the flow,
# but in this simplified version, we're just calling those as functions -- hence the .fn()
batch_files = read_and_split_file.fn(object_path=object_path, tmp_dir_name=tmp_dir_name, verbose=verbose)
banner_count = 0
logger.info(f"Created {len(batch_files)} for {object_path} source file.")
for batch_file in batch_files:
try:
banner_count += process_batch.fn(batch_file=batch_file)
except Exception as x:
logger.exception(f"Error processing batch {batch_file} of {object_path}")
continue
return banner_count
@flow(log_prints=True, task_runner=ConcurrentTaskRunner())
def banner_pipeline(file_glob: str = "*.json*", verbose: bool = False):
"""
Import a jsonl file (possibly compressed) from minio and ingest.
:param file_glob: The glob pattern to match files in minio.
:param verbose: Whether to emit verbose logs (could be significant).
"""
setup_environment()
object_paths = all_matching_object_paths(file_glob=file_glob, verbose=verbose)
banner_count = 0
# All tasks in a flow get executed by same agent, so we can rely on this tmpdir being available.
with tempfile.TemporaryDirectory() as tmp_dir_name:
if settings.CONCURRENT_PIPELINES:
logger.info(f"Processing input files concurrently.")
process_file_futures = list[PrefectFuture]()
for object_path in object_paths:
process_file_futures.append(
process_file.submit(object_path=object_path, tmp_dir_name=tmp_dir_name, verbose=verbose)
)
for fut in process_file_futures:
banner_count += fut.result()
else:
logger.info(f"Processing matched files sequentially.")
for object_path in object_paths:
try:
banner_count += process_file(object_path=object_path, tmp_dir_name=tmp_dir_name, verbose=verbose)
except Exception as x:
logger.exception(f"Error processing file: {object_path}")
continue
logger.info(f"Processed {banner_count} banners from {len(object_paths)} source files.")
return banner_count
Note that this works fine if settings.CONCURRENT_PIPELINES is False
(but is too slow to actually be a viable processing solution). For reference, each batch of 10k rows is taking right now ~20s to ingest.
Details and Stack Traces
On the server the task fails with this pattern:
Exception in ASGI application
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py", line 408, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
return await self.app(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 289, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 83, in __call__
await self.app(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
raise e
File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
await self.app(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 718, in __call__
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 443, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 289, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/gzip.py", line 24, in __call__
await responder(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/gzip.py", line 44, in __call__
await self.app(scope, receive, self.send_with_gzip)
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
raise e
File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
await self.app(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 718, in __call__
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 276, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 69, in app
await response(scope, receive, send)
File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 174, in __call__
await self.background()
File "/usr/local/lib/python3.11/site-packages/starlette/background.py", line 43, in __call__
await task()
File "/usr/local/lib/python3.11/site-packages/starlette/background.py", line 26, in __call__
await self.func(*self.args, **self.kwargs)
File "/usr/local/lib/python3.11/site-packages/prefect/server/api/work_queues.py", line 165, in _record_work_queue_polls
await models.work_queues.update_work_queue(
File "/usr/local/lib/python3.11/site-packages/prefect/server/database/dependencies.py", line 119, in async_wrapper
return await fn(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/prefect/server/models/work_queues.py", line 217, in update_work_queue
result = await session.execute(update_stmt)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 454, in execute
result = await greenlet_spawn(
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 190, in greenlet_spawn
result = context.throw(*sys.exc_info())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2262, in execute
return self._execute_internal(
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2144, in _execute_internal
result: Result[Any] = compile_state_cls.orm_execute_statement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/bulk_persistence.py", line 1620, in orm_execute_statement
return super().orm_execute_statement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement
result = conn.execute(
^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1412, in execute
return meth(
^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1635, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1844, in _execute_context
return self._exec_single_context(
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1984, in _exec_single_context
self._handle_dbapi_exception(
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2342, in _handle_dbapi_exception
raise exc_info[1].with_traceback(exc_info[2])
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1965, in _exec_single_context
self.dialect.do_execute(
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 921, in do_execute
cursor.execute(statement, parameters)
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 585, in execute
self._adapt_connection.await_(
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 125, in await_only
return current.driver.switch(awaitable) # type: ignore[no-any-return]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 185, in greenlet_spawn
value = await result
^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 564, in _prepare_and_execute
self._handle_exception(error)
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 515, in _handle_exception
self._adapt_connection._handle_exception(error)
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 804, in _handle_exception
raise error
File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 552, in _prepare_and_execute
self._rows = await prepared_stmt.fetch(*parameters)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch
data = await self.__bind_execute(args, 0, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute
data, status, _ = await self.__do_execute(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute
return await executor(protocol)
^^^^^^^^^^^^^^^^^^^^^^^^
File "asyncpg/protocol/protocol.pyx", line 201, in bind_execute
TimeoutError
Sometimes the server stack trace also points to a method related to checking / updating concurrency limit.
On the agent this looks like:
Crash details:
Traceback (most recent call last):
File "/opt/app-root/lib64/python3.11/site-packages/prefect/engine.py", line 1839, in report_flow_run_crashes
yield
File "/usr/lib64/python3.11/contextlib.py", line 716, in __aexit__
cb_suppress = await cb(*exc_details)
^^^^^^^^^^^^^^^^^^^^^^
File "/opt/app-root/lib64/python3.11/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
raise exceptions[0]
File "/opt/app-root/lib64/python3.11/site-packages/prefect/engine.py", line 1303, in create_task_run_then_submit
task_run = await create_task_run(
^^^^^^^^^^^^^^^^^^^^^^
File "/opt/app-root/lib64/python3.11/site-packages/prefect/engine.py", line 1348, in create_task_run
task_run = await flow_run_context.client.create_task_run(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/app-root/lib64/python3.11/site-packages/prefect/client/orchestration.py", line 1872, in create_task_run
response = await self._client.post(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/app-root/lib64/python3.11/site-packages/httpx/_client.py", line 1848, in post
return await self.request(
^^^^^^^^^^^^^^^^^^^
File "/opt/app-root/lib64/python3.11/site-packages/httpx/_client.py", line 1530, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/app-root/lib64/python3.11/site-packages/prefect/client/base.py", line 280, in send
response.raise_for_status()
File "/opt/app-root/lib64/python3.11/site-packages/prefect/client/base.py", line 138, in raise_for_status
raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url 'http://prefect-server:4200/api/task_runs/'
Response: {'exception_message': 'Internal Server Error'}
For more information check: https://httpstatuses.com/500
Any pointers for how to troubleshoot or mitigate these server timeouts would be very appreciated!