From 8971a2917758c91b7a36626e2ca794e51ad8570f Mon Sep 17 00:00:00 2001 From: Damika Gamlath Date: Fri, 21 Jun 2024 15:23:32 +0000 Subject: [PATCH] refactor gce.RegenerateMigInstancesCache() to use Instance.List API for listing MIG instances --- .../cloudprovider/gce/gce_cloud_provider.go | 2 +- .../cloudprovider/gce/gce_manager.go | 4 +- .../cloudprovider/gce/gce_manager_test.go | 2 +- .../cloudprovider/gce/mig_info_provider.go | 151 ++++- .../gce/mig_info_provider_test.go | 532 +++++++++++++++++- .../config/autoscaling_options.go | 5 +- cluster-autoscaler/main.go | 9 +- 7 files changed, 653 insertions(+), 52 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go index ef40afccdfc2..7a7b5b42e308 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go @@ -383,7 +383,7 @@ func BuildGCE(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover defer config.Close() } - manager, err := CreateGceManager(config, do, opts.GCEOptions.LocalSSDDiskSizeProvider, opts.Regional, opts.GCEOptions.ConcurrentRefreshes, opts.UserAgent, opts.GCEOptions.DomainUrl, opts.GCEOptions.MigInstancesMinRefreshWaitTime) + manager, err := CreateGceManager(config, do, opts.GCEOptions.LocalSSDDiskSizeProvider, opts.Regional, opts.GCEOptions.BulkMigInstancesListingEnabled, opts.GCEOptions.ConcurrentRefreshes, opts.UserAgent, opts.GCEOptions.DomainUrl, opts.GCEOptions.MigInstancesMinRefreshWaitTime) if err != nil { klog.Fatalf("Failed to create GCE Manager: %v", err) } diff --git a/cluster-autoscaler/cloudprovider/gce/gce_manager.go b/cluster-autoscaler/cloudprovider/gce/gce_manager.go index 0db8fa76c1ce..534d3e15b220 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_manager.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_manager.go @@ -128,7 +128,7 @@ type gceManagerImpl struct { // CreateGceManager constructs GceManager object. func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, localSSDDiskSizeProvider localssdsize.LocalSSDSizeProvider, - regional bool, concurrentGceRefreshes int, userAgent, domainUrl string, migInstancesMinRefreshWaitTime time.Duration) (GceManager, error) { + regional, bulkGceMigInstancesListingEnabled bool, concurrentGceRefreshes int, userAgent, domainUrl string, migInstancesMinRefreshWaitTime time.Duration) (GceManager, error) { // Create Google Compute Engine token. var err error tokenSource := google.ComputeTokenSource("") @@ -188,7 +188,7 @@ func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGr cache: cache, GceService: gceService, migLister: migLister, - migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes, migInstancesMinRefreshWaitTime), + migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes, migInstancesMinRefreshWaitTime, bulkGceMigInstancesListingEnabled), location: location, regional: regional, projectId: projectId, diff --git a/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go b/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go index 275fcf7a2278..dbbe5864d4d6 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go @@ -354,7 +354,7 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa manager := &gceManagerImpl{ cache: cache, migLister: migLister, - migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, 1, 0*time.Second), + migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, 1, 0*time.Second, false), GceService: gceService, projectId: projectId, regional: regional, diff --git a/cluster-autoscaler/cloudprovider/gce/mig_info_provider.go b/cluster-autoscaler/cloudprovider/gce/mig_info_provider.go index 277bc2dd64a5..87c7a7564fb0 100644 --- a/cluster-autoscaler/cloudprovider/gce/mig_info_provider.go +++ b/cluster-autoscaler/cloudprovider/gce/mig_info_provider.go @@ -62,15 +62,16 @@ type timeProvider interface { } type cachingMigInfoProvider struct { - migInfoMutex sync.Mutex - cache *GceCache - migLister MigLister - gceClient AutoscalingGceClient - projectId string - concurrentGceRefreshes int - migInstanceMutex sync.Mutex - migInstancesMinRefreshWaitTime time.Duration - timeProvider timeProvider + migInfoMutex sync.Mutex + cache *GceCache + migLister MigLister + gceClient AutoscalingGceClient + projectId string + concurrentGceRefreshes int + migInstanceMutex sync.Mutex + migInstancesMinRefreshWaitTime time.Duration + timeProvider timeProvider + bulkGceMigInstancesListingEnabled bool } type realTime struct{} @@ -80,15 +81,16 @@ func (r *realTime) Now() time.Time { } // NewCachingMigInfoProvider creates an instance of caching MigInfoProvider -func NewCachingMigInfoProvider(cache *GceCache, migLister MigLister, gceClient AutoscalingGceClient, projectId string, concurrentGceRefreshes int, migInstancesMinRefreshWaitTime time.Duration) MigInfoProvider { +func NewCachingMigInfoProvider(cache *GceCache, migLister MigLister, gceClient AutoscalingGceClient, projectId string, concurrentGceRefreshes int, migInstancesMinRefreshWaitTime time.Duration, bulkGceMigInstancesListingEnabled bool) MigInfoProvider { return &cachingMigInfoProvider{ - cache: cache, - migLister: migLister, - gceClient: gceClient, - projectId: projectId, - concurrentGceRefreshes: concurrentGceRefreshes, - migInstancesMinRefreshWaitTime: migInstancesMinRefreshWaitTime, - timeProvider: &realTime{}, + cache: cache, + migLister: migLister, + gceClient: gceClient, + projectId: projectId, + concurrentGceRefreshes: concurrentGceRefreshes, + migInstancesMinRefreshWaitTime: migInstancesMinRefreshWaitTime, + timeProvider: &realTime{}, + bulkGceMigInstancesListingEnabled: bulkGceMigInstancesListingEnabled, } } @@ -151,6 +153,11 @@ func (c *cachingMigInfoProvider) getCachedMigForInstance(instanceRef GceRef) (Mi func (c *cachingMigInfoProvider) RegenerateMigInstancesCache() error { c.cache.InvalidateAllMigInstances() c.cache.InvalidateAllInstancesToMig() + + if c.bulkGceMigInstancesListingEnabled { + return c.bulkListMigInstances() + } + migs := c.migLister.GetMigs() errors := make([]error, len(migs)) workqueue.ParallelizeUntil(context.Background(), c.concurrentGceRefreshes, len(migs), func(piece int) { @@ -165,6 +172,116 @@ func (c *cachingMigInfoProvider) RegenerateMigInstancesCache() error { return nil } +func (c *cachingMigInfoProvider) bulkListMigInstances() error { + c.cache.InvalidateMigInstancesState() + err := c.fillMigInfoCache() + if err != nil { + return err + } + instances, listErr := c.listInstancesInAllZonesWithMigs() + migToInstances := groupInstancesToMigs(instances) + updateErr := c.updateMigInstancesCache(migToInstances) + + if listErr != nil { + return listErr + } + return updateErr +} + +func (c *cachingMigInfoProvider) listInstancesInAllZonesWithMigs() ([]GceInstance, error) { + var zones []string + for zone := range c.listAllZonesWithMigs() { + zones = append(zones, zone) + } + var allInstances []GceInstance + errors := make([]error, len(zones)) + zoneInstances := make([][]GceInstance, len(zones)) + workqueue.ParallelizeUntil(context.Background(), c.concurrentGceRefreshes, len(zones), func(piece int) { + zoneInstances[piece], errors[piece] = c.gceClient.FetchAllInstances(c.projectId, zones[piece], "") + }, workqueue.WithChunkSize(c.concurrentGceRefreshes)) + + for _, instances := range zoneInstances { + allInstances = append(allInstances, instances...) + } + for _, err := range errors { + if err != nil { + return allInstances, err + } + } + return allInstances, nil +} + +func groupInstancesToMigs(instances []GceInstance) map[GceRef][]GceInstance { + migToInstances := map[GceRef][]GceInstance{} + for _, instance := range instances { + migToInstances[instance.Igm] = append(migToInstances[instance.Igm], instance) + } + return migToInstances +} + +func (c *cachingMigInfoProvider) isMigInstancesConsistent(mig Mig, migToInstances map[GceRef][]GceInstance) bool { + migRef := mig.GceRef() + state, found := c.cache.GetMigInstancesState(migRef) + if !found { + return false + } + instanceCount := state[cloudprovider.InstanceRunning] + state[cloudprovider.InstanceCreating] + state[cloudprovider.InstanceDeleting] + + instances, found := migToInstances[migRef] + if !found && instanceCount > 0 { + return false + } + return instanceCount == int64(len(instances)) +} + +func (c *cachingMigInfoProvider) isMigCreatingOrDeletingInstances(mig Mig) bool { + migRef := mig.GceRef() + state, found := c.cache.GetMigInstancesState(migRef) + if !found { + return false + } + return state[cloudprovider.InstanceCreating] > 0 || state[cloudprovider.InstanceDeleting] > 0 +} + +// updateMigInstancesCache updates the mig instances for each mig +func (c *cachingMigInfoProvider) updateMigInstancesCache(migToInstances map[GceRef][]GceInstance) error { + var errors []error + for _, mig := range c.migLister.GetMigs() { + migRef := mig.GceRef() + // If there is an inconsistency between number of instances according to instances.List + // and number of instances according to migInstancesStateCount for the given mig, which can be due to + // - abandoned instance + // - missing/malformed "created-by" reference + // we use an igm.ListInstances call as the authoritative source of instance information + if !c.isMigInstancesConsistent(mig, migToInstances) { + if err := c.fillMigInstances(migRef); err != nil { + errors = append(errors, err) + } + continue + } + + // mig instances are re-fetched along with instance.Status.ErrorInfo for migs with + // instances in creating or deleting state + if c.isMigCreatingOrDeletingInstances(mig) { + if err := c.fillMigInstances(migRef); err != nil { + errors = append(errors, err) + } + continue + } + + // for all other cases, mig instances cache is updated with the instances obtained from instance.List api + instances := migToInstances[migRef] + err := c.cache.SetMigInstances(migRef, instances, c.timeProvider.Now()) + if err != nil { + errors = append(errors, err) + } + } + if len(errors) > 0 { + return errors[0] + } + return nil +} + func (c *cachingMigInfoProvider) findMigWithMatchingBasename(instanceRef GceRef) Mig { for _, mig := range c.migLister.GetMigs() { migRef := mig.GceRef() diff --git a/cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go b/cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go index 70a18654a9bc..53331e964b20 100644 --- a/cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go +++ b/cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go @@ -46,6 +46,66 @@ var ( Name: "mig", }, } + mig1 = &gceMig{ + gceRef: GceRef{ + Project: "myprojid", + Zone: "myzone1", + Name: "mig1", + }, + } + mig2 = &gceMig{ + gceRef: GceRef{ + Project: "myprojid", + Zone: "myzone2", + Name: "mig2", + }, + } + + instance1 = GceInstance{ + Instance: cloudprovider.Instance{ + Id: "gce://myprojid/myzone1/test-instance-1", + Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}, + }, + Igm: mig1.GceRef(), + } + instance2 = GceInstance{ + Instance: cloudprovider.Instance{ + Id: "gce://myprojid/myzone1/test-instance-2", + Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating}, + }, + Igm: mig1.GceRef(), + } + instance3 = GceInstance{ + Instance: cloudprovider.Instance{ + Id: "gce://myprojid/myzone2/test-instance-3", + Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}, + }, + Igm: mig2.GceRef(), + } + + instance4 = GceInstance{ + Instance: cloudprovider.Instance{ + Id: "gce://myprojid/myzone2/test-instance-4", + Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}, + }, + Igm: GceRef{}, + } + + instance5 = GceInstance{ + Instance: cloudprovider.Instance{ + Id: "gce://myprojid/myzone2/test-instance-5", + Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}, + }, + Igm: GceRef{}, + } + + instance6 = GceInstance{ + Instance: cloudprovider.Instance{ + Id: "gce://myprojid/myzone2/test-instance-6", + Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}, + }, + Igm: mig2.GceRef(), + } ) type mockAutoscalingGceClient struct { @@ -204,7 +264,7 @@ func TestFillMigInstances(t *testing.T) { fetchMigInstances: fetchMigInstancesWithCounter(newInstances, callCounter), } - provider, ok := NewCachingMigInfoProvider(tc.cache, NewMigLister(tc.cache), client, mig.GceRef().Project, 1, time.Hour).(*cachingMigInfoProvider) + provider, ok := NewCachingMigInfoProvider(tc.cache, NewMigLister(tc.cache), client, mig.GceRef().Project, 1, time.Hour, false).(*cachingMigInfoProvider) assert.True(t, ok) provider.timeProvider = &fakeTime{now: timeNow} @@ -349,7 +409,7 @@ func TestMigInfoProviderGetMigForInstance(t *testing.T) { fetchMigs: fetchMigsConst(nil), } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) mig, err := provider.GetMigForInstance(instanceRef) @@ -432,7 +492,7 @@ func TestGetMigInstances(t *testing.T) { fetchMigInstances: tc.fetchMigInstances, } migLister := NewMigLister(tc.cache) - provider, ok := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second).(*cachingMigInfoProvider) + provider, ok := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false).(*cachingMigInfoProvider) assert.True(t, ok) provider.timeProvider = &fakeTime{now: newRefreshTime} @@ -465,30 +525,44 @@ func TestRegenerateMigInstancesCache(t *testing.T) { {Instance: cloudprovider.Instance{Id: "gce://project/us-test1/base-instance-name-abcd"}, NumericId: 1}, {Instance: cloudprovider.Instance{Id: "gce://project/us-test1/base-instance-name-efgh"}, NumericId: 2}, } + mig1Instances := []GceInstance{instance1, instance2} + mig2Instances := []GceInstance{instance3, instance6} otherInstances := []GceInstance{ {Instance: cloudprovider.Instance{Id: "gce://project/us-test1/other-base-instance-name-abcd"}}, {Instance: cloudprovider.Instance{Id: "gce://project/us-test1/other-base-instance-name-efgh"}}, } - var instancesRefs, otherInstancesRefs []GceRef - for _, instance := range instances { - instanceRef, err := GceRefFromProviderId(instance.Id) - assert.Nil(t, err) - instancesRefs = append(instancesRefs, instanceRef) + mig1Igm := &gce.InstanceGroupManager{ + Zone: mig1.GceRef().Zone, + Name: mig1.GceRef().Name, + TargetSize: 2, + CurrentActions: &gce.InstanceGroupManagerActionsSummary{ + Creating: 1, + }, } - for _, instance := range otherInstances { - instanceRef, err := GceRefFromProviderId(instance.Id) - assert.Nil(t, err) - otherInstancesRefs = append(otherInstancesRefs, instanceRef) + mig2Igm := &gce.InstanceGroupManager{ + Zone: mig2.GceRef().Zone, + Name: mig2.GceRef().Name, + TargetSize: 2, + CurrentActions: &gce.InstanceGroupManagerActionsSummary{}, } + instancesRefs := toInstancesRefs(t, instances) + mig1InstancesRefs := toInstancesRefs(t, mig1Instances) + mig2InstancesRefs := toInstancesRefs(t, mig2Instances) + otherInstancesRefs := toInstancesRefs(t, otherInstances) + testCases := []struct { - name string - cache *GceCache - fetchMigInstances func(GceRef) ([]GceInstance, error) - expectedErr error - expectedMigInstances map[GceRef][]GceInstance - expectedInstancesToMig map[GceRef]GceRef + name string + cache *GceCache + fetchMigInstances func(GceRef) ([]GceInstance, error) + fetchMigs func(string) ([]*gce.InstanceGroupManager, error) + fetchAllInstances func(string, string, string) ([]GceInstance, error) + bulkGceMigInstancesListingEnabled bool + projectId string + expectedErr error + expectedMigInstances map[GceRef][]GceInstance + expectedInstancesToMig map[GceRef]GceRef }{ { name: "fill empty cache for one mig", @@ -498,6 +572,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) { instancesToMig: map[GceRef]GceRef{}, }, fetchMigInstances: fetchMigInstancesConst(instances), + projectId: mig.GceRef().Project, expectedMigInstances: map[GceRef][]GceInstance{ mig.GceRef(): instances, }, @@ -520,6 +595,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) { mig.GceRef(): instances, otherMig.GceRef(): otherInstances, }), + projectId: mig.GceRef().Project, expectedMigInstances: map[GceRef][]GceInstance{ mig.GceRef(): instances, otherMig.GceRef(): otherInstances, @@ -552,6 +628,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) { mig.GceRef(): instances, otherMig.GceRef(): otherInstances, }), + projectId: mig.GceRef().Project, expectedMigInstances: map[GceRef][]GceInstance{ mig.GceRef(): instances, }, @@ -575,6 +652,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) { }, }, fetchMigInstances: fetchMigInstancesConst(otherInstances), + projectId: mig.GceRef().Project, expectedMigInstances: map[GceRef][]GceInstance{ mig.GceRef(): otherInstances, }, @@ -593,17 +671,95 @@ func TestRegenerateMigInstancesCache(t *testing.T) { instancesToMig: map[GceRef]GceRef{}, }, fetchMigInstances: fetchMigInstancesFail, + projectId: mig.GceRef().Project, expectedErr: errFetchMigInstances, }, + { + name: "bulkGceMigInstancesListingEnabled - fill empty cache for one mig - instances in creating/deleting state", + cache: &GceCache{ + migs: map[GceRef]Mig{mig1.GceRef(): mig1}, + instances: map[GceRef][]GceInstance{}, + instancesToMig: map[GceRef]GceRef{}, + migTargetSizeCache: map[GceRef]int64{}, + migBaseNameCache: map[GceRef]string{}, + listManagedInstancesResultsCache: map[GceRef]string{}, + instanceTemplateNameCache: map[GceRef]InstanceTemplateName{}, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{}, + }, + fetchMigInstances: fetchMigInstancesConst(mig1Instances), + fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{mig1Igm}), + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone1": {instance1, instance2}}), + bulkGceMigInstancesListingEnabled: true, + projectId: mig1.GceRef().Project, + expectedMigInstances: map[GceRef][]GceInstance{ + mig1.GceRef(): mig1Instances, + }, + expectedInstancesToMig: map[GceRef]GceRef{ + mig1InstancesRefs[0]: mig1.GceRef(), + mig1InstancesRefs[1]: mig1.GceRef(), + }, + }, + { + name: "bulkGceMigInstancesListingEnabled - fill empty cache for one mig - number of instances are inconsistent in bulk listing result", + cache: &GceCache{ + migs: map[GceRef]Mig{mig2.GceRef(): mig2}, + instances: map[GceRef][]GceInstance{}, + instancesToMig: map[GceRef]GceRef{}, + migTargetSizeCache: map[GceRef]int64{}, + migBaseNameCache: map[GceRef]string{}, + listManagedInstancesResultsCache: map[GceRef]string{}, + instanceTemplateNameCache: map[GceRef]InstanceTemplateName{}, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{}, + }, + fetchMigInstances: fetchMigInstancesConst(mig2Instances), + fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{mig2Igm}), + // one instance is missing from the instances of igm2 in myzone2 + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone2": {instance3}}), + bulkGceMigInstancesListingEnabled: true, + projectId: mig2.GceRef().Project, + expectedMigInstances: map[GceRef][]GceInstance{ + mig2.GceRef(): mig2Instances, + }, + expectedInstancesToMig: map[GceRef]GceRef{ + mig2InstancesRefs[0]: mig2.GceRef(), + mig2InstancesRefs[1]: mig2.GceRef(), + }, + }, + { + name: "bulkGceMigInstancesListingEnabled - fill empty cache for one mig - all instances in running state", + cache: &GceCache{ + migs: map[GceRef]Mig{mig2.GceRef(): mig2}, + instances: map[GceRef][]GceInstance{}, + instancesToMig: map[GceRef]GceRef{}, + migTargetSizeCache: map[GceRef]int64{}, + migBaseNameCache: map[GceRef]string{}, + listManagedInstancesResultsCache: map[GceRef]string{}, + instanceTemplateNameCache: map[GceRef]InstanceTemplateName{}, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{}, + }, + fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{mig2Igm}), + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone2": {instance3, instance6}}), + bulkGceMigInstancesListingEnabled: true, + projectId: mig2.GceRef().Project, + expectedMigInstances: map[GceRef][]GceInstance{ + mig2.GceRef(): mig2Instances, + }, + expectedInstancesToMig: map[GceRef]GceRef{ + mig2InstancesRefs[0]: mig2.GceRef(), + mig2InstancesRefs[1]: mig2.GceRef(), + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { client := &mockAutoscalingGceClient{ fetchMigInstances: tc.fetchMigInstances, + fetchMigs: tc.fetchMigs, + fetchAllInstances: tc.fetchAllInstances, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, tc.projectId, 1, 0*time.Second, tc.bulkGceMigInstancesListingEnabled) err := provider.RegenerateMigInstancesCache() assert.Equal(t, tc.expectedErr, err) @@ -615,6 +771,16 @@ func TestRegenerateMigInstancesCache(t *testing.T) { } } +func toInstancesRefs(t *testing.T, instances []GceInstance) []GceRef { + var refs []GceRef + for _, instance := range instances { + instanceRef, err := GceRefFromProviderId(instance.Id) + assert.Nil(t, err) + refs = append(refs, instanceRef) + } + return refs +} + func TestGetMigTargetSize(t *testing.T) { targetSize := int64(42) instanceGroupManager := &gce.InstanceGroupManager{ @@ -682,7 +848,7 @@ func TestGetMigTargetSize(t *testing.T) { fetchMigTargetSize: tc.fetchMigTargetSize, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) targetSize, err := provider.GetMigTargetSize(mig.GceRef()) cachedTargetSize, found := tc.cache.GetMigTargetSize(mig.GceRef()) @@ -764,7 +930,7 @@ func TestGetMigBasename(t *testing.T) { fetchMigBasename: tc.fetchMigBasename, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) basename, err := provider.GetMigBasename(mig.GceRef()) cachedBasename, found := tc.cache.GetMigBasename(mig.GceRef()) @@ -845,7 +1011,7 @@ func TestGetListManagedInstancesResults(t *testing.T) { fetchListManagedInstancesResults: tc.fetchResults, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) results, err := provider.GetListManagedInstancesResults(mig.GceRef()) cachedResults, found := tc.cache.GetListManagedInstancesResults(mig.GceRef()) @@ -940,7 +1106,7 @@ func TestGetMigInstanceTemplateName(t *testing.T) { fetchMigTemplateName: tc.fetchMigTemplateName, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) instanceTemplateName, err := provider.GetMigInstanceTemplateName(mig.GceRef()) cachedInstanceTemplateName, found := tc.cache.GetMigInstanceTemplateName(mig.GceRef()) @@ -1046,7 +1212,7 @@ func TestGetMigInstanceTemplate(t *testing.T) { fetchMigTemplate: tc.fetchMigTemplate, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) template, err := provider.GetMigInstanceTemplate(mig.GceRef()) cachedTemplate, found := tc.cache.GetMigInstanceTemplate(mig.GceRef()) @@ -1252,7 +1418,7 @@ func TestGetMigInstanceKubeEnv(t *testing.T) { fetchMigTemplate: tc.fetchMigTemplate, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) kubeEnv, err := provider.GetMigKubeEnv(mig.GceRef()) cachedKubeEnv, found := tc.cache.GetMigKubeEnv(mig.GceRef()) @@ -1347,7 +1513,7 @@ func TestGetMigMachineType(t *testing.T) { fetchMachineType: tc.fetchMachineType, } migLister := NewMigLister(cache) - provider := NewCachingMigInfoProvider(cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) machine, err := provider.GetMigMachineType(mig.GceRef()) if tc.expectError { assert.Error(t, err) @@ -1436,6 +1602,310 @@ func TestMultipleGetMigInstanceCallsLimited(t *testing.T) { } } +func TestListInstancesInAllZonesWithMigs(t *testing.T) { + testCases := []struct { + name string + migs map[GceRef]Mig + fetchAllInstances func(string, string, string) ([]GceInstance, error) + wantInstances []GceInstance + wantErr bool + }{ + { + name: "instance fetching failed", + migs: map[GceRef]Mig{mig1.GceRef(): mig1}, + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{}), + wantErr: true, + }, + { + name: "Successfully list mig instances in a single zone", + migs: map[GceRef]Mig{mig1.GceRef(): mig1}, + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone1": {instance1, instance2}, "myzone2": {instance3}}), + wantInstances: []GceInstance{instance1, instance2}, + }, + { + name: "Successfully list mig instances in multiple zones", + migs: map[GceRef]Mig{mig1.GceRef(): mig1, mig2.GceRef(): mig2}, + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone1": {instance1, instance2}, "myzone2": {instance3}}), + wantInstances: []GceInstance{instance1, instance2, instance3}, + }, + { + name: "Successfully list mig instances in one zones and got errors in another", + migs: map[GceRef]Mig{mig1.GceRef(): mig1, mig2.GceRef(): mig2}, + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone1": {instance1, instance2}}), + wantInstances: []GceInstance{instance1, instance2}, + wantErr: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cache := GceCache{ + migs: tc.migs, + } + client := &mockAutoscalingGceClient{ + fetchAllInstances: tc.fetchAllInstances, + } + migLister := NewMigLister(&cache) + provider := &cachingMigInfoProvider{ + cache: &cache, + migLister: migLister, + gceClient: client, + concurrentGceRefreshes: 1, + } + instances, err := provider.listInstancesInAllZonesWithMigs() + + if tc.wantErr { + assert.NotNil(t, err) + } else { + assert.NoError(t, err) + } + assert.ElementsMatch(t, tc.wantInstances, instances) + }) + } +} + +func TestGroupInstancesToMigs(t *testing.T) { + testCases := []struct { + name string + instances []GceInstance + want map[GceRef][]GceInstance + }{ + { + name: "no instances", + want: map[GceRef][]GceInstance{}, + }, + { + name: "instances from multiple migs including unknown migs", + instances: []GceInstance{instance1, instance2, instance3, instance4, instance5}, + want: map[GceRef][]GceInstance{ + mig1.GceRef(): {instance1, instance2}, + mig2.GceRef(): {instance3}, + {}: {instance4, instance5}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + groupedInstances := groupInstancesToMigs(tc.instances) + assert.Equal(t, tc.want, groupedInstances) + }) + } +} + +func TestIsMigInstancesConsistent(t *testing.T) { + testCases := []struct { + name string + mig Mig + migToInstances map[GceRef][]GceInstance + migInstancesStateCache map[GceRef]map[cloudprovider.InstanceState]int64 + want bool + }{ + { + name: "instance not found", + mig: mig1, + migToInstances: map[GceRef][]GceInstance{}, + want: false, + }, + { + name: "instanceState not found", + mig: mig1, + migToInstances: map[GceRef][]GceInstance{mig1.GceRef(): {instance1, instance2}}, + want: false, + }, + { + name: "inconsistent number of instances", + mig: mig1, + migToInstances: map[GceRef][]GceInstance{mig1.GceRef(): {instance1, instance2}}, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): { + cloudprovider.InstanceCreating: 2, + cloudprovider.InstanceDeleting: 3, + cloudprovider.InstanceRunning: 4, + }, + }, + want: false, + }, + { + name: "consistent number of instances", + mig: mig1, + migToInstances: map[GceRef][]GceInstance{mig1.GceRef(): {instance1, instance2}}, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): { + cloudprovider.InstanceCreating: 1, + cloudprovider.InstanceDeleting: 0, + cloudprovider.InstanceRunning: 1, + }, + }, + want: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cache := GceCache{ + migInstancesStateCache: tc.migInstancesStateCache, + } + provider := &cachingMigInfoProvider{ + cache: &cache, + } + got := provider.isMigInstancesConsistent(tc.mig, tc.migToInstances) + assert.Equal(t, tc.want, got) + }) + } +} + +func TestIsMigInCreatingOrDeletingInstanceState(t *testing.T) { + testCases := []struct { + name string + mig Mig + migInstancesStateCache map[GceRef]map[cloudprovider.InstanceState]int64 + want bool + }{ + { + name: "instanceState not found", + mig: mig1, + want: false, + }, + { + name: "in creating state", + mig: mig1, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): { + cloudprovider.InstanceCreating: 2, + cloudprovider.InstanceDeleting: 0, + cloudprovider.InstanceRunning: 1, + }, + }, + want: true, + }, + { + name: "in deleting state", + mig: mig1, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): { + cloudprovider.InstanceCreating: 0, + cloudprovider.InstanceDeleting: 1, + cloudprovider.InstanceRunning: 0, + }, + }, + want: true, + }, + { + name: "not in creating or deleting states", + mig: mig1, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): { + cloudprovider.InstanceCreating: 0, + cloudprovider.InstanceDeleting: 0, + cloudprovider.InstanceRunning: 1, + }, + }, + want: false, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cache := GceCache{ + migInstancesStateCache: tc.migInstancesStateCache, + } + provider := &cachingMigInfoProvider{ + cache: &cache, + } + got := provider.isMigCreatingOrDeletingInstances(tc.mig) + assert.Equal(t, tc.want, got) + }) + } +} + +func TestUpdateMigInstancesCache(t *testing.T) { + testCases := []struct { + name string + migs map[GceRef]Mig + migToInstances map[GceRef][]GceInstance + fetchMigInstances []GceInstance + wantInstances map[GceRef][]GceInstance + migInstancesStateCache map[GceRef]map[cloudprovider.InstanceState]int64 + }{ + { + name: "inconsistent mig instance state", + migs: map[GceRef]Mig{mig1.GceRef(): mig1}, + migToInstances: map[GceRef][]GceInstance{ + mig1.GceRef(): {instance1}, + }, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): {cloudprovider.InstanceRunning: 2, cloudprovider.InstanceDeleting: 0, cloudprovider.InstanceCreating: 0}, + }, + fetchMigInstances: []GceInstance{instance1, instance2}, + wantInstances: map[GceRef][]GceInstance{mig1.GceRef(): {instance1, instance2}}, + }, + { + name: "mig with instance in creating or deleting state", + migs: map[GceRef]Mig{mig1.GceRef(): mig1}, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): {cloudprovider.InstanceRunning: 0, cloudprovider.InstanceDeleting: 0, cloudprovider.InstanceCreating: 2}, + }, + fetchMigInstances: []GceInstance{instance1, instance2}, + wantInstances: map[GceRef][]GceInstance{mig1.GceRef(): {instance1, instance2}}, + }, + { + name: "consistent mig instance state", + migs: map[GceRef]Mig{mig1.GceRef(): mig1}, + migToInstances: map[GceRef][]GceInstance{ + mig1.GceRef(): {instance1, instance2}, + }, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): {cloudprovider.InstanceRunning: 2, cloudprovider.InstanceDeleting: 0, cloudprovider.InstanceCreating: 0}, + }, + wantInstances: map[GceRef][]GceInstance{mig1.GceRef(): {instance1, instance2}}, + }, + { + name: "mix of consistent and inconsistent states", + migs: map[GceRef]Mig{mig1.GceRef(): mig1, mig2.GceRef(): mig2}, + migToInstances: map[GceRef][]GceInstance{ + mig1.GceRef(): {instance1, instance2}, + }, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): {cloudprovider.InstanceRunning: 2, cloudprovider.InstanceDeleting: 0, cloudprovider.InstanceCreating: 0}, + mig2.GceRef(): {cloudprovider.InstanceRunning: 1, cloudprovider.InstanceDeleting: 0, cloudprovider.InstanceCreating: 0}, + }, + fetchMigInstances: []GceInstance{instance3}, + wantInstances: map[GceRef][]GceInstance{ + mig1.GceRef(): {instance1, instance2}, + mig2.GceRef(): {instance3}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cache := GceCache{ + migs: tc.migs, + instances: make(map[GceRef][]GceInstance), + instancesUpdateTime: make(map[GceRef]time.Time), + migBaseNameCache: make(map[GceRef]string), + migInstancesStateCache: tc.migInstancesStateCache, + instancesToMig: make(map[GceRef]GceRef), + } + migLister := NewMigLister(&cache) + client := &mockAutoscalingGceClient{ + fetchMigInstances: fetchMigInstancesConst(tc.fetchMigInstances), + } + provider := &cachingMigInfoProvider{ + cache: &cache, + migLister: migLister, + gceClient: client, + timeProvider: &realTime{}, + } + err := provider.updateMigInstancesCache(tc.migToInstances) + assert.NoError(t, err) + for migRef, want := range tc.wantInstances { + instances, found := cache.GetMigInstances(migRef) + assert.True(t, found) + assert.Equal(t, want, instances) + } + }) + } +} + type fakeTime struct { now time.Time } @@ -1555,3 +2025,13 @@ func fetchMachineTypeConst(name string, cpu int64, mem int64) func(string, strin }, nil } } + +func fetchAllInstancesInZone(allInstances map[string][]GceInstance) func(string, string, string) ([]GceInstance, error) { + return func(project, zone, filter string) ([]GceInstance, error) { + instances, found := allInstances[zone] + if !found { + return nil, errors.New("") + } + return instances, nil + } +} diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 12ced472c683..35f7e1005204 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -58,7 +58,7 @@ type NodeGroupAutoscalingOptions struct { // GCEOptions contain autoscaling options specific to GCE cloud provider. type GCEOptions struct { - // ConcurrentRefreshes is the maximum number of concurrently refreshed instance groups or instance templates. + // ConcurrentRefreshes is the maximum number of concurrently refreshed instance groups or instance templates or zones with mig instances ConcurrentRefreshes int // MigInstancesMinRefreshWaitTime is the minimum time which needs to pass before GCE MIG instances from a given MIG can be refreshed. MigInstancesMinRefreshWaitTime time.Duration @@ -66,6 +66,9 @@ type GCEOptions struct { DomainUrl string // LocalSSDDiskSizeProvider provides local ssd disk size based on machine type LocalSSDDiskSizeProvider gce_localssdsize.LocalSSDSizeProvider + // BulkMigInstancesListingEnabled means that cluster instances should be listed in bulk instead of per mig. + // Instances of migs having instances in creating or deleting state are re-fetched using igm.ListInstances. Inconsistencies are handled by re-fetching using igm.ListInstances + BulkMigInstancesListingEnabled bool } const ( diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index da462f5a94f0..7084e901de5a 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -219,10 +219,10 @@ var ( awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only") // GCE specific flags - concurrentGceRefreshes = flag.Int("gce-concurrent-refreshes", 1, "Maximum number of concurrent refreshes per cloud object type.") - gceMigInstancesMinRefreshWaitTime = flag.Duration("gce-mig-instances-min-refresh-wait-time", 5*time.Second, "The minimum time which needs to pass before GCE MIG instances from a given MIG can be refreshed.") - _ = flag.Bool("gce-expander-ephemeral-storage-support", true, "Whether scale-up takes ephemeral storage resources into account for GCE cloud provider (Deprecated, to be removed in 1.30+)") - + concurrentGceRefreshes = flag.Int("gce-concurrent-refreshes", 1, "Maximum number of concurrent refreshes per cloud object type.") + gceMigInstancesMinRefreshWaitTime = flag.Duration("gce-mig-instances-min-refresh-wait-time", 5*time.Second, "The minimum time which needs to pass before GCE MIG instances from a given MIG can be refreshed.") + _ = flag.Bool("gce-expander-ephemeral-storage-support", true, "Whether scale-up takes ephemeral storage resources into account for GCE cloud provider (Deprecated, to be removed in 1.30+)") + bulkGceMigInstancesListingEnabled = flag.Bool("bulk-mig-instances-listing-enabled", false, "Fetch GCE mig instances in bulk instead of per mig") enableProfiling = flag.Bool("profiling", false, "Is debug/pprof endpoint enabled") clusterAPICloudConfigAuthoritative = flag.Bool("clusterapi-cloud-config-authoritative", false, "Treat the cloud-config flag authoritatively (do not fallback to using kubeconfig flag). ClusterAPI only") cordonNodeBeforeTerminate = flag.Bool("cordon-node-before-terminating", false, "Should CA cordon nodes before terminating during downscale process") @@ -407,6 +407,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { ConcurrentRefreshes: *concurrentGceRefreshes, MigInstancesMinRefreshWaitTime: *gceMigInstancesMinRefreshWaitTime, LocalSSDDiskSizeProvider: localssdsize.NewSimpleLocalSSDProvider(), + BulkMigInstancesListingEnabled: *bulkGceMigInstancesListingEnabled, }, ClusterAPICloudConfigAuthoritative: *clusterAPICloudConfigAuthoritative, CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate,