MissingResult: State data is missing. (DaskTaskRunner)

Hello, I’m blocked with the problem from topic.
When I try to run multiple flows (~30) in the same moment, some flows are failed with this error.

It should not be the problem of Dask cluster, because on the same cluster I can run ~ 50 flows with prefect1 run without any problem!

The problem is, that it’s a random error the flow which had this problem previous run, won’t have it on the next one.

I add a stack trace:

Stack trace
Encountered exception during execution:
Traceback (most recent call last):
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 637, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/app/flows_2/sub_flows/features_execution.py", line 263, in features_execution_flow
    features_results = [state.result() for state in states_features]
  File "/app/flows_2/sub_flows/features_execution.py", line 263, in <listcomp>
    features_results = [state.result() for state in states_features]
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/schemas.py", line 107, in result
    return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/states.py", line 73, in get_state_result
    return _get_state_result(state, raise_on_failure=raise_on_failure)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 226, in coroutine_wrapper
    return run_async_from_worker_thread(async_fn, *args, **kwargs)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/states.py", line 88, in _get_state_result
    raise await get_state_exception(state)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect_dask/task_runners.py", line 269, in wait
    return await future.result(timeout=timeout)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/distributed/client.py", line 289, in _result
    raise exc.with_traceback(tb)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1307, in begin_task_run
    state = await orchestrate_task_run(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1382, in orchestrate_task_run
    resolved_parameters = await resolve_inputs(parameters)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1672, in resolve_inputs
    return await run_sync_in_worker_thread(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 285, in visit_collection
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 285, in <listcomp>
    items = [(visit_nested(k), visit_nested(v)) for k, v in expr.items()]
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
    return visit_collection(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 280, in visit_collection
    items = [visit_nested(o) for o in expr]
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 280, in <listcomp>
    items = [visit_nested(o) for o in expr]
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 251, in visit_nested
    return visit_collection(
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/collections.py", line 259, in visit_collection
    result = visit_fn(expr)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/engine.py", line 1670, in resolve_input
    return state.result(raise_on_failure=False, fetch=True) if return_data else None
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/client/schemas.py", line 107, in result
    return get_state_result(self, raise_on_failure=raise_on_failure, fetch=fetch)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/states.py", line 73, in get_state_result
    return _get_state_result(state, raise_on_failure=raise_on_failure)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 226, in coroutine_wrapper
    return run_async_from_worker_thread(async_fn, *args, **kwargs)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/opt/pysetup/.venv/lib/python3.8/site-packages/prefect/states.py", line 100, in _get_state_result
    raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.

I see on GitHub that problem exists is more than one year: MissingResult State data is missing. · Issue #67 · PrefectHQ/prefect-dask · GitHub

Any suggestions?

1 Like

I’ve seen this same issue in our environment, and it seems to be related to Prefect and not my flows. It happens randomly and on different flows at different times. I’d put in a support ticket.

Hello @Seth_Coussens , you can share link to issue here!

Did you find any workaround to bypass this issue?

Same problem… Happens randomly on any kind of flow. Am on WSL if it helps

@Material-Scientist
Temporary solution, which we use:
k8s jobs infrastructure and local RayTaskRunner.
I don’t know why, but RayTaskRunner, don’t crash and works stable.

It seems the problem is related to the Prefect API being unable to deliver the proper state at the time of the request (needs to be “running”, but was “pending”, possibly due to synchronization issues - i.e. the enclosing flow already started, but this state was not updated on communication via API).

Haven’t explored the potential fix of enabling result persistence.