TimeoutError when Flow is finishing

Hi all! First up - a huge thanks for this API. I am trying it out on some problems in the office as a demonstration, and I can see some real use for it.

One of these problems is a long-ish pipeline that leverages:

  • A postgres running remotely in a singularity container
  • A SLURM cluster (dask_jobqeue.SLURMCluster) with a DaskTaskExecutor being used, with 10 separate compute nodes being pulled into the distributed dask schedular.
  • A single large Flow with ~7 tasks

In this example Flow, each task is importing a set of external python scripts and executing their main with appropriate values. I can verify that these values are correct and do not raise any unexpected run time errors, and all file outputs are correctly formed. One of these tasks is also, essentially, a map that is being applied over 40 inputs. This map is not using the task.map interface - rather a simple list comprehension to generate a corresponding set of PrefectFutures, which I then iterate and wait() over.

So far I have been making incremental progress towards a working solution. The remaining issue is now a TimeoutError raised as the flow is closing. I have tried setting PREFECT_ORION_DATABASE_CONNECTION_TIMEOUT=20, but I still keep getting these TimeoutErrors as thr flow is wrapping up. This timout variable has actually already helped resolve similiar TimeoutErrors that were being inconsistently raised through the running of the Flow. But now I just can seem to get around it.

My feeling on the issue is that the network latancy between the slurm compute nodes, and the remote postgres server is likely the ultimate root cause, and this PREFECT_ORION_DATA_CONNECTION_TIMEOUT variable might not being used when the flow is closing. My thinking is also prompted that my last logged message directly at the end of the flow and this raised TimeoutError raised from fastapi / ascynpg is consistently 5 seconds later. I believe that this 5 second timeout is a default set somewhere – I am just not sure where.

Below are the versions reported by prefect version

prefect version
Version:             2.6.4
API version:         0.8.2
Python version:      3.9.13
Git commit:          51e92dda
Built:               Thu, Oct 20, 2022 3:11 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          postgresql
21:51:08.251 | ERROR   | prefect.orion - Encountered exception in request:
Traceback (most recent call last):
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await self.app(scope, receive, sender)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 275, in handle
    await self.app(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 65, in app
    response = await func(request)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
    response = await default_handler(request)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/routing.py", line 231, in app
    raw_response = await run_endpoint_function(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
    return await dependant.call(**values)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/api/flow_runs.py", line 245, in set_flow_run_state
    orchestration_result = await models.flow_runs.set_flow_run_state(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/models/flow_runs.py", line 403, in set_flow_run_state
    run = await models.flow_runs.read_flow_run(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/models/flow_runs.py", line 149, in read_flow_run
    return await session.get(db.FlowRun, flow_run_id)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/session.py", line 298, in get
    return await greenlet_spawn(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2848, in get
    return self._get_impl(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2955, in _get_impl
    return db_load_fn(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
    session.execute(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1714, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 333, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
    ret = self._execute_context(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
    self._handle_dbapi_exception(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2128, in _handle_dbapi_exception
    util.raise_(exc_info[1], with_traceback=exc_info[2])
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
    self._adapt_connection.await_(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 408, in _prepare_and_execute
    await adapt_connection._start_transaction()
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 716, in _start_transaction
    self._handle_exception(error)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 684, in _handle_exception
    raise error
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 714, in _start_transaction
    await self._transaction.start()
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/transaction.py", line 138, in start
    await self._connection.execute(query)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/connection.py", line 318, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 338, in query
asyncio.exceptions.TimeoutError
/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/dask_jobqueue/slurm.py:49: FutureWarning: project has been renamed to account as this kwarg was used wit -A option. You are still using it (please also check config files). If you did not set account yet, project will be respected for now, but it will be removed in a future release. If you already set account, project is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
21:51:08.899 | ERROR   | Flow run 'purring-mustang' - Crash detected! Execution was interrupted by an unexpected exception: asyncio.exceptions.TimeoutError

Traceback (most recent call last):
  File "/group/askap/miniconda3/envs/acesprefect2/bin/process_holography.py", line 413, in <module>
    sys.exit(cli())
  File "/group/askap/miniconda3/envs/acesprefect2/bin/process_holography.py", line 405, in cli
    main(
  File "/group/askap/miniconda3/envs/acesprefect2/bin/process_holography.py", line 346, in main
    result = holography_flow()
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/flows.py", line 439, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 150, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 222, in create_then_begin_flow_run
    state = await begin_flow_run(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 353, in begin_flow_run
    terminal_state = await orchestrate_flow_run(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 635, in orchestrate_flow_run
    state = await propose_state(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/engine.py", line 1467, in propose_state
    response = await client.set_flow_run_state(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/client/orion.py", line 1489, in set_flow_run_state
    response = await self._client.post(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_client.py", line 1842, in post
    return await self.request(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_client.py", line 1527, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/client/base.py", line 159, in send
    await super().send(*args, **kwargs)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_client.py", line 1614, in send
    response = await self._send_handling_auth(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_client.py", line 1642, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_client.py", line 1679, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_client.py", line 1716, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/httpx/_transports/asgi.py", line 152, in handle_async_request
    await self.app(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/cors.py", line 84, in __call__
    await self.app(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await self.app(scope, receive, sender)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 427, in handle
    await self.app(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/applications.py", line 270, in __call__
    await super().__call__(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/applications.py", line 124, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 75, in __call__
    raise exc
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/middleware/exceptions.py", line 64, in __call__
    await self.app(scope, receive, sender)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 680, in __call__
    await route.handle(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 275, in handle
    await self.app(scope, receive, send)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/starlette/routing.py", line 65, in app
    response = await func(request)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/utilities/server.py", line 101, in handle_response_scoped_depends
    response = await default_handler(request)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/routing.py", line 231, in app
    raw_response = await run_endpoint_function(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/fastapi/routing.py", line 160, in run_endpoint_function
    return await dependant.call(**values)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/api/flow_runs.py", line 245, in set_flow_run_state
    orchestration_result = await models.flow_runs.set_flow_run_state(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/models/flow_runs.py", line 403, in set_flow_run_state
    run = await models.flow_runs.read_flow_run(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/database/dependencies.py", line 117, in async_wrapper
    return await fn(*args, **kwargs)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/prefect/orion/models/flow_runs.py", line 149, in read_flow_run
    return await session.get(db.FlowRun, flow_run_id)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/ext/asyncio/session.py", line 298, in get
    return await greenlet_spawn(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 126, in greenlet_spawn
    result = context.throw(*sys.exc_info())
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2848, in get
    return self._get_impl(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 2955, in _get_impl
    return db_load_fn(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/loading.py", line 530, in load_on_pk_identity
    session.execute(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/orm/session.py", line 1714, in execute
    result = conn._execute_20(statement, params or {}, execution_options)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
    return meth(self, args_10style, kwargs_10style, execution_options)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/sql/elements.py", line 333, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
    ret = self._execute_context(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
    self._handle_dbapi_exception(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 2128, in _handle_dbapi_exception
    util.raise_(exc_info[1], with_traceback=exc_info[2])
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
    self.dialect.do_execute(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
    cursor.execute(statement, parameters)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 479, in execute
    self._adapt_connection.await_(
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 408, in _prepare_and_execute
    await adapt_connection._start_transaction()
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 716, in _start_transaction
    self._handle_exception(error)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 684, in _handle_exception
    raise error
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/sqlalchemy/dialects/postgresql/asyncpg.py", line 714, in _start_transaction
    await self._transaction.start()
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/transaction.py", line 138, in start
    await self._connection.execute(query)
  File "/group/askap/miniconda3/envs/acesprefect2/lib/python3.9/site-packages/asyncpg/connection.py", line 318, in execute
    return await self._protocol.query(query, timeout)
  File "asyncpg/protocol/protocol.pyx", line 338, in query
asyncio.exceptions.TimeoutError
1 Like

Perhaps the PREFECT_API_REQUEST_TIMEOUT variable is also one I need to update – one that I just found after reading the code and seeing the defaults set to the httpx connections set in the OrionClient. Testing this now – will be a couple of hours.

1 Like

Hi, thanks so much for this detailed and thorough issue description. I’m not as familiar with Slurm so this is a bit difficult to help reproduce, but I’m curious, have you tried running this on a local cluster without Slurm (perhaps at a smaller scale)? Did that work?

it might help to take it a bit more step by step to identify the issue and if this isn’t working as expected, I believe that opening an issue on GitHub - PrefectHQ/prefect-dask: Prefect integrations with the Dask execution framework. would help you more because you could get help from the integrations engineers and community rather than me (knowing very little about this)

Another thing worth trying would be to test out the same setup without Prefect to check if the Dask setup is working as expected and only then add Prefect decorators or mapping

Thanks for the reply Anna. I hope my message made sense and it was not a word salad.

I will try to take a step back a little and go through your suggestions. The pipeline is a long running one which makes things hard to test thoroughly. I can get through most of the pipeline with a subset of data, but some scripts (externally defined) currently expect the complete dataset.

I have seen issues with dask distributed clusters (again managed by SLURMCluster) not shutting down correctly under prefect version 1, but I believe the errors are different in nature to the one I am not getting.

I will go and raise an issue on github and see how things go.

Word salad sounds cool! :smile:

sorry I’m not super helpful here but distributed things are hard to troubleshoot, definitely feel free to open a GitHub issue with some small reproducible example and link it here as well

curious, what if you would use .map instead? worth trying too

another thing would be to do it against Prefect Cloud (there is a free tier) - given that those timeouts come from the DB, worth looking at whether this works better with Prefect Cloud DB

I was able to work around the issue by managing my own persistent instance of the orion server on the same system as my postgres database. I am not sure what the initial problem ultimately was, other than perhaps things not shutting down properly or out of order. Given the use of a SLURMCluster across 10 or so compute nodes it does not surprise me.

curious, what if you would use .map instead? worth trying too

At first I did have it as an async function, but it was more a case of doing what I know. If I understand prefect enough I think I could have made each of my tasks an async task and the net result would have been the same. It was more a case of additional asyncio boilerplate vs a pretty pythonic list comphrension.