Skip to content

Commit

Permalink
refactor gce.RegenerateMigInstancesCache() to use Instance.List API f…
Browse files Browse the repository at this point in the history
…or listing MIG instances
  • Loading branch information
damikag committed Jun 21, 2024
1 parent 3cffaf5 commit 95c4ee6
Show file tree
Hide file tree
Showing 2 changed files with 452 additions and 6 deletions.
100 changes: 94 additions & 6 deletions cluster-autoscaler/cloudprovider/gce/mig_info_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,101 @@ func (c *cachingMigInfoProvider) getCachedMigForInstance(instanceRef GceRef) (Mi
func (c *cachingMigInfoProvider) RegenerateMigInstancesCache() error {
c.cache.InvalidateAllMigInstances()
c.cache.InvalidateAllInstancesToMig()
migs := c.migLister.GetMigs()
errors := make([]error, len(migs))
workqueue.ParallelizeUntil(context.Background(), c.concurrentGceRefreshes, len(migs), func(piece int) {
errors[piece] = c.fillMigInstances(migs[piece].GceRef())
}, workqueue.WithChunkSize(c.concurrentGceRefreshes))
c.cache.InvalidateMigInstancesState()
err := c.fillMigInfoCache()
if err != nil {
return err
}

instances, err := c.listInstancesInAllZonesWithMigs()
if err != nil {
return err
}
migToInstances := groupInstancesToMigs(instances)
err = c.updateMigInstancesCache(migToInstances)
if err != nil {
return err
}

return nil
}

func (c *cachingMigInfoProvider) listInstancesInAllZonesWithMigs() ([]GceInstance, error) {
zones := c.listAllZonesWithMigs()
var instances []GceInstance
for zone := range zones {
instancesOfZone, err := c.gceClient.FetchAllInstances(c.projectId, zone, "")
if err != nil {
return instances, err
}
instances = append(instances, instancesOfZone...)
}
return instances, nil
}

func groupInstancesToMigs(instances []GceInstance) map[GceRef][]GceInstance {
migToInstances := map[GceRef][]GceInstance{}
for _, instance := range instances {
if _, found := migToInstances[instance.Igm]; found {
migToInstances[instance.Igm] = append(migToInstances[instance.Igm], instance)
} else {
migToInstances[instance.Igm] = []GceInstance{instance}
}
}
return migToInstances
}

func (c *cachingMigInfoProvider) isMigInstancesInconsistent(mig Mig, migToInstances map[GceRef][]GceInstance) bool {
migRef := mig.GceRef()
instances, found := migToInstances[migRef]
if !found {
return true
}

state, found := c.cache.GetMigInstancesState(migRef)
if !found {
return true
}

instanceCount := state[cloudprovider.InstanceRunning] + state[cloudprovider.InstanceCreating] + state[cloudprovider.InstanceDeleting]
if instanceCount != int64(len(instances)) {
return true
}

return false
}

for _, err := range errors {
func (c *cachingMigInfoProvider) isMigInCreatingOrDeletingInstanceState(mig Mig) bool {
migRef := mig.GceRef()
state, found := c.cache.GetMigInstancesState(migRef)
if !found {
return false
}
runningOrCreatingInstancesCount := state[cloudprovider.InstanceCreating] + state[cloudprovider.InstanceDeleting]
if runningOrCreatingInstancesCount == 0 {
return false
}
return true
}

func (c *cachingMigInfoProvider) updateMigInstancesCache(migToInstances map[GceRef][]GceInstance) error {
migs := c.migLister.GetMigs()
for _, mig := range migs {
migRef := mig.GceRef()
if c.isMigInstancesInconsistent(mig, migToInstances) {
if err := c.fillMigInstances(migRef); err != nil {
return err
}
continue
}
if c.isMigInCreatingOrDeletingInstanceState(mig) {
if err := c.fillMigInstances(migRef); err != nil {
return err
}
continue
}
instances := migToInstances[migRef]
err := c.cache.SetMigInstances(migRef, instances, c.timeProvider.Now())
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 95c4ee6

Please sign in to comment.