ClusterNotFoundException when using ECSAgent

I’m trying to set up Prefect using ECS and my first flow is giving a ClusterNotFoundException. I set up the ECSAgent and in the UI it looked green and ready to accept flows. On the AWS logs side of things I see this for the agent, but it doesn’t give me any clues as to what went wrong.

4/11/2022, 10:01:56 AM	Traceback (most recent call last):	prefect
4/11/2022, 10:01:56 AM	File "/usr/local/lib/python3.8/site-packages/prefect/agent/agent.py", line 388, in _deploy_flow_run	prefect
4/11/2022, 10:01:56 AM	deployment_info = self.deploy_flow(flow_run)	prefect
4/11/2022, 10:01:56 AM	File "/usr/local/lib/python3.8/site-packages/prefect/agent/ecs/agent.py", line 336, in deploy_flow	prefect
4/11/2022, 10:01:56 AM	resp = self.ecs_client.run_task(taskDefinition=taskdef_arn, **kwargs)	prefect
4/11/2022, 10:01:56 AM	File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 401, in _api_call	prefect
4/11/2022, 10:01:56 AM	return self._make_api_call(operation_name, kwargs)	prefect
4/11/2022, 10:01:56 AM	File "/usr/local/lib/python3.8/site-packages/botocore/client.py", line 731, in _make_api_call	prefect
4/11/2022, 10:01:56 AM	raise error_class(parsed_response, operation_name)	prefect
4/11/2022, 10:01:56 AM	botocore.errorfactory.ClusterNotFoundException: An error occurred (ClusterNotFoundException) when calling the RunTask operation: Cluster not found.	prefect

The code for my test flow is

import prefect
from prefect import task, Flow
from prefect.run_configs import ECSRun

@task
def hello_task():
    logger = prefect.context.get("logger")
    logger.info("Hello world!")

with Flow("hello-flow", run_config=ECSRun(labels=['test'])) as flow:
    hello_task()

Has anyone else encountered this or know what I might have done wrong?

1 Like

Hi @Connor_Skennerton, welcome to Prefect Discourse! Glad to have you here! :wave:

I think the problem might be that your ECSRun doesn’t know to which ECS cluster it’s supposed to deploy your flow. Can you try setting the ECS cluster name explicitly like so: run_task_kwargs=dict(cluster="yourECSClusterName")?

Here is usage in a Flow:

import prefect
from prefect import task, Flow
from prefect.run_configs import ECSRun

@task
def hello_task():
    logger = prefect.context.get("logger")
    logger.info("Hello world!")

with Flow("hello-flow", run_config=ECSRun(labels=['test'], 
run_task_kwargs=dict(cluster="yourECSClusterName"),
)) as flow:
    hello_task()

Thank you! Yes that worked. Is there a way to specify that globally rather than on the flow itself?

1 Like

Yes, there is. You would need to attach it to your agent via --run-task-kwargs. Example startup command:

prefect agent ecs start \
--name ECS-Local \
--cluster arn:aws:ecs:us-east-1:338306982838:cluster/PrefectECS \
--task-role-arn arn:aws:iam::338306982838:role/prefectTaskRole \
--execution-role-arn arn:aws:iam::338306982838:role/prefectECSAgentTaskExecutionRole \
--log-level DEBUG \
--label ecs-agent-local \
--run-task-kwargs agent_specs.yaml

And here is an example YAML file - agent_specs.yaml:

cluster: prefectEcsCluster
capacityProviderStrategy:
  - capacityProvider: FARGATE_SPOT
    weight: 1
    base: 0
networkConfiguration:
  awsvpcConfiguration:
    subnets: [subnet-5c44cb03,subnet-0f81d342,subnet-664c9a57]
    securityGroups: []
    assignPublicIp: ENABLED

You only need the first line in this YAML file, but I included more info as an example of configuration you can pass here.