DaskTaskRunner Task Submission

Hello,

Hoping someone here is able to point me to an answer that I can’t seem to find in the docs and save me from having to comb through the codebase…

The issue I’m running into is that when using DaskTaskRunner, the workers in my cluster sit idle because the tasks are submitted to the server so slowly. I see the number of queued tasks climb ever so slowly on the dask dashboard while the majority of my workers sit idle.

I’m using the task.map interface, for around 20,000 tasks.

The piece of information I can’t find and would appreciate some help on is what execution environment is used in this setting for creation and submission of the tasks? Is it the execution environment from which my flow is called? Is it the scheduler or perhaps the workers?

As a follow up, it would be so convenient to be able to use capacity on the cluster itself to create and submit these tasks…if that isn’t already how it is implemented by default, is there a way for me to do so myself?

Cheers,

John

Hi @secrettoad -

By default, the task runner is executing / creating tasks in the execution environment of the flow. When .submit() on a task is called, it’s submitted to the taskRunner - if you’re using Dask, this is then submitted to the Dask Scheduler.

Which Dask Scheduler is being used?
If nothing is specified, then it’s a temporary, local Dask Cluster -

You can control the number of threads or processes, and the workload per.

Alternatively, you can choose to run / host your own Dask Cluster, and just submit work to that cluster by providing an endpoint - Task Runners - Prefect 2 - Coordinating the world's dataflows

Hopefully this helps?

@Christopher_Boyd

Thanks for the reply.

I’m using a cluster of EC2 machines via dask_cloudprovider.

I’m not quite sure I understand. Do you mean to say that the actual submission and creation of mapped tasks takes place within the dask scheduler? Or from the execution context from which the flow was called (during this debugging session my laptop)?

I’d very much like to have that logic run either on the scheduler or distributed on the cluster as the 20,000 creations and submissions is actually the choke point at this point when computing my flow. If you know of an already-implemented pattern around this, that would be very helpful.

Thanks so much for your help.

Cheers,

John

A bit more info that may be helpful:

I essentially want to wrap a call to a task, from within a flow, with dask.delayed, so that the actual communication with the prefect server in task creation and submission is executed on the cluster in a distributed way.

When i try to just wrap the task in delayed, I get {RuntimeError}Tasks cannot be run outside of a flow. To call the underlying task function outside of a flow use task.fn().

A minimal example…

from dask.delayed import delayed
from prefect import flow
from prefect_dask import get_dask_client
from functools import partial

@task
def my_decorated_task(num: int, destination: str):
     return True

@flow(name='my_flow', persist_result=True)
def my_decorated_flow(destination: str):
    with get_dask_client() as client:
        days = [1, 2, 3]
        futures = map(lambda x: x.result(), map(lambda x: delayed(partial(my_decorated_task, destination=destination))(x).compute(), days))
        return destination

The first map yields an iterable of dask/vanilla futures, the computation of which should be PrefectFuture objects, the results of which are iterated by the outer map.

This gets it onto the scheduler, but I’m still not sure that once there the creation and submission logic is not GIL-bound/using all processing power available…Any help on parallelizing the creation/submission itself very much appreciated.

from dask.delayed import delayed
from prefect import flow
from prefect_dask import get_dask_client
from functools import partial
from dask_cloudprovider.aws import EC2Cluster
from dask.distributed import Client

@task
def my_decorated_task(num: int, destination: str):
    return True

@flow(name='my_flow', persist_result=True)
def my_decorated_flow(destination: str):
    with get_dask_client() as client:
        days = [1, 2, 3]
        futures = map(lambda x: x.result(), my_decorated_task.map(days, unmapped(destination)))
        return destination

    with EC2Cluster(100, scheduler_instance_type='t2.2xlarge', worker_instance_type='t2.small', docker_image='my_image') as cluster:
        with Client(cluster) as client:
            result = client.run_on_scheduler(create_view.with_options(task_runner=DaskTaskRunner(client.scheduler.address))(destination='my-path'))

Where does the flow run execute (what is the infrastructure specified on the deployment)?
That is where the tasks are being submitted from, to the dask scheduler.
The logic of the tasks being submitted is part of the dask task runner - if you want it to run somewhere else, then your infrastructure for the deployment (for the executing flow_run) needs to be in that location.

