diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index 4667e285b153..a8d3fc48c295 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -308,6 +308,54 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { } } + var isRecentScalingActivitySuccess = false + var err error + + placeHolderInstancesCount := m.GetPlaceHolderInstancesCount(instances) + // Check if there are any placeholder instances in the list. + if placeHolderInstancesCount > 0 { + // Log the check for placeholders in the ASG. + klog.V(4).Infof("Detected %d placeholder instance(s), checking recent scaling activity for ASG %s", + placeHolderInstancesCount, commonAsg.Name) + + // Retrieve the most recent scaling activity to determine its success state. + isRecentScalingActivitySuccess, err = m.getMostRecentScalingActivity(commonAsg) + + // Handle errors from retrieving scaling activity. + if err != nil { + // Log the error if the scaling activity check fails and return the error. + klog.Errorf("Error retrieving scaling activity for ASG %s: %v", commonAsg.Name, err) + return err // Return error to prevent further processing with uncertain state information. + } + + if !isRecentScalingActivitySuccess { + asgDetail, err := m.getDescribeAutoScalingGroupResults(commonAsg) + + if err != nil { + klog.Errorf("Error retrieving ASG details %s: %v", commonAsg.Name, err) + return err + } + + activeInstancesInAsg := len(asgDetail.Instances) + desiredCapacityInAsg := int(*asgDetail.DesiredCapacity) + klog.V(4).Infof("asg %s has placeholders instances with desired capacity = %d and active instances = %d ", + commonAsg.Name, desiredCapacityInAsg, activeInstancesInAsg) + + // If the difference between the active instances and the desired capacity is greater than 1, + // it means that the ASG is under-provisioned and the desired capacity is not being reached. + // In this case, we would reduce the size of ASG by the count of unprovisioned instances + // which is equal to the total count of active instances in ASG + + err = m.setAsgSizeNoLock(commonAsg, activeInstancesInAsg) + + if err != nil { + klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err) + return err + } + return nil + } + } + for _, instance := range instances { // check if the instance is a placeholder - a requested instance that was never created by the node group // if it is, just decrease the size of the node group, as there's no specific instance we can remove @@ -352,6 +400,33 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { return nil } +func (m *asgCache) getDescribeAutoScalingGroupResults(commonAsg *asg) (*autoscaling.Group, error) { + asgs := make([]*autoscaling.Group, 0) + commonAsgNames := []string{commonAsg.Name} + input := &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice(commonAsgNames), + MaxRecords: aws.Int64(100), + } + + err := m.awsService.DescribeAutoScalingGroupsPages(input, func(output *autoscaling.DescribeAutoScalingGroupsOutput, _ bool) bool { + asgs = append(asgs, output.AutoScalingGroups...) + // We return true while we want to be called with the next page of + // results, if any. + return false + }) + + if err != nil { + klog.Errorf("Failed while performing DescribeAutoScalingGroupsPages: %v", err) + return nil, err + } + + if len(asgs) == 0 { + return nil, fmt.Errorf("no ASGs found for %s", commonAsgNames) + } + + return asgs[0], nil +} + // isPlaceholderInstance checks if the given instance is only a placeholder func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool { return strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) @@ -624,3 +699,55 @@ func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsIn func (m *asgCache) Cleanup() { close(m.interrupt) } + +func (m *asgCache) getMostRecentScalingActivity(asg *asg) (bool, error) { + input := &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String(asg.Name), + MaxRecords: aws.Int64(1), + } + + var response *autoscaling.DescribeScalingActivitiesOutput + var err error + attempts := 3 + + for i := 0; i < attempts; i++ { + response, err = m.awsService.DescribeScalingActivities(input) + if err == nil { + break + } + klog.V(2).Infof("Failed to describe scaling activities, attempt %d/%d: %v", i+1, attempts, err) + time.Sleep(time.Second * 2) + } + + if err != nil { + klog.Errorf("All attempts failed for DescribeScalingActivities: %v", err) + return false, err + } + + if len(response.Activities) == 0 { + klog.Info("No scaling activities found for ASG:", asg.Name) + return false, nil + } + + lastActivity := response.Activities[0] + if *lastActivity.StatusCode == "Successful" { + klog.Infof("Most recent scaling activity for ASG %s was successful", asg.Name) + return true, nil + } else { + klog.Infof("Most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage) + return false, nil + } +} + +// GetPlaceHolderInstancesCount returns count of placeholder instances in the cache +func (m *asgCache) GetPlaceHolderInstancesCount(instances []*AwsInstanceRef) int { + + placeholderInstancesCount := 0 + for _, instance := range instances { + if strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) { + placeholderInstancesCount++ + + } + } + return placeholderInstancesCount +} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 0033d27c68ea..e2e425b068e3 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -18,6 +18,7 @@ package aws import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -568,6 +569,22 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { HonorCooldown: aws.Bool(false), }).Return(&autoscaling.SetDesiredCapacityOutput{}) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }, + ).Return( + &autoscaling.DescribeScalingActivitiesOutput{ + Activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Successful"), + StatusMessage: aws.String("Successful"), + StartTime: aws.Time(time.Now().Add(-30 * time.Minute)), + }, + }, + }, nil) + // Look up the current number of instances... var expectedInstancesCount int64 = 2 a.On("DescribeAutoScalingGroupsPages", @@ -739,3 +756,84 @@ func TestHasInstance(t *testing.T) { assert.NoError(t, err) assert.False(t, present) } + +// write unit test for DeleteInstances function +func TestDeleteInstances_scalingActivityFailure(t *testing.T) { + + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"})) + + asgs := provider.NodeGroups() + a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{ + AutoScalingGroupName: aws.String(asgs[0].Id()), + DesiredCapacity: aws.Int64(1), + HonorCooldown: aws.Bool(false), + }).Return(&autoscaling.SetDesiredCapacityOutput{}) + var expectedInstancesCount int64 = 5 + a.On("DescribeAutoScalingGroupsPages", + &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), + MaxRecords: aws.Int64(100), + }, + mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "i-0c257f8f05fd1c64b", "i-0c257f8f05fd1c64c", "i-0c257f8f05fd1c64d"), false) + // we expect the instance count to be 1 after the call to DeleteNodes + //expectedInstancesCount = + }).Return(nil) + + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }, + ).Return( + &autoscaling.DescribeScalingActivitiesOutput{ + Activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StatusMessage: aws.String("Launching a new EC2 instance. Status Reason: We currently do not have sufficient p5.48xlarge capacity in zones with support for 'gp2' volumes. Our system will be working on provisioning additional capacity. Launching EC2 instance failed.\t"), + StartTime: aws.Time(time.Now().Add(-30 * time.Minute)), + }, + }, + }, nil) + + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + + a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{ + AutoScalingGroupName: aws.String(asgs[0].Id()), + DesiredCapacity: aws.Int64(3), + HonorCooldown: aws.Bool(false), + }).Return(&autoscaling.SetDesiredCapacityOutput{}) + + provider.Refresh() + + initialSize, err := asgs[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 5, initialSize) + + nodes := []*apiv1.Node{} + asgToInstances := provider.awsManager.asgCache.asgToInstances[AwsRef{Name: "test-asg"}] + for _, instance := range asgToInstances { + nodes = append(nodes, &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: instance.ProviderID, + }, + }) + } + + err = asgs[0].DeleteNodes(nodes) + assert.NoError(t, err) + a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2) + + newSize, err := asgs[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 3, newSize) + +}