Avoiding locked database with larger flows -- recommendations for something better than a delay?

I have a simple but largish flow whose tasks I want to run in parallel, executed by the prefect_dask.DaskTaskRunner. When there are only a few tasks, .map works well, but as the number of mapped tasks increases (e.g., to 500 - 1000), the runs fail during what seems like some interaction with a locked database. I could do something like run the tasks in a loop with delays between calls to .submit, but is there a better option (e.g., to wait for the database to release the lock during .map)?

import time
from prefect import flow, task
from prefect_dask import DaskTaskRunner

@task
def return_x(x: int) -> int:
    return x

@flow(task_runner=DaskTaskRunner)
def loop_flow(n: int, delay: float):
    for i in range(n):
        time.sleep(delay)
        return_x.submit(i)

@flow(task_runner=DaskTaskRunner)
def map_flow(n: int):
    return_x.map(range(n))


if __name__ == "__main__":
    n = 1000    
    # This is fine (if a bit slow to start)
    loop_flow(n, delay=0.1)
    # starts, then fails before too long (usually)
    loop_flow(n, delay=0)
    # fails almost immediately
    map_flow(n)
Snippet from mapped run
[...]
--- Orion logging error ---
Traceback (most recent call last):
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
    self._adapt_connection._handle_exception(error)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
    raise error
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/cursor.py", line 37, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/cursor.py", line 31, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/core.py", line 129, in _execute
    return await future
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/core.py", line 102, in run
    result = function()
