Running numerous tasks on dask cluster with large data inputs/outputs causes timeout errors

This topic is a TLDR for this older one: https://discourse.prefect.io/t/long-running-tasks-on-prefect-dask-clusters/1352

I was running a lot of tasks (hundreds and up to thousands) that retrieves large volumes of data and pass them to other tasks on a dask cluster.
The execution seemed to freeze at this point, then I got multiple errors on the dask cluster like the followings (in the workers logs):

2022-08-09 13:53:29,482 - distributed.worker - WARNING - Compute Failed Key: prepare_documents_elasticsearch-510511f2-12-9610f08ad2a0431bb43483d69a5c79f4-1 Function: begin_task_run args: () kwargs: {'task': <prefect.tasks.Task object at 0x7f2db245d160>, 'task_run': TaskRun(id=UUID('9610f08a-d2a0-431b-b434-83d69a5c79f4'), name='prepare_documents_elasticsearch-510511f2-12', flow_run_id=UUID('8f1324db-9633-4561-91d6-9bdc65feac08'), task_key='youtube_scraping.tasks.transform.prepare_documents_elasticsearch', dynamic_key='12', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0), tags=[], state_id=UUID('d8f82a6a-40fe-4f1a-ad34-eed29f73c70f'), task_inputs={'docs': [TaskRunResult(input_type='task_run', id=UUID('17d6892a-d1c4-4966-a598-0265315dc228'))]}, state_type=StateType.PENDING, state_name='Pending', run_count=0, expected_start_time=DateTime(2022, 8, 9, 11, 50, 45, 242705, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=5251), s Exception: "RuntimeError('The connection pool was closed while 1 HTTP requests/responses were still in-flight.')"

2022-08-09 13:57:59,852 - distributed.worker - WARNING - Compute Failed Key: get_comment_threads-9d100415-51-cf2e1fecd2794a7aa6a9b49c2ac4b31b-1 Function: begin_task_run args: () kwargs: {'task': <prefect.tasks.Task object at 0x7f2df5701280>, 'task_run': TaskRun(id=UUID('cf2e1fec-d279-4a7a-a6a9-b49c2ac4b31b'), name='get_comment_threads-9d100415-51', flow_run_id=UUID('8f1324db-9633-4561-91d6-9bdc65feac08'), task_key='youtube_scraping.tasks.youtube.get_comment_threads', dynamic_key='51', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=5, retry_delay_seconds=60.0), tags=[], state_id=UUID('d751e333-3daa-44a4-9248-37fb7e62f8b7'), task_inputs={'video_id': [], 'wait_for': [], 'max_results': []}, state_type=StateType.PENDING, state_name='Pending', run_count=0, expected_start_time=DateTime(2022, 8, 9, 11, 50, 47, 434863, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=5641), state=Pending(message=None, type=PENDING, result=None)), 'parameters': { Exception: 'RuntimeError("Cannot orchestrate task run \'cf2e1fec-d279-4a7a-a6a9-b49c2ac4b31b\'. Failed to connect to API at http://127.0.0.1:4200/api/.")'

I overlooked important warnings in the dask workers logs saying that they were using a lot of computing time managing memory (exchanging data in my case).

Refactoring my code so as to completely eliminate those large data inputs/outputs did the trick for me, and now everything is running smoothly.

So for short, don’t pass large volumes of data between tasks running on a dask cluster (at least for now)!

(Sorry for responding to my own topic, but I couldn’t mark it as solved without a response)

2 Likes

Thanks so much for the update! It’s absolutely fine to respond to your own topics if you have found a solution. This is helpful for others who may stumble across the same problem.

I also want to share with the community the Dask Best Practices docs! It’s concise and well written Dask Best Practices — Dask documentation

1 Like