diff --git a/docs/deployment/aws/README.md b/docs/deployment/aws/README.md index 3d306ac9..73d7ecfe 100644 --- a/docs/deployment/aws/README.md +++ b/docs/deployment/aws/README.md @@ -24,8 +24,12 @@ Escalator requires the following IAM policy to be able to properly integrate wit "autoscaling:SetDesiredCapacity", "autoscaling:TerminateInstanceInAutoScalingGroup", "ec2:CreateFleet", + "ec2:CreateTags", + "ec2:DescribeInstances", "ec2:DescribeInstanceStatus", - "ec2:DescribeInstances" + "ec2:RunInstances", + "ec2:TerminateInstances", + "iam:PassRole" ], "Resource": "*" } diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index adba2a04..b0195f86 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -30,6 +30,10 @@ const ( tagKey = "k8s.io/atlassian-escalator/enabled" // tagValue is the value for the tag applied to ASGs and Fleet requests tagValue = "true" + // maxTerminateInstancesTries is the maximum number of times terminateOrphanedInstances can be called consecutively + maxTerminateInstancesTries = 3 + // The TerminateInstances API only supports terminating 1000 instances at a time + terminateBatchSize = 1000 ) func instanceToProviderID(instance *autoscaling.Instance) string { @@ -175,16 +179,19 @@ type NodeGroup struct { provider *CloudProvider config *cloudprovider.NodeGroupConfig + + terminateInstancesTries int } // NewNodeGroup creates a new nodegroup from the aws group backing func NewNodeGroup(config *cloudprovider.NodeGroupConfig, asg *autoscaling.Group, provider *CloudProvider) *NodeGroup { return &NodeGroup{ - id: config.GroupID, - name: config.Name, - asg: asg, - provider: provider, - config: config, + id: config.GroupID, + name: config.Name, + asg: asg, + provider: provider, + config: config, + terminateInstancesTries: 0, } } @@ -385,6 +392,11 @@ func (n *NodeGroup) setASGDesiredSizeOneShot(addCount int64) error { instances = append(instances, i.InstanceIds...) } + return n.attachInstancesToASG(instances, terminateOrphanedInstances) +} + +// attachInstancesToASG takes a list of instances and attaches them onto the node group's ASG +func (n *NodeGroup) attachInstancesToASG(instances []*string, terminate func(*NodeGroup, []*string)) error { ticker := time.NewTicker(1 * time.Second) deadline := time.NewTimer(n.config.AWSConfig.FleetInstanceReadyTimeout) defer ticker.Stop() @@ -403,6 +415,8 @@ InstanceReadyLoop: break InstanceReadyLoop } case <-deadline.C: + log.Info("Reached instance ready deadline but not all instances are ready") + terminate(n, instances) return errors.New("Not all instances could be started") } } @@ -411,40 +425,44 @@ InstanceReadyLoop: for batchSize < len(instances) { instances, batch = instances[batchSize:], instances[0:batchSize:batchSize] - _, err = n.provider.service.AttachInstances(&autoscaling.AttachInstancesInput{ + _, err := n.provider.service.AttachInstances(&autoscaling.AttachInstancesInput{ AutoScalingGroupName: awsapi.String(n.id), InstanceIds: batch, }) if err != nil { log.Error("Failed AttachInstances call.") + terminate(n, append(instances, batch...)) return err } } // Attach the remainder for instance sets that are not evenly divisible by // batchSize - _, err = n.provider.service.AttachInstances(&autoscaling.AttachInstancesInput{ + _, err := n.provider.service.AttachInstances(&autoscaling.AttachInstancesInput{ AutoScalingGroupName: awsapi.String(n.id), InstanceIds: instances, }) - - log.WithField("asg", n.id).Debugf("CurrentSize: %v", n.Size()) - log.WithField("asg", n.id).Debugf("CurrentTargetSize: %v", n.TargetSize()) if err != nil { log.Error("Failed AttachInstances call.") + terminate(n, instances) return err } + log.WithField("asg", n.id).Debugf("CurrentSize: %v", n.Size()) + log.WithField("asg", n.id).Debugf("CurrentTargetSize: %v", n.TargetSize()) + n.terminateInstancesTries = 0 return nil } func (n *NodeGroup) allInstancesReady(ids []*string) bool { ready := false - n.provider.ec2Service.DescribeInstanceStatusPages(&ec2.DescribeInstanceStatusInput{ + input := &ec2.DescribeInstanceStatusInput{ InstanceIds: ids, IncludeAllInstances: awsapi.Bool(true), - }, func(r *ec2.DescribeInstanceStatusOutput, lastPage bool) bool { + } + + n.provider.ec2Service.DescribeInstanceStatusPages(input, func(r *ec2.DescribeInstanceStatusOutput, lastPage bool) bool { for _, i := range r.InstanceStatuses { if *i.InstanceState.Name != "running" { return false @@ -601,3 +619,43 @@ func addASGTags(config *cloudprovider.NodeGroupConfig, asg *autoscaling.Group, p log.Errorf("failed to create auto scaling tag for ASG %v", id) } } + +// terminateOrphanedInstances will attempt to terminate a list of instances +func terminateOrphanedInstances(n *NodeGroup, instances []*string) { + numInstances := len(instances) + if numInstances == 0 { + return + } + log.WithField("asg", n.id).Infof("terminating %v instance(s) that could not be attached to the ASG", numInstances) + + var instanceIds []string + for i := 0; i < numInstances; i += terminateBatchSize { + batch := instances[i:minInt(i+terminateBatchSize, numInstances)] + + for _, id := range batch { + instanceIds = append(instanceIds, *id) + } + + _, err := n.provider.ec2Service.TerminateInstances(&ec2.TerminateInstancesInput{ + InstanceIds: awsapi.StringSlice(instanceIds), + }) + if err != nil { + log.Warnf("failed to terminate instances %v", err) + } + } + + // prevent an endless cycle of provisioning and terminating instances + n.terminateInstancesTries++ + if n.terminateInstancesTries >= maxTerminateInstancesTries { + log.Fatalf("reached maximum number of consecutive failures (%v) for provisioning nodes with CreateFleet", + maxTerminateInstancesTries) + } +} + +// minInt returns the minimum of two ints +func minInt(x, y int) int { + if x < y { + return x + } + return y +} diff --git a/pkg/cloudprovider/aws/aws_test.go b/pkg/cloudprovider/aws/aws_test.go index a6fa4d89..b9860017 100644 --- a/pkg/cloudprovider/aws/aws_test.go +++ b/pkg/cloudprovider/aws/aws_test.go @@ -2,13 +2,19 @@ package aws import ( "errors" + "fmt" "testing" "time" + "github.com/aws/aws-sdk-go/aws/client" + "github.com/aws/aws-sdk-go/service/autoscaling/autoscalingiface" + "github.com/aws/aws-sdk-go/service/ec2/ec2iface" + "github.com/atlassian/escalator/pkg/cloudprovider" "github.com/atlassian/escalator/pkg/test" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/autoscaling" + "github.com/aws/aws-sdk-go/service/ec2" "github.com/stretchr/testify/assert" ) @@ -44,6 +50,7 @@ func setupAWSMocks() { id: "id", name: "name", config: &mockNodeGroupConfig, + asg: &mockASG, } mockNodeGroupConfig = cloudprovider.NodeGroupConfig{ @@ -88,7 +95,7 @@ func newMockCloudProvider(nodeGroups []string, service *test.MockAutoscalingServ } // Similar to newMockCloudProvider but node groups are injected instead of created within this function -func newMockCloudProviderUsingInjection(nodeGroups map[string]*NodeGroup, service *test.MockAutoscalingService, ec2Service *test.MockEc2Service) (*CloudProvider, error) { +func newMockCloudProviderUsingInjection(nodeGroups map[string]*NodeGroup, service autoscalingiface.AutoScalingAPI, ec2Service ec2iface.EC2API) (*CloudProvider, error) { var err error cloudProvider := &CloudProvider{ @@ -322,3 +329,258 @@ func TestAddASGTags_WithErrorResponse(t *testing.T) { ) addASGTags(&mockNodeGroupConfig, &mockASG, awsCloudProvider) } + +// local mock of ASG service with custom AttachInstances method +type mockAutoscalingService struct { + autoscalingiface.AutoScalingAPI + *client.Client + + numCalls int + numMaxCalls int +} + +// fail the AttachInstances call after numMaxCalls have happened +func (m *mockAutoscalingService) AttachInstances(*autoscaling.AttachInstancesInput) (*autoscaling.AttachInstancesOutput, error) { + if m.numCalls < m.numMaxCalls { + m.numCalls++ + return &autoscaling.AttachInstancesOutput{}, nil + } + return &autoscaling.AttachInstancesOutput{}, fmt.Errorf("failed the AttachInstances call") +} + +func TestAttachInstancesToASG_WithInstanceReadyTimeout_ExpectFailure(t *testing.T) { + setupAWSMocks() + instanceID := "instanceID" + + numInstances := 123 + var instanceIDs []*string + for i := 0; i < numInstances; i++ { + instanceIDs = append(instanceIDs, &instanceID) + } + + // Mock service call and error + awsCloudProvider, _ := newMockCloudProviderUsingInjection( + nil, + &mockAutoscalingService{ + AutoScalingAPI: nil, + Client: nil, + numCalls: 0, + numMaxCalls: 2, + }, + &test.MockEc2Service{ + TerminateInstancesOutput: &ec2.TerminateInstancesOutput{}, + TerminateInstancesErr: nil, + AllInstancesReady: false, + }, + ) + + mockNodeGroup.provider = awsCloudProvider + + // assert the terminate function is called with the correct number of instances + mockTerminateFunc := func(n *NodeGroup, i []*string) { + assert.Equal(t, numInstances, len(i)) + } + + err := mockNodeGroup.attachInstancesToASG(instanceIDs, mockTerminateFunc) + assert.Error(t, err) +} + +func TestAttachInstancesToASG_NoSuccessfulBatches_ExpectFailure(t *testing.T) { + setupAWSMocks() + instanceID := "instanceID" + + terminateSize := 50 + numInstances := batchSize + terminateSize + var instanceIDs []*string + for i := 0; i < numInstances; i++ { + instanceIDs = append(instanceIDs, &instanceID) + } + + // Mock service call and error + awsCloudProvider, _ := newMockCloudProviderUsingInjection( + nil, + &mockAutoscalingService{ + AutoScalingAPI: nil, + Client: nil, + numCalls: 0, + numMaxCalls: 0, + }, + &test.MockEc2Service{ + TerminateInstancesOutput: &ec2.TerminateInstancesOutput{}, + TerminateInstancesErr: nil, + AllInstancesReady: true, + }, + ) + + mockNodeGroup.provider = awsCloudProvider + + // assert the terminate function is called with the correct number of instances + mockTerminateFunc := func(n *NodeGroup, i []*string) { + assert.Equal(t, numInstances, len(i), "Expected all instances to be terminated") + } + + err := mockNodeGroup.attachInstancesToASG(instanceIDs, mockTerminateFunc) + assert.Error(t, err) +} + +func TestAttachInstancesToASG_OneSuccessfulBatch_ExpectFailure(t *testing.T) { + setupAWSMocks() + instanceID := "instanceID" + + terminateSize := 50 + numInstances := batchSize + terminateSize + var instanceIDs []*string + for i := 0; i < numInstances; i++ { + instanceIDs = append(instanceIDs, &instanceID) + } + + // Mock service call and error + awsCloudProvider, _ := newMockCloudProviderUsingInjection( + nil, + &mockAutoscalingService{ + AutoScalingAPI: nil, + Client: nil, + numCalls: 0, + numMaxCalls: 1, + }, + &test.MockEc2Service{ + TerminateInstancesOutput: &ec2.TerminateInstancesOutput{}, + TerminateInstancesErr: nil, + AllInstancesReady: true, + }, + ) + + mockNodeGroup.provider = awsCloudProvider + + // assert the terminate function is called with the correct number of instances + mockTerminateFunc := func(n *NodeGroup, i []*string) { + assert.Equal(t, terminateSize, len(i), "Expected all instances except the first batch to be terminated") + } + + err := mockNodeGroup.attachInstancesToASG(instanceIDs, mockTerminateFunc) + assert.Error(t, err) +} + +func TestAttachInstancesToASG_NoBatches_ExpectFailure(t *testing.T) { + setupAWSMocks() + instanceID := "instanceID" + + terminateSize := batchSize - 1 + numInstances := terminateSize + var instanceIDs []*string + for i := 0; i < numInstances; i++ { + instanceIDs = append(instanceIDs, &instanceID) + } + + // Mock service call and error + awsCloudProvider, _ := newMockCloudProviderUsingInjection( + nil, + &mockAutoscalingService{ + AutoScalingAPI: nil, + Client: nil, + numCalls: 0, + numMaxCalls: 0, + }, + &test.MockEc2Service{ + TerminateInstancesOutput: &ec2.TerminateInstancesOutput{}, + TerminateInstancesErr: nil, + AllInstancesReady: true, + }, + ) + + mockNodeGroup.provider = awsCloudProvider + + // assert the terminate function is called with the correct number of instances + mockTerminateFunc := func(n *NodeGroup, i []*string) { + assert.Equal(t, terminateSize, len(i)) + } + + err := mockNodeGroup.attachInstancesToASG(instanceIDs, mockTerminateFunc) + assert.Error(t, err) +} + +func TestAttachInstancesToASG_ExpectSuccess(t *testing.T) { + setupAWSMocks() + instanceID := "instanceID" + + terminateSize := batchSize - 1 + numInstances := terminateSize + var instanceIDs []*string + for i := 0; i < numInstances; i++ { + instanceIDs = append(instanceIDs, &instanceID) + } + + // Mock service call and error + awsCloudProvider, _ := newMockCloudProviderUsingInjection( + map[string]*NodeGroup{mockNodeGroup.id: &mockNodeGroup}, + &mockAutoscalingService{ + AutoScalingAPI: nil, + Client: nil, + numCalls: 0, + numMaxCalls: 1, + }, + &test.MockEc2Service{ + TerminateInstancesOutput: &ec2.TerminateInstancesOutput{}, + TerminateInstancesErr: nil, + AllInstancesReady: true, + }, + ) + + mockNodeGroup.provider = awsCloudProvider + + // the terminate function shouldn't get called - fail the test if it does + mockTerminateFunc := func(n *NodeGroup, i []*string) { + assert.Fail(t, "No instances should have been terminated") + } + + err := mockNodeGroup.attachInstancesToASG(instanceIDs, mockTerminateFunc) + assert.NoError(t, err) +} + +func TestTerminateInstances_Success(t *testing.T) { + setupAWSMocks() + + // Mock service call and error + awsCloudProvider, _ := newMockCloudProviderUsingInjection( + nil, + &test.MockAutoscalingService{}, + &test.MockEc2Service{ + TerminateInstancesOutput: &ec2.TerminateInstancesOutput{}, + TerminateInstancesErr: nil, + }, + ) + + instance1 := "i-123456" + instances := []*string{&instance1} + mockNodeGroup.provider = awsCloudProvider + + terminateOrphanedInstances(&mockNodeGroup, instances) +} + +func TestTerminateInstances_WithErrorResponse(t *testing.T) { + setupAWSMocks() + + // Mock service call and error + awsCloudProvider, _ := newMockCloudProviderUsingInjection( + nil, + &test.MockAutoscalingService{}, + &test.MockEc2Service{ + TerminateInstancesOutput: &ec2.TerminateInstancesOutput{}, + TerminateInstancesErr: errors.New("unauthorized"), + }, + ) + + instance1 := "i-123456" + instances := []*string{&instance1} + mockNodeGroup.provider = awsCloudProvider + + terminateOrphanedInstances(&mockNodeGroup, instances) +} + +func TestMinInt(t *testing.T) { + x := 1 + y := 2 + assert.Equal(t, x, minInt(x, y)) + assert.Equal(t, x, minInt(y, x)) + assert.Equal(t, x, minInt(x, x)) +} diff --git a/pkg/cloudprovider/aws/node_group_test.go b/pkg/cloudprovider/aws/node_group_test.go index 53c2534e..38ee2155 100644 --- a/pkg/cloudprovider/aws/node_group_test.go +++ b/pkg/cloudprovider/aws/node_group_test.go @@ -294,6 +294,7 @@ func TestNodeGroup_IncreaseSize_CreateFleet(t *testing.T) { &test.MockEc2Service{ CreateFleetOutput: tt.createFleetOutput, DescribeInstancesOutput: &ec2.DescribeInstancesOutput{}, + AllInstancesReady: true, }, ) diff --git a/pkg/test/aws.go b/pkg/test/aws.go index a6817d5f..47881fe7 100644 --- a/pkg/test/aws.go +++ b/pkg/test/aws.go @@ -67,6 +67,10 @@ type MockEc2Service struct { DescribeInstanceStatusOutput *ec2.DescribeInstanceStatusOutput DescribeInstanceStatusErr error + AllInstancesReady bool + + TerminateInstancesOutput *ec2.TerminateInstancesOutput + TerminateInstancesErr error } // DescribeInstances mock implementation for MockEc2Service @@ -81,7 +85,12 @@ func (m MockEc2Service) CreateFleet(*ec2.CreateFleetInput) (*ec2.CreateFleetOutp // DescribeInstanceStatusPages mock implementation for MockEc2Service func (m MockEc2Service) DescribeInstanceStatusPages(statusInput *ec2.DescribeInstanceStatusInput, allInstancesReadyHelper func(*ec2.DescribeInstanceStatusOutput, bool) bool) error { - // Mocks successful execution of the anonymous function within cloudprovider/aws/aws.go:allInstancesReady - allInstancesReadyHelper(&ec2.DescribeInstanceStatusOutput{}, true) + // Mocks execution of the anonymous function within cloudprovider/aws/aws.go:allInstancesReady + allInstancesReadyHelper(&ec2.DescribeInstanceStatusOutput{}, m.AllInstancesReady) return nil } + +// TerminateInstances mock implementation for MockEc2Service +func (m MockEc2Service) TerminateInstances(*ec2.TerminateInstancesInput) (*ec2.TerminateInstancesOutput, error) { + return m.TerminateInstancesOutput, m.TerminateInstancesErr +}