Hello,
We have a local kubernetes cluster on which we want to run flows using DaskTaskRunner and dask-kubernetes. We also have docker registry where the custom image that should be used by the dask workers is available.
We have our own instance of prefect server running on the kubernetes cluster.
This setup works fine with a KubeCluster
when not using prefect, e.g.
from dask.distributed import LocalCluster, Client
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
import dask.dataframe as dd
import pandas as pd
if __name__ == "__main__":
# Generate the cluster spec
spec = make_cluster_spec(
name="dask-kube-cluster",
image="path.to.registry/dask_workers/worker:latest",
)
# Set the imagePullSecrets for the scheduler and worker pods
spec["spec"]["worker"]["spec"]["imagePullSecrets"] = [
{"name": "registry-creds"}
]
spec["spec"]["scheduler"]["spec"]["imagePullSecrets"] = [
{"name": "registry-creds"}
]
with KubeCluster(
custom_cluster_spec=spec,
namespace="dask-operator",
n_workers=2,
) as cluster:
with Client(cluster) as client:
ddf = dd.from_pandas(
pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}), npartitions=2
)
print(ddf.compute().to_markdown())
When using prefect, I get a SSLCertVerificationError
:
from prefect import task, flow, get_run_logger
from prefect_dask import DaskTaskRunner
from dask.distributed import LocalCluster, Client
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
@task
def add(x, y):
logger = get_run_logger()
logger.info(f"Adding {x} and {y}")
return x + y
@flow
def test_flow(a1, a2):
x = add.submit(a1, a2)
if __name__ == "__main__":
# Generate the cluster spec
spec = make_cluster_spec(
name="dask-kube-cluster",
image="path.to.registry/dask_workers/worker:latest",
)
# Set the imagePullSecrets for the scheduler and worker pods
spec["spec"]["worker"]["spec"]["imagePullSecrets"] = [
{"name": "registry-creds"}
]
spec["spec"]["scheduler"]["spec"]["imagePullSecrets"] = [
{"name": "registry-creds"}
]
with KubeCluster(
custom_cluster_spec=spec,
namespace="dask-operator",
n_workers=2,
) as cluster:
with Client(cluster) as client:
test_flow.with_options(
task_runner=DaskTaskRunner(address=client.scheduler.address)
)(2, 3)
→ Log:
16:41:40.231 | INFO | prefect.engine - Created flow run 'striped-starfish' for flow 'test-flow'
16:41:40.234 | INFO | prefect.task_runner.dask - Connecting to an existing Dask cluster at tcp://localhost:50965
16:41:40.258 | INFO | prefect.task_runner.dask - The Dask dashboard is available at http://localhost:8787/status
16:41:40.413 | INFO | Flow run 'striped-starfish' - Created task run 'add-0' for task 'add'
16:41:40.427 | INFO | Flow run 'striped-starfish' - Submitted task run 'add-0' for execution.
16:41:43.393 | INFO | Task run 'add-0' - Crash detected! Execution was interrupted by an unexpected exception: ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1007)
16:41:43.558 | ERROR | Flow run 'striped-starfish' - Finished in state Failed('1/1 states failed.')
Traceback (most recent call last):
File "/home/kris/workspace/tmp/tmp.py", line 65, in <module>
test_flow.with_options(
File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/flows.py", line 468, in __call__
return enter_flow_run_engine_from_flow_call(
File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/engine.py", line 182, in enter_flow_run_engine_from_flow_call
retval = from_sync.wait_for_call_in_loop_thread(
File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
return call.result()
File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
return self.future.result(timeout=timeout)
File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/concurrent/futures/_base.py", line 451, in result
return self.__get_result()
File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
result = await coro
File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
return await fn(*args, **kwargs)
File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/engine.py", line 259, in create_then_begin_flow_run
return await state.result(fetch=True)
File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
raise await get_state_exception(state)
File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect_dask/task_runners.py", line 271, in wait
return await future.result(timeout=timeout)
File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/distributed/client.py", line 329, in _result
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1331, in begin_task_run
File "/usr/local/lib/python3.10/contextlib.py", line 714, in __aexit__
raise exc_details[1]
File "/usr/local/lib/python3.10/contextlib.py", line 697, in __aexit__
cb_suppress = await cb(*exc_details)
File "/usr/local/lib/python3.10/contextlib.py", line 217, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1752, in report_task_run_crashes
File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration.py", line 1894, in set_task_run_state
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1845, in post
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1530, in request
File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 243, in send
File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 189, in _send_with_retry
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1617, in send
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1645, in _send_handling_auth
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1682, in _send_handling_redirects
File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1719, in _send_single_request
File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py", line 86, in handle_async_request
File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py", line 63, in handle_async_request
File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py", line 150, in _connect
File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 78, in start_tls
File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 69, in start_tls
File "/usr/local/lib/python3.10/site-packages/anyio/streams/tls.py", line 122, in wrap
File "/usr/local/lib/python3.10/site-packages/anyio/streams/tls.py", line 130, in _call_sslobject_method
File "/usr/local/lib/python3.10/ssl.py", line 975, in do_handshake
self._sslobj.do_handshake()
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1007)
So the worker image with the same image pull secret works with the dask KubeCluster setup when not using prefect, but with prefect the SSL cert becomes a problem. The cert is from Let’s Encrypt, so it should be fine.
Any pointers?