Graceful shutdown of Dask Scheduler

Hi,

I have a Prefect flow that uses a DaskExecutor to distribute task runs across a Kubernetes cluster. Once all work finishes, the Dask worker pods are stopped as expected, however the Dask scheduler pod does not seem to gracefully terminate.

2022-06-20 19:39:51,183 - distributed.scheduler - INFO - Remove client Client-aafb0f74-f0d0-11ec-8007-fea8ebc15be7
2022-06-20 19:39:51,187 - distributed.scheduler - INFO - Remove client Client-aafb0f74-f0d0-11ec-8007-fea8ebc15be7
2022-06-20 19:39:51,188 - distributed.scheduler - INFO - Close client connection: Client-aafb0f74-f0d0-11ec-8007-fea8ebc15be7
2022-06-20 19:39:51,247 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://...', name: 1, status: running, memory: 0, processing: 0>
2022-06-20 19:39:51,247 - distributed.core - INFO - Removing comms to tcp://...
2022-06-20 19:39:51,260 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://...', name: 0, status: running, memory: 0, processing: 0>
2022-06-20 19:39:51,260 - distributed.core - INFO - Removing comms to tcp://...
2022-06-20 19:39:51,283 - distributed.scheduler - INFO - Remove worker <WorkerState 'tcp://...', name: 2, status: running, memory: 0, processing: 0>
2022-06-20 19:39:51,284 - distributed.core - INFO - Removing comms to tcp://...
2022-06-20 19:39:51,284 - distributed.scheduler - INFO - Lost all workers
2022-06-20 19:39:51,341 - distributed.scheduler - INFO - End scheduler at 'tcp://...
Traceback (most recent call last):
  File "/usr/local/bin/dask-scheduler", line 8, in <module>
    sys.exit(main())
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/usr/local/lib/python3.9/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/distributed/cli/dask_scheduler.py", line 208, in main
    loop.run_sync(run)
  File "/usr/local/lib/python3.9/site-packages/tornado/ioloop.py", line 529, in run_sync
    raise TimeoutError("Operation timed out after %s seconds" % timeout)
tornado.util.TimeoutError: Operation timed out after None seconds

Additionally, the Prefect flow-run pod ends with weird Python exceptions, though all reference tasks are marked as successful (reflected in the Prefect Cloud as well).

[2022-06-20 19:39:51+0000] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-06-20 19:39:51+0000] DEBUG - prefect.CloudFlowRunner | Flow 'prefect-dask-executor-test': Handling state change from Running to Success
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f27af629eb0>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f27aeb61ee0>, 379.994944061), (<aiohttp.client_proto.ResponseHandler object at 0x7f27aeb61e80>, 380.004234712), (<aiohttp.client_proto.ResponseHandler object at 0x7f27aeb618e0>, 380.084172218)]']
connector: <aiohttp.connector.TCPConnector object at 0x7f27af629c10>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7f27aebb02b0>
Unclosed connector
connections: ['[(<aiohttp.client_proto.ResponseHandler object at 0x7f27aeb61700>, 380.063919012)]']
connector: <aiohttp.connector.TCPConnector object at 0x7f27aebb0040>
Fatal error on SSL transport
protocol: <asyncio.sslproto.SSLProtocol object at 0x7f27af664f40>
transport: <_SelectorSocketTransport closing fd=10>
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/asyncio/selector_events.py", line 915, in write
    n = self._sock.send(data)
OSError: [Errno 9] Bad file descriptor

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/asyncio/sslproto.py", line 690, in _process_write_backlog
    self._transport.write(chunk)
  File "/usr/local/lib/python3.9/asyncio/selector_events.py", line 921, in write
    self._fatal_error(exc, 'Fatal write error on socket transport')
  File "/usr/local/lib/python3.9/asyncio/selector_events.py", line 716, in _fatal_error
    self._force_close(exc)
  File "/usr/local/lib/python3.9/asyncio/selector_events.py", line 728, in _force_close
    self._loop.call_soon(self._call_connection_lost, exc)
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 751, in call_soon
    self._check_closed()
  File "/usr/local/lib/python3.9/asyncio/base_events.py", line 515, in _check_closed
    raise RuntimeError('Event loop is closed')
RuntimeError: Event loop is closed

I am using Dask version 2022.1.0 because newer versions do not work for me (due to this issue). As an aside, version 2022.4.0 doesn’t seem to work either with Prefect, though the error I get is a little bit different:

Unexpected error: ConnectionError('kubectl port forward failed')

I was wondering, what could cause these errors? I did some research into the Dask errors and came across this issue which could explain the Dask scheduler error, but I am not sure I understand the errors occurring in the Prefect flow-run pod. I could also reproduce this by running the example flow in the Prefect’s DaskExecutor docs.

Has anyone encountered these issues?

Thank you!
– Nicoleta

1 Like

Can you share a bit more about your setup?

  1. Where does your Dask cluster run? Is this a dynamically created on-demand cluster for the flow run or a long-running cluster on Kubernetes? where does your K8s cluster run?
  2. Can you share how you set your DaskExecutor in your flow code?
  3. Can you share the output of prefect diagnostics?

Hi Anna,

Thank you for your reply.

The Dask cluster is dynamically created for the flow run. The executor is set up very similarly to this blog post:

executor = DaskExecutor(
    cluster_class=lambda: KubeCluster(
        n_workers=num_workers,
        name='dask-executor',
        pod_template=make_pod_spec(
            image=prefect.context.image,
            extra_pod_config={
                'serviceAccountName': K8S_SERVICE_ACCOUNT
            },
            cpu_limit=DASK_WORKER_CPU_LIMIT,
            cpu_request=DASK_WORKER_CPU_REQUEST,
            memory_limit=DASK_WORKER_MEMORY_LIMIT,
            memory_request=DASK_WORKER_MEMORY_REQUEST,
        ),
    ),
    debug=True
)

And the K8S cluster is managed by AWS EKS.

prefect diagnostics ran from within the flow-run pod returns:

{
  "config_overrides": {},
  "env_vars": [
    ...
  ],
  "system_information": {
    "platform": "Linux-5.4.156-83.273.amzn2.x86_64-x86_64-with-glibc2.31",
    "prefect_backend": "cloud",
    "prefect_version": "0.15.5",
    "python_version": "3.9.12"
  }
}

Which I think it’s weird, because the agent has image: prefecthq/prefect:1.2.0-python3.9 and in fact running prefect diagnostics from within the prefect agent pod returns the expected prefect_version:

{
    "platform": "Linux-5.4.156-83.273.amzn2.x86_64-x86_64-with-glibc2.31",
    "prefect_backend": "cloud",
    "prefect_version": "1.2.0",
    "python_version": "3.9.12"
  }

Any idea where this discrepancy could come from? We have deployed the kubernetes agent by following the docs.

Thank you again for your reply. :slight_smile: