Skip to content
This repository has been archived by the owner on Jul 9, 2024. It is now read-only.

Check spot request for insufficient capacity #37

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions cloud_provider/aws/aws_minion_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ def get_asgs_with_tags(cluster_name, ac_client):
break
return bunchify(response)

@staticmethod
@retry(wait_exponential_multiplier=1000, stop_max_attempt_number=3)
def describe_spot_request_with_retries(ec2_client, request_ids):
response = ec2_client.describe_spot_instance_requests(
SpotInstanceRequestIds=request_ids)
return bunchify(response)

def discover_asgs(self):
""" Query AWS and get metadata about all required ASGs. """
response = AWSMinionManager.get_asgs_with_tags(self._cluster_name, self._ac_client)
Expand Down Expand Up @@ -703,6 +710,8 @@ def check_insufficient_capacity(self, scaling_group):
INSUFFICIENT_CAPACITY_MESSAGE = ['We currently do not have sufficient',
'capacity in the Availability Zone you requested']

WAITING_SPOT_INSTANCE_MESSAGE = ['Placed Spot instance request:', 'Waiting for instance(s)']

asg_info = scaling_group.get_asg_info()
response = AWSMinionManager.describe_asg_activities_with_retries(
self._ac_client, asg_info.AutoScalingGroupName)
Expand All @@ -714,6 +723,27 @@ def check_insufficient_capacity(self, scaling_group):
if 'StatusMessage' in activity and len([message for message in INSUFFICIENT_CAPACITY_MESSAGE if message in activity.StatusMessage]) == len(INSUFFICIENT_CAPACITY_MESSAGE):
return True

# Check spot request status code
if 'StatusMessage' in activity and len([message for message in WAITING_SPOT_INSTANCE_MESSAGE if message in activity.StatusMessage]) == len(WAITING_SPOT_INSTANCE_MESSAGE):
spot_req_regex = re.compile('Placed Spot instance request: (?P<spot_req_id>sir-[a-zA-Z0-9]+)\. Waiting for instance\(s\)')
spot_req_re_result = spot_req_regex.search(activity.StatusMessage)
if spot_req_re_result is not None and \
self.check_spot_request_insufficient_capacity(spot_req_re_result.group('spot_req_id')):
return True

return False

def check_spot_request_insufficient_capacity(self, spot_request):
OVERSUBSCRIBED_MESSAGE = 'capacity-oversubscribed'
CAPACITY_NOT_AVAILABLE = 'capacity-not-available'

response = AWSMinionManager.describe_spot_request_with_retries(self._ec2_client, [spot_request])
requests = response.SpotInstanceRequests
for request in requests:
if 'Status' in request and 'Code' in request.Status:
if OVERSUBSCRIBED_MESSAGE == request.Status.Code or CAPACITY_NOT_AVAILABLE == request.Status.Code:
return True

return False

def get_asg_metas(self):
Expand Down
40 changes: 40 additions & 0 deletions cloud_provider/aws/aws_minion_manager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class AWSMinionManagerTest(unittest.TestCase):
asg_name = cluster_name_id + "-asg"
lc_name = cluster_name_id + "-lc"
insufficient_resource_message = "We currently do not have sufficient p2.xlarge capacity in the Availability Zone you requested (us-west-2b). Our system will be working on provisioning additional capacity. You can currently get p2.xlarge capacity by not specifying an Availability Zone in your request or choosing us-west-2c, us-west-2a."
asg_waiting_for_spot_instance = 'Placed Spot instance request: sir-3j8r1t2p. Waiting for instance(s)'

session = boto3.Session(region_name="us-west-2")
autoscaling = session.client("autoscaling")
Expand Down Expand Up @@ -442,3 +443,42 @@ def test_asg_activity_with_insufficient_resource(self, mock_get_name_for_instanc
awsmm = self.basic_setup_and_test()
asg_meta = awsmm.get_asg_metas()[0]
assert awsmm.check_insufficient_capacity(asg_meta)

@mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_spot_request_with_retries')
@mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_asg_activities_with_retries')
@mock_autoscaling
@mock_ec2
@mock_sts
def test_spot_request_capacity_oversubscribed(self, mock_get_name_for_instance, mock_spot_request):
mock_get_name_for_instance.return_value = bunchify({'Activities': [{'StatusMessage': self.asg_waiting_for_spot_instance, 'Progress': 20}]})
mock_spot_request.return_value = bunchify({'SpotInstanceRequests': [{'Status': {'Code': 'capacity-oversubscribed'}}]})

awsmm = self.basic_setup_and_test()
asg_meta = awsmm.get_asg_metas()[0]
assert awsmm.check_insufficient_capacity(asg_meta)

@mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_spot_request_with_retries')
@mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_asg_activities_with_retries')
@mock_autoscaling
@mock_ec2
@mock_sts
def test_spot_request_capacity_not_available(self, mock_get_name_for_instance, mock_spot_request):
mock_get_name_for_instance.return_value = bunchify({'Activities': [{'StatusMessage': self.asg_waiting_for_spot_instance, 'Progress': 20}]})
mock_spot_request.return_value = bunchify({'SpotInstanceRequests': [{'Status': {'Code': 'capacity-not-available'}}]})

awsmm = self.basic_setup_and_test()
asg_meta = awsmm.get_asg_metas()[0]
assert awsmm.check_insufficient_capacity(asg_meta)

@mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_spot_request_with_retries')
@mock.patch('cloud_provider.aws.aws_minion_manager.AWSMinionManager.describe_asg_activities_with_retries')
@mock_autoscaling
@mock_ec2
@mock_sts
def test_spot_request_other_message(self, mock_get_name_for_instance, mock_spot_request):
mock_get_name_for_instance.return_value = bunchify({'Activities': [{'StatusMessage': self.asg_waiting_for_spot_instance, 'Progress': 20}]})
mock_spot_request.return_value = bunchify({'SpotInstanceRequests': [{'Status': {'Code': 'other-message'}}]})

awsmm = self.basic_setup_and_test()
asg_meta = awsmm.get_asg_metas()[0]
assert not awsmm.check_insufficient_capacity(asg_meta)