diff --git a/README.md b/README.md index e532df0..4005529 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ Then, `kubectl apply -f deploy/mm.yaml`. * "use-spot": This will make the minion-manager intelligently use spot instances in the ASG * "no-spot": This will make the minion-manager always use on-demand instances in the ASG. This is useful when someone wants to temporarily switch to on-demand instances and at a later point switch to "use-spot" * Note that after changing the tag value, it may take upto 5 minutes for the minion-manager pod to see the changes and make them take effect. +* The "k8s-minion-manager/not-terminate" tag can control ASG instance terminate by the minion-manager. If you want to control when to terminate ASG instances. You can set this tag to `true`. If not set or other value will disable this feature. **What happens when:** diff --git a/cloud_provider/aws/asg_mm.py b/cloud_provider/aws/asg_mm.py index a5afa9b..e9fb74e 100644 --- a/cloud_provider/aws/asg_mm.py +++ b/cloud_provider/aws/asg_mm.py @@ -1,4 +1,6 @@ """ Metadata for each Autoscaling group in AWS. """ +MINION_MANAGER_LABEL = 'k8s-minion-manager' +NOT_TERMINATE_LABEL = MINION_MANAGER_LABEL+'/not-terminate' class AWSAutoscalinGroupMM(object): @@ -30,9 +32,14 @@ def set_asg_info(self, asg_info): assert asg_info is not None, "Can't set ASG info to None!" self.asg_info = asg_info for tag in self.asg_info['Tags']: - if tag.get('Key', None) == 'k8s-minion-manager': + if tag.get('Key', None) == MINION_MANAGER_LABEL: if tag['Value'] not in ("use-spot", "no-spot"): tag['Value'] = 'no-spot' + elif tag.get('Key', None) == NOT_TERMINATE_LABEL: + if tag['Value'] in ('true', 'True', 'TRUE'): + tag['Value'] = True + else: + tag['Value'] = False def set_lc_info(self, lc_info): """ Sets the lc_info. """ @@ -83,7 +90,7 @@ def get_instances(self): def get_mm_tag(self): for tag in self.asg_info['Tags']: - if tag.get('Key', None) == 'k8s-minion-manager': + if tag.get('Key', None) == MINION_MANAGER_LABEL: return tag['Value'].lower() return "no-spot" @@ -108,3 +115,10 @@ def is_instance_running(self, instance): return False return True + + def not_terminate_instance(self): + """ Retures is the ASG set not terminate instance """ + for tag in self.asg_info['Tags']: + if tag.get('Key', None) == NOT_TERMINATE_LABEL: + return tag['Value'] + return False diff --git a/cloud_provider/aws/aws_minion_manager.py b/cloud_provider/aws/aws_minion_manager.py index f94fab6..0ea95e5 100644 --- a/cloud_provider/aws/aws_minion_manager.py +++ b/cloud_provider/aws/aws_minion_manager.py @@ -18,7 +18,7 @@ from cloud_provider.aws.price_info_reporter import AWSPriceReporter from kubernetes import client, config from ..base import MinionManagerBase -from .asg_mm import AWSAutoscalinGroupMM +from .asg_mm import AWSAutoscalinGroupMM, MINION_MANAGER_LABEL logger = logging.getLogger("aws_minion_manager") logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s " + @@ -111,7 +111,7 @@ def get_asgs_with_tags(cluster_name, ac_client): if not is_candidate: continue for tag in r['Tags']: - if tag['Key'] == 'k8s-minion-manager': + if tag['Key'] == MINION_MANAGER_LABEL: response["AutoScalingGroups"].append(r) break return bunchify(response) @@ -123,7 +123,8 @@ def discover_asgs(self): asg_mm = AWSAutoscalinGroupMM() asg_mm.set_asg_info(asg) self._asg_metas.append(asg_mm) - logger.info("Adding asg %s (%s)", asg_mm.get_name(), asg_mm.get_mm_tag()) + logger.info("Adding asg %s (%s). Can manager terminate instance: %s", asg_mm.get_name(), + asg_mm.get_mm_tag(), "no " if asg_mm.not_terminate_instance() else "yes") def populate_current_config(self): """ @@ -484,6 +485,12 @@ def schedule_instance_termination(self, asg_meta): instances = asg_meta.get_instances() if len(instances) == 0: return + + """ + Check is ASG set not to terminate instance + """ + if asg_meta.not_terminate_instance(): + return # If the ASG is configured to use "no-spot" or the required tag does not exist, # do not schedule any instance termination. diff --git a/cloud_provider/aws/aws_minion_manager_test.py b/cloud_provider/aws/aws_minion_manager_test.py index d775a61..8089fb9 100644 --- a/cloud_provider/aws/aws_minion_manager_test.py +++ b/cloud_provider/aws/aws_minion_manager_test.py @@ -53,7 +53,7 @@ def get_ami(self): @mock_autoscaling @mock_sts - def create_mock_asgs(self, minion_manager_tag="use-spot"): + def create_mock_asgs(self, minion_manager_tag="use-spot", not_terminate=False): """ Creates mocked AWS resources. """ @@ -67,30 +67,36 @@ def create_mock_asgs(self, minion_manager_tag="use-spot"): KeyName='kubernetes-some-key') resp = bunchify(response) assert resp.ResponseMetadata.HTTPStatusCode == 200 + + asg_tags = [{'ResourceId': self.cluster_name_id, + 'Key': 'KubernetesCluster', 'Value': self.cluster_name_id}, + {'ResourceId': self.cluster_name_id, + 'Key': 'k8s-minion-manager', 'Value': minion_manager_tag}, + {'ResourceId': self.cluster_name_id, + 'PropagateAtLaunch': True, + 'Key': 'Name', 'Value': "my-instance-name"}, + ] + + if not_terminate: + asg_tags.append({'ResourceId': self.cluster_name_id, + 'Key': 'k8s-minion-manager/not-terminate', 'Value': 'True'}) response = self.autoscaling.create_auto_scaling_group( AutoScalingGroupName=self.asg_name, LaunchConfigurationName=self.lc_name, MinSize=3, MaxSize=3, DesiredCapacity=3, AvailabilityZones=['us-west-2a'], - Tags=[{'ResourceId': self.cluster_name_id, - 'Key': 'KubernetesCluster', 'Value': self.cluster_name_id}, - {'ResourceId': self.cluster_name_id, - 'Key': 'k8s-minion-manager', 'Value': minion_manager_tag}, - {'ResourceId': self.cluster_name_id, - 'PropagateAtLaunch': True, - 'Key': 'Name', 'Value': "my-instance-name"}, - ] + Tags=asg_tags ) resp = bunchify(response) assert resp.ResponseMetadata.HTTPStatusCode == 200 - def basic_setup_and_test(self, minion_manager_tag="use-spot"): + def basic_setup_and_test(self, minion_manager_tag="use-spot", not_terminate=False): """ Creates the mock setup for tests, creates the aws_mm object and does some basic sanity tests before returning it. """ - self.create_mock_asgs(minion_manager_tag) + self.create_mock_asgs(minion_manager_tag, not_terminate) aws_mm = AWSMinionManager(self.cluster_name_id, "us-west-2", refresh_interval_seconds=50) assert len(aws_mm.get_asg_metas()) == 0, \ "ASG Metadata already populated?" @@ -298,6 +304,37 @@ def _instance_termination_test_helper(minion_manager_tag, expected_kill_threads) _instance_termination_test_helper("use-spot", 3) _instance_termination_test_helper("no-spot", 0) _instance_termination_test_helper("abcd", 0) + + @mock_autoscaling + @mock_ec2 + @mock_sts + def test_instance_not_termination(self): + """ + Tests that the AWSMinionManager won't terminate instance with not-terminate tag. + """ + def _instance_termination_test_helper(minion_manager_tag, expected_kill_threads): + awsmm = self.basic_setup_and_test(minion_manager_tag, True) + # Inject `k8s-minion-manager/not-terminate` to awsmm + + assert len(awsmm.on_demand_kill_threads) == 0 + asg_meta = awsmm.get_asg_metas()[0] + # Set instanceType since moto's instances don't have it. + instance_type = "m3.medium" + zone = "us-west-2b" + awsmm.bid_advisor.on_demand_price_dict[instance_type] = "100" + awsmm.bid_advisor.spot_price_list = [{'InstanceType': instance_type, + 'SpotPrice': '80', + 'AvailabilityZone': zone}] + for instance in asg_meta.get_instances(): + instance.InstanceType = instance_type + awsmm.populate_instances(asg_meta) + awsmm.schedule_instance_termination(asg_meta) + assert len(awsmm.on_demand_kill_threads) == expected_kill_threads + + time.sleep(15) + assert len(awsmm.on_demand_kill_threads) == 0 + + _instance_termination_test_helper("use-spot", 0) # PriceReporter tests @mock_autoscaling