Prefect 2.0 UI doesn't seem to show all task runs running on Dask in real-time

I have several pending tasks which could be executed. The dask dashboard shows 8 workers. However according to Orion UI only 2 to 4 tasks are running at one time. Why is it not picking up the pending tasks?

1 Like

Could you share more details so that I could reproduce? If you can share example flow and a DeploymentSpec you used and the output of prefect version then I will check and open an issue if this is a bug. Thanks in advance!

Test script below. I see the issue. Here we have 20 tasks. Two workers have 7 each. The other 6 workers have 1 each. Hence as soon as those 6 process their tasks there is no work left and only two workers have to do all the work.

from time import sleep

from prefect import flow, task
from prefect.task_runners import DaskTaskRunner

@task
def test1():
    sleep(240)


@flow(
    task_runner=DaskTaskRunner(cluster_kwargs=dict(n_workers=8)),
)
def testflow():
    for x in range(20):
        test1()


if __name__ == "__main__":
    testflow()

On looking again that is not it. This does run 8 tasks at once and seems to reallocate the tasks later. I will try and do a more complex chain example that replicates my actual application but simpler.

1 Like

OK here is example where the tasks are chained. There 50 test1 tasks from the start so there should be continuous work for 8 workers except at the very end. In practice it seems to run only 4 tasks simultaneously.


from time import sleep

from prefect import flow, task
from prefect.task_runners import DaskTaskRunner

delay = 2

@task
def test1(x):
    sleep(delay)
    return x+1
@task
def test2(x):
    sleep(delay)
    return x+2
@task
def test3(x):
    sleep(delay)
    return x+3
@flow(
    task_runner=DaskTaskRunner(cluster_kwargs=dict(n_workers=8)),
)
def testflow():
    res = []
    for y in range(50):
        x1 = test1(y)
        x2 = test2(x1)
        x3 = test3(x2)
        res.append(x3)
    return res
1 Like

Possibly this is a UI problem because dask seems to be processing all the tasks and using all the workers. The dask console sometimes shows each worker “processing” 6 tasks though not sure if that means running or just in the task queue for the worker.

I ran this flow and:

  1. I definitely see all worker slots being used and tasks being executed in parallel
  2. Not sure if you really have to specify anything on the cluster_kwargs side since the defaults are usually pretty good.

Yes I get similar result so it is actually the Orion UI that is incorrect. It only shows a maximum of 4 tasks running at once.

Could you elaborate on this? :point_down: do you mean Dask UI?

When I don’t specify any kwargs:

from time import sleep

from prefect import flow, task, get_run_logger
from prefect.task_runners import DaskTaskRunner

delay = 2


@task
def test1(x):
    sleep(delay)
    logger = get_run_logger()
    logger.info(x + 1)
    return x + 1


@task
def test2(x):
    sleep(delay)
    logger = get_run_logger()
    logger.info(x + 2)
    return x + 2


@task
def test3(x):
    sleep(delay)
    logger = get_run_logger()
    logger.info(x + 3)
    return x + 3


@flow(task_runner=DaskTaskRunner())
def allocate_tasks_to_dask_workers():
    res = []
    for y in range(50):
        x1 = test1(y)
        x2 = test2(x1)
        x3 = test3(x2)
        res.append(x3)
    logger = get_run_logger()
    logger.info(res)
    return res


if __name__ == "__main__":
    allocate_tasks_to_dask_workers()

Which seems to make sense.

Let me know if you believe this is some sort of UI bug and I could create a Github issue for this - alternatively, you could open the issue yourself - you could likely explain it better. So far, it looks correct to me. By default, there are 4 tasks within this for loop and 4 task runs are running simultaneously in each iteration, both in Dask and Orion UI:

If you want to create an issue:

Default number of workers = number of physical CPUs = 4. However I have 8 virtual CPUs. This is slightly faster on the above example and probably will be generally. Note when you ran the example you only used 4 CPUs and in that case dask/orion are the same. When you have 8 CPUs then dask shows 8 tasks in parallel whereas orion shows 4.

I will raise an issue on github.

I could be wrong about this but I believe even if you specify 8 workers, a single for loop iteration in this flow executes 4 task runs - that’s why you will never see more than 4 task runs executed in parallel regardless of nr of workers. Does it make sense?

maybe just the Dask UI is confusing? :smile:

A single for loop submits 4 tasks but 50*4 tasks are submitted to the queue. So there should be 200 tasks and 50 that could run in parallel.

I see it as 50 for loop iterations, each executing 4 task runs in parallel

There is no “wait” anywhere so it submits 50 sets of 4 sequential tasks. In theory that could be 50 parallel flows though with 8 workers it is 8 in parallel. Here is the dask dashboard below.

I see what you mean. Mapping is on the roadmap and probably with mapping, this will be easier to configure.

Could be. I have it working in prefect 1 using mapping and I recall it showed 8 tasks.

I think here it is doing correct task submit but the Orion UI is not functioning properly.

1 Like

I asked the engineer who developed DaskTaskRunner about this and here’s the response:

  • test3 and test2 cannot run until the previous iteration finishes each time
  • task run inputs are resolved before submission to dask right now, that’s why you’ll only see ~4 running at a time
  • in Prefect 1.0, task run inputs are resolved by dask.

This is on the roadmap

Maybe latest version already does it as there are clearly 8 running at a time in dask dashboard as you can see in the image. It is also faster with 8 workers versus 4.