Hi, I recently migrated my scraping flows from prefect 1.0 to prefect 2.0.
They consisted on thousand of tasks ranging from scraping requests (sometimes recursive depending on the number of resources to scrape) and database related tasks.
For the infrastructure, I am using a local prefect orion server with a postgresql database and a local dask cluster of 3 workers (reasons for which I went with a postgresql server as the sqlite server didn’t work with the workers concurrent queries). For this test, all are on the same machine.
It works fine up to the point where the tasks become longer to run (a few minutes). Then some will succeed and others will start failing in increasing numbers (according to the dask dashboard, prefect doesn’t say anything), the failures having nothing to do with the task code itself. Then the entire flow will crash with the following error (which finally updates the prefect orion logs):
14:04:58.455 | ERROR | Flow run 'peach-trout' - Finished in state Failed('93/211 states failed.')
14:04:58.597 | ERROR | Flow run 'elastic-caracara' - Encountered exception during execution:
Traceback (most recent call last):
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/smartan117/community-team/youtube/youtube-scraping/youtube_scraping/flows/scrape_channels_comments.py", line 160, in scrape_channels_comments
scrape_videos_threads(videos=batch, index=thread_index)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/flows.py", line 390, in __call__
return enter_flow_run_engine_from_flow_call(
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
return run_async_from_worker_thread(begin_run)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/concurrent/futures/_base.py", line 445, in result
return self.__get_result()
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
raise self._exception
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 482, in create_and_begin_subflow_run
return terminal_state.result()
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 159, in result
state.result()
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect_dask/task_runners.py", line 236, in wait
return await future.result(timeout=timeout)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/distributed/client.py", line 292, in _result
raise exc.with_traceback(tb)
File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 960, in begin_task_run
return task_run.state
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 670, in __aexit__
raise exc_details[1]
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 653, in __aexit__
cb_suppress = await cb(*exc_details)
File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/prefect/client.py", line 1956, in __aexit__
return await self._exit_stack.__aexit__(*exc_info)
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 670, in __aexit__
raise exc_details[1]
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 653, in __aexit__
cb_suppress = await cb(*exc_details)
File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpx/_client.py", line 1997, in __aexit__
await self._transport.__aexit__(exc_type, exc_value, traceback)
File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpx/_transports/default.py", line 332, in __aexit__
await self._pool.__aexit__(exc_type, exc_value, traceback)
File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 326, in __aexit__
await self.aclose()
File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 312, in aclose
raise RuntimeError(
RuntimeError: The connection pool was closed while 1 HTTP requests/responses were still in-flight.
14:04:58.622 | ERROR | Flow run 'elastic-caracara' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
File "/home/smartan117/community-team/youtube/youtube-scraping/test/test_scraping.py", line 6, in <module>
scrape_channels_comments(["UCUQo7nzH1sXVpzL92VesANw"])
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/flows.py", line 390, in __call__
return enter_flow_run_engine_from_flow_call(
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 228, in create_then_begin_flow_run
return state.result()
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/home/smartan117/community-team/youtube/youtube-scraping/youtube_scraping/flows/scrape_channels_comments.py", line 160, in scrape_channels_comments
scrape_videos_threads(videos=batch, index=thread_index)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/flows.py", line 390, in __call__
return enter_flow_run_engine_from_flow_call(
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 156, in enter_flow_run_engine_from_flow_call
return run_async_from_worker_thread(begin_run)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/concurrent/futures/_base.py", line 445, in result
return self.__get_result()
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/concurrent/futures/_base.py", line 390, in __get_result
raise self._exception
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 482, in create_and_begin_subflow_run
return terminal_state.result()
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 159, in result
state.result()
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/prefect_dask/task_runners.py", line 236, in wait
return await future.result(timeout=timeout)
File "/home/smartan117/.pyenv/versions/youtube-v2-test/lib/python3.9/site-packages/distributed/client.py", line 292, in _result
raise exc.with_traceback(tb)
File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/prefect/engine.py", line 960, in begin_task_run
return task_run.state
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 670, in __aexit__
raise exc_details[1]
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 653, in __aexit__
cb_suppress = await cb(*exc_details)
File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/prefect/client.py", line 1956, in __aexit__
return await self._exit_stack.__aexit__(*exc_info)
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 670, in __aexit__
raise exc_details[1]
File "/home/smartan117/.pyenv/versions/3.9.7/lib/python3.9/contextlib.py", line 653, in __aexit__
cb_suppress = await cb(*exc_details)
File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpx/_client.py", line 1997, in __aexit__
await self._transport.__aexit__(exc_type, exc_value, traceback)
File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpx/_transports/default.py", line 332, in __aexit__
await self._pool.__aexit__(exc_type, exc_value, traceback)
File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 326, in __aexit__
await self.aclose()
File "/home/smartan117/.pyenv/versions/3.9.7/envs/youtube-v2-test/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 312, in aclose
raise RuntimeError(
RuntimeError: The connection pool was closed while 1 HTTP requests/responses were still in-flight.
Here is a sample of errors from the logs on the dask workers (the second one is rather rare but the first one appeared everywhere):
2022-08-09 13:53:29,482 - distributed.worker - WARNING - Compute Failed Key: prepare_documents_elasticsearch-510511f2-12-9610f08ad2a0431bb43483d69a5c79f4-1 Function: begin_task_run args: () kwargs: {'task': <prefect.tasks.Task object at 0x7f2db245d160>, 'task_run': TaskRun(id=UUID('9610f08a-d2a0-431b-b434-83d69a5c79f4'), name='prepare_documents_elasticsearch-510511f2-12', flow_run_id=UUID('8f1324db-9633-4561-91d6-9bdc65feac08'), task_key='youtube_scraping.tasks.transform.prepare_documents_elasticsearch', dynamic_key='12', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0), tags=[], state_id=UUID('d8f82a6a-40fe-4f1a-ad34-eed29f73c70f'), task_inputs={'docs': [TaskRunResult(input_type='task_run', id=UUID('17d6892a-d1c4-4966-a598-0265315dc228'))]}, state_type=StateType.PENDING, state_name='Pending', run_count=0, expected_start_time=DateTime(2022, 8, 9, 11, 50, 45, 242705, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=5251), s Exception: "RuntimeError('The connection pool was closed while 1 HTTP requests/responses were still in-flight.')"
2022-08-09 13:57:59,852 - distributed.worker - WARNING - Compute Failed Key: get_comment_threads-9d100415-51-cf2e1fecd2794a7aa6a9b49c2ac4b31b-1 Function: begin_task_run args: () kwargs: {'task': <prefect.tasks.Task object at 0x7f2df5701280>, 'task_run': TaskRun(id=UUID('cf2e1fec-d279-4a7a-a6a9-b49c2ac4b31b'), name='get_comment_threads-9d100415-51', flow_run_id=UUID('8f1324db-9633-4561-91d6-9bdc65feac08'), task_key='youtube_scraping.tasks.youtube.get_comment_threads', dynamic_key='51', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=5, retry_delay_seconds=60.0), tags=[], state_id=UUID('d751e333-3daa-44a4-9248-37fb7e62f8b7'), task_inputs={'video_id': [], 'wait_for': [], 'max_results': []}, state_type=StateType.PENDING, state_name='Pending', run_count=0, expected_start_time=DateTime(2022, 8, 9, 11, 50, 47, 434863, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=None, end_time=None, total_run_time=datetime.timedelta(0), estimated_run_time=datetime.timedelta(0), estimated_start_time_delta=datetime.timedelta(microseconds=5641), state=Pending(message=None, type=PENDING, result=None)), 'parameters': { Exception: 'RuntimeError("Cannot orchestrate task run \'cf2e1fec-d279-4a7a-a6a9-b49c2ac4b31b\'. Failed to connect to API at http://127.0.0.1:4200/api/.")'
Here some samples of prefect orion logs (always those two types of errors everywhere in the logs):
Crash detected! Execution was interrupted by an unexpected exception.
02:04:50 PM
prepare_documents_elasticsearch-510511f2-12
Crash detected! Request to http://127.0.0.1:4200/api/task_runs/1cd57908-0184-436c-b440-d4f205a23632/set_state failed.
02:04:50 PM
get_comment_threads-9d100415-20
As it happens only when tasks become longer to run, I suspect it has something to do with this already reported issue Timeout error for long-running tasks running Prefect 2.0 in WSL
The flow was working perfectly in prefect 1.0, even when tasks waited for a day for quota resets (which didn’t happen on prefect 2.0 as it failed before reaching quota).