Error trying to run Dask job on AKS

Hello all,

Running Prefect 2.8.3 on AKS, I’m running into an issue when trying to run flows using DASK on the AKS cluster. I have the Dask Operator running and it will spin up the scheduler and worker nodes when the flow kicks off but I am receiving the following error in Prefect:

StackTrace:

Encountered exception during execution:
Traceback (most recent call last):
File “/usr/local/lib/python3.10/site-packages/prefect/engine.py”, line 654, in orchestrate_flow_run
result = await run_sync(flow_call)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py”, line 154, in run_sync_in_interruptible_worker_thread
async with anyio.create_task_group() as tg:
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 662, in aexit
raise exceptions[0]
File “/usr/local/lib/python3.10/site-packages/anyio/to_thread.py”, line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 937, in run_sync_in_worker_thread
return await future
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 867, in run
result = context.run(func, *args)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py”, line 135, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File “/opt/prefect/flows/adl_to_pg.py”, line 565, in fl_adl_to_pg
process_table.map(tables_ddl_info, unmapped(adl_file_system_client),
File “/usr/local/lib/python3.10/site-packages/prefect/tasks.py”, line 836, in map
return enter_task_run_engine(
File “/usr/local/lib/python3.10/site-packages/prefect/engine.py”, line 954, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py”, line 177, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File “/usr/local/lib/python3.10/site-packages/anyio/from_thread.py”, line 49, in run
return asynclib.run_async_from_thread(func, *args)
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 970, in run_async_from_thread
return f.result()
File “/usr/local/lib/python3.10/concurrent/futures/_base.py”, line 458, in result
return self.__get_result()
File “/usr/local/lib/python3.10/concurrent/futures/_base.py”, line 403, in __get_result
raise self._exception
File “/usr/local/lib/python3.10/site-packages/prefect/engine.py”, line 980, in begin_task_map
parameters = await resolve_inputs(parameters, max_depth=1)
File “/usr/local/lib/python3.10/site-packages/prefect/engine.py”, line 1781, in resolve_inputs
return await run_sync_in_worker_thread(
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py”, line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
File “/usr/local/lib/python3.10/site-packages/anyio/to_thread.py”, line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 937, in run_sync_in_worker_thread
return await future
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 867, in run
result = context.run(func, *args)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py”, line 331, in visit_collection
items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py”, line 331, in
items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py”, line 273, in visit_nested
return visit_collection(
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py”, line 291, in visit_collection
result = visit_expression(expr)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py”, line 285, in visit_expression
return visit_fn(expr, context)
File “/usr/local/lib/python3.10/site-packages/prefect/engine.py”, line 1779, in resolve_input
return state.result(raise_on_failure=False, fetch=True) if return_data else None
File “/usr/local/lib/python3.10/site-packages/prefect/client/schemas.py”, line 107, in result
return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
File “/usr/local/lib/python3.10/site-packages/prefect/states.py”, line 76, in get_state_result
return _get_state_result(state, raise_on_failure=raise_on_failure)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py”, line 226, in coroutine_wrapper
return run_async_from_worker_thread(async_fn, *args, **kwargs)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py”, line 177, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File “/usr/local/lib/python3.10/site-packages/anyio/from_thread.py”, line 49, in run
return asynclib.run_async_from_thread(func, *args)
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 970, in run_async_from_thread
return f.result()
File “/usr/local/lib/python3.10/concurrent/futures/_base.py”, line 458, in result
return self.__get_result()
File “/usr/local/lib/python3.10/concurrent/futures/_base.py”, line 403, in __get_result
raise self._exception
File “/usr/local/lib/python3.10/site-packages/prefect/states.py”, line 98, in _get_state_result
result = await state.data.get()
File “/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py”, line 47, in with_injected_client
return await fn(*args, **kwargs)
File “/usr/local/lib/python3.10/site-packages/prefect/results.py”, line 421, in get
blob = await self._read_blob(client=client)
File “/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py”, line 47, in with_injected_client
return await fn(*args, **kwargs)
File “/usr/local/lib/python3.10/site-packages/prefect/results.py”, line 433, in _read_blob
content = await storage_block.read_path(self.storage_key)
File “/usr/local/lib/python3.10/site-packages/prefect/filesystems.py”, line 202, in read_path
raise ValueError(f"Path {path} does not exist.")
ValueError: Path /root/.prefect/storage/703345b7aad44bd2a06d5df2067ceb74 does not exist.

Any help is greatly appreciated. Please let me know if I need to provide additional details for context. Thanks!

For anyone else running into a similar issue, the error appears to that results are being stored on local storage which is not accessible by all the pods. The fix is to define a remote results storage location.