diff --git a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go index ef40afccdfc2..7a7b5b42e308 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go @@ -383,7 +383,7 @@ func BuildGCE(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover defer config.Close() } - manager, err := CreateGceManager(config, do, opts.GCEOptions.LocalSSDDiskSizeProvider, opts.Regional, opts.GCEOptions.ConcurrentRefreshes, opts.UserAgent, opts.GCEOptions.DomainUrl, opts.GCEOptions.MigInstancesMinRefreshWaitTime) + manager, err := CreateGceManager(config, do, opts.GCEOptions.LocalSSDDiskSizeProvider, opts.Regional, opts.GCEOptions.BulkMigInstancesListingEnabled, opts.GCEOptions.ConcurrentRefreshes, opts.UserAgent, opts.GCEOptions.DomainUrl, opts.GCEOptions.MigInstancesMinRefreshWaitTime) if err != nil { klog.Fatalf("Failed to create GCE Manager: %v", err) } diff --git a/cluster-autoscaler/cloudprovider/gce/gce_manager.go b/cluster-autoscaler/cloudprovider/gce/gce_manager.go index 5b183a812c5d..60c16bf0c2f7 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_manager.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_manager.go @@ -128,7 +128,7 @@ type gceManagerImpl struct { // CreateGceManager constructs GceManager object. func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions, localSSDDiskSizeProvider localssdsize.LocalSSDSizeProvider, - regional bool, concurrentGceRefreshes int, userAgent, domainUrl string, migInstancesMinRefreshWaitTime time.Duration) (GceManager, error) { + regional, bulkGceMigInstancesListingEnabled bool, concurrentGceRefreshes int, userAgent, domainUrl string, migInstancesMinRefreshWaitTime time.Duration) (GceManager, error) { // Create Google Compute Engine token. var err error tokenSource := google.ComputeTokenSource("") @@ -188,7 +188,7 @@ func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGr cache: cache, GceService: gceService, migLister: migLister, - migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes, migInstancesMinRefreshWaitTime), + migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes, migInstancesMinRefreshWaitTime, bulkGceMigInstancesListingEnabled), location: location, regional: regional, projectId: projectId, diff --git a/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go b/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go index 275fcf7a2278..dbbe5864d4d6 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go @@ -354,7 +354,7 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa manager := &gceManagerImpl{ cache: cache, migLister: migLister, - migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, 1, 0*time.Second), + migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, 1, 0*time.Second, false), GceService: gceService, projectId: projectId, regional: regional, diff --git a/cluster-autoscaler/cloudprovider/gce/mig_info_provider.go b/cluster-autoscaler/cloudprovider/gce/mig_info_provider.go index 277bc2dd64a5..29659f19e436 100644 --- a/cluster-autoscaler/cloudprovider/gce/mig_info_provider.go +++ b/cluster-autoscaler/cloudprovider/gce/mig_info_provider.go @@ -62,15 +62,16 @@ type timeProvider interface { } type cachingMigInfoProvider struct { - migInfoMutex sync.Mutex - cache *GceCache - migLister MigLister - gceClient AutoscalingGceClient - projectId string - concurrentGceRefreshes int - migInstanceMutex sync.Mutex - migInstancesMinRefreshWaitTime time.Duration - timeProvider timeProvider + migInfoMutex sync.Mutex + cache *GceCache + migLister MigLister + gceClient AutoscalingGceClient + projectId string + concurrentGceRefreshes int + migInstanceMutex sync.Mutex + migInstancesMinRefreshWaitTime time.Duration + timeProvider timeProvider + bulkGceMigInstancesListingEnabled bool } type realTime struct{} @@ -80,15 +81,16 @@ func (r *realTime) Now() time.Time { } // NewCachingMigInfoProvider creates an instance of caching MigInfoProvider -func NewCachingMigInfoProvider(cache *GceCache, migLister MigLister, gceClient AutoscalingGceClient, projectId string, concurrentGceRefreshes int, migInstancesMinRefreshWaitTime time.Duration) MigInfoProvider { +func NewCachingMigInfoProvider(cache *GceCache, migLister MigLister, gceClient AutoscalingGceClient, projectId string, concurrentGceRefreshes int, migInstancesMinRefreshWaitTime time.Duration, bulkGceMigInstancesListingEnabled bool) MigInfoProvider { return &cachingMigInfoProvider{ - cache: cache, - migLister: migLister, - gceClient: gceClient, - projectId: projectId, - concurrentGceRefreshes: concurrentGceRefreshes, - migInstancesMinRefreshWaitTime: migInstancesMinRefreshWaitTime, - timeProvider: &realTime{}, + cache: cache, + migLister: migLister, + gceClient: gceClient, + projectId: projectId, + concurrentGceRefreshes: concurrentGceRefreshes, + migInstancesMinRefreshWaitTime: migInstancesMinRefreshWaitTime, + timeProvider: &realTime{}, + bulkGceMigInstancesListingEnabled: bulkGceMigInstancesListingEnabled, } } @@ -151,6 +153,23 @@ func (c *cachingMigInfoProvider) getCachedMigForInstance(instanceRef GceRef) (Mi func (c *cachingMigInfoProvider) RegenerateMigInstancesCache() error { c.cache.InvalidateAllMigInstances() c.cache.InvalidateAllInstancesToMig() + c.cache.InvalidateMigInstancesState() + + if c.bulkGceMigInstancesListingEnabled { + err := c.fillMigInfoCache() + if err != nil { + return err + } + instances, listErr := c.listInstancesInAllZonesWithMigs() + migToInstances := groupInstancesToMigs(instances) + updateErr := c.updateMigInstancesCache(migToInstances) + + if listErr != nil { + return listErr + } + return updateErr + } + migs := c.migLister.GetMigs() errors := make([]error, len(migs)) workqueue.ParallelizeUntil(context.Background(), c.concurrentGceRefreshes, len(migs), func(piece int) { @@ -165,6 +184,98 @@ func (c *cachingMigInfoProvider) RegenerateMigInstancesCache() error { return nil } +func (c *cachingMigInfoProvider) listInstancesInAllZonesWithMigs() ([]GceInstance, error) { + var zones []string + for zone := range c.listAllZonesWithMigs() { + zones = append(zones, zone) + } + var allInstances []GceInstance + errors := make([]error, len(zones)) + zoneInstances := make([][]GceInstance, len(zones)) + workqueue.ParallelizeUntil(context.Background(), c.concurrentGceRefreshes, len(zones), func(piece int) { + zoneInstances[piece], errors[piece] = c.gceClient.FetchAllInstances(c.projectId, zones[piece], "") + }, workqueue.WithChunkSize(c.concurrentGceRefreshes)) + + for _, instances := range zoneInstances { + allInstances = append(allInstances, instances...) + } + for _, err := range errors { + if err != nil { + return allInstances, err + } + } + return allInstances, nil +} + +func groupInstancesToMigs(instances []GceInstance) map[GceRef][]GceInstance { + migToInstances := map[GceRef][]GceInstance{} + for _, instance := range instances { + migToInstances[instance.Igm] = append(migToInstances[instance.Igm], instance) + } + return migToInstances +} + +func (c *cachingMigInfoProvider) isMigInstancesConsistent(mig Mig, migToInstances map[GceRef][]GceInstance) bool { + migRef := mig.GceRef() + instances, found := migToInstances[migRef] + if !found { + return false + } + + state, found := c.cache.GetMigInstancesState(migRef) + if !found { + return false + } + + instanceCount := state[cloudprovider.InstanceRunning] + state[cloudprovider.InstanceCreating] + state[cloudprovider.InstanceDeleting] + return instanceCount == int64(len(instances)) +} + +func (c *cachingMigInfoProvider) isMigCreatingOrDeletingInstances(mig Mig) bool { + migRef := mig.GceRef() + state, found := c.cache.GetMigInstancesState(migRef) + if !found { + return false + } + return state[cloudprovider.InstanceCreating] > 0 || state[cloudprovider.InstanceDeleting] > 0 +} + +// updateMigInstancesCache updates the mig instances for each mig +func (c *cachingMigInfoProvider) updateMigInstancesCache(migToInstances map[GceRef][]GceInstance) error { + var errors []error + for _, mig := range c.migLister.GetMigs() { + migRef := mig.GceRef() + // mig instances are re-fetched if there is an inconsistency between number + // of instances according to the mig state and the instances found from instance.List api + if !c.isMigInstancesConsistent(mig, migToInstances) { + if err := c.fillMigInstances(migRef); err != nil { + errors = append(errors, err) + } + continue + } + + // mig instances are re-fetched along with instance.Status.ErrorInfo for migs with + // instances in creating or deleting state + if c.isMigCreatingOrDeletingInstances(mig) { + if err := c.fillMigInstances(migRef); err != nil { + errors = append(errors, err) + } + continue + } + + // for all other cases, mig instances cache is updated with the instances obtained from instance.List api + instances := migToInstances[migRef] + err := c.cache.SetMigInstances(migRef, instances, c.timeProvider.Now()) + if err != nil { + errors = append(errors, err) + } + } + if len(errors) > 0 { + return errors[0] + } + return nil +} + func (c *cachingMigInfoProvider) findMigWithMatchingBasename(instanceRef GceRef) Mig { for _, mig := range c.migLister.GetMigs() { migRef := mig.GceRef() diff --git a/cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go b/cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go index 70a18654a9bc..3eceaa92fc9c 100644 --- a/cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go +++ b/cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go @@ -46,6 +46,58 @@ var ( Name: "mig", }, } + mig1 = &gceMig{ + gceRef: GceRef{ + Project: "myprojid", + Zone: "myzone1", + Name: "mig1", + }, + } + mig2 = &gceMig{ + gceRef: GceRef{ + Project: "myprojid", + Zone: "myzone2", + Name: "mig2", + }, + } + + instance1 = GceInstance{ + Instance: cloudprovider.Instance{ + Id: "gce://myprojid/myzone1/test-instance-1", + Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}, + }, + Igm: mig1.GceRef(), + } + instance2 = GceInstance{ + Instance: cloudprovider.Instance{ + Id: "gce://myprojid/myzone1/test-instance-2", + Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating}, + }, + Igm: mig1.GceRef(), + } + instance3 = GceInstance{ + Instance: cloudprovider.Instance{ + Id: "gce://myprojid/myzone2/test-instance-3", + Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}, + }, + Igm: mig2.GceRef(), + } + + instance4 = GceInstance{ + Instance: cloudprovider.Instance{ + Id: "gce://myprojid/myzone2/test-instance-4", + Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}, + }, + Igm: GceRef{}, + } + + instance5 = GceInstance{ + Instance: cloudprovider.Instance{ + Id: "gce://myprojid/myzone2/test-instance-5", + Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceRunning}, + }, + Igm: GceRef{}, + } ) type mockAutoscalingGceClient struct { @@ -204,7 +256,7 @@ func TestFillMigInstances(t *testing.T) { fetchMigInstances: fetchMigInstancesWithCounter(newInstances, callCounter), } - provider, ok := NewCachingMigInfoProvider(tc.cache, NewMigLister(tc.cache), client, mig.GceRef().Project, 1, time.Hour).(*cachingMigInfoProvider) + provider, ok := NewCachingMigInfoProvider(tc.cache, NewMigLister(tc.cache), client, mig.GceRef().Project, 1, time.Hour, false).(*cachingMigInfoProvider) assert.True(t, ok) provider.timeProvider = &fakeTime{now: timeNow} @@ -349,7 +401,7 @@ func TestMigInfoProviderGetMigForInstance(t *testing.T) { fetchMigs: fetchMigsConst(nil), } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) mig, err := provider.GetMigForInstance(instanceRef) @@ -432,7 +484,7 @@ func TestGetMigInstances(t *testing.T) { fetchMigInstances: tc.fetchMigInstances, } migLister := NewMigLister(tc.cache) - provider, ok := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second).(*cachingMigInfoProvider) + provider, ok := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false).(*cachingMigInfoProvider) assert.True(t, ok) provider.timeProvider = &fakeTime{now: newRefreshTime} @@ -465,30 +517,44 @@ func TestRegenerateMigInstancesCache(t *testing.T) { {Instance: cloudprovider.Instance{Id: "gce://project/us-test1/base-instance-name-abcd"}, NumericId: 1}, {Instance: cloudprovider.Instance{Id: "gce://project/us-test1/base-instance-name-efgh"}, NumericId: 2}, } + mig1Instances := []GceInstance{instance1, instance2} + mig2Instances := []GceInstance{instance3} otherInstances := []GceInstance{ {Instance: cloudprovider.Instance{Id: "gce://project/us-test1/other-base-instance-name-abcd"}}, {Instance: cloudprovider.Instance{Id: "gce://project/us-test1/other-base-instance-name-efgh"}}, } - var instancesRefs, otherInstancesRefs []GceRef - for _, instance := range instances { - instanceRef, err := GceRefFromProviderId(instance.Id) - assert.Nil(t, err) - instancesRefs = append(instancesRefs, instanceRef) + mig1Igm := &gce.InstanceGroupManager{ + Zone: mig1.GceRef().Zone, + Name: mig1.GceRef().Name, + TargetSize: 2, + CurrentActions: &gce.InstanceGroupManagerActionsSummary{ + Creating: 1, + }, } - for _, instance := range otherInstances { - instanceRef, err := GceRefFromProviderId(instance.Id) - assert.Nil(t, err) - otherInstancesRefs = append(otherInstancesRefs, instanceRef) + mig2Igm := &gce.InstanceGroupManager{ + Zone: mig2.GceRef().Zone, + Name: mig2.GceRef().Name, + TargetSize: 1, + CurrentActions: &gce.InstanceGroupManagerActionsSummary{}, } + instancesRefs := toInstancesRefs(t, instances) + mig1InstancesRefs := toInstancesRefs(t, mig1Instances) + mig2InstancesRefs := toInstancesRefs(t, []GceInstance{instance3}) + otherInstancesRefs := toInstancesRefs(t, otherInstances) + testCases := []struct { - name string - cache *GceCache - fetchMigInstances func(GceRef) ([]GceInstance, error) - expectedErr error - expectedMigInstances map[GceRef][]GceInstance - expectedInstancesToMig map[GceRef]GceRef + name string + cache *GceCache + fetchMigInstances func(GceRef) ([]GceInstance, error) + fetchMigs func(string) ([]*gce.InstanceGroupManager, error) + fetchAllInstances func(string, string, string) ([]GceInstance, error) + bulkGceMigInstancesListingEnabled bool + projectId string + expectedErr error + expectedMigInstances map[GceRef][]GceInstance + expectedInstancesToMig map[GceRef]GceRef }{ { name: "fill empty cache for one mig", @@ -498,6 +564,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) { instancesToMig: map[GceRef]GceRef{}, }, fetchMigInstances: fetchMigInstancesConst(instances), + projectId: mig.GceRef().Project, expectedMigInstances: map[GceRef][]GceInstance{ mig.GceRef(): instances, }, @@ -520,6 +587,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) { mig.GceRef(): instances, otherMig.GceRef(): otherInstances, }), + projectId: mig.GceRef().Project, expectedMigInstances: map[GceRef][]GceInstance{ mig.GceRef(): instances, otherMig.GceRef(): otherInstances, @@ -552,6 +620,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) { mig.GceRef(): instances, otherMig.GceRef(): otherInstances, }), + projectId: mig.GceRef().Project, expectedMigInstances: map[GceRef][]GceInstance{ mig.GceRef(): instances, }, @@ -575,6 +644,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) { }, }, fetchMigInstances: fetchMigInstancesConst(otherInstances), + projectId: mig.GceRef().Project, expectedMigInstances: map[GceRef][]GceInstance{ mig.GceRef(): otherInstances, }, @@ -593,17 +663,93 @@ func TestRegenerateMigInstancesCache(t *testing.T) { instancesToMig: map[GceRef]GceRef{}, }, fetchMigInstances: fetchMigInstancesFail, + projectId: mig.GceRef().Project, expectedErr: errFetchMigInstances, }, + { + name: "bulkGceMigInstancesListingEnabled - fill empty cache for one mig - instances in creating/deleting state", + cache: &GceCache{ + migs: map[GceRef]Mig{mig1.GceRef(): mig1}, + instances: map[GceRef][]GceInstance{}, + instancesToMig: map[GceRef]GceRef{}, + migTargetSizeCache: map[GceRef]int64{}, + migBaseNameCache: map[GceRef]string{}, + listManagedInstancesResultsCache: map[GceRef]string{}, + instanceTemplateNameCache: map[GceRef]InstanceTemplateName{}, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{}, + }, + fetchMigInstances: fetchMigInstancesConst(mig1Instances), + fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{mig1Igm}), + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone1": {instance1, instance2}, "myzone2": {instance3}}), + bulkGceMigInstancesListingEnabled: true, + projectId: mig1.GceRef().Project, + expectedMigInstances: map[GceRef][]GceInstance{ + mig1.GceRef(): mig1Instances, + }, + expectedInstancesToMig: map[GceRef]GceRef{ + mig1InstancesRefs[0]: mig1.GceRef(), + mig1InstancesRefs[1]: mig1.GceRef(), + }, + }, + { + name: "bulkGceMigInstancesListingEnabled - fill empty cache for one mig - number of instances are inconsistent in bulk listing result", + cache: &GceCache{ + migs: map[GceRef]Mig{mig1.GceRef(): mig1}, + instances: map[GceRef][]GceInstance{}, + instancesToMig: map[GceRef]GceRef{}, + migTargetSizeCache: map[GceRef]int64{}, + migBaseNameCache: map[GceRef]string{}, + listManagedInstancesResultsCache: map[GceRef]string{}, + instanceTemplateNameCache: map[GceRef]InstanceTemplateName{}, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{}, + }, + fetchMigInstances: fetchMigInstancesConst(mig1Instances), + fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{mig1Igm}), + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone1": {instance1}, "myzone2": {instance3}}), + bulkGceMigInstancesListingEnabled: true, + projectId: mig1.GceRef().Project, + expectedMigInstances: map[GceRef][]GceInstance{ + mig1.GceRef(): mig1Instances, + }, + expectedInstancesToMig: map[GceRef]GceRef{ + mig1InstancesRefs[0]: mig1.GceRef(), + mig1InstancesRefs[1]: mig1.GceRef(), + }, + }, + { + name: "bulkGceMigInstancesListingEnabled - fill empty cache for one mig - all instances in running state", + cache: &GceCache{ + migs: map[GceRef]Mig{mig2.GceRef(): mig2}, + instances: map[GceRef][]GceInstance{}, + instancesToMig: map[GceRef]GceRef{}, + migTargetSizeCache: map[GceRef]int64{}, + migBaseNameCache: map[GceRef]string{}, + listManagedInstancesResultsCache: map[GceRef]string{}, + instanceTemplateNameCache: map[GceRef]InstanceTemplateName{}, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{}, + }, + fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{mig2Igm}), + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone1": {instance1, instance2}, "myzone2": {instance3}}), + bulkGceMigInstancesListingEnabled: true, + projectId: mig2.GceRef().Project, + expectedMigInstances: map[GceRef][]GceInstance{ + mig2.GceRef(): mig2Instances, + }, + expectedInstancesToMig: map[GceRef]GceRef{ + mig2InstancesRefs[0]: mig2.GceRef(), + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { client := &mockAutoscalingGceClient{ fetchMigInstances: tc.fetchMigInstances, + fetchMigs: tc.fetchMigs, + fetchAllInstances: tc.fetchAllInstances, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, tc.projectId, 1, 0*time.Second, tc.bulkGceMigInstancesListingEnabled) err := provider.RegenerateMigInstancesCache() assert.Equal(t, tc.expectedErr, err) @@ -615,6 +761,16 @@ func TestRegenerateMigInstancesCache(t *testing.T) { } } +func toInstancesRefs(t *testing.T, instances []GceInstance) []GceRef { + var refs []GceRef + for _, instance := range instances { + instanceRef, err := GceRefFromProviderId(instance.Id) + assert.Nil(t, err) + refs = append(refs, instanceRef) + } + return refs +} + func TestGetMigTargetSize(t *testing.T) { targetSize := int64(42) instanceGroupManager := &gce.InstanceGroupManager{ @@ -682,7 +838,7 @@ func TestGetMigTargetSize(t *testing.T) { fetchMigTargetSize: tc.fetchMigTargetSize, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) targetSize, err := provider.GetMigTargetSize(mig.GceRef()) cachedTargetSize, found := tc.cache.GetMigTargetSize(mig.GceRef()) @@ -764,7 +920,7 @@ func TestGetMigBasename(t *testing.T) { fetchMigBasename: tc.fetchMigBasename, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) basename, err := provider.GetMigBasename(mig.GceRef()) cachedBasename, found := tc.cache.GetMigBasename(mig.GceRef()) @@ -845,7 +1001,7 @@ func TestGetListManagedInstancesResults(t *testing.T) { fetchListManagedInstancesResults: tc.fetchResults, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) results, err := provider.GetListManagedInstancesResults(mig.GceRef()) cachedResults, found := tc.cache.GetListManagedInstancesResults(mig.GceRef()) @@ -940,7 +1096,7 @@ func TestGetMigInstanceTemplateName(t *testing.T) { fetchMigTemplateName: tc.fetchMigTemplateName, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) instanceTemplateName, err := provider.GetMigInstanceTemplateName(mig.GceRef()) cachedInstanceTemplateName, found := tc.cache.GetMigInstanceTemplateName(mig.GceRef()) @@ -1046,7 +1202,7 @@ func TestGetMigInstanceTemplate(t *testing.T) { fetchMigTemplate: tc.fetchMigTemplate, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) template, err := provider.GetMigInstanceTemplate(mig.GceRef()) cachedTemplate, found := tc.cache.GetMigInstanceTemplate(mig.GceRef()) @@ -1252,7 +1408,7 @@ func TestGetMigInstanceKubeEnv(t *testing.T) { fetchMigTemplate: tc.fetchMigTemplate, } migLister := NewMigLister(tc.cache) - provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) kubeEnv, err := provider.GetMigKubeEnv(mig.GceRef()) cachedKubeEnv, found := tc.cache.GetMigKubeEnv(mig.GceRef()) @@ -1347,7 +1503,7 @@ func TestGetMigMachineType(t *testing.T) { fetchMachineType: tc.fetchMachineType, } migLister := NewMigLister(cache) - provider := NewCachingMigInfoProvider(cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second) + provider := NewCachingMigInfoProvider(cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false) machine, err := provider.GetMigMachineType(mig.GceRef()) if tc.expectError { assert.Error(t, err) @@ -1436,6 +1592,310 @@ func TestMultipleGetMigInstanceCallsLimited(t *testing.T) { } } +func TestListInstancesInAllZonesWithMigs(t *testing.T) { + testCases := []struct { + name string + migs map[GceRef]Mig + fetchAllInstances func(string, string, string) ([]GceInstance, error) + wantInstances []GceInstance + wantErr bool + }{ + { + name: "instance fetching failed", + migs: map[GceRef]Mig{mig1.GceRef(): mig1}, + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{}), + wantErr: true, + }, + { + name: "Successfully list mig instances in a single zone", + migs: map[GceRef]Mig{mig1.GceRef(): mig1}, + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone1": {instance1, instance2}, "myzone2": {instance3}}), + wantInstances: []GceInstance{instance1, instance2}, + }, + { + name: "Successfully list mig instances in multiple zones", + migs: map[GceRef]Mig{mig1.GceRef(): mig1, mig2.GceRef(): mig2}, + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone1": {instance1, instance2}, "myzone2": {instance3}}), + wantInstances: []GceInstance{instance1, instance2, instance3}, + }, + { + name: "Successfully list mig instances in one zones and got errors in another", + migs: map[GceRef]Mig{mig1.GceRef(): mig1, mig2.GceRef(): mig2}, + fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone1": {instance1, instance2}}), + wantInstances: []GceInstance{instance1, instance2}, + wantErr: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cache := GceCache{ + migs: tc.migs, + } + client := &mockAutoscalingGceClient{ + fetchAllInstances: tc.fetchAllInstances, + } + migLister := NewMigLister(&cache) + provider := &cachingMigInfoProvider{ + cache: &cache, + migLister: migLister, + gceClient: client, + concurrentGceRefreshes: 1, + } + instances, err := provider.listInstancesInAllZonesWithMigs() + + if tc.wantErr { + assert.NotNil(t, err) + } else { + assert.NoError(t, err) + } + assert.ElementsMatch(t, tc.wantInstances, instances) + }) + } +} + +func TestGroupInstancesToMigs(t *testing.T) { + testCases := []struct { + name string + instances []GceInstance + want map[GceRef][]GceInstance + }{ + { + name: "no instances", + want: map[GceRef][]GceInstance{}, + }, + { + name: "instances from multiple migs including unknown migs", + instances: []GceInstance{instance1, instance2, instance3, instance4, instance5}, + want: map[GceRef][]GceInstance{ + mig1.GceRef(): {instance1, instance2}, + mig2.GceRef(): {instance3}, + GceRef{}: {instance4, instance5}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + groupedInstances := groupInstancesToMigs(tc.instances) + assert.Equal(t, tc.want, groupedInstances) + }) + } +} + +func TestIsMigInstancesInconsistent(t *testing.T) { + testCases := []struct { + name string + mig Mig + migToInstances map[GceRef][]GceInstance + migInstancesStateCache map[GceRef]map[cloudprovider.InstanceState]int64 + want bool + }{ + { + name: "instance not found", + mig: mig1, + migToInstances: map[GceRef][]GceInstance{}, + want: false, + }, + { + name: "instanceState not found", + mig: mig1, + migToInstances: map[GceRef][]GceInstance{mig1.GceRef(): {instance1, instance2}}, + want: false, + }, + { + name: "inconsistent number of instances", + mig: mig1, + migToInstances: map[GceRef][]GceInstance{mig1.GceRef(): {instance1, instance2}}, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): { + cloudprovider.InstanceCreating: 2, + cloudprovider.InstanceDeleting: 3, + cloudprovider.InstanceRunning: 4, + }, + }, + want: false, + }, + { + name: "consistent number of instances", + mig: mig1, + migToInstances: map[GceRef][]GceInstance{mig1.GceRef(): {instance1, instance2}}, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): { + cloudprovider.InstanceCreating: 1, + cloudprovider.InstanceDeleting: 0, + cloudprovider.InstanceRunning: 1, + }, + }, + want: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cache := GceCache{ + migInstancesStateCache: tc.migInstancesStateCache, + } + provider := &cachingMigInfoProvider{ + cache: &cache, + } + got := provider.isMigInstancesConsistent(tc.mig, tc.migToInstances) + assert.Equal(t, tc.want, got) + }) + } +} + +func TestIsMigInCreatingOrDeletingInstanceState(t *testing.T) { + testCases := []struct { + name string + mig Mig + migInstancesStateCache map[GceRef]map[cloudprovider.InstanceState]int64 + want bool + }{ + { + name: "instanceState not found", + mig: mig1, + want: false, + }, + { + name: "in creating state", + mig: mig1, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): { + cloudprovider.InstanceCreating: 2, + cloudprovider.InstanceDeleting: 0, + cloudprovider.InstanceRunning: 1, + }, + }, + want: true, + }, + { + name: "in deleting state", + mig: mig1, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): { + cloudprovider.InstanceCreating: 0, + cloudprovider.InstanceDeleting: 1, + cloudprovider.InstanceRunning: 0, + }, + }, + want: true, + }, + { + name: "not in creating or deleting states", + mig: mig1, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): { + cloudprovider.InstanceCreating: 0, + cloudprovider.InstanceDeleting: 0, + cloudprovider.InstanceRunning: 1, + }, + }, + want: false, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cache := GceCache{ + migInstancesStateCache: tc.migInstancesStateCache, + } + provider := &cachingMigInfoProvider{ + cache: &cache, + } + got := provider.isMigCreatingOrDeletingInstances(tc.mig) + assert.Equal(t, tc.want, got) + }) + } +} + +func TestUpdateMigInstancesCache(t *testing.T) { + testCases := []struct { + name string + migs map[GceRef]Mig + migToInstances map[GceRef][]GceInstance + fetchMigInstances []GceInstance + wantInstances map[GceRef][]GceInstance + migInstancesStateCache map[GceRef]map[cloudprovider.InstanceState]int64 + }{ + { + name: "inconsistent mig instance state", + migs: map[GceRef]Mig{mig1.GceRef(): mig1}, + migToInstances: map[GceRef][]GceInstance{ + mig1.GceRef(): {instance1}, + }, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): {cloudprovider.InstanceRunning: 2, cloudprovider.InstanceDeleting: 0, cloudprovider.InstanceCreating: 0}, + }, + fetchMigInstances: []GceInstance{instance1, instance2}, + wantInstances: map[GceRef][]GceInstance{mig1.GceRef(): {instance1, instance2}}, + }, + { + name: "mig with instance in creating or deleting state", + migs: map[GceRef]Mig{mig1.GceRef(): mig1}, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): {cloudprovider.InstanceRunning: 0, cloudprovider.InstanceDeleting: 0, cloudprovider.InstanceCreating: 2}, + }, + fetchMigInstances: []GceInstance{instance1, instance2}, + wantInstances: map[GceRef][]GceInstance{mig1.GceRef(): {instance1, instance2}}, + }, + { + name: "consistent mig instance state", + migs: map[GceRef]Mig{mig1.GceRef(): mig1}, + migToInstances: map[GceRef][]GceInstance{ + mig1.GceRef(): {instance1, instance2}, + }, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): {cloudprovider.InstanceRunning: 2, cloudprovider.InstanceDeleting: 0, cloudprovider.InstanceCreating: 0}, + }, + wantInstances: map[GceRef][]GceInstance{mig1.GceRef(): {instance1, instance2}}, + }, + { + name: "mix of consistent and inconsistent states", + migs: map[GceRef]Mig{mig1.GceRef(): mig1, mig2.GceRef(): mig2}, + migToInstances: map[GceRef][]GceInstance{ + mig1.GceRef(): {instance1, instance2}, + }, + migInstancesStateCache: map[GceRef]map[cloudprovider.InstanceState]int64{ + mig1.GceRef(): {cloudprovider.InstanceRunning: 2, cloudprovider.InstanceDeleting: 0, cloudprovider.InstanceCreating: 0}, + mig2.GceRef(): {cloudprovider.InstanceRunning: 1, cloudprovider.InstanceDeleting: 0, cloudprovider.InstanceCreating: 0}, + }, + fetchMigInstances: []GceInstance{instance3}, + wantInstances: map[GceRef][]GceInstance{ + mig1.GceRef(): {instance1, instance2}, + mig2.GceRef(): {instance3}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cache := GceCache{ + migs: tc.migs, + instances: make(map[GceRef][]GceInstance), + instancesUpdateTime: make(map[GceRef]time.Time), + migBaseNameCache: make(map[GceRef]string), + migInstancesStateCache: tc.migInstancesStateCache, + instancesToMig: make(map[GceRef]GceRef), + } + migLister := NewMigLister(&cache) + client := &mockAutoscalingGceClient{ + fetchMigInstances: fetchMigInstancesConst(tc.fetchMigInstances), + } + provider := &cachingMigInfoProvider{ + cache: &cache, + migLister: migLister, + gceClient: client, + timeProvider: &realTime{}, + } + err := provider.updateMigInstancesCache(tc.migToInstances) + assert.NoError(t, err) + for migRef, want := range tc.wantInstances { + instances, found := cache.GetMigInstances(migRef) + assert.True(t, found) + assert.Equal(t, want, instances) + } + }) + } +} + type fakeTime struct { now time.Time } @@ -1555,3 +2015,13 @@ func fetchMachineTypeConst(name string, cpu int64, mem int64) func(string, strin }, nil } } + +func fetchAllInstancesInZone(allInstances map[string][]GceInstance) func(string, string, string) ([]GceInstance, error) { + return func(project, zone, filter string) ([]GceInstance, error) { + instances, found := allInstances[zone] + if !found { + return nil, errors.New("") + } + return instances, nil + } +} diff --git a/cluster-autoscaler/config/autoscaling_options.go b/cluster-autoscaler/config/autoscaling_options.go index 12ced472c683..1fbced6e0eb7 100644 --- a/cluster-autoscaler/config/autoscaling_options.go +++ b/cluster-autoscaler/config/autoscaling_options.go @@ -58,7 +58,7 @@ type NodeGroupAutoscalingOptions struct { // GCEOptions contain autoscaling options specific to GCE cloud provider. type GCEOptions struct { - // ConcurrentRefreshes is the maximum number of concurrently refreshed instance groups or instance templates. + // ConcurrentRefreshes is the maximum number of concurrently refreshed instance groups or instance templates or zones with mig instances ConcurrentRefreshes int // MigInstancesMinRefreshWaitTime is the minimum time which needs to pass before GCE MIG instances from a given MIG can be refreshed. MigInstancesMinRefreshWaitTime time.Duration @@ -66,6 +66,9 @@ type GCEOptions struct { DomainUrl string // LocalSSDDiskSizeProvider provides local ssd disk size based on machine type LocalSSDDiskSizeProvider gce_localssdsize.LocalSSDSizeProvider + // BulkMigInstancesListingEnabled means that mig instances should be listed using a single api call and sort them into migs instead of using mig specific api call. + // Instances of migs having instances in creating or deleting state are re-fetched using mig specific api call. Inconsistencies are handled by re-fetching using mig specific api calls + BulkMigInstancesListingEnabled bool } const ( diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index e756bd8b73e9..de916049b144 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -218,10 +218,10 @@ var ( awsUseStaticInstanceList = flag.Bool("aws-use-static-instance-list", false, "Should CA fetch instance types in runtime or use a static list. AWS only") // GCE specific flags - concurrentGceRefreshes = flag.Int("gce-concurrent-refreshes", 1, "Maximum number of concurrent refreshes per cloud object type.") - gceMigInstancesMinRefreshWaitTime = flag.Duration("gce-mig-instances-min-refresh-wait-time", 5*time.Second, "The minimum time which needs to pass before GCE MIG instances from a given MIG can be refreshed.") - _ = flag.Bool("gce-expander-ephemeral-storage-support", true, "Whether scale-up takes ephemeral storage resources into account for GCE cloud provider (Deprecated, to be removed in 1.30+)") - + concurrentGceRefreshes = flag.Int("gce-concurrent-refreshes", 1, "Maximum number of concurrent refreshes per cloud object type.") + gceMigInstancesMinRefreshWaitTime = flag.Duration("gce-mig-instances-min-refresh-wait-time", 5*time.Second, "The minimum time which needs to pass before GCE MIG instances from a given MIG can be refreshed.") + _ = flag.Bool("gce-expander-ephemeral-storage-support", true, "Whether scale-up takes ephemeral storage resources into account for GCE cloud provider (Deprecated, to be removed in 1.30+)") + bulkGceMigInstancesListingEnabled = flag.Bool("bulk-mig-instances-listing-enabled", false, "Bulk GCE mig instances listing fetches GCE mig instances for all migs using one API and sort them in to migs. Instances of migs having instances in creating or deleting state are re-fetched using mig specific api call. Inconsistencies are handled by re-fetching using mig specific api calls") enableProfiling = flag.Bool("profiling", false, "Is debug/pprof endpoint enabled") clusterAPICloudConfigAuthoritative = flag.Bool("clusterapi-cloud-config-authoritative", false, "Treat the cloud-config flag authoritatively (do not fallback to using kubeconfig flag). ClusterAPI only") cordonNodeBeforeTerminate = flag.Bool("cordon-node-before-terminating", false, "Should CA cordon nodes before terminating during downscale process") @@ -406,6 +406,7 @@ func createAutoscalingOptions() config.AutoscalingOptions { ConcurrentRefreshes: *concurrentGceRefreshes, MigInstancesMinRefreshWaitTime: *gceMigInstancesMinRefreshWaitTime, LocalSSDDiskSizeProvider: localssdsize.NewSimpleLocalSSDProvider(), + BulkMigInstancesListingEnabled: *bulkGceMigInstancesListingEnabled, }, ClusterAPICloudConfigAuthoritative: *clusterAPICloudConfigAuthoritative, CordonNodeBeforeTerminate: *cordonNodeBeforeTerminate,