Long running tasks on prefect-dask clusters

Hi, I recently migrated my scraping flows from prefect 1.0 to prefect 2.0.
They consisted on thousand of tasks ranging from scraping requests (sometimes recursive depending on the number of resources to scrape) and database related tasks.
For the infrastructure, I am using a local prefect orion server with a postgresql database and a local dask cluster of 3 workers (reasons for which I went with a postgresql server as the sqlite server didn’t work with the workers concurrent queries). For this test, all are on the same machine.
It works fine up to the point where the tasks become longer to run (a few minutes). Then some will succeed and others will start failing in increasing numbers (according to the dask dashboard, prefect doesn’t say anything), the failures having nothing to do with the task code itself. Then the entire flow will crash with the following error (which finally updates the prefect orion logs):

14:04:58.455 | ERROR   | Flow run 'peach-trout' - Finished in state Failed('93/211 states failed.')
14:04:58.597 | ERROR   | Flow run 'elastic-caracara' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/smartan117/community-team/youtube/youtube-scraping/youtube_scraping/flows/scrape_channels_comments.py", line 160, in scrape_channels_comments
    scrape_videos_threads(videos=batch, index=thread_index)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/concurrent/futures/_base.py", line 445, in result
    return self.__get_result()
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 482, in create_and_begin_subflow_run
    return terminal_state.result()
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 159, in result
    state.result()
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect_dask/task_runners.py", line 236, in wait
    return await future.result(timeout=timeout)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/distributed/client.py", line 292, in _result
    raise exc.with_traceback(tb)
  File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 960, in begin_task_run
    return task_run.state
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 670, in __aexit__
    raise exc_details[1]
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 653, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/prefect/client.py", line 1956, in __aexit__
    return await self._exit_stack.__aexit__(*exc_info)
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 670, in __aexit__
    raise exc_details[1]
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 653, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpx/_client.py", line 1997, in __aexit__
    await self._transport.__aexit__(exc_type, exc_value, traceback)
  File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpx/_transports/default.py", line 332, in __aexit__
    await self._pool.__aexit__(exc_type, exc_value, traceback)
  File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 326, in __aexit__
    await self.aclose()
  File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 312, in aclose
    raise RuntimeError(
RuntimeError: The connection pool was closed while 1 HTTP requests/responses were still in-flight.
14:04:58.622 | ERROR   | Flow run 'elastic-caracara' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
  File "/home/smartan117/community-team/youtube/youtube-scraping/test/test_scraping.py", line 6, in <module>
    scrape_channels_comments(["UCUQo7nzH1sXVpzL92VesANw"])
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 228, in create_then_begin_flow_run
    return state.result()
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/home/smartan117/community-team/youtube/youtube-scraping/youtube_scraping/flows/scrape_channels_comments.py", line 160, in scrape_channels_comments
    scrape_videos_threads(videos=batch, index=thread_index)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
    return run_async_from_worker_thread(begin_run)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/concurrent/futures/_base.py", line 445, in result
    return self.__get_result()
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
    raise self._exception
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 482, in create_and_begin_subflow_run
    return terminal_state.result()
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 159, in result
    state.result()
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect_dask/task_runners.py", line 236, in wait
    return await future.result(timeout=timeout)
  File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/distributed/client.py", line 292, in _result
    raise exc.with_traceback(tb)
  File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 960, in begin_task_run
    return task_run.state
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 670, in __aexit__
    raise exc_details[1]
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 653, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/prefect/client.py", line 1956, in __aexit__
    return await self._exit_stack.__aexit__(*exc_info)
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 670, in __aexit__
    raise exc_details[1]
  File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 653, in __aexit__
    cb_suppress = await cb(*exc_details)
  File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpx/_client.py", line 1997, in __aexit__
    await self._transport.__aexit__(exc_type, exc_value, traceback)
  File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpx/_transports/default.py", line 332, in __aexit__
    await self._pool.__aexit__(exc_type, exc_value, traceback)
  File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 326, in __aexit__
    await self.aclose()
  File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 312, in aclose
    raise RuntimeError(
RuntimeError: The connection pool was closed while 1 HTTP requests/responses were still in-flight.

Here is a sample of errors from the logs on the dask workers (the second one is rather rare but the first one appeared everywhere):

2022-08-09 13:53:29,482 - distributed.worker - WARNING - Compute Failed Key: prepare_documents_elasticsearch-510511f2-12-9610f08ad2a0431bb43483d69a5c79f4-1 Function: begin_task_run args: () kwargs: {'task': <prefect.tasks.Task object at 0x7f2db245d160>, 'task_run': TaskRun(id=UUID('9610f08a-d2a0-431b-b434-83d69a5c79f4'), name='prepare_documents_elasticsearch-510511f2-12', flow_run_id=UUID('8f1324db-9633-4561-91d6-9bdc65feac08'), task_key='youtube_scraping.tasks.transform.prepare_documents_elasticsearch', dynamic_key='12', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0), tags=[], state_id=UUID('d8f82a6a-40fe-4f1a-ad34-eed29f73c70f'), task_inputs={'docs': [TaskRunResult(input_type='task_run', id=UUID('17d6892a-d1c4-4966-a598-0265315dc228'))]}, state_type=StateType.PENDING, state_name='Pending', run_count=0, expected_start_time=DateTime(2022, 8, 9, 11, 50, 45, 242705, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=5251), s Exception: "RuntimeError('The connection pool was closed while 1 HTTP requests/responses were still in-flight.')"

2022-08-09 13:57:59,852 - distributed.worker - WARNING - Compute Failed Key: get_comment_threads-9d100415-51-cf2e1fecd2794a7aa6a9b49c2ac4b31b-1 Function: begin_task_run args: () kwargs: {'task': <prefect.tasks.Task object at 0x7f2df5701280>, 'task_run': TaskRun(id=UUID('cf2e1fec-d279-4a7a-a6a9-b49c2ac4b31b'), name='get_comment_threads-9d100415-51', flow_run_id=UUID('8f1324db-9633-4561-91d6-9bdc65feac08'), task_key='youtube_scraping.tasks.youtube.get_comment_threads', dynamic_key='51', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=5, retry_delay_seconds=60.0), tags=[], state_id=UUID('d751e333-3daa-44a4-9248-37fb7e62f8b7'), task_inputs={'video_id': [], 'wait_for': [], 'max_results': []}, state_type=StateType.PENDING, state_name='Pending', run_count=0, expected_start_time=DateTime(2022, 8, 9, 11, 50, 47, 434863, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=5641), state=Pending(message=None, type=PENDING, result=None)), 'parameters': { Exception: 'RuntimeError("Cannot orchestrate task run \'cf2e1fec-d279-4a7a-a6a9-b49c2ac4b31b\'. Failed to connect to API at http://127.0.0.1:4200/api/.")'

Here some samples of prefect orion logs (always those two types of errors everywhere in the logs):

Crash detected! Execution was interrupted by an unexpected exception.
02:04:50 PM
prepare_documents_elasticsearch-510511f2-12

Crash detected! Request to http://127.0.0.1:4200/api/task_runs/1cd57908-0184-436c-b440-d4f205a23632/set_state failed.
02:04:50 PM
get_comment_threads-9d100415-20

As it happens only when tasks become longer to run, I suspect it has something to do with this already reported issue Timeout error for long-running tasks running Prefect 2.0 in WSL

The flow was working perfectly in prefect 1.0, even when tasks waited for a day for quota resets (which didn’t happen on prefect 2.0 as it failed before reaching quota).

1 Like

Are you running it all on WSL? This could be the issue indeed.

You could increase the timeout for the Postgres queries as shown here:

This issue may provide some additional pointers:

I am not running it on WSL, all is local.

I tested to run long processing dummy tasks and it worked without any fuss, so the problem is not long running tasks.

I may have found a clue, as the dask workers seem to issue a lot of warnings about taking a lot of CPU time for memory transfer/optimization. I figured that perhaps the problem was in the important volume of data exchanged between the tasks (which increases sharply at this step).
So I rewrote all my tasks to make them self-contained (no output), which correspond to bundling my scraping http request, the data transformation and its save on the database I’m using in one “big” task. This did the trick.
Now my flow is way faster and can pass this step easily and beyond.
The cherry on top, it solves my problem of filling the hard drive with task results, and freed a lot of RAM on the dask workers.

2 Likes

Wow, nice work, really good progress!

Do you want me to rename the topic, or change the tags? Or create a new topic with the TLDR of this one?
As the problems was about important data volume exchanged between tasks on dask cluster and not task duration.

1 Like

Go for it!