I have a workflow. I am hoping to run a very slow shell with 5 jobs (1, 2, 3, 4, 5) for example. I have 2 agents (A, B) running on two AWS EC2 instances.
I want both agents start to work at same time, and each agent finish one job first, and then take another job. So if x-axis is time, then it would be something like this:
A: 1 -> 3 -> 5
B: 2 -> 4
Experiment 1
However, this code starts 5 tasks at same time in one agent:
import asyncio
import json
from pathlib import Path
from prefect import flow
from prefect_shell import shell_run_command
from pydantic import BaseModel
class DataSource(BaseModel):
source: str
@flow
async def main(data_sources: list[DataSource]) -> None:
tasks = []
for data_source in data_sources:
source = data_source.source
task = asyncio.create_task(
shell_run_command(command=f"my_own_slow_command {source}")
)
tasks.append(task)
await asyncio.gather(*tasks)
if __name__ == "__main__":
params = json.loads(Path("params.json").read_text())
asyncio.run(
main(
data_sources=params["data_sources"],
)
)
Experiment 2
With same code, I also tried to set on this work queue’s `Flow Run Concurrency" to 2, but still does not help.
Any guide would be appreciate, thanks! 
Hi @Hongbo-Miao
Great question… there are a few levels at which you could achieve this. First of all, agents run flows, not tasks. A single agent can run many flows (which can then run many tasks), but in your case you want one agent to run one shell command since you’re using multiple instances. This means you’ll want multiple flows. One way to achieve this would be to:
- Rewrite your flow to accept a single
DataSource
- Create a deployment of your flow
- Invoke your new deployment with run_deployment from a parent flow
You’ll still want to set a concurrency limit, just keep in mind that the parent flow will also consume a concurrency slot, unless you put it on a different work queue. Here is a good example of what this pattern looks like in practice.
1 Like
Thanks @EmilRex ! If I understand correctly, in experiment 2, I set work queue’s `Flow Run Concurrency" to 2, I guess that is for flow-level concurrency limit. Am I right?
Experiment 3
Now I want to change my direction a little bit, I hope to set task-level concurrency limit by only using one Prefect agent running at one EC2 instance.
I tried to create a tag by
prefect concurrency-limit create small_instance 2
Then I deployed with this tag by
poetry run poe build ... --tag=small_instance
However, with same code, I still see 5 tasks started at same time. I was expecting task shell_run_command
only run 2 at a time because of my tag, but turns out it does not. Any idea? Thanks! 
Hmm… did you add the tag to the shell task? Also, you might want to use .submit()
. For example:
async def main(data_source: DataSource) -> None:
source = data_source.source
shell_run_command.with_options(tags=["small_instance"]).submit(
command=f"my_own_slow_command {source}"
)
shell_run_command.with_options(tags=["small_instance"])()
Using .submit()
submits the task to the task runner as an async function. Prefect handles the execution. I don’t think it’s strictly necessary, but it cleans up the code some. The important bit is passing the tag to the task.
1 Like
Thanks a lot @EmilRex ! 
Confirm this works, this is my final solution for setting task-level concurrency limit by only using one Prefect agent running at one EC2 instance, and two tasks at one time! 
import asyncio
import json
from pathlib import Path
from prefect import flow
from prefect_shell import shell_run_command
from pydantic import BaseModel
class DataSource(BaseModel):
source: str
@flow
async def main(data_sources: list[DataSource]) -> None:
tasks = []
for data_source in data_sources:
source = data_source.source
run = shell_run_command.with_options(tags=["small_instance"])
task = asyncio.create_task(
run(command=f"my_own_slow_command {source}")
)
tasks.append(task)
await asyncio.gather(*tasks)
if __name__ == "__main__":
params = json.loads(Path("params.json").read_text())
asyncio.run(
main(
data_sources=params["data_sources"],
)
)
Then later I just need use original build command:
prefect concurrency-limit create small_instance 2
# Note when build no need add ` --tag=small_instance` any more.
poetry run poe build ...
1 Like