Server TimeoutError in db stmt execute creating task run when flow submits task for concurrent execution

I don’t think this is a Prefect bug, per se, but I am struggling to narrow down why this is happening and could use some guidance on how to troubleshoot this – and perhaps general suggestions for why I might be doing something wrong.

Our Setup

  • We are using Prefect v2.11.4 (upgraded to latest to rule that out)
  • For the server, we have this installed in a custom Docker image based on rocky linux 9.2 (x86_64) and have also tried the vanilla prefecthq/prefect:2.11.4-python3.11 server with same results.
  • We have a dedicated postgresql container (a postgres 15 based on rocky linux) for Prefect’s persistence.
  • We are using a single agent (based on same Docker image) – have not yet changed to using “workers”.
  • We have setup concurrency limit for the tasks.
  • We are using Docker Swarm for all services (with local volumes, as this is a single node swarm).

Issue Summary

When attempting to run tasks concurrently using task.submit() pattern, the server get a TimeoutError attempting to create a task run in the database which then yields a 500 error to the agent and results in crashing the job. This doesn’t happen 100% of the time, but it happens most of the time.

  • Originally we were using DaskTaskRunner. It seemed at first like just using the ConcurrentTaskRunner was a solution, but it only succeeded once and then started failing in much the same way.

Code Snippets

This flow is for processing network scan results. The files themselves are large JSONL files (compressed each file is around 7GB).

I don’t yet have a simple reproduce case, but the general flow & task invocation looks like this:


@task(tags=["process-file", "minio-read"])
def process_file(object_path: str, tmp_dir_name: Path, verbose: bool = False) -> int:
   # Note: original design had read_and_split_file and process_batch tasks being called by the flow, 
   # but in this simplified version, we're just calling those as functions -- hence the .fn()
    batch_files = read_and_split_file.fn(object_path=object_path, tmp_dir_name=tmp_dir_name, verbose=verbose)
    banner_count = 0
    logger.info(f"Created {len(batch_files)} for {object_path} source file.")
    for batch_file in batch_files:
        try:
            banner_count += process_batch.fn(batch_file=batch_file)
        except Exception as x:
            logger.exception(f"Error processing batch {batch_file} of {object_path}")
            continue

    return banner_count

@flow(log_prints=True, task_runner=ConcurrentTaskRunner())
def banner_pipeline(file_glob: str = "*.json*", verbose: bool = False):
    """
    Import a jsonl file (possibly compressed) from minio and ingest.

    :param file_glob: The glob pattern to match files in minio.
    :param verbose: Whether to emit verbose logs (could be significant).
    """
    setup_environment()
    object_paths = all_matching_object_paths(file_glob=file_glob, verbose=verbose)

    banner_count = 0

    # All tasks in a flow get executed by same agent, so we can rely on this tmpdir being available.
    with tempfile.TemporaryDirectory() as tmp_dir_name:
        if settings.CONCURRENT_PIPELINES:
            logger.info(f"Processing input files concurrently.")

            process_file_futures = list[PrefectFuture]()

            for object_path in object_paths:
                process_file_futures.append(
                    process_file.submit(object_path=object_path, tmp_dir_name=tmp_dir_name, verbose=verbose)
                )

            for fut in process_file_futures:
                banner_count += fut.result()

        else:
            logger.info(f"Processing matched files sequentially.")
            for object_path in object_paths:
                try:
                    banner_count += process_file(object_path=object_path, tmp_dir_name=tmp_dir_name, verbose=verbose)
                except Exception as x:
                    logger.exception(f"Error processing file: {object_path}")
                    continue

        logger.info(f"Processed {banner_count} banners from {len(object_paths)} source files.")

    return banner_count

Note that this works fine if settings.CONCURRENT_PIPELINES is False (but is too slow to actually be a viable processing solution). For reference, each batch of 10k rows is taking right now ~20s to ingest.

Details and Stack Traces

