diff --git a/cloud_provider/aws/aws_minion_manager.py b/cloud_provider/aws/aws_minion_manager.py index 936fbf5..549e90a 100644 --- a/cloud_provider/aws/aws_minion_manager.py +++ b/cloud_provider/aws/aws_minion_manager.py @@ -72,6 +72,16 @@ def describe_asg_with_retries(ac_client, asgs=[]): AutoScalingGroupNames=asgs) return bunchify(response) + @staticmethod + @retry(wait_exponential_multiplier=1000, stop_max_attempt_number=3) + def describe_asg_activities_with_retries(ac_client, asg): + """ + AWS describe_auto_scaling_groups with retries. + """ + response = ac_client.describe_scaling_activities( + AutoScalingGroupName=asg) + return bunchify(response) + @staticmethod @retry(wait_exponential_multiplier=1000, stop_max_attempt_number=3) def get_instances_with_retries(ec2_client, instance_ids): @@ -578,9 +588,7 @@ def minion_manager_work(self): # 3. bid_info = asg_meta.get_bid_info() if asg_meta.get_mm_tag() == "no-spot" and bid_info["type"] == "spot": - new_bid_info = {} - new_bid_info["type"] = "on-demand" - new_bid_info["price"] = "" + new_bid_info = self.create_on_demand_bid_info() logger.info("ASG %s configured with no-spot but currently using spot. Updating...", asg_meta.get_name()) self.update_scaling_group(asg_meta, new_bid_info) continue @@ -588,6 +596,13 @@ def minion_manager_work(self): new_bid_info = self.bid_advisor.get_new_bid( zones=asg_meta.asg_info.AvailabilityZones, instance_type=asg_meta.lc_info.InstanceType) + + # Change ASG to on-demand if insufficient capacity + if self.check_insufficient_capacity(asg_meta): + new_bid_info = self.create_on_demand_bid_info() + logger.info("ASG %s spot instance have not sufficient resource. Updating to on-demand...", asg_meta.get_name()) + self.update_scaling_group(asg_meta, new_bid_info) + continue # Update ASGs iff new bid is different from current bid. if self.are_bids_equal(asg_meta.bid_info, new_bid_info): @@ -612,6 +627,12 @@ def minion_manager_work(self): except Exception as ex: raise Exception("Failed to discover/populate current ASG info: " + str(ex)) + def create_on_demand_bid_info(self): + new_bid_info = {} + new_bid_info["type"] = "on-demand" + new_bid_info["price"] = "" + return new_bid_info + def run(self): """Entrypoint for the AWS specific minion-manager.""" logger.info("Running AWS Minion Manager") @@ -668,7 +689,28 @@ def check_scaling_group_instances(self, scaling_group): # Wait for sometime before checking again. time.sleep(60) return False - + + def check_insufficient_capacity(self, scaling_group): + """ + Checks whether not completed ASG activities got not have sufficient capacity error message. + """ + # This error message from https://docs.aws.amazon.com/autoscaling/ec2/userguide/ts-as-capacity.html#ts-as-capacity-1 + INSUFFICIENT_CAPACITY_MESSAGE = ['We currently do not have sufficient', + 'capacity in the Availability Zone you requested'] + + asg_info = scaling_group.get_asg_info() + response = AWSMinionManager.describe_asg_activities_with_retries( + self._ac_client, asg_info.AutoScalingGroupName) + activities = response.Activities + + for activity in activities: + if activity.Progress == 100: + continue + if 'StatusMessage' in activity and len([message for message in INSUFFICIENT_CAPACITY_MESSAGE if message in activity.StatusMessage]) == len(INSUFFICIENT_CAPACITY_MESSAGE): + return True + + return False + def get_asg_metas(self): """ Return all asg_meta """ return self._asg_metas diff --git a/cloud_provider/aws/aws_minion_manager_test.py b/cloud_provider/aws/aws_minion_manager_test.py index f84a3ab..daf13b5 100644 --- a/cloud_provider/aws/aws_minion_manager_test.py +++ b/cloud_provider/aws/aws_minion_manager_test.py @@ -22,6 +22,7 @@ class AWSMinionManagerTest(unittest.TestCase): cluster_name_id = cluster_name + "-" + cluster_id asg_name = cluster_name_id + "-asg" lc_name = cluster_name_id + "-lc" + insufficient_resource_message = "We currently do not have sufficient p2.xlarge capacity in the Availability Zone you requested (us-west-2b). Our system will be working on provisioning additional capacity. You can currently get p2.xlarge capacity by not specifying an Availability Zone in your request or choosing us-west-2c, us-west-2a." session = boto3.Session(region_name="us-west-2") autoscaling = session.client("autoscaling") @@ -358,4 +359,49 @@ def test_cordon(self, mock_get_name_for_instance, mock_check_call): mock_check_call.side_effect = [Exception("Test"), True] awsmm.cordon_node("ip-of-fake-node") - mock_check_call.assert_called_with(['kubectl', 'uncordon', 'ip-of-fake-node-name']) \ No newline at end of file + mock_check_call.assert_called_with(['kubectl', 'uncordon', 'ip-of-fake-node-name']) + + @mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_asg_activities_with_retries') + @mock_autoscaling + @mock_ec2 + @mock_sts + def test_asg_activities_all_done(self, mock_get_name_for_instance): + mock_get_name_for_instance.return_value = bunchify({'Activities': [{'StatusMessage': 'dummy ok message', 'Progress': 100}, {'StatusMessage': 'dummy ok message2', 'Progress': 100}]}) + + awsmm = self.basic_setup_and_test() + asg_meta = awsmm.get_asg_metas()[0] + assert not awsmm.check_insufficient_capacity(asg_meta) + + + @mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_asg_activities_with_retries') + @mock_autoscaling + @mock_ec2 + @mock_sts + def test_asg_activity_without_statusMessage(self, mock_get_name_for_instance): + mock_get_name_for_instance.return_value = bunchify({'Activities': [{'Progress': 20}, {'StatusMessage': 'dummy ok message2', 'Progress': 100}]}) + + awsmm = self.basic_setup_and_test() + asg_meta = awsmm.get_asg_metas()[0] + assert not awsmm.check_insufficient_capacity(asg_meta) + + @mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_asg_activities_with_retries') + @mock_autoscaling + @mock_ec2 + @mock_sts + def test_asg_done_activity_with_insufficient_resource(self, mock_get_name_for_instance): + mock_get_name_for_instance.return_value = bunchify({'Activities': [{'StatusMessage': self.insufficient_resource_message, 'Progress': 100}]}) + + awsmm = self.basic_setup_and_test() + asg_meta = awsmm.get_asg_metas()[0] + assert not awsmm.check_insufficient_capacity(asg_meta) + + @mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_asg_activities_with_retries') + @mock_autoscaling + @mock_ec2 + @mock_sts + def test_asg_activity_with_insufficient_resource(self, mock_get_name_for_instance): + mock_get_name_for_instance.return_value = bunchify({'Activities': [{'StatusMessage': self.insufficient_resource_message, 'Progress': 20}]}) + + awsmm = self.basic_setup_and_test() + asg_meta = awsmm.get_asg_metas()[0] + assert awsmm.check_insufficient_capacity(asg_meta)