Hello, I’m blocked with the problem from topic.
When I try to run multiple flows (~30) in the same moment, some flows are failed with this error.
It should not be the problem of Dask cluster, because on the same cluster I can run ~ 50 flows with prefect1 run without any problem!
The problem is, that it’s a random error the flow which had this problem previous run, won’t have it on the next one.
I add a stack trace:
Stack trace
Encountered exception during execution:
Traceback (most recent call last):
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 637, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/app/flows_2/sub_flows/features_execution.py", line 263, in features_execution_flow
features_results = [state.result() for state in states_features]
File "/app/flows_2/sub_flows/features_execution.py", line 263, in <listcomp>
features_results = [state.result() for state in states_features]
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/schemas.py", line 107, in result
return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/states.py", line 73, in get_state_result
return _get_state_result(state, raise_on_failure=raise_on_failure)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 226, in coroutine_wrapper
return run_async_from_worker_thread(async_fn, *args, **kwargs)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/states.py", line 88, in _get_state_result
raise await get_state_exception(state)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect_dask/task_runners.py", line 269, in wait
return await future.result(timeout=timeout)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/distributed/client.py", line 289, in _result
raise exc.with_traceback(tb)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1307, in begin_task_run
state = await orchestrate_task_run(
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1382, in orchestrate_task_run
resolved_parameters = await resolve_inputs(parameters)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1672, in resolve_inputs
return await run_sync_in_worker_thread(
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 285, in visit_collection
items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 285, in <listcomp>
items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
return visit_collection(
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 280, in visit_collection
items = [visit_nested(o) for o in expr]
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 280, in <listcomp>
items = [visit_nested(o) for o in expr]
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
return visit_collection(
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 259, in visit_collection
result = visit_fn(expr)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1670, in resolve_input
return state.result(raise_on_failure=False, fetch=True) if return_data else None
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/schemas.py", line 107, in result
return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/states.py", line 73, in get_state_result
return _get_state_result(state, raise_on_failure=raise_on_failure)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 226, in coroutine_wrapper
return run_async_from_worker_thread(async_fn, *args, **kwargs)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
return self.__get_result()
File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/states.py", line 100, in _get_state_result
raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
I see on GitHub that problem exists is more than one year: MissingResult State data is missing. · Issue #67 · PrefectHQ/prefect-dask · GitHub
Any suggestions?