Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: correctly handle lack of capacity of AWS spot ASGs #2008

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 61 additions & 11 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package aws
import (
"fmt"
"reflect"
"regexp"
"strings"
"sync"

Expand All @@ -30,7 +31,10 @@ import (
"k8s.io/klog"
)

const scaleToZeroSupported = true
const (
scaleToZeroSupported = true
placeholderInstanceNamePrefix = "i-placeholder-"
)

type asgCache struct {
registeredAsgs []*asg
Expand Down Expand Up @@ -195,6 +199,10 @@ func (m *asgCache) SetAsgSize(asg *asg, size int) error {
m.mutex.Lock()
defer m.mutex.Unlock()

return m.setAsgSizeNoLock(asg, size)
}

func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error {
params := &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asg.Name),
DesiredCapacity: aws.Int64(int64(size)),
Expand All @@ -212,6 +220,10 @@ func (m *asgCache) SetAsgSize(asg *asg, size int) error {
return nil
}

func (m *asgCache) decreaseAsgSizeByOneNoLock(asg *asg) error {
return m.setAsgSizeNoLock(asg, asg.curSize-1)
}

// DeleteInstances deletes the given instances. All instances must be controlled by the same ASG.
func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
m.mutex.Lock()
Expand Down Expand Up @@ -239,24 +251,36 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
}

for _, instance := range instances {
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
resp, err := m.service.TerminateInstanceInAutoScalingGroup(params)
if err != nil {
return err
// check if the instance is a placeholder - a requested instance that was never created by the node group
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
if m.isPlaceholderInstance(instance) {
klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+
"of deleting instance", instance.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
} else {
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
resp, err := m.service.TerminateInstanceInAutoScalingGroup(params)
if err != nil {
return err
}
klog.V(4).Infof(*resp.Activity.Description)
}

// Proactively decrement the size so autoscaler makes better decisions
commonAsg.curSize--

klog.V(4).Infof(*resp.Activity.Description)
}

return nil
}

// isPlaceholderInstance checks if the given instance is only a placeholder
func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool {
matched, _ := regexp.MatchString(fmt.Sprintf("^%s\\d+$", placeholderInstanceNamePrefix), instance.Name)
return matched
}

// Fetch automatically discovered ASGs. These ASGs should be unregistered if
// they no longer exist in AWS.
func (m *asgCache) fetchAutoAsgNames() ([]string, error) {
Expand Down Expand Up @@ -323,6 +347,11 @@ func (m *asgCache) regenerate() error {
return err
}

// If currently any ASG has more Desired than running Instances, introduce placeholders
// for the instances to come up. This is required to track Desired instances that
// will never come up, like with Spot Request that can't be fulfilled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do see several issue leads to desire > current. Sometimes onDemand nodes can run out of capacity like hitting limit or nodes with strict condition like in one placement group.

groups = m.createPlaceholdersForDesiredNonStartedInstances(groups)

// Register or update ASGs
exists := make(map[AwsRef]bool)
for _, group := range groups {
Expand Down Expand Up @@ -355,6 +384,27 @@ func (m *asgCache) regenerate() error {
return nil
}

func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*autoscaling.Group) []*autoscaling.Group {
for _, g := range groups {
desired := *g.DesiredCapacity
real := int64(len(g.Instances))
if desired <= real {
continue
}

for i := real; i < desired; i++ {
id := fmt.Sprintf("%s%d", placeholderInstanceNamePrefix, i)
klog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+
"Creating placeholder instance with ID %s.", *g.AutoScalingGroupName, real, desired, id)
g.Instances = append(g.Instances, &autoscaling.Instance{
InstanceId: &id,
AvailabilityZone: g.AvailabilityZones[0],
})
}
}
return groups
}

func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
spec := dynamic.NodeGroupSpec{
Name: aws.StringValue(g.AutoScalingGroupName),
Expand Down
5 changes: 4 additions & 1 deletion cluster-autoscaler/cloudprovider/aws/aws_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,10 @@ func (m *AwsManager) SetAsgSize(asg *asg, size int) error {

// DeleteInstances deletes the given instances. All instances must be controlled by the same ASG.
func (m *AwsManager) DeleteInstances(instances []*AwsInstanceRef) error {
return m.asgCache.DeleteInstances(instances)
if err := m.asgCache.DeleteInstances(instances); err != nil {
return err
}
return m.forceRefresh()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like we should either have a log message here stating that a refresh is being forced or update the message in forceRefresh as currently it could be confusing for users with the log message
in forceRefresh stating: "Refreshed ASG list, next refresh after %v", m.lastRefresh.Add(refreshInterval))

}

// GetAsgNodes returns Asg nodes.
Expand Down
12 changes: 11 additions & 1 deletion cluster-autoscaler/cloudprovider/aws/aws_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,16 @@ func TestFetchExplicitAsgs(t *testing.T) {
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
zone := "test-1a"
fn(&autoscaling.DescribeAutoScalingGroupsOutput{
AutoScalingGroups: []*autoscaling.Group{
{AutoScalingGroupName: aws.String(groupname)},
{
AvailabilityZones: []*string{&zone},
AutoScalingGroupName: aws.String(groupname),
MinSize: aws.Int64(int64(min)),
MaxSize: aws.Int64(int64(max)),
DesiredCapacity: aws.Int64(int64(min)),
},
}}, false)
}).Return(nil)

Expand Down Expand Up @@ -381,11 +388,14 @@ func TestFetchAutoAsgs(t *testing.T) {
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
zone := "test-1a"
fn(&autoscaling.DescribeAutoScalingGroupsOutput{
AutoScalingGroups: []*autoscaling.Group{{
AvailabilityZones: []*string{&zone},
AutoScalingGroupName: aws.String(groupname),
MinSize: aws.Int64(int64(min)),
MaxSize: aws.Int64(int64(max)),
DesiredCapacity: aws.Int64(int64(min)),
}}}, false)
}).Return(nil).Twice()

