Hello all,
Running Prefect 2.8.3 on AKS, I’m running into an issue when trying to run flows using DASK on the AKS cluster. I have the Dask Operator running and it will spin up the scheduler and worker nodes when the flow kicks off but I am receiving the following error in Prefect:
StackTrace:
Encountered exception during execution:
Traceback (most recent call last):
File “/usr/local/lib/python3.10/site-packages/prefect/engine.py”, line 654, in orchestrate_flow_run
result = await run_sync(flow_call)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py”, line 154, in run_sync_in_interruptible_worker_thread
async with anyio.create_task_group() as tg:
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 662, in aexit
raise exceptions[0]
File “/usr/local/lib/python3.10/site-packages/anyio/to_thread.py”, line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 937, in run_sync_in_worker_thread
return await future
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 867, in run
result = context.run(func, *args)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py”, line 135, in capture_worker_thread_and_result
result = __fn(*args, **kwargs)
File “/opt/prefect/flows/adl_to_pg.py”, line 565, in fl_adl_to_pg
process_table.map(tables_ddl_info, unmapped(adl_file_system_client),
File “/usr/local/lib/python3.10/site-packages/prefect/tasks.py”, line 836, in map
return enter_task_run_engine(
File “/usr/local/lib/python3.10/site-packages/prefect/engine.py”, line 954, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py”, line 177, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File “/usr/local/lib/python3.10/site-packages/anyio/from_thread.py”, line 49, in run
return asynclib.run_async_from_thread(func, *args)
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 970, in run_async_from_thread
return f.result()
File “/usr/local/lib/python3.10/concurrent/futures/_base.py”, line 458, in result
return self.__get_result()
File “/usr/local/lib/python3.10/concurrent/futures/_base.py”, line 403, in __get_result
raise self._exception
File “/usr/local/lib/python3.10/site-packages/prefect/engine.py”, line 980, in begin_task_map
parameters = await resolve_inputs(parameters, max_depth=1)
File “/usr/local/lib/python3.10/site-packages/prefect/engine.py”, line 1781, in resolve_inputs
return await run_sync_in_worker_thread(
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py”, line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
File “/usr/local/lib/python3.10/site-packages/anyio/to_thread.py”, line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 937, in run_sync_in_worker_thread
return await future
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 867, in run
result = context.run(func, *args)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py”, line 331, in visit_collection
items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py”, line 331, in
items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py”, line 273, in visit_nested
return visit_collection(
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py”, line 291, in visit_collection
result = visit_expression(expr)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/collections.py”, line 285, in visit_expression
return visit_fn(expr, context)
File “/usr/local/lib/python3.10/site-packages/prefect/engine.py”, line 1779, in resolve_input
return state.result(raise_on_failure=False, fetch=True) if return_data else None
File “/usr/local/lib/python3.10/site-packages/prefect/client/schemas.py”, line 107, in result
return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
File “/usr/local/lib/python3.10/site-packages/prefect/states.py”, line 76, in get_state_result
return _get_state_result(state, raise_on_failure=raise_on_failure)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py”, line 226, in coroutine_wrapper
return run_async_from_worker_thread(async_fn, *args, **kwargs)
File “/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py”, line 177, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File “/usr/local/lib/python3.10/site-packages/anyio/from_thread.py”, line 49, in run
return asynclib.run_async_from_thread(func, *args)
File “/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py”, line 970, in run_async_from_thread
return f.result()
File “/usr/local/lib/python3.10/concurrent/futures/_base.py”, line 458, in result
return self.__get_result()
File “/usr/local/lib/python3.10/concurrent/futures/_base.py”, line 403, in __get_result
raise self._exception
File “/usr/local/lib/python3.10/site-packages/prefect/states.py”, line 98, in _get_state_result
result = await state.data.get()
File “/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py”, line 47, in with_injected_client
return await fn(*args, **kwargs)
File “/usr/local/lib/python3.10/site-packages/prefect/results.py”, line 421, in get
blob = await self._read_blob(client=client)
File “/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py”, line 47, in with_injected_client
return await fn(*args, **kwargs)
File “/usr/local/lib/python3.10/site-packages/prefect/results.py”, line 433, in _read_blob
content = await storage_block.read_path(self.storage_key)
File “/usr/local/lib/python3.10/site-packages/prefect/filesystems.py”, line 202, in read_path
raise ValueError(f"Path {path} does not exist.")
ValueError: Path /root/.prefect/storage/703345b7aad44bd2a06d5df2067ceb74 does not exist.
Any help is greatly appreciated. Please let me know if I need to provide additional details for context. Thanks!