`SSLCertVerificationError` when using `DaskTaskRunner` with `KubeCluster` and private registry

Hello,

We have a local kubernetes cluster on which we want to run flows using DaskTaskRunner and dask-kubernetes. We also have docker registry where the custom image that should be used by the dask workers is available.
We have our own instance of prefect server running on the kubernetes cluster.

This setup works fine with a KubeCluster when not using prefect, e.g.

from dask.distributed import LocalCluster, Client
from dask_kubernetes.operator import KubeCluster, make_cluster_spec
import dask.dataframe as dd
import pandas as pd

if __name__ == "__main__":
    # Generate the cluster spec
    spec = make_cluster_spec(
        name="dask-kube-cluster",
        image="path.to.registry/dask_workers/worker:latest",
    )

    # Set the imagePullSecrets for the scheduler and worker pods
    spec["spec"]["worker"]["spec"]["imagePullSecrets"] = [
        {"name": "registry-creds"}
    ]
    spec["spec"]["scheduler"]["spec"]["imagePullSecrets"] = [
        {"name": "registry-creds"}
    ]

    with KubeCluster(
        custom_cluster_spec=spec,
        namespace="dask-operator",
        n_workers=2,
    ) as cluster:
        with Client(cluster) as client:
            ddf = dd.from_pandas(
                pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}), npartitions=2
            )
            print(ddf.compute().to_markdown())

When using prefect, I get a SSLCertVerificationError:

from prefect import task, flow, get_run_logger
from prefect_dask import DaskTaskRunner
from dask.distributed import LocalCluster, Client
from dask_kubernetes.operator import KubeCluster, make_cluster_spec


@task
def add(x, y):
    logger = get_run_logger()
    logger.info(f"Adding {x} and {y}")
    return x + y


@flow
def test_flow(a1, a2):
    x = add.submit(a1, a2)


if __name__ == "__main__":
    # Generate the cluster spec
    spec = make_cluster_spec(
        name="dask-kube-cluster",
        image="path.to.registry/dask_workers/worker:latest",
    )

    # Set the imagePullSecrets for the scheduler and worker pods
    spec["spec"]["worker"]["spec"]["imagePullSecrets"] = [
        {"name": "registry-creds"}
    ]
    spec["spec"]["scheduler"]["spec"]["imagePullSecrets"] = [
        {"name": "registry-creds"}
    ]

    with KubeCluster(
        custom_cluster_spec=spec,
        namespace="dask-operator",
        n_workers=2,
    ) as cluster:
        with Client(cluster) as client:
            test_flow.with_options(
                task_runner=DaskTaskRunner(address=client.scheduler.address)
            )(2, 3)

→ Log:

16:41:40.231 | INFO    | prefect.engine - Created flow run 'striped-starfish' for flow 'test-flow'
16:41:40.234 | INFO    | prefect.task_runner.dask - Connecting to an existing Dask cluster at tcp://localhost:50965
16:41:40.258 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://localhost:8787/status
16:41:40.413 | INFO    | Flow run 'striped-starfish' - Created task run 'add-0' for task 'add'
16:41:40.427 | INFO    | Flow run 'striped-starfish' - Submitted task run 'add-0' for execution.
16:41:43.393 | INFO    | Task run 'add-0' - Crash detected! Execution was interrupted by an unexpected exception: ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1007)

16:41:43.558 | ERROR   | Flow run 'striped-starfish' - Finished in state Failed('1/1 states failed.')
Traceback (most recent call last):
  File "/home/kris/workspace/tmp/tmp.py", line 65, in <module>
    test_flow.with_options(
  File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/flows.py", line 468, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/engine.py", line 182, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
  File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
  File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
  File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
  File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/engine.py", line 259, in create_then_begin_flow_run
    return await state.result(fetch=True)
  File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect/states.py", line 91, in _get_state_result
    raise await get_state_exception(state)
  File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/prefect_dask/task_runners.py", line 271, in wait
    return await future.result(timeout=timeout)
  File "/home/kris/mambaforge/envs/test_kubecluster/lib/python3.10/site-packages/distributed/client.py", line 329, in _result
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1331, in begin_task_run
  File "/usr/local/lib/python3.10/contextlib.py", line 714, in __aexit__
    raise exc_details[1]
 File "/usr/local/lib/python3.10/contextlib.py", line 697, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/usr/local/lib/python3.10/contextlib.py", line 217, in __aexit__
    await self.gen.athrow(typ, value, traceback)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1752, in report_task_run_crashes
  File "/usr/local/lib/python3.10/site-packages/prefect/client/orchestration.py", line 1894, in set_task_run_state
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1845, in post
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1530, in request
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 243, in send
  File "/usr/local/lib/python3.10/site-packages/prefect/client/base.py", line 189, in _send_with_retry
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1617, in send
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1645, in _send_handling_auth
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1682, in _send_handling_redirects
  File "/usr/local/lib/python3.10/site-packages/httpx/_client.py", line 1719, in _send_single_request
  File "/usr/local/lib/python3.10/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 253, in handle_async_request
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection_pool.py", line 237, in handle_async_request
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py", line 86, in handle_async_request
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py", line 63, in handle_async_request
  File "/usr/local/lib/python3.10/site-packages/httpcore/_async/connection.py", line 150, in _connect
  File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 78, in start_tls
  File "/usr/local/lib/python3.10/site-packages/httpcore/backends/asyncio.py", line 69, in start_tls
  File "/usr/local/lib/python3.10/site-packages/anyio/streams/tls.py", line 122, in wrap
  File "/usr/local/lib/python3.10/site-packages/anyio/streams/tls.py", line 130, in _call_sslobject_method
  File "/usr/local/lib/python3.10/ssl.py", line 975, in do_handshake
    self._sslobj.do_handshake()
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: unable to get local issuer certificate (_ssl.c:1007)

So the worker image with the same image pull secret works with the dask KubeCluster setup when not using prefect, but with prefect the SSL cert becomes a problem. The cert is from Let’s Encrypt, so it should be fine.

Any pointers?

I have since tried to add a prefect block for the registry (did this in the server UI).
Added to the script like this:

from prefect.infrastructure.docker import DockerRegistry
# ...

@flow
def test_flow(a1, a2):
    docker_registry_block = DockerRegistry.load("gitlab-docker-registry") 
    x = add.submit(a1, a2)

# ...

Unfortunately, the same error happens

We solved the issue:

The problem was not in with the private docker registry, but with the self-signed SSL certificate for the address where our prefect server runs.

Can be solved by using a docker image for the KubeCluster that contains the cert and sets environment variables, e.g. with adding this to the dockerfile (adjust variables starting with my):

FROM python:3.10.10

# Install prefect and other requirements

COPY my-self-signed-ca.crt /usr/local/share/ca-certificates/
RUN chmod 644 /usr/local/share/ca-certificates/my-self-signed-ca.crt && update-ca-certificates
ENV SSL_CERT_DIR=/etc/ssl/certs/
ENV REQUESTS_CA_BUNDLE=/etc/ssl/certs/
RUN prefect profile create my-profile \
    && prefect config set PREFECT_API_URL="https://my-prefect-server-url/api" \
    && prefect profile use my-profile