Expand Down
8 changes: 8 additions & 0 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,14 @@ func (csr *ClusterStateRegistry) updateScaleRequests(currentTime time.Time) {
csr.scaleDownRequests = newScaleDownRequests
}

// BackoffNodeGroup is used to force the specified nodeGroup to go into backoff mode, which
// means it won't be used for scaling out temporarily
func (csr *ClusterStateRegistry) BackoffNodeGroup(nodeGroup cloudprovider.NodeGroup, currentTime time.Time) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert

csr.Lock()
defer csr.Unlock()
csr.backoffNodeGroup(nodeGroup, cloudprovider.OtherErrorClass, "cloudProviderError", currentTime)
}

// To be executed under a lock.
func (csr *ClusterStateRegistry) backoffNodeGroup(nodeGroup cloudprovider.NodeGroup, errorClass cloudprovider.InstanceErrorClass, errorCode string, currentTime time.Time) {
nodeGroupInfo := csr.nodeInfosForGroups[nodeGroup.Id()]
Expand Down
5 changes: 3 additions & 2 deletions cluster-autoscaler/clusterstate/utils/node_instances_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,9 @@ func (cache *CloudProviderNodeInstancesCache) GetCloudProviderNodeInstances() (m
wg.Add(1)
go func() {
defer wg.Done()
_, err := cache.fetchCloudProviderNodeInstancesForNodeGroup(nodeGroup)
klog.Errorf("Failed to fetch cloud provider node instances for %v, error %v", nodeGroup.Id(), err)
if _, err := cache.fetchCloudProviderNodeInstancesForNodeGroup(nodeGroup); err != nil {
klog.Errorf("Failed to fetch cloud provider node instances for %v, error %v", nodeGroup.Id(), err)
}
}()
}
}
Expand Down
4 changes: 3 additions & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,9 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
unregisteredNodes := a.clusterStateRegistry.GetUnregisteredNodes()
if len(unregisteredNodes) > 0 {
klog.V(1).Infof("%d unregistered nodes present", len(unregisteredNodes))
removedAny, err := removeOldUnregisteredNodes(unregisteredNodes, autoscalingContext, currentTime, autoscalingContext.LogRecorder)
removedAny, err := removeOldUnregisteredNodes(unregisteredNodes, autoscalingContext, a.clusterStateRegistry,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just twigged that this will need reverting as well whilst playing around with backporting this into a 1.3 version.

currentTime, autoscalingContext.LogRecorder)
// There was a problem with removing unregistered nodes. Retry in the next loop.
if err != nil {
klog.Warningf("Failed to remove unregistered nodes: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ func sanitizeTemplateNode(node *apiv1.Node, nodeGroup string, ignoredTaints tain

// Removes unregistered nodes if needed. Returns true if anything was removed and error if such occurred.
func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNode, context *context.AutoscalingContext,
currentTime time.Time, logRecorder *utils.LogEventRecorder) (bool, error) {
clusterStateRegistry *clusterstate.ClusterStateRegistry, currentTime time.Time, logRecorder *utils.LogEventRecorder) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert.

removedAny := false
for _, unregisteredNode := range unregisteredNodes {
if unregisteredNode.UnregisteredSince.Add(context.MaxNodeProvisionTime).Before(currentTime) {
Expand Down Expand Up @@ -514,6 +514,7 @@ func removeOldUnregisteredNodes(unregisteredNodes []clusterstate.UnregisteredNod
"Failed to remove node %s: %v", unregisteredNode.Node.Name, err)
return removedAny, err
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plaese revert

logRecorder.Eventf(apiv1.EventTypeNormal, "DeleteUnregistered",
"Removed unregistered node %v", unregisteredNode.Node.Name)
removedAny = true
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/core/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,12 +451,12 @@ func TestRemoveOldUnregisteredNodes(t *testing.T) {
assert.Equal(t, 1, len(unregisteredNodes))

// Nothing should be removed. The unregistered node is not old enough.
removed, err := removeOldUnregisteredNodes(unregisteredNodes, context, now.Add(-50*time.Minute), fakeLogRecorder)
removed, err := removeOldUnregisteredNodes(unregisteredNodes, context, clusterState, now.Add(-50*time.Minute), fakeLogRecorder)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert

assert.NoError(t, err)
assert.False(t, removed)

// ng1_2 should be removed.
removed, err = removeOldUnregisteredNodes(unregisteredNodes, context, now, fakeLogRecorder)
removed, err = removeOldUnregisteredNodes(unregisteredNodes, context, clusterState, now, fakeLogRecorder)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert

assert.NoError(t, err)
assert.True(t, removed)
deletedNode := getStringFromChan(deletedNodes)
Expand Down