Skip to content

Commit

Permalink
fix(aws-ecs): drain hook lambda allows tasks to stop gracefully (#13559)
Browse files Browse the repository at this point in the history
fixes #13506

### Description 

After the container instance is set to draining, the tasks running on it transition from RUNNING > DEACTIVATING > STOPPING > DEPROVISIONING > STOPPED.

The current way of counting running tasks via `instance['runningTasksCount'] + instance['pendingTasksCount']` does not include tasks in those transitional states, leading to the EC2 instance being terminated prematurely.


### Verification 

I have verified the change by manually updating the automatically created drain hook lambda and then running a ASG refresh.

I ran the test with additional debug output to compare the old logic of `runningTasksCount + pendingTasksCount` and the new logic that fetches the status of the tasks.

I interleaved the logs from the ECS events, application running in the task and the drain hook lambda:

```
2021-03-11T15:56:52.608-08:00	Instance i-1234567890abcdefg has container instance ARN arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv
2021-03-11T15:56:52.649-08:00	Instance ARN arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has task ARNs arn:aws:ecs:us-west-2:123456789012:task/fooservice/1234567890abcdefghijklmnopqrstuv
2021-03-11T15:57:03.018-08:00	OLD: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 1 tasks
2021-03-11T15:57:03.051-08:00	NEW: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 1 tasks
2021-03-11T15:57:13.215-08:00	OLD: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 1 tasks
2021-03-11T15:57:13.280-08:00	NEW: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 1 tasks
2021-03-11T15:57:15.280-08:00 	service fooservice has stopped 1 running tasks: task 1234567890abcdefghijklmnopqrstuv.
2021-03-11T15:57:23.438-08:00	OLD: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 0 tasks
2021-03-11T15:57:23.490-08:00	NEW: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 1 tasks
2021-03-11T15:57:33.632-08:00	OLD: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 0 tasks
2021-03-11T15:57:33.690-08:00	NEW: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 1 tasks
2021-03-11T15:57:43.853-08:00	OLD: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 0 tasks
2021-03-11T15:57:43.890-08:00	NEW: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 1 tasks
2021-03-11T15:57:46.000-08:00 	service fooservice has started 1 tasks: task 1234567890abcdefghijklmnopqrstuv.
2021-03-11T15:57:46.000-08:00 	(service fooservice, taskSet ecs-svc/1234567890abcdefghi) has begun draining connections on 2 tasks.
2021-03-11T15:57:46.000-08:00 	service fooservice deregistered 1 targets in target-group fooservice-vpce-target
2021-03-11T15:57:46.000-08:00 	service fooservice deregistered 1 targets in target-group fooservice-target
2021-03-11T15:57:54.032-08:00	OLD: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 0 tasks
2021-03-11T15:57:54.090-08:00	NEW: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 1 tasks
2021-03-11T15:57:58.000-08:00 	service fooservice registered 1 targets in target-group fooservice-vpce-target
2021-03-11T15:57:58.000-08:00 	service fooservice registered 1 targets in target-group fooservice-target
2021-03-11T15:58:04.242-08:00	OLD: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 0 tasks
2021-03-11T15:58:04.270-08:00	NEW: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 1 tasks
2021-03-11T15:58:14.430-08:00	OLD: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 0 tasks
2021-03-11T15:58:14.470-08:00	NEW: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 1 tasks
2021-03-11T15:58:24.611-08:00	OLD: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 0 tasks
2021-03-11T15:58:24.650-08:00	NEW: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 1 tasks
2021-03-11T15:58:34.796-08:00	OLD: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 0 tasks
2021-03-11T15:58:34.850-08:00	NEW: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 1 tasks
2021-03-11T15:58:44.999-08:00	OLD: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 0 tasks
2021-03-11T15:58:45.030-08:00	NEW: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 1 tasks
2021-03-11T15:58:49.000-08:00 	app received SIGTERM
2021-03-11T15:58:54.000-08:00 	service fooservice has reached a steady state.
2021-03-11T15:58:55.170-08:00	OLD: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 0 tasks
2021-03-11T15:58:55.210-08:00	NEW: Instance arn:aws:ecs:us-west-2:123456789012:container-instance/fooservice/1234567890abcdefghijklmnopqrstuv has 0 tasks
2021-03-11T15:58:55.210-08:00	Terminating instance i-1234567890abcdefg
```

The logs show that the new approach allows ecs to drain connections, deregister the target and respect the `deregistrationDelay` ( set to 1 minute in this case ). 

The old approach would have terminated the EC2 instance 23 seconds prior to ECS even deregistering the target, leading to 502 errors.

### Pull Request Checklist
- [x]  Testing 
    I was not able to find any tests validating the functionality of the lambda. However, I have updated `expected.json` files to expect the new lambda function code.
- [ ] Docs - *Not Applicable*
    No previously documented behavior has changed
- [x] Title and Description 
- [ ] Sensitive Modules (requires 2 PR approvers) - *Not Applicable*

### Impact
End users utilizing ECS on EC2 with capacity provided by an ASG will see an increase in instance termination time, however the process is now much safer, respects the ALBs `deregistrationDelay` and will reduce connection errors.

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
Bio2hazard authored Mar 20, 2021
1 parent 62a91b7 commit 3e1148e
Show file tree
Hide file tree
Showing 17 changed files with 44 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "import boto3, json, os, time\n\necs = boto3.client('ecs')\nautoscaling = boto3.client('autoscaling')\n\n\ndef lambda_handler(event, context):\n print(json.dumps(event))\n cluster = os.environ['CLUSTER']\n snsTopicArn = event['Records'][0]['Sns']['TopicArn']\n lifecycle_event = json.loads(event['Records'][0]['Sns']['Message'])\n instance_id = lifecycle_event.get('EC2InstanceId')\n if not instance_id:\n print('Got event without EC2InstanceId: %s', json.dumps(event))\n return\n\n instance_arn = container_instance_arn(cluster, instance_id)\n print('Instance %s has container instance ARN %s' % (lifecycle_event['EC2InstanceId'], instance_arn))\n\n if not instance_arn:\n return\n\n while has_tasks(cluster, instance_arn):\n time.sleep(10)\n\n try:\n print('Terminating instance %s' % instance_id)\n autoscaling.complete_lifecycle_action(\n LifecycleActionResult='CONTINUE',\n **pick(lifecycle_event, 'LifecycleHookName', 'LifecycleActionToken', 'AutoScalingGroupName'))\n except Exception as e:\n # Lifecycle action may have already completed.\n print(str(e))\n\n\ndef container_instance_arn(cluster, instance_id):\n \"\"\"Turn an instance ID into a container instance ARN.\"\"\"\n arns = ecs.list_container_instances(cluster=cluster, filter='ec2InstanceId==' + instance_id)['containerInstanceArns']\n if not arns:\n return None\n return arns[0]\n\n\ndef has_tasks(cluster, instance_arn):\n \"\"\"Return True if the instance is running tasks for the given cluster.\"\"\"\n instances = ecs.describe_container_instances(cluster=cluster, containerInstances=[instance_arn])['containerInstances']\n if not instances:\n return False\n instance = instances[0]\n\n if instance['status'] == 'ACTIVE':\n # Start draining, then try again later\n set_container_instance_to_draining(cluster, instance_arn)\n return True\n\n tasks = instance['runningTasksCount'] + instance['pendingTasksCount']\n print('Instance %s has %s tasks' % (instance_arn, tasks))\n\n return tasks > 0\n\n\ndef set_container_instance_to_draining(cluster, instance_arn):\n ecs.update_container_instances_state(\n cluster=cluster,\n containerInstances=[instance_arn], status='DRAINING')\n\n\ndef pick(dct, *keys):\n \"\"\"Pick a subset of a dict.\"\"\"\n return {k: v for k, v in dct.items() if k in keys}\n"
"ZipFile": "import boto3, json, os, time\n\necs = boto3.client('ecs')\nautoscaling = boto3.client('autoscaling')\n\n\ndef lambda_handler(event, context):\n print(json.dumps(event))\n cluster = os.environ['CLUSTER']\n snsTopicArn = event['Records'][0]['Sns']['TopicArn']\n lifecycle_event = json.loads(event['Records'][0]['Sns']['Message'])\n instance_id = lifecycle_event.get('EC2InstanceId')\n if not instance_id:\n print('Got event without EC2InstanceId: %s', json.dumps(event))\n return\n\n instance_arn = container_instance_arn(cluster, instance_id)\n print('Instance %s has container instance ARN %s' % (lifecycle_event['EC2InstanceId'], instance_arn))\n\n if not instance_arn:\n return\n\n task_arns = container_instance_task_arns(cluster, instance_arn)\n \n if task_arns:\n print('Instance ARN %s has task ARNs %s' % (instance_arn, ', '.join(task_arns)))\n\n while has_tasks(cluster, instance_arn, task_arns):\n time.sleep(10)\n\n try:\n print('Terminating instance %s' % instance_id)\n autoscaling.complete_lifecycle_action(\n LifecycleActionResult='CONTINUE',\n **pick(lifecycle_event, 'LifecycleHookName', 'LifecycleActionToken', 'AutoScalingGroupName'))\n except Exception as e:\n # Lifecycle action may have already completed.\n print(str(e))\n\n\ndef container_instance_arn(cluster, instance_id):\n \"\"\"Turn an instance ID into a container instance ARN.\"\"\"\n arns = ecs.list_container_instances(cluster=cluster, filter='ec2InstanceId==' + instance_id)['containerInstanceArns']\n if not arns:\n return None\n return arns[0]\n\ndef container_instance_task_arns(cluster, instance_arn):\n \"\"\"Fetch tasks for a container instance ARN.\"\"\"\n arns = ecs.list_tasks(cluster=cluster, containerInstance=instance_arn)['taskArns']\n return arns\n\ndef has_tasks(cluster, instance_arn, task_arns):\n \"\"\"Return True if the instance is running tasks for the given cluster.\"\"\"\n instances = ecs.describe_container_instances(cluster=cluster, containerInstances=[instance_arn])['containerInstances']\n if not instances:\n return False\n instance = instances[0]\n\n if instance['status'] == 'ACTIVE':\n # Start draining, then try again later\n set_container_instance_to_draining(cluster, instance_arn)\n return True\n\n task_count = None\n\n if task_arns:\n # Fetch details for tasks running on the container instance\n tasks = ecs.describe_tasks(cluster=cluster, tasks=task_arns)['tasks']\n if tasks:\n # Consider any non-stopped tasks as running\n task_count = sum(task['lastStatus'] != 'STOPPED' for task in tasks) + instance['pendingTasksCount']\n \n if not task_count:\n # Fallback to instance task counts if detailed task information is unavailable\n task_count = instance['runningTasksCount'] + instance['pendingTasksCount']\n \n print('Instance %s has %s tasks' % (instance_arn, task_count))\n\n return task_count > 0\n\ndef set_container_instance_to_draining(cluster, instance_arn):\n ecs.update_container_instances_state(\n cluster=cluster,\n containerInstances=[instance_arn], status='DRAINING')\n\n\ndef pick(dct, *keys):\n \"\"\"Pick a subset of a dict.\"\"\"\n return {k: v for k, v in dct.items() if k in keys}\n"
},
"Role": {
"Fn::GetAtt": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "import boto3, json, os, time\n\necs = boto3.client('ecs')\nautoscaling = boto3.client('autoscaling')\n\n\ndef lambda_handler(event, context):\n print(json.dumps(event))\n cluster = os.environ['CLUSTER']\n snsTopicArn = event['Records'][0]['Sns']['TopicArn']\n lifecycle_event = json.loads(event['Records'][0]['Sns']['Message'])\n instance_id = lifecycle_event.get('EC2InstanceId')\n if not instance_id:\n print('Got event without EC2InstanceId: %s', json.dumps(event))\n return\n\n instance_arn = container_instance_arn(cluster, instance_id)\n print('Instance %s has container instance ARN %s' % (lifecycle_event['EC2InstanceId'], instance_arn))\n\n if not instance_arn:\n return\n\n while has_tasks(cluster, instance_arn):\n time.sleep(10)\n\n try:\n print('Terminating instance %s' % instance_id)\n autoscaling.complete_lifecycle_action(\n LifecycleActionResult='CONTINUE',\n **pick(lifecycle_event, 'LifecycleHookName', 'LifecycleActionToken', 'AutoScalingGroupName'))\n except Exception as e:\n # Lifecycle action may have already completed.\n print(str(e))\n\n\ndef container_instance_arn(cluster, instance_id):\n \"\"\"Turn an instance ID into a container instance ARN.\"\"\"\n arns = ecs.list_container_instances(cluster=cluster, filter='ec2InstanceId==' + instance_id)['containerInstanceArns']\n if not arns:\n return None\n return arns[0]\n\n\ndef has_tasks(cluster, instance_arn):\n \"\"\"Return True if the instance is running tasks for the given cluster.\"\"\"\n instances = ecs.describe_container_instances(cluster=cluster, containerInstances=[instance_arn])['containerInstances']\n if not instances:\n return False\n instance = instances[0]\n\n if instance['status'] == 'ACTIVE':\n # Start draining, then try again later\n set_container_instance_to_draining(cluster, instance_arn)\n return True\n\n tasks = instance['runningTasksCount'] + instance['pendingTasksCount']\n print('Instance %s has %s tasks' % (instance_arn, tasks))\n\n return tasks > 0\n\n\ndef set_container_instance_to_draining(cluster, instance_arn):\n ecs.update_container_instances_state(\n cluster=cluster,\n containerInstances=[instance_arn], status='DRAINING')\n\n\ndef pick(dct, *keys):\n \"\"\"Pick a subset of a dict.\"\"\"\n return {k: v for k, v in dct.items() if k in keys}\n"
"ZipFile": "import boto3, json, os, time\n\necs = boto3.client('ecs')\nautoscaling = boto3.client('autoscaling')\n\n\ndef lambda_handler(event, context):\n print(json.dumps(event))\n cluster = os.environ['CLUSTER']\n snsTopicArn = event['Records'][0]['Sns']['TopicArn']\n lifecycle_event = json.loads(event['Records'][0]['Sns']['Message'])\n instance_id = lifecycle_event.get('EC2InstanceId')\n if not instance_id:\n print('Got event without EC2InstanceId: %s', json.dumps(event))\n return\n\n instance_arn = container_instance_arn(cluster, instance_id)\n print('Instance %s has container instance ARN %s' % (lifecycle_event['EC2InstanceId'], instance_arn))\n\n if not instance_arn:\n return\n\n task_arns = container_instance_task_arns(cluster, instance_arn)\n \n if task_arns:\n print('Instance ARN %s has task ARNs %s' % (instance_arn, ', '.join(task_arns)))\n\n while has_tasks(cluster, instance_arn, task_arns):\n time.sleep(10)\n\n try:\n print('Terminating instance %s' % instance_id)\n autoscaling.complete_lifecycle_action(\n LifecycleActionResult='CONTINUE',\n **pick(lifecycle_event, 'LifecycleHookName', 'LifecycleActionToken', 'AutoScalingGroupName'))\n except Exception as e:\n # Lifecycle action may have already completed.\n print(str(e))\n\n\ndef container_instance_arn(cluster, instance_id):\n \"\"\"Turn an instance ID into a container instance ARN.\"\"\"\n arns = ecs.list_container_instances(cluster=cluster, filter='ec2InstanceId==' + instance_id)['containerInstanceArns']\n if not arns:\n return None\n return arns[0]\n\ndef container_instance_task_arns(cluster, instance_arn):\n \"\"\"Fetch tasks for a container instance ARN.\"\"\"\n arns = ecs.list_tasks(cluster=cluster, containerInstance=instance_arn)['taskArns']\n return arns\n\ndef has_tasks(cluster, instance_arn, task_arns):\n \"\"\"Return True if the instance is running tasks for the given cluster.\"\"\"\n instances = ecs.describe_container_instances(cluster=cluster, containerInstances=[instance_arn])['containerInstances']\n if not instances:\n return False\n instance = instances[0]\n\n if instance['status'] == 'ACTIVE':\n # Start draining, then try again later\n set_container_instance_to_draining(cluster, instance_arn)\n return True\n\n task_count = None\n\n if task_arns:\n # Fetch details for tasks running on the container instance\n tasks = ecs.describe_tasks(cluster=cluster, tasks=task_arns)['tasks']\n if tasks:\n # Consider any non-stopped tasks as running\n task_count = sum(task['lastStatus'] != 'STOPPED' for task in tasks) + instance['pendingTasksCount']\n \n if not task_count:\n # Fallback to instance task counts if detailed task information is unavailable\n task_count = instance['runningTasksCount'] + instance['pendingTasksCount']\n \n print('Instance %s has %s tasks' % (instance_arn, task_count))\n\n return task_count > 0\n\ndef set_container_instance_to_draining(cluster, instance_arn):\n ecs.update_container_instances_state(\n cluster=cluster,\n containerInstances=[instance_arn], status='DRAINING')\n\n\ndef pick(dct, *keys):\n \"\"\"Pick a subset of a dict.\"\"\"\n return {k: v for k, v in dct.items() if k in keys}\n"
},
"Role": {
"Fn::GetAtt": [
Expand Down
34 changes: 27 additions & 7 deletions packages/@aws-cdk/aws-ecs/lib/drain-hook/lambda-source/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ def lambda_handler(event, context):
if not instance_arn:
return

while has_tasks(cluster, instance_arn):
task_arns = container_instance_task_arns(cluster, instance_arn)

if task_arns:
print('Instance ARN %s has task ARNs %s' % (instance_arn, ', '.join(task_arns)))

while has_tasks(cluster, instance_arn, task_arns):
time.sleep(10)

try:
Expand All @@ -40,8 +45,12 @@ def container_instance_arn(cluster, instance_id):
return None
return arns[0]

def container_instance_task_arns(cluster, instance_arn):
"""Fetch tasks for a container instance ARN."""
arns = ecs.list_tasks(cluster=cluster, containerInstance=instance_arn)['taskArns']
return arns

def has_tasks(cluster, instance_arn):
def has_tasks(cluster, instance_arn, task_arns):
"""Return True if the instance is running tasks for the given cluster."""
instances = ecs.describe_container_instances(cluster=cluster, containerInstances=[instance_arn])['containerInstances']
if not instances:
Expand All @@ -53,11 +62,22 @@ def has_tasks(cluster, instance_arn):
set_container_instance_to_draining(cluster, instance_arn)
return True

tasks = instance['runningTasksCount'] + instance['pendingTasksCount']
print('Instance %s has %s tasks' % (instance_arn, tasks))

return tasks > 0

task_count = None

if task_arns:
# Fetch details for tasks running on the container instance
tasks = ecs.describe_tasks(cluster=cluster, tasks=task_arns)['tasks']
if tasks:
# Consider any non-stopped tasks as running
task_count = sum(task['lastStatus'] != 'STOPPED' for task in tasks) + instance['pendingTasksCount']

if not task_count:
# Fallback to instance task counts if detailed task information is unavailable
task_count = instance['runningTasksCount'] + instance['pendingTasksCount']

print('Instance %s has %s tasks' % (instance_arn, task_count))

return task_count > 0

def set_container_instance_to_draining(cluster, instance_arn):
ecs.update_container_instances_state(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@
"Type": "AWS::Lambda::Function",
"Properties": {
"Code": {
"ZipFile": "import boto3, json, os, time\n\necs = boto3.client('ecs')\nautoscaling = boto3.client('autoscaling')\n\n\ndef lambda_handler(event, context):\n print(json.dumps(event))\n cluster = os.environ['CLUSTER']\n snsTopicArn = event['Records'][0]['Sns']['TopicArn']\n lifecycle_event = json.loads(event['Records'][0]['Sns']['Message'])\n instance_id = lifecycle_event.get('EC2InstanceId')\n if not instance_id:\n print('Got event without EC2InstanceId: %s', json.dumps(event))\n return\n\n instance_arn = container_instance_arn(cluster, instance_id)\n print('Instance %s has container instance ARN %s' % (lifecycle_event['EC2InstanceId'], instance_arn))\n\n if not instance_arn:\n return\n\n while has_tasks(cluster, instance_arn):\n time.sleep(10)\n\n try:\n print('Terminating instance %s' % instance_id)\n autoscaling.complete_lifecycle_action(\n LifecycleActionResult='CONTINUE',\n **pick(lifecycle_event, 'LifecycleHookName', 'LifecycleActionToken', 'AutoScalingGroupName'))\n except Exception as e:\n # Lifecycle action may have already completed.\n print(str(e))\n\n\ndef container_instance_arn(cluster, instance_id):\n \"\"\"Turn an instance ID into a container instance ARN.\"\"\"\n arns = ecs.list_container_instances(cluster=cluster, filter='ec2InstanceId==' + instance_id)['containerInstanceArns']\n if not arns:\n return None\n return arns[0]\n\n\ndef has_tasks(cluster, instance_arn):\n \"\"\"Return True if the instance is running tasks for the given cluster.\"\"\"\n instances = ecs.describe_container_instances(cluster=cluster, containerInstances=[instance_arn])['containerInstances']\n if not instances:\n return False\n instance = instances[0]\n\n if instance['status'] == 'ACTIVE':\n # Start draining, then try again later\n set_container_instance_to_draining(cluster, instance_arn)\n return True\n\n tasks = instance['runningTasksCount'] + instance['pendingTasksCount']\n print('Instance %s has %s tasks' % (instance_arn, tasks))\n\n return tasks > 0\n\n\ndef set_container_instance_to_draining(cluster, instance_arn):\n ecs.update_container_instances_state(\n cluster=cluster,\n containerInstances=[instance_arn], status='DRAINING')\n\n\ndef pick(dct, *keys):\n \"\"\"Pick a subset of a dict.\"\"\"\n return {k: v for k, v in dct.items() if k in keys}\n"
"ZipFile": "import boto3, json, os, time\n\necs = boto3.client('ecs')\nautoscaling = boto3.client('autoscaling')\n\n\ndef lambda_handler(event, context):\n print(json.dumps(event))\n cluster = os.environ['CLUSTER']\n snsTopicArn = event['Records'][0]['Sns']['TopicArn']\n lifecycle_event = json.loads(event['Records'][0]['Sns']['Message'])\n instance_id = lifecycle_event.get('EC2InstanceId')\n if not instance_id:\n print('Got event without EC2InstanceId: %s', json.dumps(event))\n return\n\n instance_arn = container_instance_arn(cluster, instance_id)\n print('Instance %s has container instance ARN %s' % (lifecycle_event['EC2InstanceId'], instance_arn))\n\n if not instance_arn:\n return\n\n task_arns = container_instance_task_arns(cluster, instance_arn)\n \n if task_arns:\n print('Instance ARN %s has task ARNs %s' % (instance_arn, ', '.join(task_arns)))\n\n while has_tasks(cluster, instance_arn, task_arns):\n time.sleep(10)\n\n try:\n print('Terminating instance %s' % instance_id)\n autoscaling.complete_lifecycle_action(\n LifecycleActionResult='CONTINUE',\n **pick(lifecycle_event, 'LifecycleHookName', 'LifecycleActionToken', 'AutoScalingGroupName'))\n except Exception as e:\n # Lifecycle action may have already completed.\n print(str(e))\n\n\ndef container_instance_arn(cluster, instance_id):\n \"\"\"Turn an instance ID into a container instance ARN.\"\"\"\n arns = ecs.list_container_instances(cluster=cluster, filter='ec2InstanceId==' + instance_id)['containerInstanceArns']\n if not arns:\n return None\n return arns[0]\n\ndef container_instance_task_arns(cluster, instance_arn):\n \"\"\"Fetch tasks for a container instance ARN.\"\"\"\n arns = ecs.list_tasks(cluster=cluster, containerInstance=instance_arn)['taskArns']\n return arns\n\ndef has_tasks(cluster, instance_arn, task_arns):\n \"\"\"Return True if the instance is running tasks for the given cluster.\"\"\"\n instances = ecs.describe_container_instances(cluster=cluster, containerInstances=[instance_arn])['containerInstances']\n if not instances:\n return False\n instance = instances[0]\n\n if instance['status'] == 'ACTIVE':\n # Start draining, then try again later\n set_container_instance_to_draining(cluster, instance_arn)\n return True\n\n task_count = None\n\n if task_arns:\n # Fetch details for tasks running on the container instance\n tasks = ecs.describe_tasks(cluster=cluster, tasks=task_arns)['tasks']\n if tasks:\n # Consider any non-stopped tasks as running\n task_count = sum(task['lastStatus'] != 'STOPPED' for task in tasks) + instance['pendingTasksCount']\n \n if not task_count:\n # Fallback to instance task counts if detailed task information is unavailable\n task_count = instance['runningTasksCount'] + instance['pendingTasksCount']\n \n print('Instance %s has %s tasks' % (instance_arn, task_count))\n\n return task_count > 0\n\ndef set_container_instance_to_draining(cluster, instance_arn):\n ecs.update_container_instances_state(\n cluster=cluster,\n containerInstances=[instance_arn], status='DRAINING')\n\n\ndef pick(dct, *keys):\n \"\"\"Pick a subset of a dict.\"\"\"\n return {k: v for k, v in dct.items() if k in keys}\n"
},
"Role": {
"Fn::GetAtt": [
Expand Down
Loading

0 comments on commit 3e1148e

Please sign in to comment.