How can I impose concurrency limits for flow and task runs?

Prefect 2.0

If you want to limit the number of flow runs executing simultaneously, you can leverage concurrency limits.

Global task run concurrency limits

Global task run concurrency limits are based on tags that you can set within your flow.

You can set concurrency limits on as few or as many tags as you wish. The easiest way to create, list, and remove concurrency limits is by using Prefect CLI concurrency-limit commands.

prefect concurrency-limit [command] [arguments]
Command Description
create Create a concurrency limit by specifying a tag and limit.
delete Delete the concurrency limit set on the specified tag.
ls View all defined concurrency limits.
read View details about a concurrency limit. active_slots shows a list of IDs for task runs that are currently using a concurrency slot.

For example, to set a concurrency limit of 10 on the ‘small_instance’ tag:

prefect concurrency-limit create small_instance 10

To delete the concurrency limit on the ‘small_instance’ tag:

prefect concurrency-limit delete small_instance

More on setting task run concurrency limits on this docs page.

For more on tags:

Details on the initial concurrency-limit release:

Work-queue flow run concurrency limits

Flow run concurrency limits can be set on a work queue. To set a limit of max. 5 flow runs for a specific work queue, use the following command:

prefect work-queue set-concurrency-limit QUEUE_ID 5

Related threads

Prefect 1.0

Prefect Cloud allows setting flow run limits based on flow run labels. You can assign a concurrency limit to a specific label — more on that, in the documentation.

To set task run limits, you can leverage task tags:

from prefect import task

@task(tags=["database", "aws"])
def my_task():