sqlite3.OperationalError: database is locked

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/logging/handlers.py", line 146, in send_logs
    await client.create_logs(self._pending_logs)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/client/orion.py", line 1691, in create_logs
    await self._client.post(f"/logs/", json=serialized_logs)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_client.py", line 1842, in post
    return await self.request(
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_client.py", line 1527, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/client/base.py", line 159, in send
    await super().send(*args, **kwargs)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_client.py", line 1614, in send
    response = await self._send_handling_auth(
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_client.py", line 1716, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/httpx/_transports/asgi.py", line 152, in handle_async_request
    await self.app(scope, receive, send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/cors.py", line 84, in __call__
    await self.app(scope, receive, send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await self.app(scope, receive, sender)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/routing.py", line 427, in handle
    await self.app(scope, receive, send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await self.app(scope, receive, sender)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/routing.py", line 275, in handle
    await self.app(scope, receive, send)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/starlette/routing.py", line 65, in app
    response = await func(request)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
    response = await default_handler(request)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/routing.py", line 231, in app
    raw_response = await run_endpoint_function(
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
    return await dependant.call(**values)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/orion/api/logs.py", line 27, in create_logs
    await models.logs.create_logs(session=session, logs=batch)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/prefect/orion/models/logs.py", line 45, in create_logs
    await session.execute(log_insert.values([log.dict() for log in logs]))
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/ext/asyncio/session.py", line 214, in execute
    result = await greenlet_spawn(
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1714, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 333, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
    self._handle_dbapi_exception(
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2124, in _handle_dbapi_exception
    util.raise_(
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 108, in execute
    self._adapt_connection._handle_exception(error)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 236, in _handle_exception
    raise error
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 90, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/cursor.py", line 37, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/cursor.py", line 31, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/core.py", line 129, in _execute
    return await future
  File "/home/psadil/mambaforge/envs/map/lib/python3.10/site-packages/aiosqlite/core.py", line 102, in run
    result = function()
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked
[SQL: INSERT INTO log (id, created, updated, name, level, flow_run_id, task_run_id, message, timestamp) VALUES (:id, :created, :updated, :name_m0, :level_m0, :flow_run_id_m0, :task_run_id_m0, :message_m0, :timestamp_m0)]
[parameters: {'id': '6ec70409-1641-4f7d-a8a5-9d97c4c351e9', 'created': '2022-10-26 12:48:02.915298', 'updated': '2022-10-26 12:48:02.915333', 'name_m0': 'prefect.flow_runs', 'level_m0': 40, 'flow_run_id_m0': '323b9eee-0808-4b9e-8db8-33029a6c9119', 'task_run_id_m0': None, 'message_m0': 'Crash detected! Execution was interrupted by an unexpected exception: anyio._backends._asyncio.ExceptionGroup: 22 exceptions were raised in the task  ... (296157 characters truncated) ... 3.OperationalError) unable to open database file\n[SQL: PRAGMA journal_mode = WAL;]\n(Background on this error at: https://sqlalche.me/e/14/e3q8)\n\n', 'timestamp_m0': '2022-10-26 12:47:02.367979'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
Worker information:
    Approximate queue length: 0
    Pending log batch length: 1
    Pending log batch size: 360
The log worker is stopping and these logs will not be sent.

Environment
name: map
channels:
  - conda-forge
dependencies:
  - _libgcc_mutex=0.1=conda_forge
  - _openmp_mutex=4.5=2_gnu
  - aiofiles=22.1.0=pyhd8ed1ab_0
  - aiohttp=3.8.3=py310h5764c6d_0
  - aiosignal=1.2.0=pyhd8ed1ab_0
  - aiosqlite=0.17.0=pyhd8ed1ab_0
  - alembic=1.8.1=pyhd8ed1ab_0
  - anyio=3.6.2=pyhd8ed1ab_0
  - apprise=1.1.0=pyhd8ed1ab_0
  - asgi-lifespan=1.0.1=pyhd8ed1ab_5
  - async-exit-stack=1.0.1=pyhd8ed1ab_0
  - async-timeout=4.0.2=pyhd8ed1ab_0
  - asyncpg=0.26.0=py310h5764c6d_0
  - attrs=22.1.0=pyh71513ae_1
  - bcrypt=3.2.2=py310h5764c6d_0
  - blinker=1.5=pyhd8ed1ab_0
  - brotlipy=0.7.0=py310h5764c6d_1004
  - bzip2=1.0.8=h7f98852_4
  - ca-certificates=2022.9.24=ha878542_0
  - cached-property=1.5.2=hd8ed1ab_1
  - cached_property=1.5.2=pyha770c72_1
  - cachetools=5.2.0=pyhd8ed1ab_0
  - certifi=2022.9.24=pyhd8ed1ab_0
  - cffi=1.15.1=py310h255011f_1
  - charset-normalizer=2.1.1=pyhd8ed1ab_0
  - click=8.1.3=py310hff52083_0
  - cloudpickle=2.2.0=pyhd8ed1ab_0
  - colorama=0.4.6=pyhd8ed1ab_0
  - commonmark=0.9.1=py_0
  - coolname=2.0.0=pyhd8ed1ab_0
  - croniter=1.3.7=pyhd8ed1ab_0
  - cryptography=38.0.2=py310h600f1e7_1
  - cytoolz=0.12.0=py310h5764c6d_0
  - dask-core=2022.10.0=pyhd8ed1ab_2
  - dataclasses=0.8=pyhc8e2a94_3
  - distributed=2022.10.0=pyhd8ed1ab_2
  - docker-py=6.0.0=pyhd8ed1ab_0
  - fastapi=0.85.1=pyhd8ed1ab_0
  - frozenlist=1.3.1=py310h5764c6d_0
  - fsspec=2022.10.0=pyhd8ed1ab_0
  - future=0.18.2=py310hff52083_5
  - google-auth=2.13.0=pyh1a96a4e_0
  - greenlet=1.1.3=py310hd8f1fbe_0
  - griffe=0.21.0=pyhd8ed1ab_1
  - h11=0.12.0=pyhd8ed1ab_0
  - h2=4.1.0=pyhd8ed1ab_0
  - heapdict=1.0.1=py_0
  - hpack=4.0.0=pyh9f0ad1d_0
  - httpcore=0.15.0=pyhd8ed1ab_0
  - httpx=0.23.0=py310hff52083_1
  - hyperframe=6.0.1=pyhd8ed1ab_0
  - idna=3.4=pyhd8ed1ab_0
  - importlib-metadata=4.11.4=py310hff52083_0
  - importlib_metadata=4.11.4=hd8ed1ab_0
  - importlib_resources=5.10.0=pyhd8ed1ab_0
  - jinja2=3.1.2=pyhd8ed1ab_1
  - jsonpatch=1.32=pyhd8ed1ab_0
  - jsonpointer=2.0=py_0
  - ld_impl_linux-64=2.39=hc81fddc_0
  - libffi=3.4.2=h7f98852_5
  - libgcc-ng=12.2.0=h65d4601_19
  - libgomp=12.2.0=h65d4601_19
  - libnsl=2.0.0=h7f98852_0
  - libsodium=1.0.18=h36c2ea0_1
  - libsqlite=3.39.4=h753d276_0
  - libstdcxx-ng=12.2.0=h46fd767_19
  - libuuid=2.32.1=h7f98852_1000
  - libzlib=1.2.13=h166bdaf_4
  - locket=1.0.0=pyhd8ed1ab_0
  - mako=1.2.3=pyhd8ed1ab_0
  - markdown=3.4.1=pyhd8ed1ab_0
  - markupsafe=2.1.1=py310h5764c6d_1
  - msgpack-python=1.0.4=py310hbf28c38_0
  - multidict=6.0.2=py310h5764c6d_1
  - ncurses=6.3=h27087fc_1
  - oauthlib=3.2.2=pyhd8ed1ab_0
  - openssl=3.0.5=h166bdaf_2
  - orjson=3.8.1=py310h0ed1b42_1
  - packaging=21.3=pyhd8ed1ab_0
  - paramiko=2.11.0=pyhd8ed1ab_0
  - partd=1.3.0=pyhd8ed1ab_0
  - pathspec=0.10.1=pyhd8ed1ab_0
  - pendulum=2.1.2=py310h5764c6d_4
  - pip=22.3=pyhd8ed1ab_0
  - prefect=2.6.4=pyhd8ed1ab_0
  - prefect-dask=0.2.1=pyhd8ed1ab_0
  - psutil=5.9.3=py310h5764c6d_0
  - pyasn1=0.4.8=py_0
  - pyasn1-modules=0.2.7=py_0
  - pycparser=2.21=pyhd8ed1ab_0
  - pydantic=1.10.2=py310h5764c6d_0
  - pygments=2.13.0=pyhd8ed1ab_0
  - pyjwt=2.6.0=pyhd8ed1ab_0
  - pynacl=1.5.0=py310h5764c6d_1
  - pyopenssl=22.1.0=pyhd8ed1ab_0
  - pyparsing=3.0.9=pyhd8ed1ab_0
  - pysocks=1.7.1=pyha2e5f31_6
  - python=3.10.6=ha86cf86_0_cpython
  - python-dateutil=2.8.2=pyhd8ed1ab_0
  - python-kubernetes=24.2.0=pyhd8ed1ab_0
  - python-slugify=6.1.2=pyhd8ed1ab_0
  - python_abi=3.10=2_cp310
  - pytz=2022.5=pyhd8ed1ab_0
  - pytzdata=2020.1=pyh9f0ad1d_0
  - pyu2f=0.1.5=pyhd8ed1ab_0
  - pywin32-on-windows=0.1.0=pyh1179c8e_3
  - pyyaml=6.0=py310h5764c6d_4
  - readchar=4.0.3=pyhd8ed1ab_0
  - readline=8.1.2=h0f457ee_0
  - requests=2.28.1=pyhd8ed1ab_1
  - requests-oauthlib=1.3.1=pyhd8ed1ab_0
  - rfc3986=1.5.0=pyhd8ed1ab_0
  - rich=12.6.0=pyhd8ed1ab_0
  - rsa=4.9=pyhd8ed1ab_0
  - setuptools=65.5.0=pyhd8ed1ab_0
  - shellingham=1.5.0=pyhd8ed1ab_0
  - six=1.16.0=pyh6c4a22f_0
  - sniffio=1.3.0=pyhd8ed1ab_0
  - sortedcontainers=2.4.0=pyhd8ed1ab_0
  - sqlalchemy=1.4.42=py310h5764c6d_0
  - starlette=0.20.4=pyhd8ed1ab_1
  - tblib=1.7.0=pyhd8ed1ab_0
  - text-unidecode=1.3=py_0
  - tk=8.6.12=h27826a3_0
  - toml=0.10.2=pyhd8ed1ab_0
  - toolz=0.12.0=pyhd8ed1ab_0
  - tornado=6.1=py310h5764c6d_3
  - typer=0.6.1=pyhd8ed1ab_0
  - typing-extensions=4.4.0=hd8ed1ab_0
  - typing_extensions=4.4.0=pyha770c72_0
  - tzdata=2022e=h191b570_0
  - unidecode=1.3.6=pyhd8ed1ab_0
  - urllib3=1.26.11=pyhd8ed1ab_0
  - uvicorn=0.19.0=py310hff52083_0
  - websocket-client=1.4.1=pyhd8ed1ab_0
  - wheel=0.37.1=pyhd8ed1ab_0
  - xz=5.2.6=h166bdaf_0
  - yaml=0.2.5=h7f98852_2
  - yarl=1.7.2=py310h5764c6d_2
  - zict=2.2.0=pyhd8ed1ab_0
  - zipp=3.10.0=pyhd8ed1ab_0

I see, this looks like a similar issue to this one https://linen.prefect.io/t/2716896/Hey-guys-I-m-running-a-flow-with-a-DaskTaskRunner-that-spawn

We are currently working on improving this situation, there’s no dedicated issue to track afaik, but you can follow the release notes - we’ll share more as we publish releases Topics tagged release-notes

1 Like

Oh, that might be.

FWIW, this simple mapped case works fine for me with a postgresql database configured as a Docker container, as described in Database - Prefect 2 - Coordinating the world's dataflows. In my case, I may need to stick with an sqlite database, and I do need multiprocessing, so I’ll look forward to whatever improvements come along

1 Like

I believe Dask is the issue, do you need Dask?

just tried this and it worked perfectly well:

import time
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner


@task
def return_x(x: int) -> int:
    return x


@flow(task_runner=ConcurrentTaskRunner)
def loop_flow(n: int, delay: float):
    for i in range(n):
        time.sleep(delay)
        return_x.submit(i)


@flow(task_runner=ConcurrentTaskRunner)
def map_flow(n: int):
    return_x.map(range(n))


if __name__ == "__main__":
    n = 1000
    map_flow(n)

Alternatively, with Dask, did you consider Prefect Cloud? there is a completely free tier and we use a much more powerful backend in Cloud, which we’ll be able to handle that run even with Dask: https://app.prefect.cloud/

Thanks for following up. I think that I need Dask. At least, the tasks that I’m running are somewhat long-running, mainly python, and without much i/o. It’s my impression that the ConcurrentTaskRunner uses threading, and so given those tasks I could expect that extra cores thrown at the flow will mostly end up sleeping. That was my motivation for jumping to Dask, which I understand to offer parallelism through multiprocessing.

Prefect Cloud seems great, and I’ll probably explore that more in other situations. But this flow needs to run in a rather isolated environment, and I suspect that using Prefect Cloud might carry a substantial administrative burden.

It actually might be the case that a postgresql database could work. I’d need to do more reading to see how that could work (I can’t actually use Docker but would instead need to rely on Singularity).

You’re spot on, and you did everything right. Great writeup of the issue btw!

It’s actually the other way around - self-hosting carries an administrative burden, using Cloud is as simple as signing up, and the hybrid model works with the most restrictive environments.

Fingers crossed! :crossed_fingers: keep us posted on how it goes.

1 Like

It’s actually the other way around

That makes sense. I was thinking less about the admin of maintaining a database and more about the admin of studying exactly what data would be sent to Prefect Cloud, and then convincing the powers that be that this excludes anything sensitive. If I explore that option more, I’ll open another thread.

For using Singularity, the following seems to work.

$ mkdir oriondb

$ singularity instance start \
  -B $(pwd)/oriondb:/var/lib/postgresql/data \
  --writable-tmpfs \
  docker://postgres:latest postgres

$ singularity run \
  --env POSTGRES_PASSWORD=yourTopSecretPassword \
  --env POSTGRES_DB=orion \
  instance://postgres -p 5432 -h localhost &> postgres.log &

$ export PREFECT_ORION_DATABASE_CONNECTION_URL=postgresql+asyncpg://postgres:yourTopSecretPassword@localhost:5432/orion

$ prefect orion start &> orion.log &

$ python test.py
[...mapped flow runs...]
$ singularity instance stop postgres

(sorry that, given the thread’s title, this will be a pain to search for in Discourse!)

Thanks!

this should help:

Thanks so much for sharing the solution!

I seem to be doing something almost identical to you.
Also using DASK and ran into issues scaling mapped tasks.

Prefect Cloud is a non starter for me as my data is sensitive.

Currently exploring the other issues mentioned in this thread.