I have been spending a lot of time configuring prefect with DastTaskRunner on Kubernetes, and I have bunch of questions to ask which are closely related to each other.
I am creating a cluster and client instances before starting the prefect flow. For example,
from dask import config as dask_config
from dask.distributed import Client, get_client
from dask_kubernetes.operator import KubeCluster
from prefect import task, flow
from prefect.context import get_run_context
from prefect_dask import DaskTaskRunner
custom_dask_config = {
"distributed.worker.daemon": False,
"distributed.comm.timeouts.connect": 500,
"distributed.comm.timeouts.tcp": 500,
"distributed.worker.memory.target": False,
"distributed.worker.memory.spill": False,
}
@task
def my_task():
return 2 + 5
@flow
def my_flow():
dask_client = get_client()
prefect_client = get_run_context().task_runner._client
my_task.submit().result()
if __name__ == '__main__':
dask_config.set(custom_dask_config)
cluster = KubeCluster(
# `spec` includes n_workers, threads_per_worker, memory_per_worker, docker and
# credential information
custom_cluster_spec=spec,
namespace="temp-operator",
)
dask_client = Client(cluster)
runner = DaskTaskRunner(address=dask_client.scheduler.address)
my_flow.with_options(runner=runner)()
Based on the above set-up, I have noticed that it creates two clients, one with dask_client = Client(cluster)
and another one which creates inside a flow to submit tasks (which I named it as a prefect_client
). When I fetch the global client via get_client
method, it provides dask_client
information which has more information than the prefect_client
(for an instance, prefect_client is missing cluster information, etc…). Correct me if I am wrong.
Question - 1) Is there any viable reason, why prefect creates a separate client instead of using the dask_client
?
If I pass the extra client argument such as,
runner = DaskTaskRunner(
address=dask_client.scheduler.address,
client_kwargs={"set_as_default": True}
)
then prefect_client
is overwritten over dask_client
and I lose information about cluster as I mentioned before.
Question - 2) If I follow the above method (having only one global client), what are the possible problems which I am not aware of?
Question - 3) The reason, I am asking above questions is that when the tasks are submitted on Kubernetes, the DaskTaskRunner
submits many tasks on a single worker and the test of the workers do nothing. To avoid this issue, I have to create my own queue manager which submits larger computation tasks on different workers, so at least, I can utilize resources well.
Question - 4) Now, another issue I have is, when some of the tasks exceeds memory consumption above 95%, the scheduler automatically restarts that worker in order to avoid that worker which is killed by external watch-dog. However, once that worker is restarted, it stays ideal and no computation is happened afterwards. Regardless of, if I use my own queue manager or not. Also, the flow does not close the pods/workers properly and they lie on Kubernetes which does nothing but consumes resources. Therefore, I have to do manual clean-up. How can I deal this situation? Dask documentation says that one can set number of allowed-failure ("distributed.scheduler.allowed-failures"
) inside the dask_config
. But, setting this does also not allow any task to rerun if something goes wrong.
Question - 5) Coming back to the question - 2 based on the question - 4, does prefect_client also obeys the dask_config which can be either related to scheduler and/or worker?
I am posting all above questions in a single thread, however if you think, I am open to create a separate thread.
I used python 3.10.12
with below package and it dependencies.
prefect==2.14.21
prefect-dask==0.2.6
dask==2024.1.1
distributed==2024.1.1
dask-kubernetes==2024.1.0