On the server the task fails with this pattern:

Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/uvicorn/protocols/http/h11_impl.py", line 408, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 84, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 289, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/cors.py", line 83, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
    raise e
  File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 443, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/fastapi/applications.py", line 289, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/applications.py", line 122, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/gzip.py", line 24, in __call__
    await responder(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/gzip.py", line 44, in __call__
    await self.app(scope, receive, self.send_with_gzip)
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/usr/local/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 20, in __call__
    raise e
  File "/usr/local/lib/python3.11/site-packages/fastapi/middleware/asyncexitstack.py", line 17, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/routing.py", line 69, in app
    await response(scope, receive, send)
  File "/usr/local/lib/python3.11/site-packages/starlette/responses.py", line 174, in __call__
    await self.background()
  File "/usr/local/lib/python3.11/site-packages/starlette/background.py", line 43, in __call__
    await task()
  File "/usr/local/lib/python3.11/site-packages/starlette/background.py", line 26, in __call__
    await self.func(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.11/site-packages/prefect/server/api/work_queues.py", line 165, in _record_work_queue_polls
    await models.work_queues.update_work_queue(
  File "/usr/local/lib/python3.11/site-packages/prefect/server/database/dependencies.py", line 119, in async_wrapper
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/prefect/server/models/work_queues.py", line 217, in update_work_queue
    result = await session.execute(update_stmt)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/ext/asyncio/session.py", line 454, in execute
    result = await greenlet_spawn(
             ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 190, in greenlet_spawn
    result = context.throw(*sys.exc_info())
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2262, in execute
    return self._execute_internal(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 2144, in _execute_internal
    result: Result[Any] = compile_state_cls.orm_execute_statement(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/bulk_persistence.py", line 1620, in orm_execute_statement
    return super().orm_execute_statement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/orm/context.py", line 293, in orm_execute_statement
    result = conn.execute(
             ^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1412, in execute
    return meth(
           ^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 515, in _execute_on_connection
    return connection._execute_clauseelement(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1635, in _execute_clauseelement
    ret = self._execute_context(
          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1844, in _execute_context
    return self._exec_single_context(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1984, in _exec_single_context
    self._handle_dbapi_exception(
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2342, in _handle_dbapi_exception
    raise exc_info[1].with_traceback(exc_info[2])
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1965, in _exec_single_context
    self.dialect.do_execute(
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 921, in do_execute
    cursor.execute(statement, parameters)
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 585, in execute
    self._adapt_connection.await_(
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 125, in await_only
    return current.driver.switch(awaitable)  # type: ignore[no-any-return]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 185, in greenlet_spawn
    value = await result
            ^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 564, in _prepare_and_execute
    self._handle_exception(error)
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 515, in _handle_exception
    self._adapt_connection._handle_exception(error)
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 804, in _handle_exception
    raise error
  File "/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 552, in _prepare_and_execute
    self._rows = await prepared_stmt.fetch(*parameters)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/asyncpg/prepared_stmt.py", line 176, in fetch
    data = await self.__bind_execute(args, 0, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/asyncpg/prepared_stmt.py", line 241, in __bind_execute
    data, status, _ = await self.__do_execute(
                      ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/asyncpg/prepared_stmt.py", line 230, in __do_execute
    return await executor(protocol)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "asyncpg/protocol/protocol.pyx", line 201, in bind_execute
TimeoutError

Sometimes the server stack trace also points to a method related to checking / updating concurrency limit.

On the agent this looks like:

Crash details:
Traceback (most recent call last):
  File "/opt/app-root/lib64/python3.11/site-packages/prefect/engine.py", line 1839, in report_flow_run_crashes
    yield
  File "/usr/lib64/python3.11/contextlib.py", line 716, in __aexit__
    cb_suppress = await cb(*exc_details)
                  ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/app-root/lib64/python3.11/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/opt/app-root/lib64/python3.11/site-packages/prefect/engine.py", line 1303, in create_task_run_then_submit
    task_run = await create_task_run(
               ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/app-root/lib64/python3.11/site-packages/prefect/engine.py", line 1348, in create_task_run
    task_run = await flow_run_context.client.create_task_run(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/app-root/lib64/python3.11/site-packages/prefect/client/orchestration.py", line 1872, in create_task_run
    response = await self._client.post(
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/app-root/lib64/python3.11/site-packages/httpx/_client.py", line 1848, in post
    return await self.request(
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/app-root/lib64/python3.11/site-packages/httpx/_client.py", line 1530, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/app-root/lib64/python3.11/site-packages/prefect/client/base.py", line 280, in send
    response.raise_for_status()
  File "/opt/app-root/lib64/python3.11/site-packages/prefect/client/base.py", line 138, in raise_for_status
    raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
prefect.exceptions.PrefectHTTPStatusError: Server error '500 Internal Server Error' for url 'http://prefect-server:4200/api/task_runs/'
Response: {'exception_message': 'Internal Server Error'}
For more information check: https://httpstatuses.com/500

Any pointers for how to troubleshoot or mitigate these server timeouts would be very appreciated!

Interestingly, when digging in on the postgres side, I see a number of locks that are getting stuck open (or at least have long durations):

Query Lock Mode Lock Type State State Change Query Duration
UPDATE work_queue SET last_polled=$1::TIMESTAMP WITH TIME ZONE, updated=CURRENT_TIMESTAMP WHERE work_queue.id = $2::UUID RowExclusiveLock relation active 2023-08-22 18:22:38.288 -0400 00:02:38.810187
UPDATE work_queue SET last_polled=$1::TIMESTAMP WITH TIME ZONE, updated=CURRENT_TIMESTAMP WHERE work_queue.id = $2::UUID RowExclusiveLock relation active 2023-08-22 18:22:38.288 -0400 00:02:38.810187
UPDATE work_queue SET last_polled=$1::TIMESTAMP WITH TIME ZONE, updated=CURRENT_TIMESTAMP WHERE work_queue.id = $2::UUID ExclusiveLock transactionid active 2023-08-22 18:22:38.288 -0400 00:02:38.810187

(And there are a number more.)

It may be worth mentioning that the start of the flows on the agent are CPU-intensive gunzip operations that last for several minutes. Perhaps because these do not “yield”, they are effectively blocking server-agent comms (?) – That would seem odd, though, as these are not async flows/tasks and I would expect that Prefect already knows how to run these in the background.