We often hear a use case where users would like to loop over some input and generate tens if not hundreds of flow runs running on AWS ECS Fargate.
Problem
You can’t register an infinite number of ECS task revisions for the same ECS task family. You’ll get an error such as:
02:31:48.532 | ERROR | prefect.agent - Failed to submit flow run 'fcb99032-a6da-4f26-81fb-16a653fd0753' to infrastructure.
Traceback (most recent call last):
File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect/agent.py", line 265, in _submit_run_and_capture_errors
result = await infrastructure.run(task_status=task_status)
File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect_aws/ecs.py", line 507, in run
) = await run_sync_in_worker_thread(
File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect_aws/ecs.py", line 586, in _create_task_and_wait_for_start
task_definition_arn = self._register_task_definition(
File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect_aws/ecs.py", line 980, in _register_task_definition
response = ecs_client.register_task_definition(**task_definition_request)
File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/botocore/client.py", line 507, in _api_call
return self._make_api_call(operation_name, kwargs)
File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/botocore/client.py", line 943, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
Generally, registering new ECS task definitions is a subject of throttling from the AWS ECS API.
Possible solutions
Add latency in between iterations to avoid concurrency throttling from ECS
Add some time in between (either by increasing the scheduling interval or by adding time.sleep(30)
in between API calls triggering a flow run from that deployment). This way, you can keep running hundreds of containerized flow runs and keep registering hundreds of task definition revisions without throttling issues that are otherwise applied if you perform such operations concurrently.
Flow:
import platform
import prefect
from prefect import task, flow, get_run_logger
import sys
@task
def log_platform_info():
logger = get_run_logger()
logger.info("Host's network name = %s", platform.node())
logger.info("Python version = %s", platform.python_version())
logger.info("Platform information (instance type) = %s ", platform.platform())
logger.info("OS/Arch = %s/%s", sys.platform, platform.machine())
logger.info("Prefect Version = %s 🚀", prefect.__version__)
@flow
def healthcheck():
log_platform_info()
if __name__ == "__main__":
healthcheck()
Create deployment (assuming setup similar to this repo dataflow-ops/healthcheck.py at main · anna-geller/dataflow-ops · GitHub):
prefect deployment build flows/healthcheck.py:healthcheck -n prod -q prod -a -sb github/default -ib ecs-task/default
Start an agent:
prefect agent start -q sss
Trigger 100 of runs:
from prefect.deployments import run_deployment
import time
for _ in range(1, 100):
run_deployment(name="healthcheck/prod", timeout=0)
time.sleep(30)
Register task definition once and reuse
Alternatively, you can register a task definition e.g. from your CI/CD environment and provide an explicit task definition ARN to your ECSTask infrastructure block. This way, concurrency is not an issue because no new task definition revisions are registered.
ecs = ECSTask(
aws_credentials=aws_creds,
cluster="prefect",
task_definition_arn="arn:aws:ecs:us-east-1:111111111111111:task-definition/prefect",
)
ecs.save(DEFAULT_BLOCK, overwrite=True)
More:
Extra notes
We improved this here but you still need to be careful in running serverless containers at scale - following the above + using prefect-aws >= 0.1.8 should help.