How to run deployments with ECSTask infrastructure block at scale (running hundreds of serverless ECS-containerized flow runs concurrently)?

We often hear a use case where users would like to loop over some input and generate tens if not hundreds of flow runs running on AWS ECS Fargate.

Problem

You can’t register an infinite number of ECS task revisions for the same ECS task family. You’ll get an error such as:

02:31:48.532 | ERROR   | prefect.agent - Failed to submit flow run 'fcb99032-a6da-4f26-81fb-16a653fd0753' to infrastructure.
Traceback (most recent call last):
  File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect/agent.py", line 265, in _submit_run_and_capture_errors
    result = await infrastructure.run(task_status=task_status)
  File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect_aws/ecs.py", line 507, in run
    ) = await run_sync_in_worker_thread(
  File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect_aws/ecs.py", line 586, in _create_task_and_wait_for_start
    task_definition_arn = self._register_task_definition(
  File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect_aws/ecs.py", line 980, in _register_task_definition
    response = ecs_client.register_task_definition(**task_definition_request)
  File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/botocore/client.py", line 507, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/botocore/client.py", line 943, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.

Generally, registering new ECS task definitions is a subject of throttling from the AWS ECS API.

Possible solutions

Add latency in between iterations to avoid concurrency throttling from ECS

Add some time in between (either by increasing the scheduling interval or by adding time.sleep(30) in between API calls triggering a flow run from that deployment). This way, you can keep running hundreds of containerized flow runs and keep registering hundreds of task definition revisions without throttling issues that are otherwise applied if you perform such operations concurrently.

Flow:

import platform
import prefect
from prefect import task, flow, get_run_logger
import sys


@task
def log_platform_info():
    logger = get_run_logger()
    logger.info("Host's network name = %s", platform.node())
    logger.info("Python version = %s", platform.python_version())
    logger.info("Platform information (instance type) = %s ", platform.platform())
    logger.info("OS/Arch = %s/%s", sys.platform, platform.machine())
    logger.info("Prefect Version = %s 🚀", prefect.__version__)


@flow
def healthcheck():
    log_platform_info()


if __name__ == "__main__":
    healthcheck()

Create deployment (assuming setup similar to this repo dataflow-ops/healthcheck.py at main · anna-geller/dataflow-ops · GitHub):

prefect deployment build flows/healthcheck.py:healthcheck -n prod -q prod -a -sb github/default -ib ecs-task/default

Start an agent:

prefect agent start -q sss

Trigger 100 of runs:

from prefect.deployments import run_deployment
import time


for _ in range(1, 100):
    run_deployment(name="healthcheck/prod", timeout=0)
    time.sleep(30)

Register task definition once and reuse

Alternatively, you can register a task definition e.g. from your CI/CD environment and provide an explicit task definition ARN to your ECSTask infrastructure block. This way, concurrency is not an issue because no new task definition revisions are registered.

ecs = ECSTask(
    aws_credentials=aws_creds,
    cluster="prefect",
    task_definition_arn="arn:aws:ecs:us-east-1:111111111111111:task-definition/prefect",
)
ecs.save(DEFAULT_BLOCK, overwrite=True)

More:

Extra notes

We improved this here but you still need to be careful in running serverless containers at scale - following the above + using prefect-aws >= 0.1.8 should help.