DaskTaskRunner creating a separate client

I have been spending a lot of time configuring prefect with DastTaskRunner on Kubernetes, and I have bunch of questions to ask which are closely related to each other.

I am creating a cluster and client instances before starting the prefect flow. For example,

from dask import config as dask_config
from dask.distributed import Client, get_client
from dask_kubernetes.operator import KubeCluster
from prefect import task, flow
from prefect.context import get_run_context
from prefect_dask import DaskTaskRunner

custom_dask_config = {
        "distributed.worker.daemon": False,
        "distributed.comm.timeouts.connect": 500,
        "distributed.comm.timeouts.tcp": 500,
        "distributed.worker.memory.target": False,
        "distributed.worker.memory.spill": False,
    }

@task
def my_task():
    return 2 + 5

@flow
def my_flow():
    dask_client = get_client()
    prefect_client = get_run_context().task_runner._client
    
    my_task.submit().result()

if __name__ == '__main__':
    dask_config.set(custom_dask_config)
    cluster = KubeCluster(
        # `spec` includes n_workers, threads_per_worker, memory_per_worker, docker and
        # credential information
        custom_cluster_spec=spec, 
        namespace="temp-operator",
    )
    dask_client = Client(cluster)
    runner = DaskTaskRunner(address=dask_client.scheduler.address)
    my_flow.with_options(runner=runner)()

Based on the above set-up, I have noticed that it creates two clients, one with dask_client = Client(cluster) and another one which creates inside a flow to submit tasks (which I named it as a prefect_client). When I fetch the global client via get_client method, it provides dask_client information which has more information than the prefect_client (for an instance, prefect_client is missing cluster information, etc…). Correct me if I am wrong.

Question - 1) Is there any viable reason, why prefect creates a separate client instead of using the dask_client?

If I pass the extra client argument such as,

runner = DaskTaskRunner(
    address=dask_client.scheduler.address,
    client_kwargs={"set_as_default": True}
)

then prefect_client is overwritten over dask_client and I lose information about cluster as I mentioned before.

Question - 2) If I follow the above method (having only one global client), what are the possible problems which I am not aware of?

Question - 3) The reason, I am asking above questions is that when the tasks are submitted on Kubernetes, the DaskTaskRunner submits many tasks on a single worker and the test of the workers do nothing. To avoid this issue, I have to create my own queue manager which submits larger computation tasks on different workers, so at least, I can utilize resources well.

Question - 4) Now, another issue I have is, when some of the tasks exceeds memory consumption above 95%, the scheduler automatically restarts that worker in order to avoid that worker which is killed by external watch-dog. However, once that worker is restarted, it stays ideal and no computation is happened afterwards. Regardless of, if I use my own queue manager or not. Also, the flow does not close the pods/workers properly and they lie on Kubernetes which does nothing but consumes resources. Therefore, I have to do manual clean-up. How can I deal this situation? Dask documentation says that one can set number of allowed-failure ("distributed.scheduler.allowed-failures") inside the dask_config. But, setting this does also not allow any task to rerun if something goes wrong.

Question - 5) Coming back to the question - 2 based on the question - 4, does prefect_client also obeys the dask_config which can be either related to scheduler and/or worker?

I am posting all above questions in a single thread, however if you think, I am open to create a separate thread.

I used python 3.10.12 with below package and it dependencies.

prefect==2.14.21
prefect-dask==0.2.6
dask==2024.1.1
distributed==2024.1.1
dask-kubernetes==2024.1.0