From f002b7eb89152869cc2d38407b1d48966c044dc7 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Thu, 5 Nov 2020 12:38:11 -0800 Subject: [PATCH 01/28] 1 --- xds/internal/client/client.go | 12 +++++++++++ xds/internal/client/client_xds.go | 35 +++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index c2bbeafbdd8d..d615ae9f2226 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -215,6 +215,16 @@ type SecurityConfig struct { AcceptedSANs []string } +// CircuitBreakerThreshold contains the circuit breakers configuration +// received as part of the Cluster resource. +type CircuitBreakerThreshold struct { + // RoutingPriority is the routing priority for the threshold. DEFAULT or + // HIGH. + RoutingPriority string + // MaxRequests is the maximum requests for the threshold. + MaxRequests uint32 +} + // ClusterUpdate contains information from a received CDS response, which is of // interest to the registered CDS watcher. type ClusterUpdate struct { @@ -225,6 +235,8 @@ type ClusterUpdate struct { EnableLRS bool // SecurityCfg contains security configuration sent by the xDS server. SecurityCfg *SecurityConfig + // Thresholds contains the circuit breaker thresholds, if any. + Thresholds []CircuitBreakerThreshold } // OverloadDropConfig contains the config to drop overloads. diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go index b8598c0247d9..c49cdf2e73e4 100644 --- a/xds/internal/client/client_xds.go +++ b/xds/internal/client/client_xds.go @@ -309,10 +309,15 @@ func validateCluster(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) { if err != nil { return emptyUpdate, err } + cbt, err := circuitBreakersFromCluster(cluster) + if err != nil { + return emptyUpdate, err + } return ClusterUpdate{ ServiceName: cluster.GetEdsClusterConfig().GetServiceName(), EnableLRS: cluster.GetLrsServer().GetSelf() != nil, SecurityCfg: sc, + Thresholds: cbt, }, nil } @@ -382,6 +387,36 @@ func securityConfigFromCluster(cluster *v3clusterpb.Cluster) (*SecurityConfig, e return sc, nil } +// circuitBreakersFromCluster extracts the circuit breakers configuration from +// the received cluster resource. Returns nil if no CircuitBreakers or no +// Thresholds in CircuitBreakers. +func circuitBreakersFromCluster(cluster *v3clusterpb.Cluster) ([]CircuitBreakerThreshold, error) { + circuitBreakers := cluster.GetCircuitBreakers() + if circuitBreakers == nil { + return nil, nil + } + thresholds := circuitBreakers.GetThresholds() + if thresholds == nil { + return nil, nil + } + cbt := make([]CircuitBreakerThreshold, 0) + for _, threshold := range thresholds { + maxRequestsPb := threshold.GetMaxRequests() + var maxRequests uint32 = 1024 + if maxRequestsPb != nil { + maxRequests = maxRequestsPb.GetValue() + } + cbt = append(cbt, CircuitBreakerThreshold{ + RoutingPriority: threshold.GetPriority().String(), + MaxRequests: maxRequests, + }) + } + if len(cbt) == 0 { + return nil, nil + } + return cbt, nil +} + // UnmarshalEndpoints processes resources received in an EDS response, // validates them, and transforms them into a native struct which contains only // fields we are interested in. From 81840c40fae607b8f2b26c5b10987d342c2eee06 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Wed, 18 Nov 2020 12:24:00 -0800 Subject: [PATCH 02/28] implementation --- .../balancer/cdsbalancer/cdsbalancer.go | 13 ++- xds/internal/balancer/edsbalancer/config.go | 8 ++ xds/internal/balancer/edsbalancer/eds.go | 4 + xds/internal/balancer/edsbalancer/eds_impl.go | 52 ++++++++--- .../balancer/edsbalancer/eds_impl_test.go | 2 +- xds/internal/balancer/edsbalancer/eds_test.go | 2 + xds/internal/client/client_max_requests.go | 91 +++++++++++++++++++ 7 files changed, 155 insertions(+), 17 deletions(-) create mode 100644 xds/internal/client/client_max_requests.go diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 55d6e8c9f0aa..b0ff1e8dbafe 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -305,6 +305,15 @@ func (b *cdsBalancer) handleSecurityConfig(config *xdsclient.SecurityConfig) err return nil } +func getCircuitBreaking(update xdsclient.ClusterUpdate) (circuitBreaking bool, maxRequests uint32) { + for _, threshold := range update.Thresholds { + if threshold.RoutingPriority == "DEFAULT" { + return true, threshold.MaxRequests + } + } + return false, 0 +} + // handleWatchUpdate handles a watch update from the xDS Client. Good updates // lead to clientConn updates being invoked on the underlying edsBalancer. func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) { @@ -342,13 +351,13 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) { b.edsLB = edsLB b.logger.Infof("Created child policy %p of type %s", b.edsLB, edsName) } - lbCfg := &edsbalancer.EDSConfig{EDSServiceName: update.cds.ServiceName} + circuitBreaking, maxRequests := getCircuitBreaking(update.cds) + lbCfg := &edsbalancer.EDSConfig{EDSServiceName: update.cds.ServiceName, CircuitBreaking: circuitBreaking, MaxRequests: maxRequests} if update.cds.EnableLRS { // An empty string here indicates that the edsBalancer should use the // same xDS server for load reporting as it does for EDS // requests/responses. lbCfg.LrsLoadReportingServerName = new(string) - } ccState := balancer.ClientConnState{ ResolverState: resolver.State{Attributes: attributes.New(xdsinternal.XDSClientID, b.xdsClient)}, diff --git a/xds/internal/balancer/edsbalancer/config.go b/xds/internal/balancer/edsbalancer/config.go index 9b59cfa01ab3..787d4c6b8f82 100644 --- a/xds/internal/balancer/edsbalancer/config.go +++ b/xds/internal/balancer/edsbalancer/config.go @@ -42,6 +42,10 @@ type EDSConfig struct { // will be disabled. If set to the empty string, load reporting will // be sent to the same server that we obtained CDS data from. LrsLoadReportingServerName *string + // If circuit breaking is enabled. + CircuitBreaking bool + // Max requests when circuit breaking. + MaxRequests uint32 } // edsConfigJSON is the intermediate unmarshal result of EDSConfig. ChildPolicy @@ -52,6 +56,8 @@ type edsConfigJSON struct { FallbackPolicy []*loadBalancingConfig EDSServiceName string LRSLoadReportingServerName *string + CircuitBreaking bool + MaxRequests uint32 } // UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l. @@ -65,6 +71,8 @@ func (l *EDSConfig) UnmarshalJSON(data []byte) error { l.EDSServiceName = configJSON.EDSServiceName l.LrsLoadReportingServerName = configJSON.LRSLoadReportingServerName + l.CircuitBreaking = configJSON.CircuitBreaking + l.MaxRequests = configJSON.MaxRequests for _, lbcfg := range configJSON.ChildPolicy { if balancer.Get(lbcfg.Name) != nil { diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index 9312b0532cbf..505a2337800c 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -93,6 +93,8 @@ type edsBalancerImplInterface interface { handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) // updateState handle a balancer state update from the priority. updateState(priority priorityType, s balancer.State) + // updateConfig ??? + updateConfig(edsConfig *EDSConfig) // close closes the eds balancer. close() } @@ -181,6 +183,8 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) { x.logger.Warningf("failed to update xds clients: %v", err) } + x.edsImpl.updateConfig(cfg) + if x.config == nil { x.config = cfg return diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index 1ce79bf61bb6..561caec98b27 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -18,6 +18,7 @@ package edsbalancer import ( "encoding/json" + "fmt" "reflect" "sync" "time" @@ -35,6 +36,7 @@ import ( "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/balancergroup" "google.golang.org/grpc/xds/internal/balancer/weightedtarget/weightedaggregator" + "google.golang.org/grpc/xds/internal/client" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/load" ) @@ -92,10 +94,11 @@ type edsBalancerImpl struct { subConnMu sync.Mutex subConnToPriority map[balancer.SubConn]priorityType - pickerMu sync.Mutex - dropConfig []xdsclient.OverloadDropConfig - drops []*dropper - innerState balancer.State // The state of the picker without drop support. + pickerMu sync.Mutex + dropConfig []xdsclient.OverloadDropConfig + drops []*dropper + innerState balancer.State // The state of the picker without drop support. + edsServiceName string } // newEDSBalancerImpl create a new edsBalancerImpl. @@ -170,7 +173,7 @@ func (edsImpl *edsBalancerImpl) updateDrops(dropConfig []xdsclient.OverloadDropC // Update picker with old inner picker, new drops. edsImpl.cc.UpdateState(balancer.State{ ConnectivityState: edsImpl.innerState.ConnectivityState, - Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.xdsClient.loadStore())}, + Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.xdsClient.loadStore(), edsImpl.edsServiceName)}, ) } edsImpl.pickerMu.Unlock() @@ -389,6 +392,12 @@ func (edsImpl *edsBalancerImpl) handleSubConnStateChange(sc balancer.SubConn, s } } +func (edsImpl *edsBalancerImpl) updateConfig(edsConfig *EDSConfig) { + fmt.Println(edsImpl) + edsImpl.edsServiceName = edsConfig.EDSServiceName + client.UpdateService(edsConfig.EDSServiceName, edsConfig.CircuitBreaking, edsConfig.MaxRequests) +} + // updateState first handles priority, and then wraps picker in a drop picker // before forwarding the update. func (edsImpl *edsBalancerImpl) updateState(priority priorityType, s balancer.State) { @@ -403,7 +412,7 @@ func (edsImpl *edsBalancerImpl) updateState(priority priorityType, s balancer.St defer edsImpl.pickerMu.Unlock() edsImpl.innerState = s // Don't reset drops when it's a state change. - edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.xdsClient.loadStore())}) + edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.xdsClient.loadStore(), edsImpl.edsServiceName)}) } } @@ -452,16 +461,18 @@ func (edsImpl *edsBalancerImpl) close() { } type dropPicker struct { - drops []*dropper - p balancer.Picker - loadStore load.PerClusterReporter + drops []*dropper + p balancer.Picker + loadStore load.PerClusterReporter + serviceName string } -func newDropPicker(p balancer.Picker, drops []*dropper, loadStore load.PerClusterReporter) *dropPicker { +func newDropPicker(p balancer.Picker, drops []*dropper, loadStore load.PerClusterReporter, serviceName string) *dropPicker { return &dropPicker{ - drops: drops, - p: p, - loadStore: loadStore, + drops: drops, + p: p, + loadStore: loadStore, + serviceName: serviceName, } } @@ -485,5 +496,18 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { } // TODO: (eds) don't drop unless the inner picker is READY. Similar to // https://github.com/grpc/grpc-go/issues/2622. - return d.p.Pick(info) + pr, err := d.p.Pick(info) + if err != nil { + if err := client.StartRequest(d.serviceName); err != nil { + return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error()) + } + oldDone := pr.Done + pr.Done = func(doneInfo balancer.DoneInfo) { + client.EndRequest(d.serviceName) + if oldDone != nil { + oldDone(doneInfo) + } + } + } + return pr, err } diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 292ea4f80a70..5343e5707607 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -656,7 +656,7 @@ func (s) TestDropPicker(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p := newDropPicker(constPicker, tt.drops, nil) + p := newDropPicker(constPicker, tt.drops, nil, "") // scCount is the number of sc's returned by pick. The opposite of // drop-count. diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index 085310d44b39..1cee053c1bed 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -112,6 +112,8 @@ func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) { func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {} +func (f *fakeEDSBalancer) updateConfig(edsConfig *EDSConfig) {} + func (f *fakeEDSBalancer) close() {} func (f *fakeEDSBalancer) waitForChildPolicy(wantPolicy *loadBalancingConfig) error { diff --git a/xds/internal/client/client_max_requests.go b/xds/internal/client/client_max_requests.go new file mode 100644 index 000000000000..5eddba4c989f --- /dev/null +++ b/xds/internal/client/client_max_requests.go @@ -0,0 +1,91 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package client + +import ( + "fmt" + "sync" +) + +type clientMaxRequests struct { + mu sync.Mutex + services map[string]serviceInfo +} + +type serviceInfo struct { + circuitBreaking bool + maxRequests uint32 + numRequests uint32 +} + +var c clientMaxRequests + +func init() { + c.services = make(map[string]serviceInfo) + UpdateService("", false, 0) +} + +// UpdateService updates the service with the provided service name, or creates +// it if it doesn't exist. +func UpdateService(serviceName string, circuitBreaking bool, maxRequests uint32) { + fmt.Println("UpdateService", serviceName) + c.mu.Lock() + defer c.mu.Unlock() + sInfo, ok := c.services[serviceName] + if !ok { + sInfo = serviceInfo{numRequests: 0} + } + sInfo.circuitBreaking = circuitBreaking + sInfo.maxRequests = maxRequests + c.services[serviceName] = sInfo +} + +// StartRequest starts a request for a service, incrementing its number of +// requests by 1. Returns an error if circuit brekaing is on and the max number +// of requests is exceeded. +func StartRequest(serviceName string) error { + fmt.Println("StartRequest", serviceName) + c.mu.Lock() + defer c.mu.Unlock() + sInfo, ok := c.services[serviceName] + if !ok { + return fmt.Errorf("service name %v not identified", serviceName) + } + sInfo.numRequests++ + if sInfo.circuitBreaking && sInfo.numRequests > sInfo.maxRequests { + return fmt.Errorf("max requests %v exceeded on service %v", sInfo.maxRequests, serviceName) + } + c.services[serviceName] = sInfo + return nil +} + +// EndRequest ends a request for a service, decrementing its number of requests +// by 1. +func EndRequest(serviceName string) error { + fmt.Println("EndRequest", serviceName) + c.mu.Lock() + defer c.mu.Unlock() + sInfo, ok := c.services[serviceName] + if !ok { + return fmt.Errorf("service name %v not identified", serviceName) + } + sInfo.numRequests-- + c.services[serviceName] = sInfo + return nil +} From 027f3b9da19be533424e66ac2ededbef00d51b0d Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Wed, 18 Nov 2020 14:25:14 -0800 Subject: [PATCH 03/28] Refactored to counter struct --- xds/internal/balancer/edsbalancer/eds_impl.go | 51 +++++++------- .../balancer/edsbalancer/eds_impl_test.go | 2 +- xds/internal/client/client_max_requests.go | 68 +++++++++++-------- 3 files changed, 68 insertions(+), 53 deletions(-) diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index 561caec98b27..f474ea91e524 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -18,7 +18,6 @@ package edsbalancer import ( "encoding/json" - "fmt" "reflect" "sync" "time" @@ -94,11 +93,11 @@ type edsBalancerImpl struct { subConnMu sync.Mutex subConnToPriority map[balancer.SubConn]priorityType - pickerMu sync.Mutex - dropConfig []xdsclient.OverloadDropConfig - drops []*dropper - innerState balancer.State // The state of the picker without drop support. - edsServiceName string + pickerMu sync.Mutex + dropConfig []xdsclient.OverloadDropConfig + drops []*dropper + innerState balancer.State // The state of the picker without drop support. + counter *client.ServiceRequestsCounter } // newEDSBalancerImpl create a new edsBalancerImpl. @@ -173,7 +172,7 @@ func (edsImpl *edsBalancerImpl) updateDrops(dropConfig []xdsclient.OverloadDropC // Update picker with old inner picker, new drops. edsImpl.cc.UpdateState(balancer.State{ ConnectivityState: edsImpl.innerState.ConnectivityState, - Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.xdsClient.loadStore(), edsImpl.edsServiceName)}, + Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.xdsClient.loadStore(), edsImpl.counter)}, ) } edsImpl.pickerMu.Unlock() @@ -392,10 +391,16 @@ func (edsImpl *edsBalancerImpl) handleSubConnStateChange(sc balancer.SubConn, s } } +// updateConfig handles changes to the circuit breaking configuration. func (edsImpl *edsBalancerImpl) updateConfig(edsConfig *EDSConfig) { - fmt.Println(edsImpl) - edsImpl.edsServiceName = edsConfig.EDSServiceName - client.UpdateService(edsConfig.EDSServiceName, edsConfig.CircuitBreaking, edsConfig.MaxRequests) + if edsImpl.counter == nil || edsImpl.counter.ServiceName != edsConfig.EDSServiceName { + edsImpl.counter = &client.ServiceRequestsCounter{ServiceName: edsConfig.EDSServiceName} + } + edsImpl.counter.UpdateService(edsConfig.CircuitBreaking, edsConfig.MaxRequests) + if !edsConfig.CircuitBreaking { + // counter should be nil to prevent overhead in dropPicker. + edsImpl.counter = nil + } } // updateState first handles priority, and then wraps picker in a drop picker @@ -412,7 +417,7 @@ func (edsImpl *edsBalancerImpl) updateState(priority priorityType, s balancer.St defer edsImpl.pickerMu.Unlock() edsImpl.innerState = s // Don't reset drops when it's a state change. - edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.xdsClient.loadStore(), edsImpl.edsServiceName)}) + edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.xdsClient.loadStore(), edsImpl.counter)}) } } @@ -461,18 +466,18 @@ func (edsImpl *edsBalancerImpl) close() { } type dropPicker struct { - drops []*dropper - p balancer.Picker - loadStore load.PerClusterReporter - serviceName string + drops []*dropper + p balancer.Picker + loadStore load.PerClusterReporter + counter *client.ServiceRequestsCounter } -func newDropPicker(p balancer.Picker, drops []*dropper, loadStore load.PerClusterReporter, serviceName string) *dropPicker { +func newDropPicker(p balancer.Picker, drops []*dropper, loadStore load.PerClusterReporter, counter *client.ServiceRequestsCounter) *dropPicker { return &dropPicker{ - drops: drops, - p: p, - loadStore: loadStore, - serviceName: serviceName, + drops: drops, + p: p, + loadStore: loadStore, + counter: counter, } } @@ -497,13 +502,13 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { // TODO: (eds) don't drop unless the inner picker is READY. Similar to // https://github.com/grpc/grpc-go/issues/2622. pr, err := d.p.Pick(info) - if err != nil { - if err := client.StartRequest(d.serviceName); err != nil { + if d.counter != nil && err != nil { + if err := d.counter.StartRequest(); err != nil { return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error()) } oldDone := pr.Done pr.Done = func(doneInfo balancer.DoneInfo) { - client.EndRequest(d.serviceName) + d.counter.EndRequest() if oldDone != nil { oldDone(doneInfo) } diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 5343e5707607..1d4e72d605b0 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -656,7 +656,7 @@ func (s) TestDropPicker(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p := newDropPicker(constPicker, tt.drops, nil, "") + p := newDropPicker(constPicker, tt.drops, nil, nil) // scCount is the number of sc's returned by pick. The opposite of // drop-count. diff --git a/xds/internal/client/client_max_requests.go b/xds/internal/client/client_max_requests.go index 5eddba4c989f..e8524d32345b 100644 --- a/xds/internal/client/client_max_requests.go +++ b/xds/internal/client/client_max_requests.go @@ -23,7 +23,11 @@ import ( "sync" ) -type clientMaxRequests struct { +func init() { + src.services = make(map[string]serviceInfo) +} + +type servicesRequestsCounter struct { mu sync.Mutex services map[string]serviceInfo } @@ -34,58 +38,64 @@ type serviceInfo struct { numRequests uint32 } -var c clientMaxRequests +var src servicesRequestsCounter -func init() { - c.services = make(map[string]serviceInfo) - UpdateService("", false, 0) +// ServiceRequestsCounter is used to track the total inflight requests for a +// service with the provided name. +type ServiceRequestsCounter struct { + ServiceName string } -// UpdateService updates the service with the provided service name, or creates -// it if it doesn't exist. -func UpdateService(serviceName string, circuitBreaking bool, maxRequests uint32) { - fmt.Println("UpdateService", serviceName) - c.mu.Lock() - defer c.mu.Unlock() - sInfo, ok := c.services[serviceName] +// UpdateService updates the configuration for a service, or creates it if it +// doesn't exist. +func (c *ServiceRequestsCounter) UpdateService(circuitBreaking bool, maxRequests uint32) { + src.mu.Lock() + defer src.mu.Unlock() + sInfo, ok := src.services[c.ServiceName] if !ok { sInfo = serviceInfo{numRequests: 0} } sInfo.circuitBreaking = circuitBreaking sInfo.maxRequests = maxRequests - c.services[serviceName] = sInfo + src.services[c.ServiceName] = sInfo } // StartRequest starts a request for a service, incrementing its number of -// requests by 1. Returns an error if circuit brekaing is on and the max number +// requests by 1. Returns an error if circuit breaking is on and the max number // of requests is exceeded. -func StartRequest(serviceName string) error { - fmt.Println("StartRequest", serviceName) - c.mu.Lock() - defer c.mu.Unlock() - sInfo, ok := c.services[serviceName] +func (c *ServiceRequestsCounter) StartRequest() error { + src.mu.Lock() + defer src.mu.Unlock() + sInfo, ok := src.services[c.ServiceName] if !ok { - return fmt.Errorf("service name %v not identified", serviceName) + return fmt.Errorf("service name %v not identified", c.ServiceName) } sInfo.numRequests++ if sInfo.circuitBreaking && sInfo.numRequests > sInfo.maxRequests { - return fmt.Errorf("max requests %v exceeded on service %v", sInfo.maxRequests, serviceName) + return fmt.Errorf("max requests %v exceeded on service %v", sInfo.maxRequests, c.ServiceName) } - c.services[serviceName] = sInfo + src.services[c.ServiceName] = sInfo return nil } // EndRequest ends a request for a service, decrementing its number of requests // by 1. -func EndRequest(serviceName string) error { - fmt.Println("EndRequest", serviceName) - c.mu.Lock() - defer c.mu.Unlock() - sInfo, ok := c.services[serviceName] +func (c *ServiceRequestsCounter) EndRequest() error { + src.mu.Lock() + defer src.mu.Unlock() + sInfo, ok := src.services[c.ServiceName] if !ok { - return fmt.Errorf("service name %v not identified", serviceName) + return fmt.Errorf("service name %v not identified", c.ServiceName) } sInfo.numRequests-- - c.services[serviceName] = sInfo + src.services[c.ServiceName] = sInfo return nil } + +// Copy copies the counter, or nil if it is nil. +func (c *ServiceRequestsCounter) Copy() *ServiceRequestsCounter { + if c == nil { + return nil + } + return &ServiceRequestsCounter{ServiceName: c.ServiceName} +} From 45960e9470a5e61c2f267dbad702bc08c1a1dc1b Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Wed, 18 Nov 2020 14:49:52 -0800 Subject: [PATCH 04/28] Added ClusterUpdate test --- xds/internal/client/client_cds_test.go | 44 ++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/xds/internal/client/client_cds_test.go b/xds/internal/client/client_cds_test.go index 6cba7ef12a08..69b35c8e3ee6 100644 --- a/xds/internal/client/client_cds_test.go +++ b/xds/internal/client/client_cds_test.go @@ -32,6 +32,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "google.golang.org/grpc/xds/internal/version" + "google.golang.org/protobuf/types/known/wrapperspb" ) const ( @@ -169,6 +170,49 @@ func (s) TestValidateCluster_Success(t *testing.T) { }, wantUpdate: ClusterUpdate{ServiceName: serviceName, EnableLRS: true}, }, + { + name: "happiest-case-with-circuitbreakers", + cluster: &v3clusterpb.Cluster{ + Name: clusterName, + ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS}, + EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{ + EdsConfig: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{ + Ads: &v3corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: serviceName, + }, + LbPolicy: v3clusterpb.Cluster_ROUND_ROBIN, + CircuitBreakers: &v3clusterpb.CircuitBreakers{ + Thresholds: []*v3clusterpb.CircuitBreakers_Thresholds{ + { + Priority: v3corepb.RoutingPriority_DEFAULT, + MaxRequests: wrapperspb.UInt32(512), + }, + { + Priority: v3corepb.RoutingPriority_HIGH, + MaxRequests: nil, + }, + }, + }, + LrsServer: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Self{ + Self: &v3corepb.SelfConfigSource{}, + }, + }, + }, + wantUpdate: ClusterUpdate{ServiceName: serviceName, EnableLRS: true, Thresholds: []CircuitBreakerThreshold{ + { + RoutingPriority: "DEFAULT", + MaxRequests: 512, + }, + { + RoutingPriority: "HIGH", + MaxRequests: 1024, + }, + }}, + }, } for _, test := range tests { From 7ce9958d0057fb5520650945705468ce1ce1d7b7 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Wed, 18 Nov 2020 15:39:34 -0800 Subject: [PATCH 05/28] testing counter --- xds/internal/client/client_max_requests.go | 8 -- .../client/client_max_requests_test.go | 108 ++++++++++++++++++ 2 files changed, 108 insertions(+), 8 deletions(-) create mode 100644 xds/internal/client/client_max_requests_test.go diff --git a/xds/internal/client/client_max_requests.go b/xds/internal/client/client_max_requests.go index e8524d32345b..ac63b2b9d22e 100644 --- a/xds/internal/client/client_max_requests.go +++ b/xds/internal/client/client_max_requests.go @@ -91,11 +91,3 @@ func (c *ServiceRequestsCounter) EndRequest() error { src.services[c.ServiceName] = sInfo return nil } - -// Copy copies the counter, or nil if it is nil. -func (c *ServiceRequestsCounter) Copy() *ServiceRequestsCounter { - if c == nil { - return nil - } - return &ServiceRequestsCounter{ServiceName: c.ServiceName} -} diff --git a/xds/internal/client/client_max_requests_test.go b/xds/internal/client/client_max_requests_test.go new file mode 100644 index 000000000000..e146b3c6f9f4 --- /dev/null +++ b/xds/internal/client/client_max_requests_test.go @@ -0,0 +1,108 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package client_test + +import ( + "math/rand" + "sync" + "testing" + "time" + + "google.golang.org/grpc/xds/internal/client" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} + +type counterTest struct { + name string + circuitBreaking bool + maxRequests uint32 + numRequests uint32 + errorExpected bool +} + +func testCounter(t *testing.T, test counterTest) { + counter := client.ServiceRequestsCounter{ServiceName: test.name} + counter.UpdateService(test.circuitBreaking, test.maxRequests) + wg := sync.WaitGroup{} + wg.Add(int(test.numRequests)) + var firstError error = nil + errorMu := sync.Mutex{} + fail := func(err error) { + errorMu.Lock() + defer errorMu.Unlock() + if firstError == nil { + firstError = err + } + } + for i := 0; i < int(test.numRequests); i++ { + go func() { + defer wg.Done() + if err := counter.StartRequest(); err != nil { + fail(err) + return + } + time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + if err := counter.EndRequest(); err != nil { + fail(err) + return + } + }() + } + wg.Wait() + if test.errorExpected && firstError == nil { + t.Error("no error when error expected") + } + if !test.errorExpected && firstError != nil { + t.Errorf("error starting request: %v", firstError) + } +} + +func (s) TestRequestsCounter(t *testing.T) { + tests := []counterTest{ + { + name: "cb-on-no-exceed", + circuitBreaking: true, + maxRequests: 1024, + numRequests: 1024, + errorExpected: false, + }, + { + name: "cb-off-exceeds", + circuitBreaking: false, + maxRequests: 32, + numRequests: 64, + errorExpected: false, + }, + { + name: "cb-on-exceeds", + circuitBreaking: true, + maxRequests: 32, + numRequests: 64, + errorExpected: true, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + testCounter(t, test) + }) + } +} From 6efeb417df1fcacb8472a7b4312d0ce74e7d2174 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Wed, 18 Nov 2020 16:14:12 -0800 Subject: [PATCH 06/28] Testing edsconfig --- .../balancer/cdsbalancer/cdsbalancer_test.go | 28 +++++++++++++++++++ ...requests.go => client_requests_counter.go} | 0 ...est.go => client_requests_counter_test.go} | 0 3 files changed, 28 insertions(+) rename xds/internal/client/{client_max_requests.go => client_requests_counter.go} (100%) rename xds/internal/client/{client_max_requests_test.go => client_requests_counter_test.go} (100%) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 2257929e40cd..6723912c0e2b 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -205,6 +205,16 @@ func edsCCS(service string, enableLRS bool, xdsClient interface{}) balancer.Clie } } +// edsCCScb is edsCCS except it also adds circuit breaking configuration. +func edsCCScb(service string, enableLRS bool, xdsClient interface{}, maxRequests uint32) balancer.ClientConnState { + ccs := edsCCS(service, enableLRS, xdsClient) + if edsConfig, ok := ccs.BalancerConfig.(*edsbalancer.EDSConfig); ok { + edsConfig.CircuitBreaking = true + edsConfig.MaxRequests = maxRequests + } + return ccs +} + // setup creates a cdsBalancer and an edsBalancer (and overrides the // newEDSBalancer function to return it), and also returns a cleanup function. func setup(t *testing.T) (*cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) { @@ -377,6 +387,24 @@ func (s) TestHandleClusterUpdate(t *testing.T) { cdsUpdate: xdsclient.ClusterUpdate{ServiceName: serviceName}, wantCCS: edsCCS(serviceName, false, xdsC), }, + { + name: "happy-case-with-circuit-breakers", + cdsUpdate: xdsclient.ClusterUpdate{ServiceName: serviceName, Thresholds: []xdsclient.CircuitBreakerThreshold{ + { + RoutingPriority: "HIGH", + MaxRequests: 1024, + }, + { + RoutingPriority: "DEFAULT", + MaxRequests: 512, + }, + { + RoutingPriority: "DEFAULT", + MaxRequests: 256, + }, + }}, + wantCCS: edsCCScb(serviceName, false, xdsC, 512), + }, } for _, test := range tests { diff --git a/xds/internal/client/client_max_requests.go b/xds/internal/client/client_requests_counter.go similarity index 100% rename from xds/internal/client/client_max_requests.go rename to xds/internal/client/client_requests_counter.go diff --git a/xds/internal/client/client_max_requests_test.go b/xds/internal/client/client_requests_counter_test.go similarity index 100% rename from xds/internal/client/client_max_requests_test.go rename to xds/internal/client/client_requests_counter_test.go From 4021d5dcb6c7a015fc41fe42edf27c8c4e754356 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Wed, 18 Nov 2020 17:48:57 -0800 Subject: [PATCH 07/28] added marshall test --- xds/internal/balancer/edsbalancer/eds_test.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index 1cee053c1bed..0cd23e2c85f4 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -568,6 +568,19 @@ func (s) TestXDSBalancerConfigParsing(t *testing.T) { }, wantErr: false, }, + { + name: "circuit-breaking", + js: json.RawMessage(` +{ + "circuitBreaking": true, + "maxRequests": 1024 +}`), + want: &EDSConfig{ + CircuitBreaking: true, + MaxRequests: 1024, + }, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From a60e3b600850d4ac0a26072ea7a09859dc042a4c Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Wed, 18 Nov 2020 18:42:33 -0800 Subject: [PATCH 08/28] Added picker test --- xds/internal/balancer/edsbalancer/eds_impl.go | 2 +- .../balancer/edsbalancer/eds_impl_test.go | 57 +++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index f474ea91e524..62e36aa3d895 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -502,7 +502,7 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { // TODO: (eds) don't drop unless the inner picker is READY. Similar to // https://github.com/grpc/grpc-go/issues/2622. pr, err := d.p.Pick(info) - if d.counter != nil && err != nil { + if d.counter != nil && err == nil { if err := d.counter.StartRequest(); err != nil { return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error()) } diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 1d4e72d605b0..fd3289fc3381 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -550,6 +550,63 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { } } +func (s) TestEDS_CircuitBreaking(t *testing.T) { + cc := testutils.NewTestClientConn(t) + edsb := newEDSBalancerImpl(cc, nil, nil, nil) + edsb.enqueueChildBalancerStateUpdate = edsb.updateState + edsb.updateConfig(&EDSConfig{EDSServiceName: "test", CircuitBreaking: true, MaxRequests: 50}) + + // One locality with one backend. + clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) + clab1.AddLocality(testSubZones[0], 1, 0, testEndpointAddrs[:1], nil) + edsb.handleEDSResponse(parseEDSRespProtoForTesting(clab1.Build())) + sc1 := <-cc.NewSubConnCh + edsb.handleSubConnStateChange(sc1, connectivity.Connecting) + edsb.handleSubConnStateChange(sc1, connectivity.Ready) + + // Picks with drops. + dones := []func(){} + p := <-cc.NewPickerCh + for i := 0; i < 100; i++ { + pr, err := p.Pick(balancer.PickInfo{}) + // TODO: the dropping algorithm needs a design. When the dropping algorithm + // is fixed, this test also needs fix. + if i < 50 && err != nil { + t.Errorf("The first 50%% picks should be non-drops, got error %v", err) + } else if i > 50 && err == nil { + t.Errorf("The second 50%% picks should be drops, got error ") + } + dones = append(dones, func() { + if pr.Done != nil { + pr.Done(balancer.DoneInfo{}) + } + }) + } + + for _, done := range dones { + done() + } + dones = []func(){} + + // Pick without drops. + for i := 0; i < 50; i++ { + pr, err := p.Pick(balancer.PickInfo{}) + if err != nil { + t.Errorf("The third 50%% picks should be non-drops, got error %v", err) + } + dones = append(dones, func() { + if pr.Done != nil { + pr.Done(balancer.DoneInfo{}) + } + }) + } + + // Without this, future tests with the same service name will fail. + for _, done := range dones { + done() + } +} + func init() { balancer.Register(&testInlineUpdateBalancerBuilder{}) } From ceb64d4c9f3e53d01ade8a5af406c967a34939d0 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Wed, 18 Nov 2020 18:46:02 -0800 Subject: [PATCH 09/28] working this time --- xds/internal/balancer/cdsbalancer/cdsbalancer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 3b68abccd7e1..52bfff136736 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -285,7 +285,6 @@ func (b *cdsBalancer) handleSecurityConfig(config *xdsclient.SecurityConfig) err return nil } -<<<<<<< HEAD func getCircuitBreaking(update xdsclient.ClusterUpdate) (circuitBreaking bool, maxRequests uint32) { for _, threshold := range update.Thresholds { if threshold.RoutingPriority == "DEFAULT" { From 8bc9b0238c4bd5756e628b93d930f11f34fbb40b Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Wed, 18 Nov 2020 18:48:39 -0800 Subject: [PATCH 10/28] added comment --- xds/internal/balancer/edsbalancer/eds.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index 505a2337800c..8658c6dc550a 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -93,7 +93,7 @@ type edsBalancerImplInterface interface { handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) // updateState handle a balancer state update from the priority. updateState(priority priorityType, s balancer.State) - // updateConfig ??? + // updateConfig handles an update to the eds configuration. updateConfig(edsConfig *EDSConfig) // close closes the eds balancer. close() From 63ca25bc8deabdce6f18f454fac15a81fc2a41b6 Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Thu, 19 Nov 2020 08:09:43 -0800 Subject: [PATCH 11/28] removed rand --- xds/internal/client/client_requests_counter_test.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/xds/internal/client/client_requests_counter_test.go b/xds/internal/client/client_requests_counter_test.go index e146b3c6f9f4..2d51f5574868 100644 --- a/xds/internal/client/client_requests_counter_test.go +++ b/xds/internal/client/client_requests_counter_test.go @@ -19,7 +19,6 @@ package client_test import ( - "math/rand" "sync" "testing" "time" @@ -27,10 +26,6 @@ import ( "google.golang.org/grpc/xds/internal/client" ) -func init() { - rand.Seed(time.Now().UnixNano()) -} - type counterTest struct { name string circuitBreaking bool @@ -60,7 +55,7 @@ func testCounter(t *testing.T, test counterTest) { fail(err) return } - time.Sleep(time.Duration(rand.Intn(10)) * time.Millisecond) + time.Sleep(time.Duration(1) * time.Millisecond) if err := counter.EndRequest(); err != nil { fail(err) return From 23e96d4d77dc8c2e52a0aa192c01a50d32d4688c Mon Sep 17 00:00:00 2001 From: "gargut@google.com" Date: Thu, 19 Nov 2020 09:27:35 -0800 Subject: [PATCH 12/28] using waitgroups in counter test --- .../client/client_requests_counter.go | 2 ++ .../client/client_requests_counter_test.go | 20 ++++++++++++------- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/xds/internal/client/client_requests_counter.go b/xds/internal/client/client_requests_counter.go index ac63b2b9d22e..c6340b621a0f 100644 --- a/xds/internal/client/client_requests_counter.go +++ b/xds/internal/client/client_requests_counter.go @@ -71,6 +71,7 @@ func (c *ServiceRequestsCounter) StartRequest() error { return fmt.Errorf("service name %v not identified", c.ServiceName) } sInfo.numRequests++ + fmt.Println("StartRequest:", c.ServiceName, sInfo.maxRequests, sInfo.numRequests) if sInfo.circuitBreaking && sInfo.numRequests > sInfo.maxRequests { return fmt.Errorf("max requests %v exceeded on service %v", sInfo.maxRequests, c.ServiceName) } @@ -88,6 +89,7 @@ func (c *ServiceRequestsCounter) EndRequest() error { return fmt.Errorf("service name %v not identified", c.ServiceName) } sInfo.numRequests-- + fmt.Println("EndRequest:", c.ServiceName, sInfo.maxRequests, sInfo.numRequests) src.services[c.ServiceName] = sInfo return nil } diff --git a/xds/internal/client/client_requests_counter_test.go b/xds/internal/client/client_requests_counter_test.go index 2d51f5574868..25b6f26ba8f5 100644 --- a/xds/internal/client/client_requests_counter_test.go +++ b/xds/internal/client/client_requests_counter_test.go @@ -21,7 +21,6 @@ package client_test import ( "sync" "testing" - "time" "google.golang.org/grpc/xds/internal/client" ) @@ -37,8 +36,12 @@ type counterTest struct { func testCounter(t *testing.T, test counterTest) { counter := client.ServiceRequestsCounter{ServiceName: test.name} counter.UpdateService(test.circuitBreaking, test.maxRequests) - wg := sync.WaitGroup{} - wg.Add(int(test.numRequests)) + requestsStartedWg := sync.WaitGroup{} + requestsStartedWg.Add(1) + requestsSent := sync.WaitGroup{} + requestsSent.Add(int(test.numRequests)) + requestsDoneWg := sync.WaitGroup{} + requestsDoneWg.Add(int(test.numRequests)) var firstError error = nil errorMu := sync.Mutex{} fail := func(err error) { @@ -50,19 +53,22 @@ func testCounter(t *testing.T, test counterTest) { } for i := 0; i < int(test.numRequests); i++ { go func() { - defer wg.Done() + defer requestsDoneWg.Done() if err := counter.StartRequest(); err != nil { fail(err) + requestsSent.Done() return } - time.Sleep(time.Duration(1) * time.Millisecond) + requestsSent.Done() + requestsStartedWg.Wait() if err := counter.EndRequest(); err != nil { fail(err) - return } }() } - wg.Wait() + requestsSent.Wait() + requestsStartedWg.Done() + requestsDoneWg.Wait() if test.errorExpected && firstError == nil { t.Error("no error when error expected") } From f3beb8496086304e0ba516bddf22a5c5b271ea15 Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Mon, 30 Nov 2020 13:15:42 -0800 Subject: [PATCH 13/28] Using *uint32 as max requests in various confis --- .../balancer/cdsbalancer/cdsbalancer.go | 12 +--------- .../balancer/cdsbalancer/cdsbalancer_test.go | 22 ++++--------------- xds/internal/balancer/edsbalancer/config.go | 12 +++++----- xds/internal/balancer/edsbalancer/eds_impl.go | 22 +++++++++++++------ .../balancer/edsbalancer/eds_impl_test.go | 7 +++++- xds/internal/balancer/edsbalancer/eds_test.go | 3 +-- xds/internal/balancer/edsbalancer/util.go | 5 +++++ xds/internal/client/client.go | 14 ++---------- xds/internal/client/client_cds_test.go | 11 +--------- .../client/client_requests_counter.go | 14 +++++++----- .../client/client_requests_counter_test.go | 5 ++++- xds/internal/client/client_xds.go | 20 +++++++---------- xds/internal/env/env.go | 9 ++++++-- 13 files changed, 68 insertions(+), 88 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 52bfff136736..da211a29242f 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -285,15 +285,6 @@ func (b *cdsBalancer) handleSecurityConfig(config *xdsclient.SecurityConfig) err return nil } -func getCircuitBreaking(update xdsclient.ClusterUpdate) (circuitBreaking bool, maxRequests uint32) { - for _, threshold := range update.Thresholds { - if threshold.RoutingPriority == "DEFAULT" { - return true, threshold.MaxRequests - } - } - return false, 0 -} - func buildProviderFunc(configs map[string]*certprovider.BuildableConfig, instanceName, certName string, wantIdentity, wantRoot bool) (certprovider.Provider, error) { cfg, ok := configs[instanceName] if !ok { @@ -351,8 +342,7 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) { b.edsLB = edsLB b.logger.Infof("Created child policy %p of type %s", b.edsLB, edsName) } - circuitBreaking, maxRequests := getCircuitBreaking(update.cds) - lbCfg := &edsbalancer.EDSConfig{EDSServiceName: update.cds.ServiceName, CircuitBreaking: circuitBreaking, MaxRequests: maxRequests} + lbCfg := &edsbalancer.EDSConfig{EDSServiceName: update.cds.ServiceName, MaxRequests: update.cds.MaxRequests} if update.cds.EnableLRS { // An empty string here indicates that the edsBalancer should use the // same xDS server for load reporting as it does for EDS diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 6723912c0e2b..e94a3b353b8f 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -209,8 +209,7 @@ func edsCCS(service string, enableLRS bool, xdsClient interface{}) balancer.Clie func edsCCScb(service string, enableLRS bool, xdsClient interface{}, maxRequests uint32) balancer.ClientConnState { ccs := edsCCS(service, enableLRS, xdsClient) if edsConfig, ok := ccs.BalancerConfig.(*edsbalancer.EDSConfig); ok { - edsConfig.CircuitBreaking = true - edsConfig.MaxRequests = maxRequests + edsConfig.MaxRequests = func() *uint32 { i := uint32(maxRequests); return &i }() } return ccs } @@ -388,22 +387,9 @@ func (s) TestHandleClusterUpdate(t *testing.T) { wantCCS: edsCCS(serviceName, false, xdsC), }, { - name: "happy-case-with-circuit-breakers", - cdsUpdate: xdsclient.ClusterUpdate{ServiceName: serviceName, Thresholds: []xdsclient.CircuitBreakerThreshold{ - { - RoutingPriority: "HIGH", - MaxRequests: 1024, - }, - { - RoutingPriority: "DEFAULT", - MaxRequests: 512, - }, - { - RoutingPriority: "DEFAULT", - MaxRequests: 256, - }, - }}, - wantCCS: edsCCScb(serviceName, false, xdsC, 512), + name: "happy-case-with-circuit-breakers", + cdsUpdate: xdsclient.ClusterUpdate{ServiceName: serviceName, MaxRequests: func() *uint32 { i := uint32(512); return &i }()}, + wantCCS: edsCCScb(serviceName, false, xdsC, 512), }, } diff --git a/xds/internal/balancer/edsbalancer/config.go b/xds/internal/balancer/edsbalancer/config.go index 787d4c6b8f82..c25a571cf064 100644 --- a/xds/internal/balancer/edsbalancer/config.go +++ b/xds/internal/balancer/edsbalancer/config.go @@ -42,10 +42,8 @@ type EDSConfig struct { // will be disabled. If set to the empty string, load reporting will // be sent to the same server that we obtained CDS data from. LrsLoadReportingServerName *string - // If circuit breaking is enabled. - CircuitBreaking bool - // Max requests when circuit breaking. - MaxRequests uint32 + // Max requests when circuit breaking, if any (otherwise nil). + MaxRequests *uint32 } // edsConfigJSON is the intermediate unmarshal result of EDSConfig. ChildPolicy @@ -71,8 +69,10 @@ func (l *EDSConfig) UnmarshalJSON(data []byte) error { l.EDSServiceName = configJSON.EDSServiceName l.LrsLoadReportingServerName = configJSON.LRSLoadReportingServerName - l.CircuitBreaking = configJSON.CircuitBreaking - l.MaxRequests = configJSON.MaxRequests + l.MaxRequests = nil + if configJSON.CircuitBreaking { + l.MaxRequests = &configJSON.MaxRequests + } for _, lbcfg := range configJSON.ChildPolicy { if balancer.Get(lbcfg.Name) != nil { diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index 62e36aa3d895..1ee35bd6a692 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -38,6 +38,7 @@ import ( "google.golang.org/grpc/xds/internal/client" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/load" + "google.golang.org/grpc/xds/internal/env" ) // TODO: make this a environment variable? @@ -393,11 +394,14 @@ func (edsImpl *edsBalancerImpl) handleSubConnStateChange(sc balancer.SubConn, s // updateConfig handles changes to the circuit breaking configuration. func (edsImpl *edsBalancerImpl) updateConfig(edsConfig *EDSConfig) { + if !env.CircuitBreakingSupport { + return + } if edsImpl.counter == nil || edsImpl.counter.ServiceName != edsConfig.EDSServiceName { edsImpl.counter = &client.ServiceRequestsCounter{ServiceName: edsConfig.EDSServiceName} } - edsImpl.counter.UpdateService(edsConfig.CircuitBreaking, edsConfig.MaxRequests) - if !edsConfig.CircuitBreaking { + edsImpl.counter.UpdateCounter(edsConfig.MaxRequests) + if edsConfig.MaxRequests == nil { // counter should be nil to prevent overhead in dropPicker. edsImpl.counter = nil } @@ -499,13 +503,14 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { } return balancer.PickResult{}, status.Errorf(codes.Unavailable, "RPC is dropped") } - // TODO: (eds) don't drop unless the inner picker is READY. Similar to - // https://github.com/grpc/grpc-go/issues/2622. - pr, err := d.p.Pick(info) - if d.counter != nil && err == nil { + if d.counter != nil { if err := d.counter.StartRequest(); err != nil { return balancer.PickResult{}, status.Errorf(codes.Unavailable, err.Error()) } + pr, err := d.p.Pick(info) + if err != nil { + return pr, err + } oldDone := pr.Done pr.Done = func(doneInfo balancer.DoneInfo) { d.counter.EndRequest() @@ -513,6 +518,9 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { oldDone(doneInfo) } } + return pr, err } - return pr, err + // TODO: (eds) don't drop unless the inner picker is READY. Similar to + // https://github.com/grpc/grpc-go/issues/2622. + return d.p.Pick(info) } diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index fd3289fc3381..e595c40a1482 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/xds/internal/balancer/balancergroup" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/load" + "google.golang.org/grpc/xds/internal/env" "google.golang.org/grpc/xds/internal/testutils" ) @@ -551,10 +552,14 @@ func (s) TestEDS_UpdateSubBalancerName(t *testing.T) { } func (s) TestEDS_CircuitBreaking(t *testing.T) { + origCircuitBreakingSupport := env.CircuitBreakingSupport + env.CircuitBreakingSupport = true + defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }() + cc := testutils.NewTestClientConn(t) edsb := newEDSBalancerImpl(cc, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState - edsb.updateConfig(&EDSConfig{EDSServiceName: "test", CircuitBreaking: true, MaxRequests: 50}) + edsb.updateConfig(&EDSConfig{EDSServiceName: "test", MaxRequests: newUint32(50)}) // One locality with one backend. clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index 0cd23e2c85f4..8bc13cb2e0ac 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -576,8 +576,7 @@ func (s) TestXDSBalancerConfigParsing(t *testing.T) { "maxRequests": 1024 }`), want: &EDSConfig{ - CircuitBreaking: true, - MaxRequests: 1024, + MaxRequests: newUint32(1024), }, wantErr: false, }, diff --git a/xds/internal/balancer/edsbalancer/util.go b/xds/internal/balancer/edsbalancer/util.go index 132950426466..bcd54113b8d1 100644 --- a/xds/internal/balancer/edsbalancer/util.go +++ b/xds/internal/balancer/edsbalancer/util.go @@ -42,3 +42,8 @@ func newDropper(c xdsclient.OverloadDropConfig) *dropper { func (d *dropper) drop() (ret bool) { return d.w.Next().(bool) } + +func newUint32(value uint32) *uint32 { + i := uint32(value) + return &i +} diff --git a/xds/internal/client/client.go b/xds/internal/client/client.go index ebecd511841d..52897ac248f1 100644 --- a/xds/internal/client/client.go +++ b/xds/internal/client/client.go @@ -219,16 +219,6 @@ type SecurityConfig struct { AcceptedSANs []string } -// CircuitBreakerThreshold contains the circuit breakers configuration -// received as part of the Cluster resource. -type CircuitBreakerThreshold struct { - // RoutingPriority is the routing priority for the threshold. DEFAULT or - // HIGH. - RoutingPriority string - // MaxRequests is the maximum requests for the threshold. - MaxRequests uint32 -} - // ClusterUpdate contains information from a received CDS response, which is of // interest to the registered CDS watcher. type ClusterUpdate struct { @@ -239,8 +229,8 @@ type ClusterUpdate struct { EnableLRS bool // SecurityCfg contains security configuration sent by the xDS server. SecurityCfg *SecurityConfig - // Thresholds contains the circuit breaker thresholds, if any. - Thresholds []CircuitBreakerThreshold + // MaxRequests for circuit breaking, if any (otherwise nil). + MaxRequests *uint32 } // OverloadDropConfig contains the config to drop overloads. diff --git a/xds/internal/client/client_cds_test.go b/xds/internal/client/client_cds_test.go index 69b35c8e3ee6..accc048b76fa 100644 --- a/xds/internal/client/client_cds_test.go +++ b/xds/internal/client/client_cds_test.go @@ -202,16 +202,7 @@ func (s) TestValidateCluster_Success(t *testing.T) { }, }, }, - wantUpdate: ClusterUpdate{ServiceName: serviceName, EnableLRS: true, Thresholds: []CircuitBreakerThreshold{ - { - RoutingPriority: "DEFAULT", - MaxRequests: 512, - }, - { - RoutingPriority: "HIGH", - MaxRequests: 1024, - }, - }}, + wantUpdate: ClusterUpdate{ServiceName: serviceName, EnableLRS: true, MaxRequests: func() *uint32 { i := uint32(512); return &i }()}, }, } diff --git a/xds/internal/client/client_requests_counter.go b/xds/internal/client/client_requests_counter.go index c6340b621a0f..681022319ab4 100644 --- a/xds/internal/client/client_requests_counter.go +++ b/xds/internal/client/client_requests_counter.go @@ -46,17 +46,19 @@ type ServiceRequestsCounter struct { ServiceName string } -// UpdateService updates the configuration for a service, or creates it if it -// doesn't exist. -func (c *ServiceRequestsCounter) UpdateService(circuitBreaking bool, maxRequests uint32) { +// UpdateCounter updates the configuration for a service, or creates it if it +// doesn't exist. Pass nil to disable circuit breaking for a service. +func (c *ServiceRequestsCounter) UpdateCounter(maxRequests *uint32) { src.mu.Lock() defer src.mu.Unlock() sInfo, ok := src.services[c.ServiceName] if !ok { sInfo = serviceInfo{numRequests: 0} } - sInfo.circuitBreaking = circuitBreaking - sInfo.maxRequests = maxRequests + sInfo.circuitBreaking = maxRequests != nil + if maxRequests != nil { + sInfo.maxRequests = *maxRequests + } src.services[c.ServiceName] = sInfo } @@ -71,7 +73,7 @@ func (c *ServiceRequestsCounter) StartRequest() error { return fmt.Errorf("service name %v not identified", c.ServiceName) } sInfo.numRequests++ - fmt.Println("StartRequest:", c.ServiceName, sInfo.maxRequests, sInfo.numRequests) + fmt.Println("StartRequest:", c.ServiceName, sInfo.circuitBreaking, sInfo.maxRequests, sInfo.numRequests) if sInfo.circuitBreaking && sInfo.numRequests > sInfo.maxRequests { return fmt.Errorf("max requests %v exceeded on service %v", sInfo.maxRequests, c.ServiceName) } diff --git a/xds/internal/client/client_requests_counter_test.go b/xds/internal/client/client_requests_counter_test.go index 25b6f26ba8f5..b60205981508 100644 --- a/xds/internal/client/client_requests_counter_test.go +++ b/xds/internal/client/client_requests_counter_test.go @@ -35,7 +35,10 @@ type counterTest struct { func testCounter(t *testing.T, test counterTest) { counter := client.ServiceRequestsCounter{ServiceName: test.name} - counter.UpdateService(test.circuitBreaking, test.maxRequests) + counter.UpdateCounter(&test.maxRequests) + if !test.circuitBreaking { + counter.UpdateCounter(nil) + } requestsStartedWg := sync.WaitGroup{} requestsStartedWg.Add(1) requestsSent := sync.WaitGroup{} diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go index 5a6aef6987ce..c124087cc192 100644 --- a/xds/internal/client/client_xds.go +++ b/xds/internal/client/client_xds.go @@ -309,7 +309,7 @@ func validateCluster(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) { if err != nil { return emptyUpdate, err } - cbt, err := circuitBreakersFromCluster(cluster) + mr, err := circuitBreakersFromCluster(cluster) if err != nil { return emptyUpdate, err } @@ -317,7 +317,7 @@ func validateCluster(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) { ServiceName: cluster.GetEdsClusterConfig().GetServiceName(), EnableLRS: cluster.GetLrsServer().GetSelf() != nil, SecurityCfg: sc, - Thresholds: cbt, + MaxRequests: mr, }, nil } @@ -390,7 +390,7 @@ func securityConfigFromCluster(cluster *v3clusterpb.Cluster) (*SecurityConfig, e // circuitBreakersFromCluster extracts the circuit breakers configuration from // the received cluster resource. Returns nil if no CircuitBreakers or no // Thresholds in CircuitBreakers. -func circuitBreakersFromCluster(cluster *v3clusterpb.Cluster) ([]CircuitBreakerThreshold, error) { +func circuitBreakersFromCluster(cluster *v3clusterpb.Cluster) (*uint32, error) { circuitBreakers := cluster.GetCircuitBreakers() if circuitBreakers == nil { return nil, nil @@ -399,22 +399,18 @@ func circuitBreakersFromCluster(cluster *v3clusterpb.Cluster) ([]CircuitBreakerT if thresholds == nil { return nil, nil } - cbt := make([]CircuitBreakerThreshold, 0) for _, threshold := range thresholds { + if threshold.GetPriority().String() != "DEFAULT" { + continue + } maxRequestsPb := threshold.GetMaxRequests() var maxRequests uint32 = 1024 if maxRequestsPb != nil { maxRequests = maxRequestsPb.GetValue() } - cbt = append(cbt, CircuitBreakerThreshold{ - RoutingPriority: threshold.GetPriority().String(), - MaxRequests: maxRequests, - }) - } - if len(cbt) == 0 { - return nil, nil + return &maxRequests, nil } - return cbt, nil + return nil, nil } // UnmarshalEndpoints processes resources received in an EDS response, diff --git a/xds/internal/env/env.go b/xds/internal/env/env.go index 3b338eae05fd..577c6737c4d4 100644 --- a/xds/internal/env/env.go +++ b/xds/internal/env/env.go @@ -26,8 +26,9 @@ import ( ) const ( - bootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP" - xdsV3SupportEnv = "GRPC_XDS_EXPERIMENTAL_V3_SUPPORT" + bootstrapFileNameEnv = "GRPC_XDS_BOOTSTRAP" + xdsV3SupportEnv = "GRPC_XDS_EXPERIMENTAL_V3_SUPPORT" + circuitBreakingSupportEnv = "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" ) var ( @@ -39,4 +40,8 @@ var ( // done by setting the environment variable // "GRPC_XDS_EXPERIMENTAL_V3_SUPPORT" to "true". V3Support = strings.EqualFold(os.Getenv(xdsV3SupportEnv), "true") + // CircuitBreakingSupport indicates whether circuit breaking support is + // enabled, which can be done by setting the environment variable + // "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "true". + CircuitBreakingSupport = strings.EqualFold(os.Getenv(xdsV3SupportEnv), "true") ) From eff8dfa57ca607d9694f044bf7ddc0e5f0c3316d Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Mon, 30 Nov 2020 13:20:59 -0800 Subject: [PATCH 14/28] Removed error return value --- xds/internal/client/client_xds.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go index c124087cc192..46b6fa18b9c8 100644 --- a/xds/internal/client/client_xds.go +++ b/xds/internal/client/client_xds.go @@ -390,14 +390,14 @@ func securityConfigFromCluster(cluster *v3clusterpb.Cluster) (*SecurityConfig, e // circuitBreakersFromCluster extracts the circuit breakers configuration from // the received cluster resource. Returns nil if no CircuitBreakers or no // Thresholds in CircuitBreakers. -func circuitBreakersFromCluster(cluster *v3clusterpb.Cluster) (*uint32, error) { +func circuitBreakersFromCluster(cluster *v3clusterpb.Cluster) *uint32 { circuitBreakers := cluster.GetCircuitBreakers() if circuitBreakers == nil { - return nil, nil + return nil } thresholds := circuitBreakers.GetThresholds() if thresholds == nil { - return nil, nil + return nil } for _, threshold := range thresholds { if threshold.GetPriority().String() != "DEFAULT" { @@ -408,9 +408,9 @@ func circuitBreakersFromCluster(cluster *v3clusterpb.Cluster) (*uint32, error) { if maxRequestsPb != nil { maxRequests = maxRequestsPb.GetValue() } - return &maxRequests, nil + return &maxRequests } - return nil, nil + return nil } // UnmarshalEndpoints processes resources received in an EDS response, From 4cd2b4f705f7022f77b3a56d902bd384bb7eee06 Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Mon, 30 Nov 2020 13:23:14 -0800 Subject: [PATCH 15/28] fixed error --- xds/internal/client/client_xds.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go index 46b6fa18b9c8..22b85257ec10 100644 --- a/xds/internal/client/client_xds.go +++ b/xds/internal/client/client_xds.go @@ -309,10 +309,7 @@ func validateCluster(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) { if err != nil { return emptyUpdate, err } - mr, err := circuitBreakersFromCluster(cluster) - if err != nil { - return emptyUpdate, err - } + mr := circuitBreakersFromCluster(cluster) return ClusterUpdate{ ServiceName: cluster.GetEdsClusterConfig().GetServiceName(), EnableLRS: cluster.GetLrsServer().GetSelf() != nil, From 3308919ed98d11f16ea4ce6f4114c20d70fafa69 Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Mon, 30 Nov 2020 13:55:16 -0800 Subject: [PATCH 16/28] Reworked counter --- xds/internal/balancer/edsbalancer/eds_impl.go | 7 +- .../client/client_requests_counter.go | 72 ++++++++----------- .../client/client_requests_counter_test.go | 58 ++++++--------- 3 files changed, 54 insertions(+), 83 deletions(-) diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index 1ee35bd6a692..092a4b085f6e 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -398,10 +398,11 @@ func (edsImpl *edsBalancerImpl) updateConfig(edsConfig *EDSConfig) { return } if edsImpl.counter == nil || edsImpl.counter.ServiceName != edsConfig.EDSServiceName { - edsImpl.counter = &client.ServiceRequestsCounter{ServiceName: edsConfig.EDSServiceName} + edsImpl.counter = client.NewServiceRequestsCounter(edsConfig.EDSServiceName) } - edsImpl.counter.UpdateCounter(edsConfig.MaxRequests) - if edsConfig.MaxRequests == nil { + if edsConfig.MaxRequests != nil { + edsImpl.counter.SetMaxRequests(*edsConfig.MaxRequests) + } else { // counter should be nil to prevent overhead in dropPicker. edsImpl.counter = nil } diff --git a/xds/internal/client/client_requests_counter.go b/xds/internal/client/client_requests_counter.go index 681022319ab4..1ddf7a038e40 100644 --- a/xds/internal/client/client_requests_counter.go +++ b/xds/internal/client/client_requests_counter.go @@ -24,18 +24,12 @@ import ( ) func init() { - src.services = make(map[string]serviceInfo) + src.services = make(map[string]*ServiceRequestsCounter) } type servicesRequestsCounter struct { mu sync.Mutex - services map[string]serviceInfo -} - -type serviceInfo struct { - circuitBreaking bool - maxRequests uint32 - numRequests uint32 + services map[string]*ServiceRequestsCounter } var src servicesRequestsCounter @@ -43,55 +37,49 @@ var src servicesRequestsCounter // ServiceRequestsCounter is used to track the total inflight requests for a // service with the provided name. type ServiceRequestsCounter struct { + mu sync.Mutex ServiceName string + maxRequests uint32 + numRequests uint32 } -// UpdateCounter updates the configuration for a service, or creates it if it -// doesn't exist. Pass nil to disable circuit breaking for a service. -func (c *ServiceRequestsCounter) UpdateCounter(maxRequests *uint32) { +// NewServiceRequestsCounter creates a new ServiceRequestsCounter that is +// internally tracked by this package and returns a pointer to it. If one with +// the serviceName already exists, returns a pointer to it. +func NewServiceRequestsCounter(serviceName string) *ServiceRequestsCounter { src.mu.Lock() defer src.mu.Unlock() - sInfo, ok := src.services[c.ServiceName] + c, ok := src.services[serviceName] if !ok { - sInfo = serviceInfo{numRequests: 0} + c = &ServiceRequestsCounter{ServiceName: serviceName} + src.services[serviceName] = c } - sInfo.circuitBreaking = maxRequests != nil - if maxRequests != nil { - sInfo.maxRequests = *maxRequests - } - src.services[c.ServiceName] = sInfo + return c +} + +// SetMax updates the max requests for a service's counter. +func (c *ServiceRequestsCounter) SetMaxRequests(maxRequests uint32) { + c.mu.Lock() + defer c.mu.Unlock() + c.maxRequests = maxRequests } // StartRequest starts a request for a service, incrementing its number of -// requests by 1. Returns an error if circuit breaking is on and the max number -// of requests is exceeded. +// requests by 1. Returns an error if the max number of requests is exceeded. func (c *ServiceRequestsCounter) StartRequest() error { - src.mu.Lock() - defer src.mu.Unlock() - sInfo, ok := src.services[c.ServiceName] - if !ok { - return fmt.Errorf("service name %v not identified", c.ServiceName) - } - sInfo.numRequests++ - fmt.Println("StartRequest:", c.ServiceName, sInfo.circuitBreaking, sInfo.maxRequests, sInfo.numRequests) - if sInfo.circuitBreaking && sInfo.numRequests > sInfo.maxRequests { - return fmt.Errorf("max requests %v exceeded on service %v", sInfo.maxRequests, c.ServiceName) + c.mu.Lock() + defer c.mu.Unlock() + if c.numRequests+1 > c.maxRequests { + return fmt.Errorf("max requests %v exceeded on service %v", c.maxRequests, c.ServiceName) } - src.services[c.ServiceName] = sInfo + c.numRequests++ return nil } // EndRequest ends a request for a service, decrementing its number of requests // by 1. -func (c *ServiceRequestsCounter) EndRequest() error { - src.mu.Lock() - defer src.mu.Unlock() - sInfo, ok := src.services[c.ServiceName] - if !ok { - return fmt.Errorf("service name %v not identified", c.ServiceName) - } - sInfo.numRequests-- - fmt.Println("EndRequest:", c.ServiceName, sInfo.maxRequests, sInfo.numRequests) - src.services[c.ServiceName] = sInfo - return nil +func (c *ServiceRequestsCounter) EndRequest() { + c.mu.Lock() + defer c.mu.Unlock() + c.numRequests-- } diff --git a/xds/internal/client/client_requests_counter_test.go b/xds/internal/client/client_requests_counter_test.go index b60205981508..60c0dfebe572 100644 --- a/xds/internal/client/client_requests_counter_test.go +++ b/xds/internal/client/client_requests_counter_test.go @@ -26,19 +26,15 @@ import ( ) type counterTest struct { - name string - circuitBreaking bool - maxRequests uint32 - numRequests uint32 - errorExpected bool + name string + maxRequests uint32 + numRequests uint32 + errorExpected bool } func testCounter(t *testing.T, test counterTest) { - counter := client.ServiceRequestsCounter{ServiceName: test.name} - counter.UpdateCounter(&test.maxRequests) - if !test.circuitBreaking { - counter.UpdateCounter(nil) - } + counter := client.NewServiceRequestsCounter(test.name) + counter.SetMaxRequests(test.maxRequests) requestsStartedWg := sync.WaitGroup{} requestsStartedWg.Add(1) requestsSent := sync.WaitGroup{} @@ -47,26 +43,21 @@ func testCounter(t *testing.T, test counterTest) { requestsDoneWg.Add(int(test.numRequests)) var firstError error = nil errorMu := sync.Mutex{} - fail := func(err error) { - errorMu.Lock() - defer errorMu.Unlock() - if firstError == nil { - firstError = err - } - } for i := 0; i < int(test.numRequests); i++ { go func() { defer requestsDoneWg.Done() if err := counter.StartRequest(); err != nil { - fail(err) + errorMu.Lock() + defer errorMu.Unlock() + if firstError == nil { + firstError = err + } requestsSent.Done() return } requestsSent.Done() requestsStartedWg.Wait() - if err := counter.EndRequest(); err != nil { - fail(err) - } + counter.EndRequest() }() } requestsSent.Wait() @@ -83,25 +74,16 @@ func testCounter(t *testing.T, test counterTest) { func (s) TestRequestsCounter(t *testing.T) { tests := []counterTest{ { - name: "cb-on-no-exceed", - circuitBreaking: true, - maxRequests: 1024, - numRequests: 1024, - errorExpected: false, - }, - { - name: "cb-off-exceeds", - circuitBreaking: false, - maxRequests: 32, - numRequests: 64, - errorExpected: false, + name: "does-not-exceed-max-requests", + maxRequests: 1024, + numRequests: 1024, + errorExpected: false, }, { - name: "cb-on-exceeds", - circuitBreaking: true, - maxRequests: 32, - numRequests: 64, - errorExpected: true, + name: "exceeds-max-requests", + maxRequests: 32, + numRequests: 64, + errorExpected: true, }, } for _, test := range tests { From 13c8c414089688570307edf53c53c22a1a1e9d6f Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Mon, 30 Nov 2020 16:55:45 -0800 Subject: [PATCH 17/28] vet fix --- xds/internal/client/client_requests_counter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/client/client_requests_counter.go b/xds/internal/client/client_requests_counter.go index 1ddf7a038e40..5aedce4ae6d6 100644 --- a/xds/internal/client/client_requests_counter.go +++ b/xds/internal/client/client_requests_counter.go @@ -57,7 +57,7 @@ func NewServiceRequestsCounter(serviceName string) *ServiceRequestsCounter { return c } -// SetMax updates the max requests for a service's counter. +// SetMaxRequests updates the max requests for a service's counter. func (c *ServiceRequestsCounter) SetMaxRequests(maxRequests uint32) { c.mu.Lock() defer c.mu.Unlock() From 96282cb729d0ab063d342d42c7354ac3471d889b Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Tue, 1 Dec 2020 13:12:44 -0800 Subject: [PATCH 18/28] Fixed error from merge master --- xds/internal/balancer/cdsbalancer/cdsbalancer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 66a6861d3848..25544a14f228 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -204,7 +204,7 @@ func edsCCS(service string, enableLRS bool) balancer.ClientConnState { // edsCCScb is edsCCS except it also adds circuit breaking configuration. func edsCCScb(service string, enableLRS bool, xdsClient interface{}, maxRequests uint32) balancer.ClientConnState { - ccs := edsCCS(service, enableLRS, xdsClient) + ccs := edsCCS(service, enableLRS) if edsConfig, ok := ccs.BalancerConfig.(*edsbalancer.EDSConfig); ok { edsConfig.MaxRequests = func() *uint32 { i := uint32(maxRequests); return &i }() } From dbe85ecb31ff73ec0c83cff73c18a4129081f81a Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Wed, 2 Dec 2020 01:05:33 -0800 Subject: [PATCH 19/28] Removed edsconfig change, counter updated in CDS balancer --- .../balancer/cdsbalancer/cdsbalancer.go | 7 +++++- .../balancer/cdsbalancer/cdsbalancer_test.go | 14 ------------ xds/internal/balancer/edsbalancer/config.go | 8 ------- xds/internal/balancer/edsbalancer/eds_impl.go | 6 ----- .../balancer/edsbalancer/eds_impl_test.go | 5 ++++- xds/internal/balancer/edsbalancer/eds_test.go | 12 ---------- xds/internal/balancer/edsbalancer/util.go | 5 ----- .../client/client_requests_counter.go | 22 ++++++++++++++----- .../client/client_requests_counter_test.go | 2 +- 9 files changed, 28 insertions(+), 53 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index b4032e87f6f9..82433a1ceb20 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -34,6 +34,7 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/edsbalancer" + "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/bootstrap" xdsclient "google.golang.org/grpc/xds/internal/client" @@ -340,16 +341,20 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) { b.edsLB = edsLB b.logger.Infof("Created child policy %p of type %s", b.edsLB, edsName) } - lbCfg := &edsbalancer.EDSConfig{EDSServiceName: update.cds.ServiceName, MaxRequests: update.cds.MaxRequests} + lbCfg := &edsbalancer.EDSConfig{EDSServiceName: update.cds.ServiceName} if update.cds.EnableLRS { // An empty string here indicates that the edsBalancer should use the // same xDS server for load reporting as it does for EDS // requests/responses. lbCfg.LrsLoadReportingServerName = new(string) + } ccState := balancer.ClientConnState{ BalancerConfig: lbCfg, } + if update.cds.MaxRequests != nil { + client.SetMaxRequests(update.cds.ServiceName, update.cds.MaxRequests) + } if err := b.edsLB.UpdateClientConnState(ccState); err != nil { b.logger.Errorf("xds: edsBalancer.UpdateClientConnState(%+v) returned error: %v", ccState, err) } diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 25544a14f228..ccd1699afaac 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -202,15 +202,6 @@ func edsCCS(service string, enableLRS bool) balancer.ClientConnState { } } -// edsCCScb is edsCCS except it also adds circuit breaking configuration. -func edsCCScb(service string, enableLRS bool, xdsClient interface{}, maxRequests uint32) balancer.ClientConnState { - ccs := edsCCS(service, enableLRS) - if edsConfig, ok := ccs.BalancerConfig.(*edsbalancer.EDSConfig); ok { - edsConfig.MaxRequests = func() *uint32 { i := uint32(maxRequests); return &i }() - } - return ccs -} - // setup creates a cdsBalancer and an edsBalancer (and overrides the // newEDSBalancer function to return it), and also returns a cleanup function. func setup(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDSBalancer, *xdstestutils.TestClientConn, func()) { @@ -365,11 +356,6 @@ func (s) TestHandleClusterUpdate(t *testing.T) { cdsUpdate: xdsclient.ClusterUpdate{ServiceName: serviceName}, wantCCS: edsCCS(serviceName, false), }, - { - name: "happy-case-with-circuit-breakers", - cdsUpdate: xdsclient.ClusterUpdate{ServiceName: serviceName, MaxRequests: func() *uint32 { i := uint32(512); return &i }()}, - wantCCS: edsCCScb(serviceName, false, xdsC, 512), - }, } for _, test := range tests { diff --git a/xds/internal/balancer/edsbalancer/config.go b/xds/internal/balancer/edsbalancer/config.go index c25a571cf064..9b59cfa01ab3 100644 --- a/xds/internal/balancer/edsbalancer/config.go +++ b/xds/internal/balancer/edsbalancer/config.go @@ -42,8 +42,6 @@ type EDSConfig struct { // will be disabled. If set to the empty string, load reporting will // be sent to the same server that we obtained CDS data from. LrsLoadReportingServerName *string - // Max requests when circuit breaking, if any (otherwise nil). - MaxRequests *uint32 } // edsConfigJSON is the intermediate unmarshal result of EDSConfig. ChildPolicy @@ -54,8 +52,6 @@ type edsConfigJSON struct { FallbackPolicy []*loadBalancingConfig EDSServiceName string LRSLoadReportingServerName *string - CircuitBreaking bool - MaxRequests uint32 } // UnmarshalJSON parses the JSON-encoded byte slice in data and stores it in l. @@ -69,10 +65,6 @@ func (l *EDSConfig) UnmarshalJSON(data []byte) error { l.EDSServiceName = configJSON.EDSServiceName l.LrsLoadReportingServerName = configJSON.LRSLoadReportingServerName - l.MaxRequests = nil - if configJSON.CircuitBreaking { - l.MaxRequests = &configJSON.MaxRequests - } for _, lbcfg := range configJSON.ChildPolicy { if balancer.Get(lbcfg.Name) != nil { diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index 092a4b085f6e..0200793f7bdb 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -400,12 +400,6 @@ func (edsImpl *edsBalancerImpl) updateConfig(edsConfig *EDSConfig) { if edsImpl.counter == nil || edsImpl.counter.ServiceName != edsConfig.EDSServiceName { edsImpl.counter = client.NewServiceRequestsCounter(edsConfig.EDSServiceName) } - if edsConfig.MaxRequests != nil { - edsImpl.counter.SetMaxRequests(*edsConfig.MaxRequests) - } else { - // counter should be nil to prevent overhead in dropPicker. - edsImpl.counter = nil - } } // updateState first handles priority, and then wraps picker in a drop picker diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index e595c40a1482..0601c68c6917 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -32,6 +32,7 @@ import ( "google.golang.org/grpc/connectivity" "google.golang.org/grpc/xds/internal" "google.golang.org/grpc/xds/internal/balancer/balancergroup" + "google.golang.org/grpc/xds/internal/client" xdsclient "google.golang.org/grpc/xds/internal/client" "google.golang.org/grpc/xds/internal/client/load" "google.golang.org/grpc/xds/internal/env" @@ -559,7 +560,9 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { cc := testutils.NewTestClientConn(t) edsb := newEDSBalancerImpl(cc, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState - edsb.updateConfig(&EDSConfig{EDSServiceName: "test", MaxRequests: newUint32(50)}) + edsb.updateConfig(&EDSConfig{EDSServiceName: "test"}) + var maxRequests uint32 = 50 + client.SetMaxRequests("test", &maxRequests) // One locality with one backend. clab1 := testutils.NewClusterLoadAssignmentBuilder(testClusterNames[0], nil) diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index dd963525f0ea..0914fa363456 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -561,18 +561,6 @@ func (s) TestXDSBalancerConfigParsing(t *testing.T) { }, wantErr: false, }, - { - name: "circuit-breaking", - js: json.RawMessage(` -{ - "circuitBreaking": true, - "maxRequests": 1024 -}`), - want: &EDSConfig{ - MaxRequests: newUint32(1024), - }, - wantErr: false, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/xds/internal/balancer/edsbalancer/util.go b/xds/internal/balancer/edsbalancer/util.go index bcd54113b8d1..132950426466 100644 --- a/xds/internal/balancer/edsbalancer/util.go +++ b/xds/internal/balancer/edsbalancer/util.go @@ -42,8 +42,3 @@ func newDropper(c xdsclient.OverloadDropConfig) *dropper { func (d *dropper) drop() (ret bool) { return d.w.Next().(bool) } - -func newUint32(value uint32) *uint32 { - i := uint32(value) - return &i -} diff --git a/xds/internal/client/client_requests_counter.go b/xds/internal/client/client_requests_counter.go index 5aedce4ae6d6..633891f4bb27 100644 --- a/xds/internal/client/client_requests_counter.go +++ b/xds/internal/client/client_requests_counter.go @@ -41,6 +41,7 @@ type ServiceRequestsCounter struct { ServiceName string maxRequests uint32 numRequests uint32 + enableMax bool } // NewServiceRequestsCounter creates a new ServiceRequestsCounter that is @@ -58,10 +59,21 @@ func NewServiceRequestsCounter(serviceName string) *ServiceRequestsCounter { } // SetMaxRequests updates the max requests for a service's counter. -func (c *ServiceRequestsCounter) SetMaxRequests(maxRequests uint32) { - c.mu.Lock() - defer c.mu.Unlock() - c.maxRequests = maxRequests +func SetMaxRequests(serviceName string, maxRequests *uint32) *ServiceRequestsCounter { + src.mu.Lock() + defer src.mu.Unlock() + c, ok := src.services[serviceName] + if !ok { + c = &ServiceRequestsCounter{ServiceName: serviceName} + src.services[serviceName] = c + } + if maxRequests != nil { + c.enableMax = true + c.maxRequests = *maxRequests + } else { + c.enableMax = false + } + return c } // StartRequest starts a request for a service, incrementing its number of @@ -69,7 +81,7 @@ func (c *ServiceRequestsCounter) SetMaxRequests(maxRequests uint32) { func (c *ServiceRequestsCounter) StartRequest() error { c.mu.Lock() defer c.mu.Unlock() - if c.numRequests+1 > c.maxRequests { + if c.enableMax && c.numRequests+1 > c.maxRequests { return fmt.Errorf("max requests %v exceeded on service %v", c.maxRequests, c.ServiceName) } c.numRequests++ diff --git a/xds/internal/client/client_requests_counter_test.go b/xds/internal/client/client_requests_counter_test.go index 60c0dfebe572..c6fce23935e3 100644 --- a/xds/internal/client/client_requests_counter_test.go +++ b/xds/internal/client/client_requests_counter_test.go @@ -34,7 +34,7 @@ type counterTest struct { func testCounter(t *testing.T, test counterTest) { counter := client.NewServiceRequestsCounter(test.name) - counter.SetMaxRequests(test.maxRequests) + client.SetMaxRequests(test.name, &test.maxRequests) requestsStartedWg := sync.WaitGroup{} requestsStartedWg.Add(1) requestsSent := sync.WaitGroup{} From bb4e67422daa40b0cff450b1ded62d9fcf8d0096 Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Wed, 2 Dec 2020 01:13:30 -0800 Subject: [PATCH 20/28] Fixed drop picker bug --- xds/internal/balancer/cdsbalancer/cdsbalancer.go | 4 +--- xds/internal/balancer/edsbalancer/eds_impl.go | 1 + 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 82433a1ceb20..96da4548e0ee 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -352,9 +352,7 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) { ccState := balancer.ClientConnState{ BalancerConfig: lbCfg, } - if update.cds.MaxRequests != nil { - client.SetMaxRequests(update.cds.ServiceName, update.cds.MaxRequests) - } + client.SetMaxRequests(update.cds.ServiceName, update.cds.MaxRequests) if err := b.edsLB.UpdateClientConnState(ccState); err != nil { b.logger.Errorf("xds: edsBalancer.UpdateClientConnState(%+v) returned error: %v", ccState, err) } diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index 0200793f7bdb..69918e5846bc 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -504,6 +504,7 @@ func (d *dropPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { } pr, err := d.p.Pick(info) if err != nil { + d.counter.EndRequest() return pr, err } oldDone := pr.Done From d00cc36723d8d438d44241236d0b82aa2c6c2598 Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Wed, 2 Dec 2020 12:35:05 -0800 Subject: [PATCH 21/28] Using atomics in counter --- .../balancer/cdsbalancer/cdsbalancer.go | 3 +- xds/internal/balancer/edsbalancer/eds.go | 6 ++-- xds/internal/balancer/edsbalancer/eds_impl.go | 20 ++++++------- .../balancer/edsbalancer/eds_impl_test.go | 2 +- xds/internal/balancer/edsbalancer/eds_test.go | 2 +- .../client/client_requests_counter.go | 28 ++++++++----------- .../client/client_requests_counter_test.go | 2 +- xds/internal/client/client_xds.go | 20 ++++--------- xds/internal/env/env.go | 2 +- 9 files changed, 35 insertions(+), 50 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 96da4548e0ee..3a276c034f2a 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -329,6 +329,8 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) { return } + client.SetMaxRequests(update.cds.ServiceName, update.cds.MaxRequests) + // The first good update from the watch API leads to the instantiation of an // edsBalancer. Further updates/errors are propagated to the existing // edsBalancer. @@ -352,7 +354,6 @@ func (b *cdsBalancer) handleWatchUpdate(update *watchUpdate) { ccState := balancer.ClientConnState{ BalancerConfig: lbCfg, } - client.SetMaxRequests(update.cds.ServiceName, update.cds.MaxRequests) if err := b.edsLB.UpdateClientConnState(ccState); err != nil { b.logger.Errorf("xds: edsBalancer.UpdateClientConnState(%+v) returned error: %v", ccState, err) } diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index 7f4ade9b8c15..a6d145960a8f 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -101,8 +101,8 @@ type edsBalancerImplInterface interface { handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) // updateState handle a balancer state update from the priority. updateState(priority priorityType, s balancer.State) - // updateConfig handles an update to the eds configuration. - updateConfig(edsConfig *EDSConfig) + // updateServiceRequestsCounter handles an update to the eds configuration. + updateServiceRequestsCounter(serviceName string) // close closes the eds balancer. close() } @@ -191,7 +191,7 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) { x.logger.Warningf("failed to update xds clients: %v", err) } - x.edsImpl.updateConfig(cfg) + x.edsImpl.updateServiceRequestsCounter(cfg.EDSServiceName) if x.config == nil { x.config = cfg diff --git a/xds/internal/balancer/edsbalancer/eds_impl.go b/xds/internal/balancer/edsbalancer/eds_impl.go index 69918e5846bc..837d08f496d5 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl.go +++ b/xds/internal/balancer/edsbalancer/eds_impl.go @@ -94,11 +94,11 @@ type edsBalancerImpl struct { subConnMu sync.Mutex subConnToPriority map[balancer.SubConn]priorityType - pickerMu sync.Mutex - dropConfig []xdsclient.OverloadDropConfig - drops []*dropper - innerState balancer.State // The state of the picker without drop support. - counter *client.ServiceRequestsCounter + pickerMu sync.Mutex + dropConfig []xdsclient.OverloadDropConfig + drops []*dropper + innerState balancer.State // The state of the picker without drop support. + serviceRequestsCounter *client.ServiceRequestsCounter } // newEDSBalancerImpl create a new edsBalancerImpl. @@ -173,7 +173,7 @@ func (edsImpl *edsBalancerImpl) updateDrops(dropConfig []xdsclient.OverloadDropC // Update picker with old inner picker, new drops. edsImpl.cc.UpdateState(balancer.State{ ConnectivityState: edsImpl.innerState.ConnectivityState, - Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.xdsClient.loadStore(), edsImpl.counter)}, + Picker: newDropPicker(edsImpl.innerState.Picker, newDrops, edsImpl.xdsClient.loadStore(), edsImpl.serviceRequestsCounter)}, ) } edsImpl.pickerMu.Unlock() @@ -393,12 +393,12 @@ func (edsImpl *edsBalancerImpl) handleSubConnStateChange(sc balancer.SubConn, s } // updateConfig handles changes to the circuit breaking configuration. -func (edsImpl *edsBalancerImpl) updateConfig(edsConfig *EDSConfig) { +func (edsImpl *edsBalancerImpl) updateServiceRequestsCounter(serviceName string) { if !env.CircuitBreakingSupport { return } - if edsImpl.counter == nil || edsImpl.counter.ServiceName != edsConfig.EDSServiceName { - edsImpl.counter = client.NewServiceRequestsCounter(edsConfig.EDSServiceName) + if edsImpl.serviceRequestsCounter == nil || edsImpl.serviceRequestsCounter.ServiceName != serviceName { + edsImpl.serviceRequestsCounter = client.GetServiceRequestsCounter(serviceName) } } @@ -416,7 +416,7 @@ func (edsImpl *edsBalancerImpl) updateState(priority priorityType, s balancer.St defer edsImpl.pickerMu.Unlock() edsImpl.innerState = s // Don't reset drops when it's a state change. - edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.xdsClient.loadStore(), edsImpl.counter)}) + edsImpl.cc.UpdateState(balancer.State{ConnectivityState: s.ConnectivityState, Picker: newDropPicker(s.Picker, edsImpl.drops, edsImpl.xdsClient.loadStore(), edsImpl.serviceRequestsCounter)}) } } diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 0601c68c6917..746155b2d521 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -560,7 +560,7 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { cc := testutils.NewTestClientConn(t) edsb := newEDSBalancerImpl(cc, nil, nil, nil) edsb.enqueueChildBalancerStateUpdate = edsb.updateState - edsb.updateConfig(&EDSConfig{EDSServiceName: "test"}) + edsb.updateServiceRequestsCounter("test") var maxRequests uint32 = 50 client.SetMaxRequests("test", &maxRequests) diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index 0914fa363456..084a083b63c7 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -109,7 +109,7 @@ func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) { func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {} -func (f *fakeEDSBalancer) updateConfig(edsConfig *EDSConfig) {} +func (f *fakeEDSBalancer) updateServiceRequestsCounter(serviceName string) {} func (f *fakeEDSBalancer) close() {} diff --git a/xds/internal/client/client_requests_counter.go b/xds/internal/client/client_requests_counter.go index 633891f4bb27..4ff41ad6e7e5 100644 --- a/xds/internal/client/client_requests_counter.go +++ b/xds/internal/client/client_requests_counter.go @@ -21,18 +21,17 @@ package client import ( "fmt" "sync" + "sync/atomic" ) -func init() { - src.services = make(map[string]*ServiceRequestsCounter) -} - type servicesRequestsCounter struct { mu sync.Mutex services map[string]*ServiceRequestsCounter } -var src servicesRequestsCounter +var src = &servicesRequestsCounter{ + services: make(map[string]*ServiceRequestsCounter), +} // ServiceRequestsCounter is used to track the total inflight requests for a // service with the provided name. @@ -41,18 +40,16 @@ type ServiceRequestsCounter struct { ServiceName string maxRequests uint32 numRequests uint32 - enableMax bool } -// NewServiceRequestsCounter creates a new ServiceRequestsCounter that is -// internally tracked by this package and returns a pointer to it. If one with -// the serviceName already exists, returns a pointer to it. -func NewServiceRequestsCounter(serviceName string) *ServiceRequestsCounter { +// GetServiceRequestsCounter returns the ServiceRequestsCounter with the +// provided serviceName. If one does not exist, it creates it. +func GetServiceRequestsCounter(serviceName string) *ServiceRequestsCounter { src.mu.Lock() defer src.mu.Unlock() c, ok := src.services[serviceName] if !ok { - c = &ServiceRequestsCounter{ServiceName: serviceName} + c = &ServiceRequestsCounter{ServiceName: serviceName, maxRequests: 1024} src.services[serviceName] = c } return c @@ -68,10 +65,9 @@ func SetMaxRequests(serviceName string, maxRequests *uint32) *ServiceRequestsCou src.services[serviceName] = c } if maxRequests != nil { - c.enableMax = true c.maxRequests = *maxRequests } else { - c.enableMax = false + c.maxRequests = 1024 } return c } @@ -79,12 +75,10 @@ func SetMaxRequests(serviceName string, maxRequests *uint32) *ServiceRequestsCou // StartRequest starts a request for a service, incrementing its number of // requests by 1. Returns an error if the max number of requests is exceeded. func (c *ServiceRequestsCounter) StartRequest() error { - c.mu.Lock() - defer c.mu.Unlock() - if c.enableMax && c.numRequests+1 > c.maxRequests { + if atomic.LoadUint32(&c.numRequests)+1 > atomic.LoadUint32(&c.maxRequests) { return fmt.Errorf("max requests %v exceeded on service %v", c.maxRequests, c.ServiceName) } - c.numRequests++ + atomic.AddUint32(&c.numRequests, 1) return nil } diff --git a/xds/internal/client/client_requests_counter_test.go b/xds/internal/client/client_requests_counter_test.go index c6fce23935e3..b97694440515 100644 --- a/xds/internal/client/client_requests_counter_test.go +++ b/xds/internal/client/client_requests_counter_test.go @@ -33,7 +33,7 @@ type counterTest struct { } func testCounter(t *testing.T, test counterTest) { - counter := client.NewServiceRequestsCounter(test.name) + counter := client.GetServiceRequestsCounter(test.name) client.SetMaxRequests(test.name, &test.maxRequests) requestsStartedWg := sync.WaitGroup{} requestsStartedWg.Add(1) diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go index 22b85257ec10..612a8198bd97 100644 --- a/xds/internal/client/client_xds.go +++ b/xds/internal/client/client_xds.go @@ -309,12 +309,11 @@ func validateCluster(cluster *v3clusterpb.Cluster) (ClusterUpdate, error) { if err != nil { return emptyUpdate, err } - mr := circuitBreakersFromCluster(cluster) return ClusterUpdate{ ServiceName: cluster.GetEdsClusterConfig().GetServiceName(), EnableLRS: cluster.GetLrsServer().GetSelf() != nil, SecurityCfg: sc, - MaxRequests: mr, + MaxRequests: circuitBreakersFromCluster(cluster), }, nil } @@ -388,24 +387,15 @@ func securityConfigFromCluster(cluster *v3clusterpb.Cluster) (*SecurityConfig, e // the received cluster resource. Returns nil if no CircuitBreakers or no // Thresholds in CircuitBreakers. func circuitBreakersFromCluster(cluster *v3clusterpb.Cluster) *uint32 { - circuitBreakers := cluster.GetCircuitBreakers() - if circuitBreakers == nil { - return nil - } - thresholds := circuitBreakers.GetThresholds() - if thresholds == nil { - return nil - } - for _, threshold := range thresholds { - if threshold.GetPriority().String() != "DEFAULT" { + for _, threshold := range cluster.GetCircuitBreakers().GetThresholds() { + if threshold.GetPriority().String() != v3corepb.RoutingPriority_DEFAULT.String() { continue } maxRequestsPb := threshold.GetMaxRequests() - var maxRequests uint32 = 1024 if maxRequestsPb != nil { - maxRequests = maxRequestsPb.GetValue() + maxRequests := maxRequestsPb.GetValue() + return &maxRequests } - return &maxRequests } return nil } diff --git a/xds/internal/env/env.go b/xds/internal/env/env.go index 577c6737c4d4..c0fa0e65b7a3 100644 --- a/xds/internal/env/env.go +++ b/xds/internal/env/env.go @@ -43,5 +43,5 @@ var ( // CircuitBreakingSupport indicates whether circuit breaking support is // enabled, which can be done by setting the environment variable // "GRPC_XDS_EXPERIMENTAL_CIRCUIT_BREAKING" to "true". - CircuitBreakingSupport = strings.EqualFold(os.Getenv(xdsV3SupportEnv), "true") + CircuitBreakingSupport = strings.EqualFold(os.Getenv(circuitBreakingSupportEnv), "true") ) From 62648b9a2fe7b27a65f51b030a13e15cc18f2e22 Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Wed, 2 Dec 2020 13:04:38 -0800 Subject: [PATCH 22/28] added atomic to EndRequest --- xds/internal/client/client_requests_counter.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/xds/internal/client/client_requests_counter.go b/xds/internal/client/client_requests_counter.go index 4ff41ad6e7e5..2560c79cb035 100644 --- a/xds/internal/client/client_requests_counter.go +++ b/xds/internal/client/client_requests_counter.go @@ -36,7 +36,6 @@ var src = &servicesRequestsCounter{ // ServiceRequestsCounter is used to track the total inflight requests for a // service with the provided name. type ServiceRequestsCounter struct { - mu sync.Mutex ServiceName string maxRequests uint32 numRequests uint32 @@ -85,7 +84,5 @@ func (c *ServiceRequestsCounter) StartRequest() error { // EndRequest ends a request for a service, decrementing its number of requests // by 1. func (c *ServiceRequestsCounter) EndRequest() { - c.mu.Lock() - defer c.mu.Unlock() - c.numRequests-- + atomic.AddUint32(&c.numRequests, ^uint32(0)) } From abd41f84bc1c61c94c5bbad34bf4753b553f459d Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Fri, 4 Dec 2020 12:59:44 -0800 Subject: [PATCH 23/28] Added testing --- .../balancer/cdsbalancer/cdsbalancer_test.go | 36 +++++ xds/internal/balancer/edsbalancer/eds.go | 2 +- .../balancer/edsbalancer/eds_impl_test.go | 2 - xds/internal/balancer/edsbalancer/eds_test.go | 6 +- xds/internal/client/client_cds_test.go | 4 + .../client/client_requests_counter.go | 2 +- .../client/client_requests_counter_test.go | 149 +++++++++++++----- xds/internal/client/client_xds.go | 13 +- 8 files changed, 164 insertions(+), 50 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index ccd1699afaac..3458ca68ba84 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/xds/internal/balancer/edsbalancer" + "google.golang.org/grpc/xds/internal/client" xdsclient "google.golang.org/grpc/xds/internal/client" xdstestutils "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/testutils/fakeclient" @@ -574,6 +575,41 @@ func (s) TestUpdateSubConnState(t *testing.T) { } } +// TestCircuitBreaking verifies that the CDS balancer correctly updates a +// service's counter on watch updates. +func (s) TestCircuitBreaking(t *testing.T) { + // This creates a CDS balancer, pushes a ClientConnState update with a fake + // xdsClient, and makes sure that the CDS balancer registers a watch on the + // provided xdsClient. + xdsC, cdsB, edsB, _, cancel := setupWithXDSCreds(t) + defer func() { + cancel() + cdsB.Close() + }() + + // Here we invoke the watch callback registered on the fake xdsClient. This + // will trigger the watch handler on the CDS balancer, which will update + // the service's counter with the new max requests. + var maxRequests uint32 = 1 + cdsUpdate := xdsclient.ClusterUpdate{ServiceName: serviceName, MaxRequests: &maxRequests} + wantCCS := edsCCS(serviceName, false) + ctx, ctxCancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer ctxCancel() + if err := invokeWatchCbAndWait(ctx, xdsC, cdsWatchInfo{cdsUpdate, nil}, wantCCS, edsB); err != nil { + t.Fatal(err) + } + + // Since the counter's max requests was set to 1, the first request should + // succeed and the second should fail. + counter := client.GetServiceRequestsCounter(serviceName) + if err := counter.StartRequest(); err != nil { + t.Fatal(err) + } + if err := counter.StartRequest(); err == nil { + t.Fatal("unexpected success on start request over max") + } +} + // TestClose verifies the Close() method in the the CDS balancer. func (s) TestClose(t *testing.T) { // This creates a CDS balancer, pushes a ClientConnState update with a fake diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index a6d145960a8f..f80379755539 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -101,7 +101,7 @@ type edsBalancerImplInterface interface { handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) // updateState handle a balancer state update from the priority. updateState(priority priorityType, s balancer.State) - // updateServiceRequestsCounter handles an update to the eds configuration. + // updateServiceRequestsCounter handles an update to the service name. updateServiceRequestsCounter(serviceName string) // close closes the eds balancer. close() diff --git a/xds/internal/balancer/edsbalancer/eds_impl_test.go b/xds/internal/balancer/edsbalancer/eds_impl_test.go index 746155b2d521..332023eef469 100644 --- a/xds/internal/balancer/edsbalancer/eds_impl_test.go +++ b/xds/internal/balancer/edsbalancer/eds_impl_test.go @@ -577,8 +577,6 @@ func (s) TestEDS_CircuitBreaking(t *testing.T) { p := <-cc.NewPickerCh for i := 0; i < 100; i++ { pr, err := p.Pick(balancer.PickInfo{}) - // TODO: the dropping algorithm needs a design. When the dropping algorithm - // is fixed, this test also needs fix. if i < 50 && err != nil { t.Errorf("The first 50%% picks should be non-drops, got error %v", err) } else if i > 50 && err == nil { diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index 084a083b63c7..8d653818948c 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -93,6 +93,7 @@ type fakeEDSBalancer struct { childPolicy *testutils.Channel subconnStateChange *testutils.Channel edsUpdate *testutils.Channel + serviceName *testutils.Channel } func (f *fakeEDSBalancer) handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) { @@ -109,7 +110,9 @@ func (f *fakeEDSBalancer) handleEDSResponse(edsResp xdsclient.EndpointsUpdate) { func (f *fakeEDSBalancer) updateState(priority priorityType, s balancer.State) {} -func (f *fakeEDSBalancer) updateServiceRequestsCounter(serviceName string) {} +func (f *fakeEDSBalancer) updateServiceRequestsCounter(serviceName string) { + f.serviceName.Send(serviceName) +} func (f *fakeEDSBalancer) close() {} @@ -164,6 +167,7 @@ func newFakeEDSBalancer(cc balancer.ClientConn) edsBalancerImplInterface { childPolicy: testutils.NewChannelWithSize(10), subconnStateChange: testutils.NewChannelWithSize(10), edsUpdate: testutils.NewChannelWithSize(10), + serviceName: testutils.NewChannelWithSize(10), } } diff --git a/xds/internal/client/client_cds_test.go b/xds/internal/client/client_cds_test.go index accc048b76fa..4d0801dd8058 100644 --- a/xds/internal/client/client_cds_test.go +++ b/xds/internal/client/client_cds_test.go @@ -31,6 +31,7 @@ import ( anypb "github.com/golang/protobuf/ptypes/any" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc/xds/internal/env" "google.golang.org/grpc/xds/internal/version" "google.golang.org/protobuf/types/known/wrapperspb" ) @@ -206,6 +207,9 @@ func (s) TestValidateCluster_Success(t *testing.T) { }, } + origCircuitBreakingSupport := env.CircuitBreakingSupport + env.CircuitBreakingSupport = true + defer func() { env.CircuitBreakingSupport = origCircuitBreakingSupport }() for _, test := range tests { t.Run(test.name, func(t *testing.T) { update, err := validateCluster(test.cluster) diff --git a/xds/internal/client/client_requests_counter.go b/xds/internal/client/client_requests_counter.go index 2560c79cb035..b431226d0089 100644 --- a/xds/internal/client/client_requests_counter.go +++ b/xds/internal/client/client_requests_counter.go @@ -74,7 +74,7 @@ func SetMaxRequests(serviceName string, maxRequests *uint32) *ServiceRequestsCou // StartRequest starts a request for a service, incrementing its number of // requests by 1. Returns an error if the max number of requests is exceeded. func (c *ServiceRequestsCounter) StartRequest() error { - if atomic.LoadUint32(&c.numRequests)+1 > atomic.LoadUint32(&c.maxRequests) { + if atomic.LoadUint32(&c.numRequests) >= atomic.LoadUint32(&c.maxRequests) { return fmt.Errorf("max requests %v exceeded on service %v", c.maxRequests, c.ServiceName) } atomic.AddUint32(&c.numRequests, 1) diff --git a/xds/internal/client/client_requests_counter_test.go b/xds/internal/client/client_requests_counter_test.go index b97694440515..04bdb221d4fc 100644 --- a/xds/internal/client/client_requests_counter_test.go +++ b/xds/internal/client/client_requests_counter_test.go @@ -16,13 +16,12 @@ * */ -package client_test +package client import ( "sync" + "sync/atomic" "testing" - - "google.golang.org/grpc/xds/internal/client" ) type counterTest struct { @@ -32,63 +31,131 @@ type counterTest struct { errorExpected bool } +var tests = []counterTest{ + { + name: "does-not-exceed-max-requests", + maxRequests: 1024, + numRequests: 1024, + errorExpected: false, + }, + { + name: "exceeds-max-requests", + maxRequests: 32, + numRequests: 64, + errorExpected: true, + }, +} + +func counterTestInit() { + src = &servicesRequestsCounter{ + services: make(map[string]*ServiceRequestsCounter), + } +} + func testCounter(t *testing.T, test counterTest) { - counter := client.GetServiceRequestsCounter(test.name) - client.SetMaxRequests(test.name, &test.maxRequests) - requestsStartedWg := sync.WaitGroup{} - requestsStartedWg.Add(1) + SetMaxRequests(test.name, &test.maxRequests) + requestsStarted := make(chan struct{}) requestsSent := sync.WaitGroup{} requestsSent.Add(int(test.numRequests)) - requestsDoneWg := sync.WaitGroup{} - requestsDoneWg.Add(int(test.numRequests)) - var firstError error = nil - errorMu := sync.Mutex{} + requestsDone := sync.WaitGroup{} + requestsDone.Add(int(test.numRequests)) + var lastError atomic.Value + var successes, errors uint32 for i := 0; i < int(test.numRequests); i++ { go func() { - defer requestsDoneWg.Done() - if err := counter.StartRequest(); err != nil { - errorMu.Lock() - defer errorMu.Unlock() - if firstError == nil { - firstError = err - } - requestsSent.Done() - return + counter := GetServiceRequestsCounter(test.name) + defer requestsDone.Done() + err := counter.StartRequest() + if err == nil { + atomic.AddUint32(&successes, 1) + } else { + atomic.AddUint32(&errors, 1) + lastError.Store(err) } requestsSent.Done() - requestsStartedWg.Wait() - counter.EndRequest() + if err == nil { + <-requestsStarted + counter.EndRequest() + } }() } requestsSent.Wait() - requestsStartedWg.Done() - requestsDoneWg.Wait() - if test.errorExpected && firstError == nil { + close(requestsStarted) + requestsDone.Wait() + loadedError := lastError.Load() + if test.errorExpected && loadedError == nil { t.Error("no error when error expected") } - if !test.errorExpected && firstError != nil { - t.Errorf("error starting request: %v", firstError) + if !test.errorExpected && loadedError != nil { + t.Errorf("error starting request: %v", loadedError.(error)) + } + expectedSuccesses := test.numRequests + var expectedErrors uint32 + if test.maxRequests < test.numRequests { + expectedSuccesses = test.maxRequests + expectedErrors = test.numRequests - test.maxRequests + } + if successes != expectedSuccesses || errors != expectedErrors { + t.Errorf("unexpected number of (successes, errors), expected (%v, %v), encountered (%v, %v)", expectedSuccesses, expectedErrors, successes, errors) } } func (s) TestRequestsCounter(t *testing.T) { - tests := []counterTest{ - { - name: "does-not-exceed-max-requests", - maxRequests: 1024, - numRequests: 1024, - errorExpected: false, - }, - { - name: "exceeds-max-requests", - maxRequests: 32, - numRequests: 64, - errorExpected: true, - }, - } + counterTestInit() for _, test := range tests { t.Run(test.name, func(t *testing.T) { testCounter(t, test) }) } } + +func (s) TestGetServiceRequestsCounter(t *testing.T) { + counterTestInit() + for _, test := range tests { + counterA := GetServiceRequestsCounter(test.name) + counterB := GetServiceRequestsCounter(test.name) + if counterA != counterB { + t.Errorf("counter %v %v != counter %v %v", counterA, *counterA, counterB, *counterB) + } + } +} + +func startRequests(t *testing.T, n uint32, max uint32, counter *ServiceRequestsCounter) { + if counterB := SetMaxRequests(counter.ServiceName, &max); counterB != counter { + t.Fatalf("counter %v %v != counter %v %v", counter, *counter, counterB, *counterB) + } + for i := uint32(0); i < n; i++ { + if err := counter.StartRequest(); err != nil { + t.Fatalf("error starting intial request: %v", err) + } + } +} + +func (s) TestSetMaxRequestsIncreased(t *testing.T) { + counterTestInit() + const serviceName string = "set-max-requests-increased" + var initialMax uint32 = 16 + counter := GetServiceRequestsCounter(serviceName) + startRequests(t, initialMax, initialMax, counter) + if err := counter.StartRequest(); err == nil { + t.Fatal("unexpected success on start request after max met") + } + newMax := initialMax + 1 + SetMaxRequests(counter.ServiceName, &newMax) + if err := counter.StartRequest(); err != nil { + t.Fatalf("unexpected error on start request after max increased: %v", err) + } +} + +func (s) TestSetMaxRequestsDecreased(t *testing.T) { + counterTestInit() + const serviceName string = "set-max-requests-decreased" + var initialMax uint32 = 16 + counter := GetServiceRequestsCounter(serviceName) + startRequests(t, initialMax-1, initialMax, counter) + newMax := initialMax - 1 + SetMaxRequests(counter.ServiceName, &newMax) + if err := counter.StartRequest(); err == nil { + t.Fatalf("unexpected success on start request after max decreased: %v", err) + } +} diff --git a/xds/internal/client/client_xds.go b/xds/internal/client/client_xds.go index 612a8198bd97..7767af480698 100644 --- a/xds/internal/client/client_xds.go +++ b/xds/internal/client/client_xds.go @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/xds/internal" + "google.golang.org/grpc/xds/internal/env" "google.golang.org/grpc/xds/internal/version" ) @@ -387,15 +388,19 @@ func securityConfigFromCluster(cluster *v3clusterpb.Cluster) (*SecurityConfig, e // the received cluster resource. Returns nil if no CircuitBreakers or no // Thresholds in CircuitBreakers. func circuitBreakersFromCluster(cluster *v3clusterpb.Cluster) *uint32 { + if !env.CircuitBreakingSupport { + return nil + } for _, threshold := range cluster.GetCircuitBreakers().GetThresholds() { - if threshold.GetPriority().String() != v3corepb.RoutingPriority_DEFAULT.String() { + if threshold.GetPriority() != v3corepb.RoutingPriority_DEFAULT { continue } maxRequestsPb := threshold.GetMaxRequests() - if maxRequestsPb != nil { - maxRequests := maxRequestsPb.GetValue() - return &maxRequests + if maxRequestsPb == nil { + return nil } + maxRequests := maxRequestsPb.GetValue() + return &maxRequests } return nil } From fad40e7a32b54937802539fd21a64d3f44e56637 Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Fri, 4 Dec 2020 13:12:17 -0800 Subject: [PATCH 24/28] merge fix --- xds/internal/balancer/edsbalancer/eds.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index 615d48d19b12..b209b2223124 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -212,7 +212,7 @@ func (x *edsBalancer) handleGRPCUpdate(update interface{}) { if err := x.handleServiceConfigUpdate(cfg); err != nil { x.logger.Warningf("failed to update xDS client: %v", err) - } + } x.edsImpl.updateServiceRequestsCounter(cfg.EDSServiceName) From 67e95630328bde818bdecad40aa9e470610675cd Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Fri, 4 Dec 2020 13:15:26 -0800 Subject: [PATCH 25/28] merge fix 2 --- xds/internal/client/client_requests_counter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/client/client_requests_counter_test.go b/xds/internal/client/client_requests_counter_test.go index 04bdb221d4fc..11291ba70d32 100644 --- a/xds/internal/client/client_requests_counter_test.go +++ b/xds/internal/client/client_requests_counter_test.go @@ -126,7 +126,7 @@ func startRequests(t *testing.T, n uint32, max uint32, counter *ServiceRequestsC } for i := uint32(0); i < n; i++ { if err := counter.StartRequest(); err != nil { - t.Fatalf("error starting intial request: %v", err) + t.Fatalf("error starting initial request: %v", err) } } } From 3406b649416773fec2af99269587d5f769ca958c Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Fri, 4 Dec 2020 13:29:36 -0800 Subject: [PATCH 26/28] test fix --- xds/internal/balancer/cdsbalancer/cdsbalancer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go index 3458ca68ba84..9d7c4a7bff55 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_test.go @@ -608,6 +608,7 @@ func (s) TestCircuitBreaking(t *testing.T) { if err := counter.StartRequest(); err == nil { t.Fatal("unexpected success on start request over max") } + counter.EndRequest() } // TestClose verifies the Close() method in the the CDS balancer. From 1c16e6862fdf2db64190b1aed75234ff29776b20 Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Fri, 4 Dec 2020 15:47:24 -0800 Subject: [PATCH 27/28] Reworking testing --- xds/internal/balancer/edsbalancer/eds.go | 3 +- xds/internal/balancer/edsbalancer/eds_test.go | 18 ++++++ .../client/client_requests_counter.go | 9 +-- .../client/client_requests_counter_test.go | 59 +++++++++---------- 4 files changed, 54 insertions(+), 35 deletions(-) diff --git a/xds/internal/balancer/edsbalancer/eds.go b/xds/internal/balancer/edsbalancer/eds.go index b209b2223124..12a1251abe9f 100644 --- a/xds/internal/balancer/edsbalancer/eds.go +++ b/xds/internal/balancer/edsbalancer/eds.go @@ -113,7 +113,8 @@ type edsBalancerImplInterface interface { handleSubConnStateChange(sc balancer.SubConn, state connectivity.State) // updateState handle a balancer state update from the priority. updateState(priority priorityType, s balancer.State) - // updateServiceRequestsCounter handles an update to the service name. + // updateServiceRequestsCounter updates the service requests counter to the + // one for the given service name. updateServiceRequestsCounter(serviceName string) // close closes the eds balancer. close() diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index 7481c1f0e4cc..fdeb71f52c29 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -174,6 +174,18 @@ func (f *fakeEDSBalancer) waitForEDSResponse(ctx context.Context, wantUpdate xds return nil } +func (f *fakeEDSBalancer) waitForCounterUpdate(ctx context.Context, wantServiceName string) error { + val, err := f.serviceName.Receive(ctx) + if err != nil { + return err + } + gotServiceName := val.(string) + if gotServiceName != wantServiceName { + return fmt.Errorf("got serviceName %v, want %v", gotServiceName, wantServiceName) + } + return nil +} + func newFakeEDSBalancer(cc balancer.ClientConn) edsBalancerImplInterface { return &fakeEDSBalancer{ cc: cc, @@ -313,6 +325,9 @@ func (s) TestConfigChildPolicyUpdate(t *testing.T) { if err := edsLB.waitForChildPolicy(ctx, lbCfgA); err != nil { t.Fatal(err) } + if err := edsLB.waitForCounterUpdate(ctx, testServiceName); err != nil { + t.Fatal(err) + } lbCfgB := &loadBalancingConfig{ Name: fakeBalancerB, @@ -329,6 +344,9 @@ func (s) TestConfigChildPolicyUpdate(t *testing.T) { if err := edsLB.waitForChildPolicy(ctx, lbCfgB); err != nil { t.Fatal(err) } + if err := edsLB.waitForCounterUpdate(ctx, testServiceName); err != nil { + t.Fatal(err) + } } // TestSubConnStateChange verifies if the top-level edsBalancer passes on diff --git a/xds/internal/client/client_requests_counter.go b/xds/internal/client/client_requests_counter.go index b431226d0089..1e28fc003ff3 100644 --- a/xds/internal/client/client_requests_counter.go +++ b/xds/internal/client/client_requests_counter.go @@ -24,6 +24,8 @@ import ( "sync/atomic" ) +const defaultMaxRequests uint32 = 1024 + type servicesRequestsCounter struct { mu sync.Mutex services map[string]*ServiceRequestsCounter @@ -48,14 +50,14 @@ func GetServiceRequestsCounter(serviceName string) *ServiceRequestsCounter { defer src.mu.Unlock() c, ok := src.services[serviceName] if !ok { - c = &ServiceRequestsCounter{ServiceName: serviceName, maxRequests: 1024} + c = &ServiceRequestsCounter{ServiceName: serviceName, maxRequests: defaultMaxRequests} src.services[serviceName] = c } return c } // SetMaxRequests updates the max requests for a service's counter. -func SetMaxRequests(serviceName string, maxRequests *uint32) *ServiceRequestsCounter { +func SetMaxRequests(serviceName string, maxRequests *uint32) { src.mu.Lock() defer src.mu.Unlock() c, ok := src.services[serviceName] @@ -66,9 +68,8 @@ func SetMaxRequests(serviceName string, maxRequests *uint32) *ServiceRequestsCou if maxRequests != nil { c.maxRequests = *maxRequests } else { - c.maxRequests = 1024 + c.maxRequests = defaultMaxRequests } - return c } // StartRequest starts a request for a service, incrementing its number of diff --git a/xds/internal/client/client_requests_counter_test.go b/xds/internal/client/client_requests_counter_test.go index 11291ba70d32..165870a07c19 100644 --- a/xds/internal/client/client_requests_counter_test.go +++ b/xds/internal/client/client_requests_counter_test.go @@ -19,34 +19,38 @@ package client import ( + "fmt" "sync" "sync/atomic" "testing" ) type counterTest struct { - name string - maxRequests uint32 - numRequests uint32 - errorExpected bool + name string + maxRequests uint32 + numRequests uint32 + expectedSuccesses uint32 + expectedErrors uint32 } var tests = []counterTest{ { - name: "does-not-exceed-max-requests", - maxRequests: 1024, - numRequests: 1024, - errorExpected: false, + name: "does-not-exceed-max-requests", + maxRequests: 1024, + numRequests: 1024, + expectedSuccesses: 1024, + expectedErrors: 0, }, { - name: "exceeds-max-requests", - maxRequests: 32, - numRequests: 64, - errorExpected: true, + name: "exceeds-max-requests", + maxRequests: 32, + numRequests: 64, + expectedSuccesses: 32, + expectedErrors: 32, }, } -func counterTestInit() { +func resetServiceRequestsCounter() { src = &servicesRequestsCounter{ services: make(map[string]*ServiceRequestsCounter), } @@ -83,25 +87,22 @@ func testCounter(t *testing.T, test counterTest) { close(requestsStarted) requestsDone.Wait() loadedError := lastError.Load() - if test.errorExpected && loadedError == nil { + if test.expectedErrors > 0 && loadedError == nil { t.Error("no error when error expected") } - if !test.errorExpected && loadedError != nil { - t.Errorf("error starting request: %v", loadedError.(error)) + if loadedError != nil { + fmt.Println(loadedError.(error)) } - expectedSuccesses := test.numRequests - var expectedErrors uint32 - if test.maxRequests < test.numRequests { - expectedSuccesses = test.maxRequests - expectedErrors = test.numRequests - test.maxRequests + if test.expectedErrors == 0 && loadedError != nil { + t.Errorf("error starting request: %v", loadedError.(error)) } - if successes != expectedSuccesses || errors != expectedErrors { - t.Errorf("unexpected number of (successes, errors), expected (%v, %v), encountered (%v, %v)", expectedSuccesses, expectedErrors, successes, errors) + if successes != test.expectedSuccesses || errors != test.expectedErrors { + t.Errorf("unexpected number of (successes, errors), expected (%v, %v), encountered (%v, %v)", test.expectedSuccesses, test.expectedErrors, successes, errors) } } func (s) TestRequestsCounter(t *testing.T) { - counterTestInit() + resetServiceRequestsCounter() for _, test := range tests { t.Run(test.name, func(t *testing.T) { testCounter(t, test) @@ -110,7 +111,7 @@ func (s) TestRequestsCounter(t *testing.T) { } func (s) TestGetServiceRequestsCounter(t *testing.T) { - counterTestInit() + resetServiceRequestsCounter() for _, test := range tests { counterA := GetServiceRequestsCounter(test.name) counterB := GetServiceRequestsCounter(test.name) @@ -121,9 +122,7 @@ func (s) TestGetServiceRequestsCounter(t *testing.T) { } func startRequests(t *testing.T, n uint32, max uint32, counter *ServiceRequestsCounter) { - if counterB := SetMaxRequests(counter.ServiceName, &max); counterB != counter { - t.Fatalf("counter %v %v != counter %v %v", counter, *counter, counterB, *counterB) - } + SetMaxRequests(counter.ServiceName, &max) for i := uint32(0); i < n; i++ { if err := counter.StartRequest(); err != nil { t.Fatalf("error starting initial request: %v", err) @@ -132,7 +131,7 @@ func startRequests(t *testing.T, n uint32, max uint32, counter *ServiceRequestsC } func (s) TestSetMaxRequestsIncreased(t *testing.T) { - counterTestInit() + resetServiceRequestsCounter() const serviceName string = "set-max-requests-increased" var initialMax uint32 = 16 counter := GetServiceRequestsCounter(serviceName) @@ -148,7 +147,7 @@ func (s) TestSetMaxRequestsIncreased(t *testing.T) { } func (s) TestSetMaxRequestsDecreased(t *testing.T) { - counterTestInit() + resetServiceRequestsCounter() const serviceName string = "set-max-requests-decreased" var initialMax uint32 = 16 counter := GetServiceRequestsCounter(serviceName) From 3d4130d8262afee2d9a4ff1459b2652b39af2dba Mon Sep 17 00:00:00 2001 From: GarrettGutierrez1 Date: Tue, 8 Dec 2020 12:32:41 -0800 Subject: [PATCH 28/28] Added counter update test --- xds/internal/balancer/edsbalancer/eds_test.go | 29 +++++++++++++++++++ .../client/client_requests_counter_test.go | 4 --- 2 files changed, 29 insertions(+), 4 deletions(-) diff --git a/xds/internal/balancer/edsbalancer/eds_test.go b/xds/internal/balancer/edsbalancer/eds_test.go index fdeb71f52c29..3ee19a00debe 100644 --- a/xds/internal/balancer/edsbalancer/eds_test.go +++ b/xds/internal/balancer/edsbalancer/eds_test.go @@ -345,6 +345,8 @@ func (s) TestConfigChildPolicyUpdate(t *testing.T) { t.Fatal(err) } if err := edsLB.waitForCounterUpdate(ctx, testServiceName); err != nil { + // Counter is updated even though the service name didn't change. The + // eds_impl will compare the service names, and skip if it didn't change. t.Fatal(err) } } @@ -590,6 +592,33 @@ func (s) TestClientWatchEDS(t *testing.T) { } } +// TestCounterUpdate verifies that the counter update is triggered with the +// service name from an update's config. +func (s) TestCounterUpdate(t *testing.T) { + edsLBCh := testutils.NewChannel() + _, cleanup := setup(edsLBCh) + defer cleanup() + + builder := balancer.Get(edsName) + edsB := builder.Build(newNoopTestClientConn(), balancer.BuildOptions{Target: resolver.Target{Endpoint: testServiceName}}) + if edsB == nil { + t.Fatalf("builder.Build(%s) failed and returned nil", edsName) + } + defer edsB.Close() + + // Update should trigger counter update with provided service name. + if err := edsB.UpdateClientConnState(balancer.ClientConnState{ + BalancerConfig: &EDSConfig{EDSServiceName: "foobar-1"}, + }); err != nil { + t.Fatal(err) + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := edsB.(*edsBalancer).edsImpl.(*fakeEDSBalancer).waitForCounterUpdate(ctx, "foobar-1"); err != nil { + t.Fatal(err) + } +} + func (s) TestBalancerConfigParsing(t *testing.T) { const testEDSName = "eds.service" var testLRSName = "lrs.server" diff --git a/xds/internal/client/client_requests_counter_test.go b/xds/internal/client/client_requests_counter_test.go index 165870a07c19..09295b982ecd 100644 --- a/xds/internal/client/client_requests_counter_test.go +++ b/xds/internal/client/client_requests_counter_test.go @@ -19,7 +19,6 @@ package client import ( - "fmt" "sync" "sync/atomic" "testing" @@ -90,9 +89,6 @@ func testCounter(t *testing.T, test counterTest) { if test.expectedErrors > 0 && loadedError == nil { t.Error("no error when error expected") } - if loadedError != nil { - fmt.Println(loadedError.(error)) - } if test.expectedErrors == 0 && loadedError != nil { t.Errorf("error starting request: %v", loadedError.(error)) }