diff --git a/go.mod b/go.mod index fee35fd575..7b971d76cd 100644 --- a/go.mod +++ b/go.mod @@ -92,7 +92,6 @@ require ( go.opentelemetry.io/proto/otlp v0.7.0 // indirect golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect - golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect golang.org/x/sys v0.2.0 // indirect golang.org/x/term v0.2.0 // indirect golang.org/x/text v0.4.0 // indirect @@ -145,5 +144,5 @@ replace ( k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.24.0-alpha.4 k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.24.0-alpha.4 k8s.io/sample-controller => k8s.io/sample-controller v0.24.0-alpha.4 - sigs.k8s.io/cloud-provider-azure => sigs.k8s.io/cloud-provider-azure v0.7.4-0.20221124105803-800265e9e780 + sigs.k8s.io/cloud-provider-azure => sigs.k8s.io/cloud-provider-azure v0.7.4-0.20221127220127-3a7fec959fb3 ) diff --git a/go.sum b/go.sum index f3b1ced091..ebe272390c 100644 --- a/go.sum +++ b/go.sum @@ -809,7 +809,6 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -1208,8 +1207,8 @@ rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.30/go.mod h1:fEO7lRTdivWO2qYVCVG7dEADOMo/MLDCVr8So2g88Uw= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33 h1:LYqFq+6Cj2D0gFfrJvL7iElD4ET6ir3VDdhDdTK7rgc= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.33/go.mod h1:soWkSNf2tZC7aMibXEqVhCd73GOY5fJikn8qbdzemB0= -sigs.k8s.io/cloud-provider-azure v0.7.4-0.20221124105803-800265e9e780 h1:nkUvliQH2IKw6Eq3B3LE8H3INHhJtcxFjR+NbzuVMf8= -sigs.k8s.io/cloud-provider-azure v0.7.4-0.20221124105803-800265e9e780/go.mod h1:x4Ym3BtGAPfacIW8llyJz9DcHGMGo5Z9v3TEJoe4ZU0= +sigs.k8s.io/cloud-provider-azure v0.7.4-0.20221127220127-3a7fec959fb3 h1:6VDdLir8+NVwtfYiCmt9DOrjSOz1Zyc2E/Y/R7yY6Yo= +sigs.k8s.io/cloud-provider-azure v0.7.4-0.20221127220127-3a7fec959fb3/go.mod h1:My3CvQ0U/1jzQ9r0Z/yyREkW+xckcdXQY09m3NK9exo= sigs.k8s.io/json v0.0.0-20211208200746-9f7c6b3444d2/go.mod h1:B+TnT182UBxE84DiCz4CVE26eOSDAeYCpfDnC2kdKMY= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 h1:iXTIw73aPyC+oRdyqqvVJuloN1p0AC/kzH07hu3NE+k= sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= diff --git a/vendor/golang.org/x/sync/LICENSE b/vendor/golang.org/x/sync/LICENSE deleted file mode 100644 index 6a66aea5ea..0000000000 --- a/vendor/golang.org/x/sync/LICENSE +++ /dev/null @@ -1,27 +0,0 @@ -Copyright (c) 2009 The Go Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are -met: - - * Redistributions of source code must retain the above copyright -notice, this list of conditions and the following disclaimer. - * Redistributions in binary form must reproduce the above -copyright notice, this list of conditions and the following disclaimer -in the documentation and/or other materials provided with the -distribution. - * Neither the name of Google Inc. nor the names of its -contributors may be used to endorse or promote products derived from -this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS -"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT -LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR -A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT -OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT -LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, -DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/vendor/golang.org/x/sync/PATENTS b/vendor/golang.org/x/sync/PATENTS deleted file mode 100644 index 733099041f..0000000000 --- a/vendor/golang.org/x/sync/PATENTS +++ /dev/null @@ -1,22 +0,0 @@ -Additional IP Rights Grant (Patents) - -"This implementation" means the copyrightable works distributed by -Google as part of the Go project. - -Google hereby grants to You a perpetual, worldwide, non-exclusive, -no-charge, royalty-free, irrevocable (except as stated in this section) -patent license to make, have made, use, offer to sell, sell, import, -transfer and otherwise run, modify and propagate the contents of this -implementation of Go, where such license applies only to those patent -claims, both currently owned or controlled by Google and acquired in -the future, licensable by Google that are necessarily infringed by this -implementation of Go. This grant does not include claims that would be -infringed only as a consequence of further modification of this -implementation. If you or your agent or exclusive licensee institute or -order or agree to the institution of patent litigation against any -entity (including a cross-claim or counterclaim in a lawsuit) alleging -that this implementation of Go or any code incorporated within this -implementation of Go constitutes direct or contributory patent -infringement, or inducement of patent infringement, then any patent -rights granted to you under this License for this implementation of Go -shall terminate as of the date such litigation is filed. diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go deleted file mode 100644 index 690eb85013..0000000000 --- a/vendor/golang.org/x/sync/singleflight/singleflight.go +++ /dev/null @@ -1,212 +0,0 @@ -// Copyright 2013 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package singleflight provides a duplicate function call suppression -// mechanism. -package singleflight // import "golang.org/x/sync/singleflight" - -import ( - "bytes" - "errors" - "fmt" - "runtime" - "runtime/debug" - "sync" -) - -// errGoexit indicates the runtime.Goexit was called in -// the user given function. -var errGoexit = errors.New("runtime.Goexit was called") - -// A panicError is an arbitrary value recovered from a panic -// with the stack trace during the execution of given function. -type panicError struct { - value interface{} - stack []byte -} - -// Error implements error interface. -func (p *panicError) Error() string { - return fmt.Sprintf("%v\n\n%s", p.value, p.stack) -} - -func newPanicError(v interface{}) error { - stack := debug.Stack() - - // The first line of the stack trace is of the form "goroutine N [status]:" - // but by the time the panic reaches Do the goroutine may no longer exist - // and its status will have changed. Trim out the misleading line. - if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { - stack = stack[line+1:] - } - return &panicError{value: v, stack: stack} -} - -// call is an in-flight or completed singleflight.Do call -type call struct { - wg sync.WaitGroup - - // These fields are written once before the WaitGroup is done - // and are only read after the WaitGroup is done. - val interface{} - err error - - // forgotten indicates whether Forget was called with this call's key - // while the call was still in flight. - forgotten bool - - // These fields are read and written with the singleflight - // mutex held before the WaitGroup is done, and are read but - // not written after the WaitGroup is done. - dups int - chans []chan<- Result -} - -// Group represents a class of work and forms a namespace in -// which units of work can be executed with duplicate suppression. -type Group struct { - mu sync.Mutex // protects m - m map[string]*call // lazily initialized -} - -// Result holds the results of Do, so they can be passed -// on a channel. -type Result struct { - Val interface{} - Err error - Shared bool -} - -// Do executes and returns the results of the given function, making -// sure that only one execution is in-flight for a given key at a -// time. If a duplicate comes in, the duplicate caller waits for the -// original to complete and receives the same results. -// The return value shared indicates whether v was given to multiple callers. -func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { - g.mu.Lock() - if g.m == nil { - g.m = make(map[string]*call) - } - if c, ok := g.m[key]; ok { - c.dups++ - g.mu.Unlock() - c.wg.Wait() - - if e, ok := c.err.(*panicError); ok { - panic(e) - } else if c.err == errGoexit { - runtime.Goexit() - } - return c.val, c.err, true - } - c := new(call) - c.wg.Add(1) - g.m[key] = c - g.mu.Unlock() - - g.doCall(c, key, fn) - return c.val, c.err, c.dups > 0 -} - -// DoChan is like Do but returns a channel that will receive the -// results when they are ready. -// -// The returned channel will not be closed. -func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { - ch := make(chan Result, 1) - g.mu.Lock() - if g.m == nil { - g.m = make(map[string]*call) - } - if c, ok := g.m[key]; ok { - c.dups++ - c.chans = append(c.chans, ch) - g.mu.Unlock() - return ch - } - c := &call{chans: []chan<- Result{ch}} - c.wg.Add(1) - g.m[key] = c - g.mu.Unlock() - - go g.doCall(c, key, fn) - - return ch -} - -// doCall handles the single call for a key. -func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { - normalReturn := false - recovered := false - - // use double-defer to distinguish panic from runtime.Goexit, - // more details see https://golang.org/cl/134395 - defer func() { - // the given function invoked runtime.Goexit - if !normalReturn && !recovered { - c.err = errGoexit - } - - c.wg.Done() - g.mu.Lock() - defer g.mu.Unlock() - if !c.forgotten { - delete(g.m, key) - } - - if e, ok := c.err.(*panicError); ok { - // In order to prevent the waiting channels from being blocked forever, - // needs to ensure that this panic cannot be recovered. - if len(c.chans) > 0 { - go panic(e) - select {} // Keep this goroutine around so that it will appear in the crash dump. - } else { - panic(e) - } - } else if c.err == errGoexit { - // Already in the process of goexit, no need to call again - } else { - // Normal return - for _, ch := range c.chans { - ch <- Result{c.val, c.err, c.dups > 0} - } - } - }() - - func() { - defer func() { - if !normalReturn { - // Ideally, we would wait to take a stack trace until we've determined - // whether this is a panic or a runtime.Goexit. - // - // Unfortunately, the only way we can distinguish the two is to see - // whether the recover stopped the goroutine from terminating, and by - // the time we know that, the part of the stack trace relevant to the - // panic has been discarded. - if r := recover(); r != nil { - c.err = newPanicError(r) - } - } - }() - - c.val, c.err = fn() - normalReturn = true - }() - - if !normalReturn { - recovered = true - } -} - -// Forget tells the singleflight to forget about a key. Future calls -// to Do for this key will call the function rather than waiting for -// an earlier call to complete. -func (g *Group) Forget(key string) { - g.mu.Lock() - if c, ok := g.m[key]; ok { - c.forgotten = true - } - delete(g.m, key) - g.mu.Unlock() -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 1bf4b1ade1..20f0a188f6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -389,9 +389,6 @@ golang.org/x/net/trace ## explicit; go 1.11 golang.org/x/oauth2 golang.org/x/oauth2/internal -# golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 -## explicit -golang.org/x/sync/singleflight # golang.org/x/sys v0.2.0 ## explicit; go 1.17 golang.org/x/sys/cpu @@ -1062,7 +1059,7 @@ k8s.io/utils/trace ## explicit; go 1.17 sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client -# sigs.k8s.io/cloud-provider-azure v0.7.4 => sigs.k8s.io/cloud-provider-azure v0.7.4-0.20221124105803-800265e9e780 +# sigs.k8s.io/cloud-provider-azure v0.7.4 => sigs.k8s.io/cloud-provider-azure v0.7.4-0.20221127220127-3a7fec959fb3 ## explicit; go 1.18 sigs.k8s.io/cloud-provider-azure/pkg/auth sigs.k8s.io/cloud-provider-azure/pkg/azureclients @@ -1158,4 +1155,4 @@ sigs.k8s.io/yaml # k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.24.0-alpha.4 # k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.24.0-alpha.4 # k8s.io/sample-controller => k8s.io/sample-controller v0.24.0-alpha.4 -# sigs.k8s.io/cloud-provider-azure => sigs.k8s.io/cloud-provider-azure v0.7.4-0.20221124105803-800265e9e780 +# sigs.k8s.io/cloud-provider-azure => sigs.k8s.io/cloud-provider-azure v0.7.4-0.20221127220127-3a7fec959fb3 diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/armclient/azure_armclient.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/armclient/azure_armclient.go index 7788895a8e..929bcfe32e 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/armclient/azure_armclient.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/azureclients/armclient/azure_armclient.go @@ -93,6 +93,20 @@ func sender() autorest.Sender { } j, _ := cookiejar.New(nil) defaultSenders.sender = &http.Client{Jar: j, Transport: roundTripper} + + // In go-autorest SDK https://github.com/Azure/go-autorest/blob/master/autorest/sender.go#L258-L287, + // if ARM returns http.StatusTooManyRequests, the sender doesn't increase the retry attempt count, + // hence the Azure clients will keep retrying forever until it get a status code other than 429. + // So we explicitly removes http.StatusTooManyRequests from autorest.StatusCodesForRetry. + // Refer https://github.com/Azure/go-autorest/issues/398. + // TODO(feiskyer): Use autorest.SendDecorator to customize the retry policy when new Azure SDK is available. + statusCodesForRetry := make([]int, 0) + for _, code := range autorest.StatusCodesForRetry { + if code != http.StatusTooManyRequests { + statusCodesForRetry = append(statusCodesForRetry, code) + } + } + autorest.StatusCodesForRetry = statusCodesForRetry }) return defaultSenders.sender } diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure.go index 11942b69f1..adc623eebf 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure.go @@ -374,22 +374,6 @@ type Cloud struct { *controllerCommon } -func init() { - // In go-autorest SDK https://github.com/Azure/go-autorest/blob/master/autorest/sender.go#L258-L287, - // if ARM returns http.StatusTooManyRequests, the sender doesn't increase the retry attempt count, - // hence the Azure clients will keep retrying forever until it get a status code other than 429. - // So we explicitly removes http.StatusTooManyRequests from autorest.StatusCodesForRetry. - // Refer https://github.com/Azure/go-autorest/issues/398. - // TODO(feiskyer): Use autorest.SendDecorator to customize the retry policy when new Azure SDK is available. - statusCodesForRetry := make([]int, 0) - for _, code := range autorest.StatusCodesForRetry { - if code != http.StatusTooManyRequests { - statusCodesForRetry = append(statusCodesForRetry, code) - } - } - autorest.StatusCodesForRetry = statusCodesForRetry -} - // NewCloud returns a Cloud with initialized clients func NewCloud(configReader io.Reader, callFromCCM bool) (cloudprovider.Interface, error) { az, err := NewCloudWithoutFeatureGates(configReader, callFromCCM) diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_loadbalancer.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_loadbalancer.go index 78bc3ce7bd..6ac852af06 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_loadbalancer.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_loadbalancer.go @@ -525,7 +525,7 @@ func (az *Cloud) cleanOrphanedLoadBalancer(lb *network.LoadBalancer, existingLBs // safeDeleteLoadBalancer deletes the load balancer after decoupling it from the vmSet func (az *Cloud) safeDeleteLoadBalancer(lb network.LoadBalancer, clusterName, vmSetName string, service *v1.Service) *retry.Error { lbBackendPoolID := az.getBackendPoolID(to.String(lb.Name), az.getLoadBalancerResourceGroup(), getBackendPoolName(clusterName, service)) - err := az.VMSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, lb.BackendAddressPools, true) + _, err := az.VMSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, lb.BackendAddressPools, true) if err != nil { return retry.NewError(false, fmt.Errorf("safeDeleteLoadBalancer: failed to EnsureBackendPoolDeleted: %w", err)) } @@ -1650,7 +1650,7 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service, if !exist { return nil, fmt.Errorf("load balancer %q not found", lbName) } - lb = &newLB + lb = newLB } } @@ -2497,7 +2497,7 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service, if !exist { return nil, fmt.Errorf("unable to get lb %s", to.String(lbName)) } - backendPrivateIPv4s, backendPrivateIPv6s := az.LoadBalancerBackendPool.GetBackendPrivateIPs(clusterName, service, &lb) + backendPrivateIPv4s, backendPrivateIPv6s := az.LoadBalancerBackendPool.GetBackendPrivateIPs(clusterName, service, lb) backendIPAddresses = backendPrivateIPv4s if utilnet.IsIPv6String(*lbIP) { backendIPAddresses = backendPrivateIPv6s @@ -3016,11 +3016,10 @@ func (az *Cloud) reconcilePublicIP(clusterName string, service *v1.Service, lbNa } if lbName != "" { - loadBalancer, _, err := az.getAzureLoadBalancer(lbName, azcache.CacheReadTypeDefault) + lb, _, err = az.getAzureLoadBalancer(lbName, azcache.CacheReadTypeDefault) if err != nil { return nil, err } - lb = &loadBalancer } discoveredDesiredPublicIP, pipsToBeDeleted, deletedDesiredPublicIP, pipsToBeUpdated, err := az.getPublicIPUpdates( diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_loadbalancer_backendpool.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_loadbalancer_backendpool.go index 10b8d62733..cb65850358 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_loadbalancer_backendpool.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_loadbalancer_backendpool.go @@ -32,6 +32,7 @@ import ( "k8s.io/klog/v2" utilnet "k8s.io/utils/net" + "sigs.k8s.io/cloud-provider-azure/pkg/cache" "sigs.k8s.io/cloud-provider-azure/pkg/consts" ) @@ -118,15 +119,16 @@ func (bc *backendPoolTypeNodeIPConfig) CleanupVMSetFromBackendPoolByCondition(sl }, } // decouple the backendPool from the node - err := bc.VMSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, backendpoolToBeDeleted, true) + shouldRefreshLB, err := bc.VMSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, backendpoolToBeDeleted, true) if err != nil { return nil, err } - slb.BackendAddressPools = &newBackendPools - // Proactively disable the etag to prevent etag mismatch error when putting lb later. - // This could happen because when we remove the hosts from the lb, the nrp - // would put the lb to remove the backend references as well. - slb.Etag = nil + if shouldRefreshLB { + slb, _, err = bc.getAzureLoadBalancer(to.String(slb.Name), cache.CacheReadTypeForceRefresh) + if err != nil { + return nil, fmt.Errorf("bc.CleanupVMSetFromBackendPoolByCondition: failed to get load balancer %s, err: %w", to.String(slb.Name), err) + } + } } return slb, nil @@ -141,6 +143,7 @@ func (bc *backendPoolTypeNodeIPConfig) ReconcileBackendPools(clusterName string, foundBackendPool := false changed := false + shouldRefreshLB := false lbName := *lb.Name serviceName := getServiceName(service) @@ -167,14 +170,13 @@ func (bc *backendPoolTypeNodeIPConfig) ReconcileBackendPools(clusterName string, bp.LoadBalancerBackendAddresses != nil && len(*bp.LoadBalancerBackendAddresses) > 0 { if removeNodeIPAddressesFromBackendPool(bp, []string{}, true) { - bp.Etag = nil if err := bc.CreateOrUpdateLBBackendPool(lbName, bp); err != nil { klog.Errorf("bc.ReconcileBackendPools for service (%s): failed to cleanup IP based backend pool %s: %s", serviceName, lbBackendPoolName, err.Error()) return false, false, fmt.Errorf("bc.ReconcileBackendPools for service (%s): failed to cleanup IP based backend pool %s: %w", serviceName, lbBackendPoolName, err) } newBackendPools[i] = bp lb.BackendAddressPools = &newBackendPools - lb.Etag = nil + shouldRefreshLB = true } } @@ -213,10 +215,13 @@ func (bc *backendPoolTypeNodeIPConfig) ReconcileBackendPools(clusterName string, }, } // decouple the backendPool from the node - err = bc.VMSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, backendpoolToBeDeleted, false) + updated, err := bc.VMSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, backendpoolToBeDeleted, false) if err != nil { return false, false, err } + if updated { + shouldRefreshLB = true + } } break } else { @@ -224,6 +229,13 @@ func (bc *backendPoolTypeNodeIPConfig) ReconcileBackendPools(clusterName string, } } + if shouldRefreshLB { + lb, _, err = bc.getAzureLoadBalancer(lbName, cache.CacheReadTypeForceRefresh) + if err != nil { + return false, false, fmt.Errorf("bc.ReconcileBackendPools for service (%s): failed to get loadbalancer %s: %w", serviceName, lbName, err) + } + } + if !foundBackendPool { isBackendPoolPreConfigured = newBackendPool(lb, isBackendPoolPreConfigured, bc.PreConfiguredBackendPoolLoadBalancerTypes, getServiceName(service), getBackendPoolName(clusterName, service)) changed = true @@ -434,6 +446,7 @@ func (bi *backendPoolTypeNodeIP) ReconcileBackendPools(clusterName string, servi foundBackendPool := false changed := false + shouldRefreshLB := false lbName := *lb.Name serviceName := getServiceName(service) lbBackendPoolName := getBackendPoolName(clusterName, service) @@ -441,6 +454,7 @@ func (bi *backendPoolTypeNodeIP) ReconcileBackendPools(clusterName string, servi lbBackendPoolID := bi.getBackendPoolID(to.String(lb.Name), bi.getLoadBalancerResourceGroup(), getBackendPoolName(clusterName, service)) isBackendPoolPreConfigured := bi.isBackendPoolPreConfigured(service) + var err error for i := len(newBackendPools) - 1; i >= 0; i-- { bp := newBackendPools[i] if strings.EqualFold(*bp.Name, lbBackendPoolName) { @@ -455,19 +469,11 @@ func (bi *backendPoolTypeNodeIP) ReconcileBackendPools(clusterName string, servi // If the LB backend pool type is configured from nodeIPConfiguration // to nodeIP, we need to decouple the VM NICs from the LB // before attaching nodeIPs/podIPs to the LB backend pool. - if bp.BackendAddressPoolPropertiesFormat != nil && - bp.BackendIPConfigurations != nil && - len(*bp.BackendIPConfigurations) > 0 { - klog.V(2).Infof("bi.ReconcileBackendPools for service (%s): ensuring the LB is decoupled from the VMSet", serviceName) - if err := bi.VMSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, lb.BackendAddressPools, true); err != nil { - klog.Errorf("bi.ReconcileBackendPools for service (%s): failed to EnsureBackendPoolDeleted: %s", serviceName, err.Error()) - return false, false, err - } - newBackendPools[i].BackendAddressPoolPropertiesFormat.LoadBalancerBackendAddresses = &[]network.LoadBalancerBackendAddress{} - newBackendPools[i].BackendAddressPoolPropertiesFormat.BackendIPConfigurations = &[]network.InterfaceIPConfiguration{} - newBackendPools[i].Etag = nil - lb.Etag = nil - break + klog.V(2).Infof("bi.ReconcileBackendPools for service (%s) and vmSet (%s): ensuring the LB is decoupled from the VMSet", serviceName, vmSetName) + shouldRefreshLB, err = bi.VMSet.EnsureBackendPoolDeleted(service, lbBackendPoolID, vmSetName, lb.BackendAddressPools, true) + if err != nil { + klog.Errorf("bi.ReconcileBackendPools for service (%s): failed to EnsureBackendPoolDeleted: %s", serviceName, err.Error()) + return false, false, err } var nodeIPAddressesToBeDeleted []string @@ -484,6 +490,7 @@ func (bi *backendPoolTypeNodeIP) ReconcileBackendPools(clusterName string, servi if err := bi.CreateOrUpdateLBBackendPool(lbName, bp); err != nil { return false, false, fmt.Errorf("bi.ReconcileBackendPools for service (%s): lb backendpool - failed to update backend pool %s for load balancer %s: %w", serviceName, lbBackendPoolName, lbName, err) } + shouldRefreshLB = true } } break @@ -492,6 +499,13 @@ func (bi *backendPoolTypeNodeIP) ReconcileBackendPools(clusterName string, servi } } + if shouldRefreshLB { + lb, _, err = bi.getAzureLoadBalancer(lbName, cache.CacheReadTypeForceRefresh) + if err != nil { + return false, false, fmt.Errorf("bi.ReconcileBackendPools for service (%s): failed to get load balancer %s: %w", serviceName, lbName, err) + } + } + if !foundBackendPool { isBackendPoolPreConfigured = newBackendPool(lb, isBackendPoolPreConfigured, bi.PreConfiguredBackendPoolLoadBalancerTypes, getServiceName(service), getBackendPoolName(clusterName, service)) changed = true diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_mock_vmsets.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_mock_vmsets.go index 07a8662252..fa0722214c 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_mock_vmsets.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_mock_vmsets.go @@ -97,11 +97,12 @@ func (mr *MockVMSetMockRecorder) DetachDisk(ctx, nodeName, diskMap interface{}) } // EnsureBackendPoolDeleted mocks base method. -func (m *MockVMSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool, deleteFromVMSet bool) error { +func (m *MockVMSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool, deleteFromVMSet bool) (bool, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "EnsureBackendPoolDeleted", service, backendPoolID, vmSetName, backendAddressPools, deleteFromVMSet) - ret0, _ := ret[0].(error) - return ret0 + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 } // EnsureBackendPoolDeleted indicates an expected call of EnsureBackendPoolDeleted. diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_standard.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_standard.go index ffdfef624d..328758e631 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_standard.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_standard.go @@ -1048,10 +1048,10 @@ func (as *availabilitySet) EnsureHostsInPool(service *v1.Service, nodes []*v1.No } // EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes. -func (as *availabilitySet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool, deleteFromVMSet bool) error { +func (as *availabilitySet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool, deleteFromVMSet bool) (bool, error) { // Returns nil if backend address pools already deleted. if backendAddressPools == nil { - return nil + return false, nil } mc := metrics.NewMetricContext("services", "vmas_ensure_backend_pool_deleted", as.ResourceGroup, as.SubscriptionID, getServiceName(service)) @@ -1076,6 +1076,7 @@ func (as *availabilitySet) EnsureBackendPoolDeleted(service *v1.Service, backend } nicUpdaters := make([]func() error, 0) allErrs := make([]error, 0) + var nicUpdated bool for i := range ipConfigurationIDs { ipConfigurationID := ipConfigurationIDs[i] nodeName, _, err := as.GetNodeNameByIPConfigurationID(ipConfigurationID) @@ -1093,15 +1094,15 @@ func (as *availabilitySet) EnsureBackendPoolDeleted(service *v1.Service, backend if err != nil { if errors.Is(err, errNotInVMSet) { klog.V(3).Infof("EnsureBackendPoolDeleted skips node %s because it is not in the vmSet %s", nodeName, vmSetName) - return nil + return false, nil } klog.Errorf("error: az.EnsureBackendPoolDeleted(%s), az.VMSet.GetPrimaryInterface.Get(%s, %s), err=%v", nodeName, vmName, vmSetName, err) - return err + return false, err } vmasName, err := getAvailabilitySetNameByID(vmasID) if err != nil { - return fmt.Errorf("EnsureBackendPoolDeleted: failed to parse the VMAS ID %s: %w", vmasID, err) + return false, fmt.Errorf("EnsureBackendPoolDeleted: failed to parse the VMAS ID %s: %w", vmasID, err) } // Only remove nodes belonging to specified vmSet to basic LB backends. if !strings.EqualFold(vmasName, vmSetName) { @@ -1111,7 +1112,7 @@ func (as *availabilitySet) EnsureBackendPoolDeleted(service *v1.Service, backend if nic.ProvisioningState == consts.NicFailedState { klog.Warningf("EnsureBackendPoolDeleted skips node %s because its primary nic %s is in Failed state", nodeName, *nic.Name) - return nil + return false, nil } if nic.InterfacePropertiesFormat != nil && nic.InterfacePropertiesFormat.IPConfigurations != nil { @@ -1143,21 +1144,22 @@ func (as *availabilitySet) EnsureBackendPoolDeleted(service *v1.Service, backend klog.Errorf("EnsureBackendPoolDeleted CreateOrUpdate for NIC(%s, %s) failed with error %v", as.resourceGroup, to.String(nic.Name), rerr.Error()) return rerr.Error() } + nicUpdated = true return nil }) } } errs := utilerrors.AggregateGoroutines(nicUpdaters...) if errs != nil { - return utilerrors.Flatten(errs) + return nicUpdated, utilerrors.Flatten(errs) } // Fail if there are other errors. if len(allErrs) > 0 { - return utilerrors.Flatten(utilerrors.NewAggregate(allErrs)) + return nicUpdated, utilerrors.Flatten(utilerrors.NewAggregate(allErrs)) } isOperationSucceeded = true - return nil + return nicUpdated, nil } func getAvailabilitySetNameByID(asID string) (string, error) { diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_utils.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_utils.go index 910abb56fa..fe353f7084 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_utils.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_utils.go @@ -298,36 +298,40 @@ func removeDuplicatedSecurityRules(rules []network.SecurityRule) []network.Secur return rules } +func getVMSSVMCacheKey(resourceGroup, vmssName string) string { + cacheKey := strings.ToLower(fmt.Sprintf("%s/%s", resourceGroup, vmssName)) + return cacheKey +} + // isNodeInVMSSVMCache check whether nodeName is in vmssVMCache -func isNodeInVMSSVMCache(nodeName string, vmssVMCache *sync.Map) bool { +func isNodeInVMSSVMCache(nodeName string, vmssVMCache *azcache.TimedCache) bool { if vmssVMCache == nil { return false } + var isInCache bool - vmssVMCache.Range(func(_, value interface{}) bool { - if value != nil && value.(*azcache.TimedCache).Store != nil { - for _, v := range value.(*azcache.TimedCache).Store.List() { - if v != nil { - data := v.(*azcache.AzureCacheEntry).Data - if data != nil { - data.(*sync.Map).Range(func(vmName, _ interface{}) bool { - if vmName != nil && vmName.(string) == nodeName { - isInCache = true - return false - } - return true - }) + + vmssVMCache.Lock.Lock() + defer vmssVMCache.Lock.Unlock() + + for _, entry := range vmssVMCache.Store.List() { + if entry != nil { + data := entry.(*azcache.AzureCacheEntry).Data + if data != nil { + data.(*sync.Map).Range(func(vmName, _ interface{}) bool { + if vmName != nil && vmName.(string) == nodeName { + isInCache = true + return false } - } - if isInCache { - return false - } + return true + }) } } + if isInCache { - return false + break } - return true - }) + } + return isInCache } diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmsets.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmsets.go index 42bce2794a..d829587960 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmsets.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmsets.go @@ -71,7 +71,7 @@ type VMSet interface { // participating in the specified LoadBalancer Backend Pool. EnsureHostInPool(service *v1.Service, nodeName types.NodeName, backendPoolID string, vmSetName string) (string, string, string, *compute.VirtualMachineScaleSetVM, error) // EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes. - EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool, deleteFromVMSet bool) error + EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool, deleteFromVMSet bool) (bool, error) //EnsureBackendPoolDeletedFromVMSets ensures the loadBalancer backendAddressPools deleted from the specified VMSS/VMAS EnsureBackendPoolDeletedFromVMSets(vmSetNamesMap map[string]bool, backendPoolID string) error diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmss.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmss.go index 28c6dbfc32..43ce1bf4f7 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmss.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmss.go @@ -35,7 +35,6 @@ import ( "k8s.io/klog/v2" utilnet "k8s.io/utils/net" - "golang.org/x/sync/singleflight" azcache "sigs.k8s.io/cloud-provider-azure/pkg/cache" "sigs.k8s.io/cloud-provider-azure/pkg/consts" "sigs.k8s.io/cloud-provider-azure/pkg/metrics" @@ -80,8 +79,12 @@ type ScaleSet struct { // the same cluster. flexScaleSet VMSet - vmssCache *azcache.TimedCache - vmssVMCache *sync.Map // [resourcegroup/vmssname]*azcache.TimedCache + vmssCache *azcache.TimedCache + + // vmssVMCache is timed cache where the Store in the cache is a map of + // Key: [resourcegroup/vmssName] + // Value: sync.Map of [vmName]*VMSSVirtualMachinesEntry + vmssVMCache *azcache.TimedCache // nonVmssUniformNodesCache is used to store node names from non uniform vm. // Currently, the nodes can from avset or vmss flex or individual vm. @@ -92,8 +95,6 @@ type ScaleSet struct { // lockMap in cache refresh lockMap *lockMap - // group represents a class of work and units of work can be executed with duplicate suppression - group singleflight.Group } // newScaleSet creates a new ScaleSet. @@ -116,7 +117,6 @@ func newScaleSet(az *Cloud) (VMSet, error) { Cloud: az, availabilitySet: as, flexScaleSet: fs, - vmssVMCache: &sync.Map{}, lockMap: newLockMap(), } @@ -132,6 +132,11 @@ func newScaleSet(az *Cloud) (VMSet, error) { return nil, err } + ss.vmssVMCache, err = ss.newVMSSVirtualMachinesCache() + if err != nil { + return nil, err + } + return ss, nil } @@ -174,22 +179,23 @@ func (ss *ScaleSet) getVMSS(vmssName string, crt azcache.AzureCacheReadType) (*c // getVmssVMByNodeIdentity find virtualMachineScaleSetVM by nodeIdentity, using node's parent VMSS cache. // Returns cloudprovider.InstanceNotFound if the node does not belong to the scale set named in nodeIdentity. func (ss *ScaleSet) getVmssVMByNodeIdentity(node *nodeIdentity, crt azcache.AzureCacheReadType) (*virtualmachine.VirtualMachine, error) { - cacheKey, cache, err := ss.getVMSSVMCache(node.resourceGroup, node.vmssName) + // FIXME(ccc): check only if vmss is uniform. + _, err := getScaleSetVMInstanceID(node.nodeName) if err != nil { return nil, err } - getter := func(nodeName string, crt azcache.AzureCacheReadType) (*virtualmachine.VirtualMachine, bool, error) { + getter := func(crt azcache.AzureCacheReadType) (*virtualmachine.VirtualMachine, bool, error) { var found bool - cached, err := cache.Get(cacheKey, crt) + virtualMachines, err := ss.getVMSSVMsFromCache(node.resourceGroup, node.vmssName, crt) if err != nil { return nil, found, err } - virtualMachines := cached.(*sync.Map) - if entry, ok := virtualMachines.Load(nodeName); ok { + + if entry, ok := virtualMachines.Load(node.nodeName); ok { result := entry.(*VMSSVirtualMachinesEntry) if result.VirtualMachine == nil { - klog.Warningf("VM is nil on Node %q, VM is in deleting state", nodeName) + klog.Warningf("VM is nil on Node %q, VM is in deleting state", node.nodeName) return nil, true, nil } found = true @@ -199,29 +205,24 @@ func (ss *ScaleSet) getVmssVMByNodeIdentity(node *nodeIdentity, crt azcache.Azur return nil, found, nil } - // FIXME(ccc): check only if vmss is uniform. - _, err = getScaleSetVMInstanceID(node.nodeName) - if err != nil { - return nil, err - } - - vm, found, err := getter(node.nodeName, crt) + vm, found, err := getter(crt) if err != nil { return nil, err } if !found { + cacheKey := getVMSSVMCacheKey(node.resourceGroup, node.vmssName) // lock and try find nodeName from cache again, refresh cache if still not found ss.lockMap.LockEntry(cacheKey) defer ss.lockMap.UnlockEntry(cacheKey) - vm, found, err = getter(node.nodeName, crt) + vm, found, err = getter(crt) if err == nil && found && vm != nil { klog.V(2).Infof("found VMSS VM with nodeName %s after retry", node.nodeName) return vm, nil } klog.V(2).Infof("Couldn't find VMSS VM with nodeName %s, refreshing the cache(vmss: %s, rg: %s)", node.nodeName, node.vmssName, node.resourceGroup) - vm, found, err = getter(node.nodeName, azcache.CacheReadTypeForceRefresh) + vm, found, err = getter(azcache.CacheReadTypeForceRefresh) if err != nil { return nil, err } @@ -321,18 +322,12 @@ func (ss *ScaleSet) GetProvisioningStateByNodeName(name string) (provisioningSta // getCachedVirtualMachineByInstanceID gets scaleSetVMInfo from cache. // The node must belong to one of scale sets. func (ss *ScaleSet) getVmssVMByInstanceID(resourceGroup, scaleSetName, instanceID string, crt azcache.AzureCacheReadType) (*compute.VirtualMachineScaleSetVM, error) { - cacheKey, cache, err := ss.getVMSSVMCache(resourceGroup, scaleSetName) - if err != nil { - return nil, err - } - getter := func(crt azcache.AzureCacheReadType) (vm *compute.VirtualMachineScaleSetVM, found bool, err error) { - cached, err := cache.Get(cacheKey, crt) + virtualMachines, err := ss.getVMSSVMsFromCache(resourceGroup, scaleSetName, crt) if err != nil { return nil, false, err } - virtualMachines := cached.(*sync.Map) virtualMachines.Range(func(key, value interface{}) bool { vmEntry := value.(*VMSSVirtualMachinesEntry) if strings.EqualFold(vmEntry.ResourceGroup, resourceGroup) && @@ -1812,10 +1807,10 @@ func (ss *ScaleSet) ensureBackendPoolDeletedFromVmssUniform(backendPoolID, vmSet } // ensureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes. -func (ss *ScaleSet) ensureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool, deleteFromVMSet bool) error { +func (ss *ScaleSet) ensureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool) (bool, error) { // Returns nil if backend address pools already deleted. if backendAddressPools == nil { - return nil + return false, nil } mc := metrics.NewMetricContext("services", "vmss_ensure_backend_pool_deleted", ss.ResourceGroup, ss.SubscriptionID, getServiceName(service)) @@ -1900,6 +1895,7 @@ func (ss *ScaleSet) ensureBackendPoolDeleted(service *v1.Service, backendPoolID, } // Update VMs with best effort that have already been added to nodeUpdates. + var updatedVM bool for meta, update := range nodeUpdates { // create new instance of meta and update for passing to anonymous function meta := meta @@ -1928,27 +1924,28 @@ func (ss *ScaleSet) ensureBackendPoolDeleted(service *v1.Service, backendPoolID, return rerr.Error() } + updatedVM = true return nil }) } errs := utilerrors.AggregateGoroutines(hostUpdates...) if errs != nil { - return utilerrors.Flatten(errs) + return updatedVM, utilerrors.Flatten(errs) } // Fail if there are other errors. if len(allErrs) > 0 { - return utilerrors.Flatten(utilerrors.NewAggregate(allErrs)) + return updatedVM, utilerrors.Flatten(utilerrors.NewAggregate(allErrs)) } isOperationSucceeded = true - return nil + return updatedVM, nil } // EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes. -func (ss *ScaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool, deleteFromVMSet bool) error { +func (ss *ScaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool, deleteFromVMSet bool) (bool, error) { if backendAddressPools == nil { - return nil + return false, nil } vmssUniformBackendIPConfigurations := []network.InterfaceIPConfiguration{} vmssFlexBackendIPConfigurations := []network.InterfaceIPConfiguration{} @@ -1987,10 +1984,11 @@ func (ss *ScaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, if deleteFromVMSet { err := ss.ensureBackendPoolDeletedFromVMSS(backendPoolID, vmSetName) if err != nil { - return err + return false, err } } + var updated bool if len(vmssUniformBackendIPConfigurations) > 0 { vmssUniformBackendPools := &[]network.BackendAddressPool{ { @@ -2000,9 +1998,12 @@ func (ss *ScaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, }, }, } - err := ss.ensureBackendPoolDeleted(service, backendPoolID, vmSetName, vmssUniformBackendPools, false) + updatedVM, err := ss.ensureBackendPoolDeleted(service, backendPoolID, vmSetName, vmssUniformBackendPools) if err != nil { - return err + return false, err + } + if updatedVM { + updated = true } } @@ -2015,9 +2016,12 @@ func (ss *ScaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, }, }, } - err := ss.flexScaleSet.EnsureBackendPoolDeleted(service, backendPoolID, vmSetName, vmssFlexBackendPools, false) + updatedNIC, err := ss.flexScaleSet.EnsureBackendPoolDeleted(service, backendPoolID, vmSetName, vmssFlexBackendPools, false) if err != nil { - return err + return false, err + } + if updatedNIC { + updated = true } } @@ -2030,13 +2034,16 @@ func (ss *ScaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, }, }, } - err := ss.availabilitySet.EnsureBackendPoolDeleted(service, backendPoolID, vmSetName, avSetBackendPools, false) + updatedNIC, err := ss.availabilitySet.EnsureBackendPoolDeleted(service, backendPoolID, vmSetName, avSetBackendPools, false) if err != nil { - return err + return false, err + } + if updatedNIC { + updated = true } } - return nil + return updated, nil } diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmss_cache.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmss_cache.go index 62ec948e9e..44ec739fc2 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmss_cache.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmss_cache.go @@ -104,19 +104,13 @@ func (ss *ScaleSet) newVMSSCache() (*azcache.TimedCache, error) { if resourceGroupNotFound { // gc vmss vm cache when there is resource group not found - removed := map[string]bool{} - ss.vmssVMCache.Range(func(key, value interface{}) bool { - cacheKey := key.(string) - vlistIdx := cacheKey[strings.LastIndex(cacheKey, "/")+1:] - if _, ok := localCache.Load(vlistIdx); !ok { - klog.V(2).Infof("remove vmss %s from cache due to rg not found", cacheKey) - removed[cacheKey] = true + vmssVMKeys := ss.vmssVMCache.Store.ListKeys() + for _, cacheKey := range vmssVMKeys { + vmssName := cacheKey[strings.LastIndex(cacheKey, "/")+1:] + if _, ok := localCache.Load(vmssName); !ok { + klog.V(2).Infof("remove vmss %s from vmssVMCache due to rg not found", cacheKey) + _ = ss.vmssVMCache.Delete(cacheKey) } - return true - }) - - for key := range removed { - ss.vmssVMCache.Delete(key) } } return localCache, nil @@ -142,25 +136,20 @@ func extractVmssVMName(name string) (string, string, error) { return ssName, instanceID, nil } -// getVMSSVMCache returns an *azcache.TimedCache and cache key for a VMSS (creating that cache if new). -func (ss *ScaleSet) getVMSSVMCache(resourceGroup, vmssName string) (string, *azcache.TimedCache, error) { - cacheKey := strings.ToLower(fmt.Sprintf("%s/%s", resourceGroup, vmssName)) - if entry, ok := ss.vmssVMCache.Load(cacheKey); ok { - return cacheKey, entry.(*azcache.TimedCache), nil +func (ss *ScaleSet) getVMSSVMsFromCache(resourceGroup, vmssName string, crt azcache.AzureCacheReadType) (*sync.Map, error) { + cacheKey := getVMSSVMCacheKey(resourceGroup, vmssName) + entry, err := ss.vmssVMCache.Get(cacheKey, crt) + if err != nil { + return nil, err } - v, err, _ := ss.group.Do(cacheKey, func() (interface{}, error) { - cache, err := ss.newVMSSVirtualMachinesCache(resourceGroup, vmssName, cacheKey) - if err != nil { - return nil, err - } - ss.vmssVMCache.Store(cacheKey, cache) - return cache, nil - }) - if err != nil { - return "", nil, err + if entry == nil { + err = fmt.Errorf("vmssVMCache entry for resourceGroup (%s), vmssName (%s) returned nil data", resourceGroup, vmssName) + return nil, err } - return cacheKey, v.(*azcache.TimedCache), nil + + virtualMachines := entry.(*sync.Map) + return virtualMachines, nil } // gcVMSSVMCache delete stale VMSS VMs caches from deleted VMSSes. @@ -169,33 +158,37 @@ func (ss *ScaleSet) gcVMSSVMCache() error { } // newVMSSVirtualMachinesCache instantiates a new VMs cache for VMs belonging to the provided VMSS. -func (ss *ScaleSet) newVMSSVirtualMachinesCache(resourceGroupName, vmssName, cacheKey string) (*azcache.TimedCache, error) { +func (ss *ScaleSet) newVMSSVirtualMachinesCache() (*azcache.TimedCache, error) { vmssVirtualMachinesCacheTTL := time.Duration(ss.Config.VmssVirtualMachinesCacheTTLInSeconds) * time.Second - getter := func(key string) (interface{}, error) { + getter := func(cacheKey string) (interface{}, error) { localCache := &sync.Map{} // [nodeName]*VMSSVirtualMachinesEntry - oldCache := make(map[string]VMSSVirtualMachinesEntry) + oldCache := make(map[string]*VMSSVirtualMachinesEntry) - if vmssCache, ok := ss.vmssVMCache.Load(cacheKey); ok { - // get old cache before refreshing the cache - cache := vmssCache.(*azcache.TimedCache) - entry, exists, err := cache.Store.GetByKey(cacheKey) - if err != nil { - return nil, err - } - if exists { - cached := entry.(*azcache.AzureCacheEntry).Data - if cached != nil { - virtualMachines := cached.(*sync.Map) - virtualMachines.Range(func(key, value interface{}) bool { - oldCache[key.(string)] = *value.(*VMSSVirtualMachinesEntry) - return true - }) - } + entry, exists, err := ss.vmssVMCache.Store.GetByKey(cacheKey) + if err != nil { + return nil, err + } + if exists { + cached := entry.(*azcache.AzureCacheEntry).Data + if cached != nil { + virtualMachines := cached.(*sync.Map) + virtualMachines.Range(func(key, value interface{}) bool { + oldCache[key.(string)] = value.(*VMSSVirtualMachinesEntry) + return true + }) } } + result := strings.Split(cacheKey, "/") + if len(result) < 2 { + err = fmt.Errorf("Invalid cacheKey (%s)", cacheKey) + return nil, err + } + + resourceGroupName, vmssName := result[0], result[1] + vms, err := ss.listScaleSetVMs(vmssName, resourceGroupName) if err != nil { return nil, err @@ -287,15 +280,9 @@ func (ss *ScaleSet) DeleteCacheForNode(nodeName string) error { return err } - cacheKey, timedcache, err := ss.getVMSSVMCache(node.resourceGroup, node.vmssName) + err = ss.vmssVMCache.Delete(getVMSSVMCacheKey(node.resourceGroup, node.vmssName)) if err != nil { - klog.Errorf("DeleteCacheForNode(%s) failed with error: %v", nodeName, err) - return err - } - - err = timedcache.Delete(cacheKey) - if err != nil { - klog.Errorf("DeleteCacheForNode(%s) failed with error: %v", nodeName, err) + klog.Errorf("DeleteCacheForNode(%s) failed to remove from vmssVMCache with error: %v", nodeName, err) return err } @@ -308,9 +295,9 @@ func (ss *ScaleSet) DeleteCacheForNode(nodeName string) error { } func (ss *ScaleSet) updateCache(nodeName, resourceGroupName, vmssName, instanceID string, updatedVM *compute.VirtualMachineScaleSetVM) error { - cacheKey, timedCache, err := ss.getVMSSVMCache(resourceGroupName, vmssName) + virtualMachines, err := ss.getVMSSVMsFromCache(resourceGroupName, vmssName, azcache.CacheReadTypeUnsafe) if err != nil { - klog.Errorf("updateCache(%s, %s, %s) failed with error: %v", vmssName, resourceGroupName, nodeName, err) + err = fmt.Errorf("updateCache(%s, %s, %s) failed getting vmCache with error: %v", vmssName, resourceGroupName, nodeName, err) return err } @@ -326,14 +313,6 @@ func (ss *ScaleSet) updateCache(nodeName, resourceGroupName, vmssName, instanceI localCache.Store(nodeName, vmssVMCacheEntry) - vmCache, err := timedCache.Get(cacheKey, azcache.CacheReadTypeUnsafe) - if err != nil { - return err - } - if vmCache == nil { - return fmt.Errorf("nil vmCache") - } - virtualMachines := vmCache.(*sync.Map) virtualMachines.Range(func(key, value interface{}) bool { if key.(string) != nodeName { localCache.Store(key.(string), value.(*VMSSVirtualMachinesEntry)) @@ -341,7 +320,8 @@ func (ss *ScaleSet) updateCache(nodeName, resourceGroupName, vmssName, instanceI return true }) - timedCache.Update(cacheKey, localCache) + cacheKey := getVMSSVMCacheKey(resourceGroupName, vmssName) + ss.vmssVMCache.Update(cacheKey, localCache) klog.V(4).Infof("updateCache(%s, %s, %s) for cacheKey(%s) updated successfully", vmssName, resourceGroupName, nodeName, cacheKey) return nil } diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmssflex.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmssflex.go index 5806338f76..0605324294 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmssflex.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_vmssflex.go @@ -913,10 +913,10 @@ func (fs *FlexScaleSet) EnsureBackendPoolDeletedFromVMSets(vmssNamesMap map[stri } // EnsureBackendPoolDeleted ensures the loadBalancer backendAddressPools deleted from the specified nodes. -func (fs *FlexScaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool, deleteFromVMSet bool) error { +func (fs *FlexScaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoolID, vmSetName string, backendAddressPools *[]network.BackendAddressPool, deleteFromVMSet bool) (bool, error) { // Returns nil if backend address pools already deleted. if backendAddressPools == nil { - return nil + return false, nil } mc := metrics.NewMetricContext("services", "vmssflex_ensure_backend_pool_deleted", fs.ResourceGroup, fs.SubscriptionID, getServiceName(service)) @@ -981,25 +981,26 @@ func (fs *FlexScaleSet) EnsureBackendPoolDeleted(service *v1.Service, backendPoo klog.V(2).Infof("2. Ensure the backendPoolID is deleted from the VMSS VMs.") // 2. Ensure the backendPoolID is deleted from the VMSS VMs. klog.V(2).Infof("go into fs.ensureBackendPoolDeletedFromNode, vmssFlexVMNameMap: %s, size: %s", vmssFlexVMNameMap, len(vmssFlexVMNameMap)) - err := fs.ensureBackendPoolDeletedFromNode(vmssFlexVMNameMap, backendPoolID) + nicUpdated, err := fs.ensureBackendPoolDeletedFromNode(vmssFlexVMNameMap, backendPoolID) klog.V(2).Infof("exit from fs.ensureBackendPoolDeletedFromNode") if err != nil { allErrs = append(allErrs, err) } if len(allErrs) > 0 { - return utilerrors.Flatten(utilerrors.NewAggregate(allErrs)) + return nicUpdated, utilerrors.Flatten(utilerrors.NewAggregate(allErrs)) } isOperationSucceeded = true - return nil + return nicUpdated, nil } -func (fs *FlexScaleSet) ensureBackendPoolDeletedFromNode(vmssFlexVMNameMap map[string]string, backendPoolID string) error { +func (fs *FlexScaleSet) ensureBackendPoolDeletedFromNode(vmssFlexVMNameMap map[string]string, backendPoolID string) (bool, error) { nicUpdaters := make([]func() error, 0) allErrs := make([]error, 0) i := 0 + var nicUpdated bool for nodeName, nicName := range vmssFlexVMNameMap { i++ klog.V(2).Infof("i = %s", i) @@ -1008,7 +1009,7 @@ func (fs *FlexScaleSet) ensureBackendPoolDeletedFromNode(vmssFlexVMNameMap map[s defer cancel() nic, rerr := fs.InterfacesClient.Get(ctx, fs.ResourceGroup, nicName, "") if rerr != nil { - return fmt.Errorf("ensureBackendPoolDeletedFromNode: failed to get interface of name %s: %w", nicName, rerr.Error()) + return false, fmt.Errorf("ensureBackendPoolDeletedFromNode: failed to get interface of name %s: %w", nicName, rerr.Error()) } if nic.ProvisioningState == consts.NicFailedState { @@ -1046,6 +1047,7 @@ func (fs *FlexScaleSet) ensureBackendPoolDeletedFromNode(vmssFlexVMNameMap map[s klog.Errorf("EnsureBackendPoolDeleted CreateOrUpdate for NIC(%s, %s) failed with error %v", fs.resourceGroup, to.String(nic.Name), rerr.Error()) return rerr.Error() } + nicUpdated = true klog.V(2).Infof("EnsureBackendPoolDeleted done") return nil }) @@ -1057,7 +1059,7 @@ func (fs *FlexScaleSet) ensureBackendPoolDeletedFromNode(vmssFlexVMNameMap map[s allErrs = append(allErrs, utilerrors.Flatten(errs)) } if len(allErrs) > 0 { - return utilerrors.Flatten(utilerrors.NewAggregate(allErrs)) + return nicUpdated, utilerrors.Flatten(utilerrors.NewAggregate(allErrs)) } - return nil + return nicUpdated, nil } diff --git a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_wrap.go b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_wrap.go index 3ed56e2b18..71a86852df 100644 --- a/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_wrap.go +++ b/vendor/sigs.k8s.io/cloud-provider-azure/pkg/provider/azure_wrap.go @@ -145,7 +145,7 @@ func (az *Cloud) getSubnet(virtualNetworkName string, subnetName string) (networ return subnet, exists, nil } -func (az *Cloud) getAzureLoadBalancer(name string, crt azcache.AzureCacheReadType) (lb network.LoadBalancer, exists bool, err error) { +func (az *Cloud) getAzureLoadBalancer(name string, crt azcache.AzureCacheReadType) (lb *network.LoadBalancer, exists bool, err error) { cachedLB, err := az.lbCache.Get(name, crt) if err != nil { return lb, false, err @@ -155,7 +155,7 @@ func (az *Cloud) getAzureLoadBalancer(name string, crt azcache.AzureCacheReadTyp return lb, false, nil } - return *(cachedLB.(*network.LoadBalancer)), true, nil + return cachedLB.(*network.LoadBalancer), true, nil } func (az *Cloud) getSecurityGroup(crt azcache.AzureCacheReadType) (network.SecurityGroup, error) {