I have a simple but largish flow whose tasks I want to run in parallel, executed by the prefect_dask.DaskTaskRunner
. When there are only a few tasks, .map
works well, but as the number of mapped tasks increases (e.g., to 500 - 1000), the runs fail during what seems like some interaction with a locked database. I could do something like run the tasks in a loop with delays between calls to .submit
, but is there a better option (e.g., to wait for the database to release the lock during .map
)?
import time
from prefect import flow, task
from prefect_dask import DaskTaskRunner
@task
def return_x(x: int) -> int:
return x
@flow(task_runner=DaskTaskRunner)
def loop_flow(n: int, delay: float):
for i in range(n):
time.sleep(delay)
return_x.submit(i)
@flow(task_runner=DaskTaskRunner)
def map_flow(n: int):
return_x.map(range(n))
if __name__ == "__main__":
n = 1000
# This is fine (if a bit slow to start)
loop_flow(n, delay=0.1)
# starts, then fails before too long (usually)
loop_flow(n, delay=0)
# fails almost immediately
map_flow(n)
Snippet from mapped run
[...]
--- Orion logging error ---
Traceback (most recent call last):
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
self._adapt_connection._handle_exception(error)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
raise error
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
self.await_(_cursor.execute(operation, parameters))
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/cursor.py", line 37, in execute
await self._execute(self._cursor.execute, sql, parameters)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/cursor.py", line 31, in _execute
return await self._conn._execute(fn, *args, **kwargs)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/core.py", line 129, in _execute
return await future
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/core.py", line 102, in run
result = function()
sqlite3.OperationalError: database is locked
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/logging/handlers.py", line 146, in send_logs
await client.create_logs(self._pending_logs)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/client/orion.py", line 1691, in create_logs
await self._client.post(f"/logs/", json=serialized_logs)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_client.py", line 1842, in post
return await self.request(
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/client/base.py", line 159, in send
await super().send(*args, **kwargs)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_client.py", line 1614, in send
response = await self._send_handling_auth(
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
response = await self._send_handling_redirects(
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
response = await self._send_single_request(request)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_client.py", line 1716, in _send_single_request
response = await transport.handle_async_request(request)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_transports/asgi.py", line 152, in handle_async_request
await self.app(scope, receive, send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/applications.py", line 270, in __call__
await super().__call__(scope, receive, send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/applications.py", line 124, in __call__
await self.middleware_stack(scope, receive, send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/cors.py", line 84, in __call__
await self.app(scope, receive, send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
raise exc
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
await self.app(scope, receive, sender)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/routing.py", line 680, in __call__
await route.handle(scope, receive, send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/routing.py", line 427, in handle
await self.app(scope, receive, send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/applications.py", line 270, in __call__
await super().__call__(scope, receive, send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/applications.py", line 124, in __call__
await self.middleware_stack(scope, receive, send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
raise exc
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
await self.app(scope, receive, sender)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/routing.py", line 680, in __call__
await route.handle(scope, receive, send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/routing.py", line 275, in handle
await self.app(scope, receive, send)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/routing.py", line 65, in app
response = await func(request)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
response = await default_handler(request)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/routing.py", line 231, in app
raw_response = await run_endpoint_function(
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
return await dependant.call(**values)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/orion/api/logs.py", line 27, in create_logs
await models.logs.create_logs(session=session, logs=batch)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
return await fn(*args, **kwargs)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/orion/models/logs.py", line 45, in create_logs
await session.execute(log_insert.values([log.dict() for log in logs]))
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 214, in execute
result = await greenlet_spawn(
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
result = context.throw(*sys.exc_info())
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1714, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 333, in _execute_on_connection
return connection._execute_clauseelement(
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
ret = self._execute_context(
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
self._handle_dbapi_exception(
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
util.raise_(
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
raise exception
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
self._adapt_connection._handle_exception(error)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
raise error
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
self.await_(_cursor.execute(operation, parameters))
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
return current.driver.switch(awaitable)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
value = await result
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/cursor.py", line 37, in execute
await self._execute(self._cursor.execute, sql, parameters)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/cursor.py", line 31, in _execute
return await self._conn._execute(fn, *args, **kwargs)
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/core.py", line 129, in _execute
return await future
File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/core.py", line 102, in run
result = function()
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: INSERT INTO log (id, created, updated, name, level, flow_run_id, task_run_id, message, timestamp) VALUES (:id, :created, :updated, :name_m0, :level_m0, :flow_run_id_m0, :task_run_id_m0, :message_m0, :timestamp_m0)]
[parameters: {'id': '6ec70409-1641-4f7d-a8a5-9d97c4c351e9', 'created': '2022-10-26 12:48:02.915298', 'updated': '2022-10-26 12:48:02.915333', 'name_m0': 'prefect.flow_runs', 'level_m0': 40, 'flow_run_id_m0': '323b9eee-0808-4b9e-8db8-33029a6c9119', 'task_run_id_m0': None, 'message_m0': 'Crash detected! Execution was interrupted by an unexpected exception: anyio._backends._asyncio.ExceptionGroup: 22 exceptions were raised in the task ... (296157 characters truncated) ... 3.OperationalError) unable to open database file\n[SQL: PRAGMA journal_mode = WAL;]\n(Background on this error at: https://sqlalche.me/e/14/e3q8)\n\n', 'timestamp_m0': '2022-10-26 12:47:02.367979'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
Worker information:
Approximate queue length: 0
Pending log batch length: 1
Pending log batch size: 360
The log worker is stopping and these logs will not be sent.
Environment
name: map
channels:
- conda-forge
dependencies:
- _libgcc_mutex=0.1=conda_forge
- _openmp_mutex=4.5=2_gnu
- aiofiles=22.1.0=pyhd8ed1ab_0
- aiohttp=3.8.3=py310h5764c6d_0
- aiosignal=1.2.0=pyhd8ed1ab_0
- aiosqlite=0.17.0=pyhd8ed1ab_0
- alembic=1.8.1=pyhd8ed1ab_0
- anyio=3.6.2=pyhd8ed1ab_0
- apprise=1.1.0=pyhd8ed1ab_0
- asgi-lifespan=1.0.1=pyhd8ed1ab_5
- async-exit-stack=1.0.1=pyhd8ed1ab_0
- async-timeout=4.0.2=pyhd8ed1ab_0
- asyncpg=0.26.0=py310h5764c6d_0
- attrs=22.1.0=pyh71513ae_1
- bcrypt=3.2.2=py310h5764c6d_0
- blinker=1.5=pyhd8ed1ab_0
- brotlipy=0.7.0=py310h5764c6d_1004
- bzip2=1.0.8=h7f98852_4
- ca-certificates=2022.9.24=ha878542_0
- cached-property=1.5.2=hd8ed1ab_1
- cached_property=1.5.2=pyha770c72_1
- cachetools=5.2.0=pyhd8ed1ab_0
- certifi=2022.9.24=pyhd8ed1ab_0
- cffi=1.15.1=py310h255011f_1
- charset-normalizer=2.1.1=pyhd8ed1ab_0
- click=8.1.3=py310hff52083_0
- cloudpickle=2.2.0=pyhd8ed1ab_0
- colorama=0.4.6=pyhd8ed1ab_0
- commonmark=0.9.1=py_0
- coolname=2.0.0=pyhd8ed1ab_0
- croniter=1.3.7=pyhd8ed1ab_0
- cryptography=38.0.2=py310h600f1e7_1
- cytoolz=0.12.0=py310h5764c6d_0
- dask-core=2022.10.0=pyhd8ed1ab_2
- dataclasses=0.8=pyhc8e2a94_3
- distributed=2022.10.0=pyhd8ed1ab_2
- docker-py=6.0.0=pyhd8ed1ab_0
- fastapi=0.85.1=pyhd8ed1ab_0
- frozenlist=1.3.1=py310h5764c6d_0
- fsspec=2022.10.0=pyhd8ed1ab_0
- future=0.18.2=py310hff52083_5
- google-auth=2.13.0=pyh1a96a4e_0
- greenlet=1.1.3=py310hd8f1fbe_0
- griffe=0.21.0=pyhd8ed1ab_1
- h11=0.12.0=pyhd8ed1ab_0
- h2=4.1.0=pyhd8ed1ab_0
- heapdict=1.0.1=py_0
- hpack=4.0.0=pyh9f0ad1d_0
- httpcore=0.15.0=pyhd8ed1ab_0
- httpx=0.23.0=py310hff52083_1
- hyperframe=6.0.1=pyhd8ed1ab_0
- idna=3.4=pyhd8ed1ab_0
- importlib-metadata=4.11.4=py310hff52083_0
- importlib_metadata=4.11.4=hd8ed1ab_0
- importlib_resources=5.10.0=pyhd8ed1ab_0
- jinja2=3.1.2=pyhd8ed1ab_1
- jsonpatch=1.32=pyhd8ed1ab_0
- jsonpointer=2.0=py_0
- ld_impl_linux-64=2.39=hc81fddc_0
- libffi=3.4.2=h7f98852_5
- libgcc-ng=12.2.0=h65d4601_19
- libgomp=12.2.0=h65d4601_19
- libnsl=2.0.0=h7f98852_0
- libsodium=1.0.18=h36c2ea0_1
- libsqlite=3.39.4=h753d276_0
- libstdcxx-ng=12.2.0=h46fd767_19
- libuuid=2.32.1=h7f98852_1000
- libzlib=1.2.13=h166bdaf_4
- locket=1.0.0=pyhd8ed1ab_0
- mako=1.2.3=pyhd8ed1ab_0
- markdown=3.4.1=pyhd8ed1ab_0
- markupsafe=2.1.1=py310h5764c6d_1
- msgpack-python=1.0.4=py310hbf28c38_0
- multidict=6.0.2=py310h5764c6d_1
- ncurses=6.3=h27087fc_1
- oauthlib=3.2.2=pyhd8ed1ab_0
- openssl=3.0.5=h166bdaf_2
- orjson=3.8.1=py310h0ed1b42_1
- packaging=21.3=pyhd8ed1ab_0
- paramiko=2.11.0=pyhd8ed1ab_0
- partd=1.3.0=pyhd8ed1ab_0
- pathspec=0.10.1=pyhd8ed1ab_0
- pendulum=2.1.2=py310h5764c6d_4
- pip=22.3=pyhd8ed1ab_0
- prefect=2.6.4=pyhd8ed1ab_0
- prefect-dask=0.2.1=pyhd8ed1ab_0
- psutil=5.9.3=py310h5764c6d_0
- pyasn1=0.4.8=py_0
- pyasn1-modules=0.2.7=py_0
- pycparser=2.21=pyhd8ed1ab_0
- pydantic=1.10.2=py310h5764c6d_0
- pygments=2.13.0=pyhd8ed1ab_0
- pyjwt=2.6.0=pyhd8ed1ab_0
- pynacl=1.5.0=py310h5764c6d_1
- pyopenssl=22.1.0=pyhd8ed1ab_0
- pyparsing=3.0.9=pyhd8ed1ab_0
- pysocks=1.7.1=pyha2e5f31_6
- python=3.10.6=ha86cf86_0_cpython
- python-dateutil=2.8.2=pyhd8ed1ab_0
- python-kubernetes=24.2.0=pyhd8ed1ab_0
- python-slugify=6.1.2=pyhd8ed1ab_0
- python_abi=3.10=2_cp310
- pytz=2022.5=pyhd8ed1ab_0
- pytzdata=2020.1=pyh9f0ad1d_0
- pyu2f=0.1.5=pyhd8ed1ab_0
- pywin32-on-windows=0.1.0=pyh1179c8e_3
- pyyaml=6.0=py310h5764c6d_4
- readchar=4.0.3=pyhd8ed1ab_0
- readline=8.1.2=h0f457ee_0
- requests=2.28.1=pyhd8ed1ab_1
- requests-oauthlib=1.3.1=pyhd8ed1ab_0
- rfc3986=1.5.0=pyhd8ed1ab_0
- rich=12.6.0=pyhd8ed1ab_0
- rsa=4.9=pyhd8ed1ab_0
- setuptools=65.5.0=pyhd8ed1ab_0
- shellingham=1.5.0=pyhd8ed1ab_0
- six=1.16.0=pyh6c4a22f_0
- sniffio=1.3.0=pyhd8ed1ab_0
- sortedcontainers=2.4.0=pyhd8ed1ab_0
- sqlalchemy=1.4.42=py310h5764c6d_0
- starlette=0.20.4=pyhd8ed1ab_1
- tblib=1.7.0=pyhd8ed1ab_0
- text-unidecode=1.3=py_0
- tk=8.6.12=h27826a3_0
- toml=0.10.2=pyhd8ed1ab_0
- toolz=0.12.0=pyhd8ed1ab_0
- tornado=6.1=py310h5764c6d_3
- typer=0.6.1=pyhd8ed1ab_0
- typing-extensions=4.4.0=hd8ed1ab_0
- typing_extensions=4.4.0=pyha770c72_0
- tzdata=2022e=h191b570_0
- unidecode=1.3.6=pyhd8ed1ab_0
- urllib3=1.26.11=pyhd8ed1ab_0
- uvicorn=0.19.0=py310hff52083_0
- websocket-client=1.4.1=pyhd8ed1ab_0
- wheel=0.37.1=pyhd8ed1ab_0
- xz=5.2.6=h166bdaf_0
- yaml=0.2.5=h7f98852_2
- yarl=1.7.2=py310h5764c6d_2
- zict=2.2.0=pyhd8ed1ab_0
- zipp=3.10.0=pyhd8ed1ab_0