From a671967dfbaab779d37fd7e597d9248f13806087 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Thu, 23 Sep 2021 11:09:09 -0700 Subject: [PATCH] backport PRs to v1.41.x (#4810) * xds: fix parent balancers to handle Idle children (#4801) * transport: fix log spam from Server Authentication Handshake errors (#4798) --- internal/transport/http2_server.go | 8 ++- server.go | 7 +- .../clustermanager/balancerstateaggregator.go | 9 ++- .../clustermanager/clustermanager_test.go | 65 +++++++++++++++++++ .../weightedaggregator/aggregator.go | 8 ++- .../weightedtarget/weightedtarget_test.go | 61 +++++++++++++++++ 6 files changed, 152 insertions(+), 6 deletions(-) diff --git a/internal/transport/http2_server.go b/internal/transport/http2_server.go index cd0ebed98845..19c13e041d3b 100644 --- a/internal/transport/http2_server.go +++ b/internal/transport/http2_server.go @@ -139,9 +139,11 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, var err error conn, authInfo, err = config.Credentials.ServerHandshake(rawConn) if err != nil { - // ErrConnDispatched means that the connection was dispatched away from - // gRPC; those connections should be left open. - if err == credentials.ErrConnDispatched { + // ErrConnDispatched means that the connection was dispatched away + // from gRPC; those connections should be left open. io.EOF means + // the connection was closed before handshaking completed, which can + // happen naturally from probers. Return these errors directly. + if err == credentials.ErrConnDispatched || err == io.EOF { return nil, err } return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) diff --git a/server.go b/server.go index d6155c0e8543..557f29559de1 100644 --- a/server.go +++ b/server.go @@ -887,7 +887,12 @@ func (s *Server) newHTTP2Transport(c net.Conn) transport.ServerTransport { if err != credentials.ErrConnDispatched { c.Close() } - channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err) + // Don't log on ErrConnDispatched and io.EOF to prevent log spam. + if err != credentials.ErrConnDispatched { + if err != io.EOF { + channelz.Warning(logger, s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err) + } + } return nil } diff --git a/xds/internal/balancer/clustermanager/balancerstateaggregator.go b/xds/internal/balancer/clustermanager/balancerstateaggregator.go index 35eb86c3590c..6e0e03299f95 100644 --- a/xds/internal/balancer/clustermanager/balancerstateaggregator.go +++ b/xds/internal/balancer/clustermanager/balancerstateaggregator.go @@ -183,13 +183,18 @@ func (bsa *balancerStateAggregator) build() balancer.State { // handling the special connecting after ready, as in UpdateState(). Then a // function to calculate the aggregated connectivity state as in this // function. - var readyN, connectingN int + // + // TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated + // state. + var readyN, connectingN, idleN int for _, ps := range bsa.idToPickerState { switch ps.stateToAggregate { case connectivity.Ready: readyN++ case connectivity.Connecting: connectingN++ + case connectivity.Idle: + idleN++ } } var aggregatedState connectivity.State @@ -198,6 +203,8 @@ func (bsa *balancerStateAggregator) build() balancer.State { aggregatedState = connectivity.Ready case connectingN > 0: aggregatedState = connectivity.Connecting + case idleN > 0: + aggregatedState = connectivity.Idle default: aggregatedState = connectivity.TransientFailure } diff --git a/xds/internal/balancer/clustermanager/clustermanager_test.go b/xds/internal/balancer/clustermanager/clustermanager_test.go index a40d954ad64f..d3475ea3f5d8 100644 --- a/xds/internal/balancer/clustermanager/clustermanager_test.go +++ b/xds/internal/balancer/clustermanager/clustermanager_test.go @@ -565,3 +565,68 @@ func TestClusterManagerForwardsBalancerBuildOptions(t *testing.T) { t.Fatal(err2) } } + +const initIdleBalancerName = "test-init-Idle-balancer" + +var errTestInitIdle = fmt.Errorf("init Idle balancer error 0") + +func init() { + stub.Register(initIdleBalancerName, stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error { + bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{}) + return nil + }, + UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { + err := fmt.Errorf("wrong picker error") + if state.ConnectivityState == connectivity.Idle { + err = errTestInitIdle + } + bd.ClientConn.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &testutils.TestConstPicker{Err: err}, + }) + }, + }) +} + +// TestInitialIdle covers the case that if the child reports Idle, the overall +// state will be Idle. +func TestInitialIdle(t *testing.T) { + cc := testutils.NewTestClientConn(t) + rtb := rtBuilder.Build(cc, balancer.BuildOptions{}) + + configJSON1 := `{ +"children": { + "cds:cluster_1":{ "childPolicy": [{"test-init-Idle-balancer":""}] } +} +}` + + config1, err := rtParser.ParseConfig([]byte(configJSON1)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + + // Send the config, and an address with hierarchy path ["cluster_1"]. + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0], Attributes: nil}, + } + if err := rtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), + }}, + BalancerConfig: config1, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + // Verify that a subconn is created with the address, and the hierarchy path + // in the address is cleared. + for range wantAddrs { + sc := <-cc.NewSubConnCh + rtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + } + + if state1 := <-cc.NewStateCh; state1 != connectivity.Idle { + t.Fatalf("Received aggregated state: %v, want Idle", state1) + } +} diff --git a/xds/internal/balancer/weightedtarget/weightedaggregator/aggregator.go b/xds/internal/balancer/weightedtarget/weightedaggregator/aggregator.go index 6c36e2a69cd9..7e1d106e9ff9 100644 --- a/xds/internal/balancer/weightedtarget/weightedaggregator/aggregator.go +++ b/xds/internal/balancer/weightedtarget/weightedaggregator/aggregator.go @@ -200,7 +200,9 @@ func (wbsa *Aggregator) BuildAndUpdate() { func (wbsa *Aggregator) build() balancer.State { wbsa.logger.Infof("Child pickers with config: %+v", wbsa.idToPickerState) m := wbsa.idToPickerState - var readyN, connectingN int + // TODO: use balancer.ConnectivityStateEvaluator to calculate the aggregated + // state. + var readyN, connectingN, idleN int readyPickerWithWeights := make([]weightedPickerState, 0, len(m)) for _, ps := range m { switch ps.stateToAggregate { @@ -209,6 +211,8 @@ func (wbsa *Aggregator) build() balancer.State { readyPickerWithWeights = append(readyPickerWithWeights, *ps) case connectivity.Connecting: connectingN++ + case connectivity.Idle: + idleN++ } } var aggregatedState connectivity.State @@ -217,6 +221,8 @@ func (wbsa *Aggregator) build() balancer.State { aggregatedState = connectivity.Ready case connectingN > 0: aggregatedState = connectivity.Connecting + case idleN > 0: + aggregatedState = connectivity.Idle default: aggregatedState = connectivity.TransientFailure } diff --git a/xds/internal/balancer/weightedtarget/weightedtarget_test.go b/xds/internal/balancer/weightedtarget/weightedtarget_test.go index eeebab733d61..b0e4df895885 100644 --- a/xds/internal/balancer/weightedtarget/weightedtarget_test.go +++ b/xds/internal/balancer/weightedtarget/weightedtarget_test.go @@ -29,6 +29,7 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/balancer/stub" "google.golang.org/grpc/internal/hierarchy" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" @@ -263,3 +264,63 @@ func subConnFromPicker(p balancer.Picker) func() balancer.SubConn { return scst.SubConn } } + +const initIdleBalancerName = "test-init-Idle-balancer" + +var errTestInitIdle = fmt.Errorf("init Idle balancer error 0") + +func init() { + stub.Register(initIdleBalancerName, stub.BalancerFuncs{ + UpdateClientConnState: func(bd *stub.BalancerData, opts balancer.ClientConnState) error { + bd.ClientConn.NewSubConn(opts.ResolverState.Addresses, balancer.NewSubConnOptions{}) + return nil + }, + UpdateSubConnState: func(bd *stub.BalancerData, sc balancer.SubConn, state balancer.SubConnState) { + err := fmt.Errorf("wrong picker error") + if state.ConnectivityState == connectivity.Idle { + err = errTestInitIdle + } + bd.ClientConn.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &testutils.TestConstPicker{Err: err}, + }) + }, + }) +} + +// TestInitialIdle covers the case that if the child reports Idle, the overall +// state will be Idle. +func TestInitialIdle(t *testing.T) { + cc := testutils.NewTestClientConn(t) + wtb := wtbBuilder.Build(cc, balancer.BuildOptions{}) + + // Start with "cluster_1: round_robin". + config1, err := wtbParser.ParseConfig([]byte(`{"targets":{"cluster_1":{"weight":1,"childPolicy":[{"test-init-Idle-balancer":""}]}}}`)) + if err != nil { + t.Fatalf("failed to parse balancer config: %v", err) + } + + // Send the config, and an address with hierarchy path ["cluster_1"]. + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0], Attributes: nil}, + } + if err := wtb.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + hierarchy.Set(wantAddrs[0], []string{"cds:cluster_1"}), + }}, + BalancerConfig: config1, + }); err != nil { + t.Fatalf("failed to update ClientConn state: %v", err) + } + + // Verify that a subconn is created with the address, and the hierarchy path + // in the address is cleared. + for range wantAddrs { + sc := <-cc.NewSubConnCh + wtb.UpdateSubConnState(sc, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + } + + if state1 := <-cc.NewStateCh; state1 != connectivity.Idle { + t.Fatalf("Received aggregated state: %v, want Idle", state1) + } +}