diff --git a/cloud_provider/aws/aws_minion_manager.py b/cloud_provider/aws/aws_minion_manager.py index 0ea95e5..c4c12b5 100644 --- a/cloud_provider/aws/aws_minion_manager.py +++ b/cloud_provider/aws/aws_minion_manager.py @@ -116,6 +116,13 @@ def get_asgs_with_tags(cluster_name, ac_client): break return bunchify(response) + @staticmethod + @retry(wait_exponential_multiplier=1000, stop_max_attempt_number=3) + def describe_spot_request_with_retries(ec2_client, request_ids): + response = ec2_client.describe_spot_instance_requests( + SpotInstanceRequestIds=request_ids) + return bunchify(response) + def discover_asgs(self): """ Query AWS and get metadata about all required ASGs. """ response = AWSMinionManager.get_asgs_with_tags(self._cluster_name, self._ac_client) @@ -703,6 +710,8 @@ def check_insufficient_capacity(self, scaling_group): INSUFFICIENT_CAPACITY_MESSAGE = ['We currently do not have sufficient', 'capacity in the Availability Zone you requested'] + WAITING_SPOT_INSTANCE_MESSAGE = ['Placed Spot instance request:', 'Waiting for instance(s)'] + asg_info = scaling_group.get_asg_info() response = AWSMinionManager.describe_asg_activities_with_retries( self._ac_client, asg_info.AutoScalingGroupName) @@ -714,6 +723,27 @@ def check_insufficient_capacity(self, scaling_group): if 'StatusMessage' in activity and len([message for message in INSUFFICIENT_CAPACITY_MESSAGE if message in activity.StatusMessage]) == len(INSUFFICIENT_CAPACITY_MESSAGE): return True + # Check spot request status code + if 'StatusMessage' in activity and len([message for message in WAITING_SPOT_INSTANCE_MESSAGE if message in activity.StatusMessage]) == len(WAITING_SPOT_INSTANCE_MESSAGE): + spot_req_regex = re.compile('Placed Spot instance request: (?Psir-[a-zA-Z0-9]+)\. Waiting for instance\(s\)') + spot_req_re_result = spot_req_regex.search(activity.StatusMessage) + if spot_req_re_result is not None and \ + self.check_spot_request_insufficient_capacity(spot_req_re_result.group('spot_req_id')): + return True + + return False + + def check_spot_request_insufficient_capacity(self, spot_request): + OVERSUBSCRIBED_MESSAGE = 'capacity-oversubscribed' + CAPACITY_NOT_AVAILABLE = 'capacity-not-available' + + response = AWSMinionManager.describe_spot_request_with_retries(self._ec2_client, [spot_request]) + requests = response.SpotInstanceRequests + for request in requests: + if 'Status' in request and 'Code' in request.Status: + if OVERSUBSCRIBED_MESSAGE == request.Status.Code or CAPACITY_NOT_AVAILABLE == request.Status.Code: + return True + return False def get_asg_metas(self): diff --git a/cloud_provider/aws/aws_minion_manager_test.py b/cloud_provider/aws/aws_minion_manager_test.py index 8089fb9..e1e0d11 100644 --- a/cloud_provider/aws/aws_minion_manager_test.py +++ b/cloud_provider/aws/aws_minion_manager_test.py @@ -23,6 +23,7 @@ class AWSMinionManagerTest(unittest.TestCase): asg_name = cluster_name_id + "-asg" lc_name = cluster_name_id + "-lc" insufficient_resource_message = "We currently do not have sufficient p2.xlarge capacity in the Availability Zone you requested (us-west-2b). Our system will be working on provisioning additional capacity. You can currently get p2.xlarge capacity by not specifying an Availability Zone in your request or choosing us-west-2c, us-west-2a." + asg_waiting_for_spot_instance = 'Placed Spot instance request: sir-3j8r1t2p. Waiting for instance(s)' session = boto3.Session(region_name="us-west-2") autoscaling = session.client("autoscaling") @@ -442,3 +443,42 @@ def test_asg_activity_with_insufficient_resource(self, mock_get_name_for_instanc awsmm = self.basic_setup_and_test() asg_meta = awsmm.get_asg_metas()[0] assert awsmm.check_insufficient_capacity(asg_meta) + + @mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_spot_request_with_retries') + @mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_asg_activities_with_retries') + @mock_autoscaling + @mock_ec2 + @mock_sts + def test_spot_request_capacity_oversubscribed(self, mock_get_name_for_instance, mock_spot_request): + mock_get_name_for_instance.return_value = bunchify({'Activities': [{'StatusMessage': self.asg_waiting_for_spot_instance, 'Progress': 20}]}) + mock_spot_request.return_value = bunchify({'SpotInstanceRequests': [{'Status': {'Code': 'capacity-oversubscribed'}}]}) + + awsmm = self.basic_setup_and_test() + asg_meta = awsmm.get_asg_metas()[0] + assert awsmm.check_insufficient_capacity(asg_meta) + + @mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_spot_request_with_retries') + @mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_asg_activities_with_retries') + @mock_autoscaling + @mock_ec2 + @mock_sts + def test_spot_request_capacity_not_available(self, mock_get_name_for_instance, mock_spot_request): + mock_get_name_for_instance.return_value = bunchify({'Activities': [{'StatusMessage': self.asg_waiting_for_spot_instance, 'Progress': 20}]}) + mock_spot_request.return_value = bunchify({'SpotInstanceRequests': [{'Status': {'Code': 'capacity-not-available'}}]}) + + awsmm = self.basic_setup_and_test() + asg_meta = awsmm.get_asg_metas()[0] + assert awsmm.check_insufficient_capacity(asg_meta) + + @mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_spot_request_with_retries') + @mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_asg_activities_with_retries') + @mock_autoscaling + @mock_ec2 + @mock_sts + def test_spot_request_other_message(self, mock_get_name_for_instance, mock_spot_request): + mock_get_name_for_instance.return_value = bunchify({'Activities': [{'StatusMessage': self.asg_waiting_for_spot_instance, 'Progress': 20}]}) + mock_spot_request.return_value = bunchify({'SpotInstanceRequests': [{'Status': {'Code': 'other-message'}}]}) + + awsmm = self.basic_setup_and_test() + asg_meta = awsmm.get_asg_metas()[0] + assert not awsmm.check_insufficient_capacity(asg_meta)