How to let two agents to take shell jobs at same time and each time each agent only take one job?

I have a workflow. I am hoping to run a very slow shell with 5 jobs (1, 2, 3, 4, 5) for example. I have 2 agents (A, B) running on two AWS EC2 instances.
I want both agents start to work at same time, and each agent finish one job first, and then take another job. So if x-axis is time, then it would be something like this:

A: 1 -> 3 -> 5
B: 2 -> 4

Experiment 1

However, this code starts 5 tasks at same time in one agent:

import asyncio
import json
from pathlib import Path

from prefect import flow
from prefect_shell import shell_run_command
from pydantic import BaseModel


class DataSource(BaseModel):
    source: str


@flow
async def main(data_sources: list[DataSource]) -> None:
    tasks = []
    for data_source in data_sources:
        source = data_source.source
        task = asyncio.create_task(
            shell_run_command(command=f"my_own_slow_command {source}")
        )
        tasks.append(task)
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    params = json.loads(Path("params.json").read_text())
    asyncio.run(
        main(
            data_sources=params["data_sources"],
        )
    )

Experiment 2

With same code, I also tried to set on this work queue’s `Flow Run Concurrency" to 2, but still does not help.

Any guide would be appreciate, thanks! :smiley:

Hi @Hongbo-Miao :wave: Great question… there are a few levels at which you could achieve this. First of all, agents run flows, not tasks. A single agent can run many flows (which can then run many tasks), but in your case you want one agent to run one shell command since you’re using multiple instances. This means you’ll want multiple flows. One way to achieve this would be to:

  1. Rewrite your flow to accept a single DataSource
  2. Create a deployment of your flow
  3. Invoke your new deployment with run_deployment from a parent flow

You’ll still want to set a concurrency limit, just keep in mind that the parent flow will also consume a concurrency slot, unless you put it on a different work queue. Here is a good example of what this pattern looks like in practice.

1 Like

Thanks @EmilRex ! If I understand correctly, in experiment 2, I set work queue’s `Flow Run Concurrency" to 2, I guess that is for flow-level concurrency limit. Am I right?

Experiment 3

Now I want to change my direction a little bit, I hope to set task-level concurrency limit by only using one Prefect agent running at one EC2 instance.

I tried to create a tag by

prefect concurrency-limit create small_instance 2

Then I deployed with this tag by

poetry run poe build ... --tag=small_instance

However, with same code, I still see 5 tasks started at same time. I was expecting task shell_run_command only run 2 at a time because of my tag, but turns out it does not. Any idea? Thanks! :smiley:

Hmm… did you add the tag to the shell task? Also, you might want to use .submit(). For example:

async def main(data_source: DataSource) -> None:
    source = data_source.source
    shell_run_command.with_options(tags=["small_instance"]).submit(
        command=f"my_own_slow_command {source}"
    )

shell_run_command.with_options(tags=["small_instance"])()

Using .submit() submits the task to the task runner as an async function. Prefect handles the execution. I don’t think it’s strictly necessary, but it cleans up the code some. The important bit is passing the tag to the task.

1 Like

Thanks a lot @EmilRex ! :smiley:

Confirm this works, this is my final solution for setting task-level concurrency limit by only using one Prefect agent running at one EC2 instance, and two tasks at one time! :tada:

import asyncio
import json
from pathlib import Path

from prefect import flow
from prefect_shell import shell_run_command
from pydantic import BaseModel


class DataSource(BaseModel):
    source: str


@flow
async def main(data_sources: list[DataSource]) -> None:
    tasks = []
    for data_source in data_sources:
        source = data_source.source
        run = shell_run_command.with_options(tags=["small_instance"])
        task = asyncio.create_task(
            run(command=f"my_own_slow_command {source}")
        )
        tasks.append(task)
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    params = json.loads(Path("params.json").read_text())
    asyncio.run(
        main(
            data_sources=params["data_sources"],
        )
    )

Then later I just need use original build command:

prefect concurrency-limit create small_instance 2

# Note when build no need add ` --tag=small_instance` any more.
poetry run poe build ...
1 Like