Hi all! First up - a huge thanks for this API. I am trying it out on some problems in the office as a demonstration, and I can see some real use for it.
One of these problems is a long-ish pipeline that leverages:
- A postgres running remotely in a singularity container
- A SLURM cluster (
dask_jobqeue.SLURMCluster
) with aDaskTaskExecutor
being used, with 10 separate compute nodes being pulled into the distributed dask schedular. - A single large Flow with ~7 tasks
In this example Flow, each task is importing a set of external python scripts and executing their main
with appropriate values. I can verify that these values are correct and do not raise any unexpected run time errors, and all file outputs are correctly formed. One of these tasks is also, essentially, a map
that is being applied over 40 inputs. This map is not using the task.map
interface - rather a simple list comprehension to generate a corresponding set of PrefectFutures
, which I then iterate and wait()
over.
So far I have been making incremental progress towards a working solution. The remaining issue is now a TimeoutError
raised as the flow is closing. I have tried setting PREFECT_ORION_DATABASE_CONNECTION_TIMEOUT=20
, but I still keep getting these TimeoutErrors
as thr flow is wrapping up. This timout variable has actually already helped resolve similiar TimeoutErrors
that were being inconsistently raised through the running of the Flow. But now I just can seem to get around it.
My feeling on the issue is that the network latancy between the slurm compute nodes, and the remote postgres server is likely the ultimate root cause, and this PREFECT_ORION_DATA_CONNECTION_TIMEOUT
variable might not being used when the flow is closing. My thinking is also prompted that my last logged message directly at the end of the flow and this raised TimeoutError
raised from fastapi
/ ascynpg
is consistently 5 seconds later. I believe that this 5 second timeout is a default set somewhere – I am just not sure where.
Below are the versions reported by prefect version
prefect version
Version: 2.6.4
API version: 0.8.2
Python version: 3.9.13
Git commit: 51e92dda
Built: Thu, Oct 20, 2022 3:11 PM
OS/Arch: linux/x86_64
Profile: default
Server type: ephemeral
Server:
Database: postgresql
21:51:08.251 | ERROR | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
raise exc
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
await self.app(scope, receive, sender)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 680, in __call__
await route.handle(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 275, in handle
await self.app(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 65, in app
response = await func(request)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
response = await default_handler(request)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/routing.py", line 231, in app
raw_response = await run_endpoint_function(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
return await dependant.call(**values)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/api/flow_runs.py", line 245, in set_flow_run_state
orchestration_result = await models.flow_runs.set_flow_run_state(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/models/flow_runs.py", line 403, in set_flow_run_state
run = await models.flow_runs.read_flow_run(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
return await fn(*args, **kwargs)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/models/flow_runs.py", line 149, in read_flow_run
return await session.get(db.FlowRun, flow_run_id)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/session.py", line 298, in get
return await greenlet_spawn(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2848, in get
return self._get_impl(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2955, in _get_impl
return db_load_fn(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
session.execute(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1714, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 333, in _execute_on_connection
return connection._execute_clauseelement(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
ret = self._execute_context(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
self._handle_dbapi_exception(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2128, in _handle_dbapi_exception
util.raise_(exc_info[1], with_traceback=exc_info[2])
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
raise exception
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
self._adapt_connection.await_(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 408, in _prepare_and_execute
await adapt_connection._start_transaction()
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 716, in _start_transaction
self._handle_exception(error)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 684, in _handle_exception
raise error
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 714, in _start_transaction
await self._transaction.start()
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/transaction.py", line 138, in start
await self._connection.execute(query)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/connection.py", line 318, in execute
return await self._protocol.query(query, timeout)
File "asyncpg/protocol/protocol.pyx", line 338, in query
asyncio.exceptions.TimeoutError
/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/dask_jobqueue/slurm.py:49: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
warnings.warn(warn, FutureWarning)
21:51:08.899 | ERROR | Flow run 'purring-mustang' - Crash detected! Execution was interrupted by an unexpected exception: asyncio.exceptions.TimeoutError
Traceback (most recent call last):
File "/group/askap/miniconda3/envs/acesprefect2/bin/process_holography.py", line 413, in <module>
sys.exit(cli())
File "/group/askap/miniconda3/envs/acesprefect2/bin/process_holography.py", line 405, in cli
main(
File "/group/askap/miniconda3/envs/acesprefect2/bin/process_holography.py", line 346, in main
result = holography_flow()
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/flows.py", line 439, in __call__
return enter_flow_run_engine_from_flow_call(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 150, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 222, in create_then_begin_flow_run
state = await begin_flow_run(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 353, in begin_flow_run
terminal_state = await orchestrate_flow_run(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 635, in orchestrate_flow_run
state = await propose_state(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 1467, in propose_state
response = await client.set_flow_run_state(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/client/orion.py", line 1489, in set_flow_run_state
response = await self._client.post(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_client.py", line 1842, in post
return await self.request(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/client/base.py", line 159, in send
await super().send(*args, **kwargs)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_client.py", line 1614, in send
response = await self._send_handling_auth(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
response = await self._send_handling_redirects(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
response = await self._send_single_request(request)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_client.py", line 1716, in _send_single_request
response = await transport.handle_async_request(request)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_transports/asgi.py", line 152, in handle_async_request
await self.app(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/applications.py", line 270, in __call__
await super().__call__(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/applications.py", line 124, in __call__
await self.middleware_stack(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/cors.py", line 84, in __call__
await self.app(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
raise exc
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
await self.app(scope, receive, sender)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 680, in __call__
await route.handle(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 427, in handle
await self.app(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/applications.py", line 270, in __call__
await super().__call__(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/applications.py", line 124, in __call__
await self.middleware_stack(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
raise exc
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
await self.app(scope, receive, sender)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 680, in __call__
await route.handle(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 275, in handle
await self.app(scope, receive, send)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 65, in app
response = await func(request)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
response = await default_handler(request)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/routing.py", line 231, in app
raw_response = await run_endpoint_function(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
return await dependant.call(**values)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/api/flow_runs.py", line 245, in set_flow_run_state
orchestration_result = await models.flow_runs.set_flow_run_state(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/models/flow_runs.py", line 403, in set_flow_run_state
run = await models.flow_runs.read_flow_run(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
return await fn(*args, **kwargs)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/models/flow_runs.py", line 149, in read_flow_run
return await session.get(db.FlowRun, flow_run_id)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/session.py", line 298, in get
return await greenlet_spawn(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2848, in get
return self._get_impl(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2955, in _get_impl
return db_load_fn(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
session.execute(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1714, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 333, in _execute_on_connection
return connection._execute_clauseelement(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
ret = self._execute_context(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
self._handle_dbapi_exception(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2128, in _handle_dbapi_exception
util.raise_(exc_info[1], with_traceback=exc_info[2])
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
raise exception
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
self._adapt_connection.await_(
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 408, in _prepare_and_execute
await adapt_connection._start_transaction()
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 716, in _start_transaction
self._handle_exception(error)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 684, in _handle_exception
raise error
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 714, in _start_transaction
await self._transaction.start()
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/transaction.py", line 138, in start
await self._connection.execute(query)
File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/connection.py", line 318, in execute
return await self._protocol.query(query, timeout)
File "asyncpg/protocol/protocol.pyx", line 338, in query
asyncio.exceptions.TimeoutError