Work queue concurrency limit and deleted flow runs

Hi,

First some background. Our Prefect setup consists of Prefect Cloud and one agent that runs on Kubernetes. The agent pulls runs from a queue called agent_1. The queue has a concurrency limit of 4. Without the concurrency limit, the agent pulls all the work there is (we have quite many runs) which leads to out-of-memory issues. Sometimes the agent process is restarted (eg. when Kubernetes updates the Pod). If a flow run was running on the agent when it was restarted, it ends up in a corrupted state. Orion thinks it is running on the agent, and the restarted agent knows nothing about that run, so it never updates the state → the flow run remains Running forever. The same happens also with the Pending state. As soon as we have 4 of such “ghost” runs, the concurrency limit becomes exceeded and the agent does not pick up new runs any longer. To deal with that, we implemented a scheduled job that checks for flow runs that remained in Running or Pending state for too long and deletes them.

The real problem is here:
Due to a bug on our side, sometimes we delete runs that are being executed by the agent, which leads to this sequence

  1. Agent pulls a run, state=Running
  2. Scheduled job deletes the run
  3. Agent finishes the execution and tries to set the state=Completed. But because the run has been deleted, it fails with ObjectNotFound error.

After it happens 4 times, the agent stops receiving new runs (supposedly due to the queue concurrency limit). Restarting the agent does not help. The only thing that helps is increasing the concurrency limit: 4 → 8. And the next time it happens, we have to increase it again. It looks like the concurrency limit state is cached somewhere in Prefect Cloud.
This is just my guess. Could you please give your opinion on whether this is a plausible explanation and what could be done to fix it.
Thank you!