Managing priority within a queue

View in #prefect-community on Slack

Hi, I have a question regarding queue priorities. I have a requirement that needs flow runs with “high-priority” to jump all other items in the deployment queue - basically switching from FIFO to LIFO.

I saw the documentation on the topic but I don’t think I would be able to support such use case. My understanding is that I would need two “low-priority” and “high-priority” queues, but if there is any item in the high-priority, no “low-priority” flow run should start, and I don’t think it can be done easily.

Are priorities within the same queue something that would be supported in the future, or other ways to work it around? Thanks!

1 Like

Here is how you could approach it:

prefect work-queue create prio1 --limit 50
prefect work-queue create prio2 --limit 5
prefect agent start -q prio1 -q prio2

In this example, the agent can simultaneously poll for work from both of those queues, but given that work queue prio1 is considered more important, it has a higher concurrency setting than the prio2 work queue.

Additionally, to “force” the agent to pick up runs from prio1 queue, you could temporarily pause the prio2 queue using:

prefect work-queue pause prio2

You could pause/resume this work queue also from the UI.

Once those prio1 runs are processed, you could resume prio2 queue.

That raises the question, what happens to flows posted in queue QueueE when a worker has been started with: prefect agent start -q QueueA -q QueueB -q QueueC -q QueueD -q QueueE and by some bad streak of luck queues QueueA - QueueD “always” end up having items Pending. Will the worker ever execute any flows from QueueE?

Also pausing the prio2 queue raises the question of how and when to unpause it. Since both prio1 and prio2 are queues meant for the same flow, this pausing/unpausing does not take into consideration the concurrency limits that might exist in these queues (and therefore flows,) since there is no way to define a concurrency limit across two queues… :frowning:

Is there another “trick” available…? :slight_smile:

there is no worker, there is just a single agent polling work from all these queues

Yes, QueueE would also be processed because you defined it that way.

Up to you, whenever you feel this is appropriate based on your workload

why not? could you explain?

why do you feel this would be needed? the same queue can poll for work from multiple deployments

Those are not tricks, it’s just that queues are very composable and you can set it up in a super flexible way based on your needs, but they are not meant as a replacement for Kubernetes, they are not resource-aware - they don’t check if there’s enough memory to spin up a flow, they are a simple way of delegating work to the agent processes that should pick up that work to respect your decision where you want your work to be executed (on-prem server, EC2 instance, serverless container, K8s job, etc)

Hi Anna, thank you for your reply and the solution you are proposing. In my use case, prio1 and prio2 would serve deployments associated with the same flow. The requirement is that only one flow at a time should be running, and also that if there are flow runs in prio1, they will always be picked up before any other in prio2.

I understand that this could be achieved by pausing/unpausing both prio2 or prio1, but I need to automate this process based on requests coming from a user towards some API I’m building, and I may have to implement some pause/unpause logic that goes along these lines:

  • when a user submits a high-priority run, prio2 is paused (if it wasn’t already), and the flow run is queued in prio1
  • when a user submits a low-priority run and prio1 is empty, prio1 is paused (if it wasn’t already) and the flow run is queued in prio2
  • every minute or so, a process checks if the last submitted flow run has been completed.
    If prio1 has any item, prio1 is unpaused, else prio2 is unpaused

I think this should work, although I hoped there was an easier way to just let a flow run “jump” the queue. Thank you again for your suggestion, I hope this is useful for somebody running into the same requirement :slight_smile:

Thanks for adding this extra information. I’m curious, could you provide more background on where do you run those things? I’d like to understand why such prioritization of runs is necessary – is everything running on a single VM and you try to this way optimize the usage of this single VM? perhaps switching to something like a serverless container such as with ECSTask or CloudRun infra blocks could solve the root problem without adding this work queue complexity?

Flow runs are executed on a VM, using a Process infrastructure, using a SequentialTaskRunner.
Certain tasks need to connect to a Dask.distributed cluster to run some heavy scientific computations, and we do it with a simple TCP connection to the Dask scheduler. We do not use the DaskTaskRunner as, among other reasons, we wanted to avoid spawning tasks from other tasks, as we got some nasty deadlocks in the past, even using with worker_client().

The Dask cluster is “fixed” - for the definition of fixed provided here.
This falls into your first guess, we are trying to control usage of the Dask cluster by keeping flow runs running one after the other, and hopefully re-ordering those that are queued based on the priority thing we are discussing.
Unfortunately, I work in a private cloud and cannot take advantage of any AWS/GCP/Azure facilities.
Thanks again for your help!

1 Like