We often hear a use case where users would like to loop over some input and generate tens if not hundreds of flow runs running on AWS ECS Fargate.
You can’t register an infinite number of ECS task revisions for the same ECS task family. You’ll get an error such as:
02:31:48.532 | ERROR | prefect.agent - Failed to submit flow run 'fcb99032-a6da-4f26-81fb-16a653fd0753' to infrastructure. Traceback (most recent call last): File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect/agent.py", line 265, in _submit_run_and_capture_errors result = await infrastructure.run(task_status=task_status) File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect_aws/ecs.py", line 507, in run ) = await run_sync_in_worker_thread( File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread return await anyio.to_thread.run_sync(call, cancellable=True) File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync return await get_asynclib().run_sync_in_worker_thread( File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread return await future File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run result = context.run(func, *args) File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect_aws/ecs.py", line 586, in _create_task_and_wait_for_start task_definition_arn = self._register_task_definition( File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/prefect_aws/ecs.py", line 980, in _register_task_definition response = ecs_client.register_task_definition(**task_definition_request) File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/botocore/client.py", line 507, in _api_call return self._make_api_call(operation_name, kwargs) File "/opt/miniconda3/envs/dataplatform/lib/python3.10/site-packages/botocore/client.py", line 943, in _make_api_call raise error_class(parsed_response, operation_name) botocore.errorfactory.ClientException: An error occurred (ClientException) when calling the RegisterTaskDefinition operation: Too many concurrent attempts to create a new revision of the specified family.
Generally, registering new ECS task definitions is a subject of throttling from the AWS ECS API.
Add some time in between (either by increasing the scheduling interval or by adding
time.sleep(30) in between API calls triggering a flow run from that deployment). This way, you can keep running hundreds of containerized flow runs and keep registering hundreds of task definition revisions without throttling issues that are otherwise applied if you perform such operations concurrently.
import platform import prefect from prefect import task, flow, get_run_logger import sys @task def log_platform_info(): logger = get_run_logger() logger.info("Host's network name = %s", platform.node()) logger.info("Python version = %s", platform.python_version()) logger.info("Platform information (instance type) = %s ", platform.platform()) logger.info("OS/Arch = %s/%s", sys.platform, platform.machine()) logger.info("Prefect Version = %s 🚀", prefect.__version__) @flow def healthcheck(): log_platform_info() if __name__ == "__main__": healthcheck()
Create deployment (assuming setup similar to this repo dataflow-ops/healthcheck.py at main · anna-geller/dataflow-ops · GitHub):
prefect deployment build flows/healthcheck.py:healthcheck -n sss -q sss -a -sb github/default -ib ecs-task/default
Start an agent:
prefect agent start -q sss
Trigger 100 of runs:
from prefect.deployments import run_deployment import time for _ in range(1, 100): run_deployment(name="healthcheck/sss", timeout=0) time.sleep(30)
sss for serverless scalable something
Alternatively, you can register a task definition e.g. from your CI/CD environment and provide an explicit task definition ARN to your ECSTask infrastructure block. This way, concurrency is not an issue because no new task definition revisions are registered.
ecs = ECSTask( aws_credentials=aws_creds, cluster="prefect", task_definition_arn="arn:aws:ecs:us-east-1:111111111111111:task-definition/prefect", ) ecs.save(DEFAULT_BLOCK, overwrite=True)
We improved this here but you still need to be careful in running serverless containers at scale - following the above + using prefect-aws >= 0.1.8 should help.