I have several pending tasks which could be executed. The dask dashboard shows 8 workers. However according to Orion UI only 2 to 4 tasks are running at one time. Why is it not picking up the pending tasks?
Could you share more details so that I could reproduce? If you can share example flow and a DeploymentSpec you used and the output of prefect version
then I will check and open an issue if this is a bug. Thanks in advance!
Test script below. I see the issue. Here we have 20 tasks. Two workers have 7 each. The other 6 workers have 1 each. Hence as soon as those 6 process their tasks there is no work left and only two workers have to do all the work.
from time import sleep
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
@task
def test1():
sleep(240)
@flow(
task_runner=DaskTaskRunner(cluster_kwargs=dict(n_workers=8)),
)
def testflow():
for x in range(20):
test1()
if __name__ == "__main__":
testflow()
On looking again that is not it. This does run 8 tasks at once and seems to reallocate the tasks later. I will try and do a more complex chain example that replicates my actual application but simpler.
OK here is example where the tasks are chained. There 50 test1 tasks from the start so there should be continuous work for 8 workers except at the very end. In practice it seems to run only 4 tasks simultaneously.
from time import sleep
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
delay = 2
@task
def test1(x):
sleep(delay)
return x+1
@task
def test2(x):
sleep(delay)
return x+2
@task
def test3(x):
sleep(delay)
return x+3
@flow(
task_runner=DaskTaskRunner(cluster_kwargs=dict(n_workers=8)),
)
def testflow():
res = []
for y in range(50):
x1 = test1(y)
x2 = test2(x1)
x3 = test3(x2)
res.append(x3)
return res
Possibly this is a UI problem because dask seems to be processing all the tasks and using all the workers. The dask console sometimes shows each worker “processing” 6 tasks though not sure if that means running or just in the task queue for the worker.
I ran this flow and:
- I definitely see all worker slots being used and tasks being executed in parallel
- Not sure if you really have to specify anything on the
cluster_kwargs
side since the defaults are usually pretty good.
Yes I get similar result so it is actually the Orion UI that is incorrect. It only shows a maximum of 4 tasks running at once.
Could you elaborate on this? do you mean Dask UI?
When I don’t specify any kwargs:
from time import sleep
from prefect import flow, task, get_run_logger
from prefect.task_runners import DaskTaskRunner
delay = 2
@task
def test1(x):
sleep(delay)
logger = get_run_logger()
logger.info(x + 1)
return x + 1
@task
def test2(x):
sleep(delay)
logger = get_run_logger()
logger.info(x + 2)
return x + 2
@task
def test3(x):
sleep(delay)
logger = get_run_logger()
logger.info(x + 3)
return x + 3
@flow(task_runner=DaskTaskRunner())
def allocate_tasks_to_dask_workers():
res = []
for y in range(50):
x1 = test1(y)
x2 = test2(x1)
x3 = test3(x2)
res.append(x3)
logger = get_run_logger()
logger.info(res)
return res
if __name__ == "__main__":
allocate_tasks_to_dask_workers()
Which seems to make sense.
Let me know if you believe this is some sort of UI bug and I could create a Github issue for this - alternatively, you could open the issue yourself - you could likely explain it better. So far, it looks correct to me. By default, there are 4 tasks within this for loop and 4 task runs are running simultaneously in each iteration, both in Dask and Orion UI:
If you want to create an issue:
Default number of workers = number of physical CPUs = 4. However I have 8 virtual CPUs. This is slightly faster on the above example and probably will be generally. Note when you ran the example you only used 4 CPUs and in that case dask/orion are the same. When you have 8 CPUs then dask shows 8 tasks in parallel whereas orion shows 4.
I will raise an issue on github.
I could be wrong about this but I believe even if you specify 8 workers, a single for loop iteration in this flow executes 4 task runs - that’s why you will never see more than 4 task runs executed in parallel regardless of nr of workers. Does it make sense?
maybe just the Dask UI is confusing?
A single for loop submits 4 tasks but 50*4 tasks are submitted to the queue. So there should be 200 tasks and 50 that could run in parallel.
I see it as 50 for loop iterations, each executing 4 task runs in parallel
There is no “wait” anywhere so it submits 50 sets of 4 sequential tasks. In theory that could be 50 parallel flows though with 8 workers it is 8 in parallel. Here is the dask dashboard below.
I see what you mean. Mapping is on the roadmap and probably with mapping, this will be easier to configure.
Could be. I have it working in prefect 1 using mapping and I recall it showed 8 tasks.
I think here it is doing correct task submit but the Orion UI is not functioning properly.
I asked the engineer who developed DaskTaskRunner
about this and here’s the response:
- test3 and test2 cannot run until the previous iteration finishes each time
- task run inputs are resolved before submission to dask right now, that’s why you’ll only see ~4 running at a time
- in Prefect 1.0, task run inputs are resolved by dask.
This is on the roadmap
Maybe latest version already does it as there are clearly 8 running at a time in dask dashboard as you can see in the image. It is also faster with 8 workers versus 4.