The dask scheduling page has more options to increase the overall throughput of the Dask Cluster (assuming workers are idle, and there is sufficient memory).
https://docs.dask.org/en/stable/scheduling.html

Additionally, setting the threads and worker numbers will allow you to expand the number of workers and threads (or if you are concerned with GIL, you can use processes) such as:

DaskTaskRunner(
cluster_kwargs={“n_workers”: 16, “threads_per_worker”: 8}
)

More kwargs available for settings can be found here:
https://docs.dask.org/en/stable/deploying-python.html#reference

I’ve tested on my local machine, on the scheduler on the dask cluster and on another large, separate EC2 instance. In all cases, just the creation and submission of around 20k tasks takes nearly 10 minutes, while my cluster is capable of actually processing/executing them in far less time than that. It is a bottleneck that prevents true scale unless I can find a solution.

In all cases, I’m triggering a local run(as opposed to a deployment) and passing in the scheduler address of an existing dask cluster to the dask task runner.

You mention expanding the cluster to address the GIL, but if the creation and submission of the tasks is happening in the local environment, that shouldn’t impact it. I need a way to distribute the actual creation and submission of tasks from the local/executing environment.

I can fiddle with the scheduler or whatever environment I’m executing from, but if the code itself is not written in a distributed/multiprocessing fashion then the time complexity of creation/submission is still going to be linear. Is there a way to create/submit tasks to the cluster with sub-linear time complexity?

Dug into the call stack a bit and this seems like the bottleneck:

or really, the iterative calls to get_task_call_return_value.

I tried adding a process pool to benchmark the performance difference, but some of the context arguments aren’t picklable (or dillable), namely the SSLContext that I assume is part of the flow_run_context.

Does anyone know if this linear complexity is by design? And whether or not reducing it is at all on the roadmap? I appreciate any help.

The .submit for the DaskTask runner should be in a concurrent mode - that is when a task is submitted, a future is returned immediately, and background submission back to the task runner. Based on my understanding and discussing with engineering, this should be capable to scaling to thousands of concurrent submissions, so perhaps this has something more to do with specifically the nested maps in your flow / task?

What about a more simple example where you are either mapping or for loop iterating over a list of objects (which should return a list of futures)?

from prefect import flow, task
from prefect_dask import get_dask_client
from dask.distributed import Client, LocalCluster
from prefect_dask import DaskTaskRunner

@task
def my_decorated_task(num: int,):
    return True

@flow(name='my_flow', persist_result=True)
def my_decorated_flow():
    with get_dask_client() as client:
        numbers = list(range(20000))
        return my_decorated_task.map(numbers)


if __name__ == '__main__':
    with LocalCluster() as cluster:
        with Client(cluster) as client:
            result = my_decorated_flow.with_options(task_runner=DaskTaskRunner(client.scheduler.address))()

Here is something minimal. Using a local cluster, you can see here that it takes 10 minutes to submit a few thousand tasks. In total, all 20k tasks were not present in the prefect DB until 18 minutes later.

I ran it from an EC2 machine to rule out networking and local resources, drastically increased the size of the database server and the same with the API server to rule out those pieces of hardware.

The only reason the queued tasks isn’t the bottleneck in this case is that it is a local cluster with only 16 threads available for execution.

In an even more minimal example, task creation and submission seems to happen at the same rate without the Dask task runner. You can see here at 3 minutes in only around 3k tasks have made it to the db.

from prefect import flow, task

@task
def my_decorated_task(num: int):
    return True


@flow(name='my_flow', persist_result=True)
def my_decorated_flow():
    numbers = list(range(20000))
    return my_decorated_task.map(numbers)


if __name__ == '__main__':
    result = my_decorated_flow()

In the second use case, I can see how this would be fine and would rarely be a bottleneck, but in the first use case (distributed computing), it very quickly becomes the bottleneck to how many tasks can actually be run concurrently. I know tasks are currently being created asynchronously, but it seems to me as though they will need to be created in a distributed manner in order to keep from being a bottleneck in a use case like this.

It seems like my question at this point has morphed into: “Is there any established way to submit tasks to a flow run from another process/outside of the flow context”?

By the way, thanks so much for the support on this.

Unfortunately I don’t have a well educated answer on this , so I’ll have to do some research and check in with the team to help share some more insight.

To answer your question directly - I don’t believe there is a way to submit tasks in a truly distributed fashion like this, or outside of context unless being called with .fn() on the task.

1 Like