Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Terminate orphaned instances from CreateFleet requests #194

Merged
merged 2 commits into from
Oct 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/deployment/aws/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ Escalator requires the following IAM policy to be able to properly integrate wit
"autoscaling:SetDesiredCapacity",
"autoscaling:TerminateInstanceInAutoScalingGroup",
"ec2:CreateFleet",
"ec2:CreateTags",
"ec2:DescribeInstances",
"ec2:DescribeInstanceStatus",
"ec2:DescribeInstances"
"ec2:RunInstances",
"ec2:TerminateInstances",
"iam:PassRole"
],
"Resource": "*"
}
Expand Down
82 changes: 70 additions & 12 deletions pkg/cloudprovider/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const (
tagKey = "k8s.io/atlassian-escalator/enabled"
// tagValue is the value for the tag applied to ASGs and Fleet requests
tagValue = "true"
// maxTerminateInstancesTries is the maximum number of times terminateOrphanedInstances can be called consecutively
maxTerminateInstancesTries = 3
// The TerminateInstances API only supports terminating 1000 instances at a time
terminateBatchSize = 1000
)

func instanceToProviderID(instance *autoscaling.Instance) string {
Expand Down Expand Up @@ -175,16 +179,19 @@ type NodeGroup struct {

provider *CloudProvider
config *cloudprovider.NodeGroupConfig

terminateInstancesTries int
}

// NewNodeGroup creates a new nodegroup from the aws group backing
func NewNodeGroup(config *cloudprovider.NodeGroupConfig, asg *autoscaling.Group, provider *CloudProvider) *NodeGroup {
return &NodeGroup{
id: config.GroupID,
name: config.Name,
asg: asg,
provider: provider,
config: config,
id: config.GroupID,
name: config.Name,
asg: asg,
provider: provider,
config: config,
terminateInstancesTries: 0,
}
}

Expand Down Expand Up @@ -385,6 +392,11 @@ func (n *NodeGroup) setASGDesiredSizeOneShot(addCount int64) error {
instances = append(instances, i.InstanceIds...)
}

return n.attachInstancesToASG(instances, terminateOrphanedInstances)
}

// attachInstancesToASG takes a list of instances and attaches them onto the node group's ASG
func (n *NodeGroup) attachInstancesToASG(instances []*string, terminate func(*NodeGroup, []*string)) error {
ticker := time.NewTicker(1 * time.Second)
deadline := time.NewTimer(n.config.AWSConfig.FleetInstanceReadyTimeout)
defer ticker.Stop()
Expand All @@ -403,6 +415,8 @@ InstanceReadyLoop:
break InstanceReadyLoop
}
case <-deadline.C:
log.Info("Reached instance ready deadline but not all instances are ready")
terminate(n, instances)
return errors.New("Not all instances could be started")
}
}
Expand All @@ -411,40 +425,44 @@ InstanceReadyLoop:
for batchSize < len(instances) {
instances, batch = instances[batchSize:], instances[0:batchSize:batchSize]

_, err = n.provider.service.AttachInstances(&autoscaling.AttachInstancesInput{
_, err := n.provider.service.AttachInstances(&autoscaling.AttachInstancesInput{
AutoScalingGroupName: awsapi.String(n.id),
InstanceIds: batch,
})
if err != nil {
log.Error("Failed AttachInstances call.")
terminate(n, append(instances, batch...))
return err
}
}

// Attach the remainder for instance sets that are not evenly divisible by
// batchSize
_, err = n.provider.service.AttachInstances(&autoscaling.AttachInstancesInput{
_, err := n.provider.service.AttachInstances(&autoscaling.AttachInstancesInput{
AutoScalingGroupName: awsapi.String(n.id),
InstanceIds: instances,
})

log.WithField("asg", n.id).Debugf("CurrentSize: %v", n.Size())
log.WithField("asg", n.id).Debugf("CurrentTargetSize: %v", n.TargetSize())
if err != nil {
log.Error("Failed AttachInstances call.")
terminate(n, instances)
return err
}
log.WithField("asg", n.id).Debugf("CurrentSize: %v", n.Size())
log.WithField("asg", n.id).Debugf("CurrentTargetSize: %v", n.TargetSize())

n.terminateInstancesTries = 0
return nil
}

func (n *NodeGroup) allInstancesReady(ids []*string) bool {
ready := false

n.provider.ec2Service.DescribeInstanceStatusPages(&ec2.DescribeInstanceStatusInput{
input := &ec2.DescribeInstanceStatusInput{
InstanceIds: ids,
IncludeAllInstances: awsapi.Bool(true),
}, func(r *ec2.DescribeInstanceStatusOutput, lastPage bool) bool {
}

n.provider.ec2Service.DescribeInstanceStatusPages(input, func(r *ec2.DescribeInstanceStatusOutput, lastPage bool) bool {
for _, i := range r.InstanceStatuses {
if *i.InstanceState.Name != "running" {
return false
Expand Down Expand Up @@ -601,3 +619,43 @@ func addASGTags(config *cloudprovider.NodeGroupConfig, asg *autoscaling.Group, p
log.Errorf("failed to create auto scaling tag for ASG %v", id)
}
}

// terminateOrphanedInstances will attempt to terminate a list of instances
func terminateOrphanedInstances(n *NodeGroup, instances []*string) {
Jacobious52 marked this conversation as resolved.
Show resolved Hide resolved
numInstances := len(instances)
if numInstances == 0 {
return
}
log.WithField("asg", n.id).Infof("terminating %v instance(s) that could not be attached to the ASG", numInstances)

var instanceIds []string
for i := 0; i < numInstances; i += terminateBatchSize {
batch := instances[i:minInt(i+terminateBatchSize, numInstances)]

for _, id := range batch {
instanceIds = append(instanceIds, *id)
}

_, err := n.provider.ec2Service.TerminateInstances(&ec2.TerminateInstancesInput{
InstanceIds: awsapi.StringSlice(instanceIds),
})
if err != nil {
log.Warnf("failed to terminate instances %v", err)
}
}

// prevent an endless cycle of provisioning and terminating instances
n.terminateInstancesTries++
if n.terminateInstancesTries >= maxTerminateInstancesTries {
log.Fatalf("reached maximum number of consecutive failures (%v) for provisioning nodes with CreateFleet",
Jacobious52 marked this conversation as resolved.
Show resolved Hide resolved
maxTerminateInstancesTries)
}
}

// minInt returns the minimum of two ints
func minInt(x, y int) int {
if x < y {
return x
}
return y
}
Loading