Deploy flow that uses a DaskTaskRunner with a Dask KubeCluster

We use the DaskTaskRunner connected to a Dask KubeCluster to run our flows. The flows are parametrized, and the parameters are passed using pydantic models. Currently, we parse the parameters from the command line or reading a yaml config file. This is an example of how it looks:

from prefect import flow, task
from dask_kubernetes.operator import make_cluster_spec
from dask_kubernetes.operator import KubeCluster
from dask.distributed import Client
from prefect_dask import DaskTaskRunner
from pydantic import BaseModel
import argparse
from dask import config as dask_config


class FlowParams(BaseModel):
    name: str


@task
def say_hello(name: str):
    print(f"Hello {name}!")


@flow
def flow_hello(params: FlowParams):
    say_hello(params.name)


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("name")
    args = parser.parse_args()
    flow_params = FlowParams(name=args.name)

    dask_config_dict = {
        "distributed.worker.daemon": False,
    }
    dask_config.set(dask_config_dict)

    # Ideally, these resources can also be set by parameters
    n_workers = 10
    worker_resources = {
        "requests": {
            "memory": "4G",
            "cpu": "1",
        },
        "limits": {
            "memory": "4G",
            "cpu": "1",
        },
    }

    spec = make_cluster_spec(
        name="my-dask-cluster",
        image="custom_image",
        resources=worker_resources,
    )

    # Set other kubernetes spec params, such as mounting nfs volumes
    # ...

    with KubeCluster(
        custom_cluster_spec=spec,
        namespace="dask-operator",
        scheduler_forward_port=54321,
        # These have to be explicitly set as of dask v2023.9.0
        resources=spec["spec"]["worker"]["spec"]["containers"][0]["resources"],
    ) as cluster:
        cluster.scale(n_workers)

        with Client(cluster) as client:
            flow_hello.with_options(
                task_runner=DaskTaskRunner(
                    address=client.scheduler.address,
                ),
            )(flow_params)

We are considering using prefect deployments for our flows, but I did not find any documentation or examples covering this use case. What I don’t understand specifically is how I can initialize the dask KubeCluster for the deployed flow. For each flow run, a KubeCluster has to be spun up, and after the run is finished, finished.

I think you can let Prefect manage the cluster lifecycle through the DaskTaskRunner by passing the runner instance to the @flow decorator, something along these lines:

runner = DaskTaskRunner(
    cluster_class="dask_kubernetes.operator.KubeCluster",
    cluster_kwargs={
        name="my-dask-cluster",
        image="custom_image",
        n_workers=10
    }
)

@flow(task_runner=runner)
def flow_hello():
   ...

So, instead of instantiating the KubeCluster yourself, you let Prefect doing it automatically at flow submission, and closing it at flow completion. More information in the docs.

The more difficult thing would be to configure the runner based on some input provided from the command line, but it doesn’t seem your core use case, if I understood correctly. Did I?

Thank you for this, I’ll try this approach! If I understand correctly, this makes prefect spin up a cluster when a flow is started and wind it down after it finishes.

We actually ideally also would want to set the resources for dask workers (cpu, mem, container tag) from parameters, e.g. from the command line or otherwise.

The pydantic config in the example is actually larger, it also has parameters for the dask resources, with reasonable defaults.

I currently see two ways that perhaps could work:

  • A deployment that uses the default resources. Perhaps we will have two or three deployments with different defaults, e.g. low, med, high compute requirements or so. That should cover most of our use cases. Would introduce some redundancy. The deployment(s) can then not be used with custom kube params, only with the defaults.
  • Add a wrapper flow that has the actual flow as a subflow. We can give resource parameters to the wrapper, which in turn could use them as args to the kubecluster when starting the actual workflow.

I will play with this and report back here

My opinion is that infrastructure-related parameters should not be part of the flow parameters, so should not be part of the pydantic model.
Also, your use of with_options - which I didn’t know - seems correct.
I think a viable solution would be the following:


@flow
def flow_hello():
  ...

if __name__ == "__main__":
  # read arguments and parse both cluster resource options and flow parameters
  # I will assume you will parse all values and create a 'resources' dictionary
  # and a 'flow_params' pydantic model

  runner = DaskTaskRunner(
    cluster_class="dask_kubernetes.operator.KubeCluster",
    cluster_kwargs={
      name="my-dask-cluster",
      image="custom_image",
      n_workers=10,
      worker_resources=resources # dictionary created from script arguments
    }
  )  

  flow_hello.with_options(task_runner=runner)(flow_params)

This should do the trick, let me know :slight_smile:

This works from the command line. I still have to figure out how to best create a deployment, but using cluster_class to set the KubeCluster and not creating it before should make this possible. Thanks!

1 Like