From fdafc23fc34fcddeec07f098693a75f5596a0509 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Mon, 31 Jul 2023 12:51:38 -0700 Subject: [PATCH] balancer: add SubConn.Shutdown; deprecate Balancer.RemoveSubConn --- balancer/balancer.go | 7 +++++ balancer/base/balancer_test.go | 2 ++ balancer_conn_wrappers.go | 8 ++++- balancer_test.go | 30 +++++++++++++++++++ internal/testutils/balancer.go | 16 ++++++---- .../balancer/ringhash/ringhash_test.go | 14 ++++----- 6 files changed, 64 insertions(+), 13 deletions(-) create mode 100644 balancer_test.go diff --git a/balancer/balancer.go b/balancer/balancer.go index b78401eb4cf2..4076db299c6b 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -115,6 +115,11 @@ type SubConn interface { // creates a new one and returns it. Returns a close function which must // be called when the Producer is no longer needed. GetOrBuildProducer(ProducerBuilder) (p Producer, close func()) + // Shutdown shuts down the SubConn. No future calls should be made on the + // SubConn. One final state update will be delivered to the StateListener + // (or UpdateSubConnState; deprecated) with ConnectivityState of Shutdown + // to indicate the shutdown is complete. + Shutdown() } // NewSubConnOptions contains options to create new SubConn. @@ -161,6 +166,8 @@ type ClientConn interface { NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error) // RemoveSubConn removes the SubConn from ClientConn. // The SubConn will be shutdown. + // + // Deprecated: use SubConn.Shutdown instead. RemoveSubConn(SubConn) // UpdateAddresses updates the addresses used in the passed in SubConn. // gRPC checks if the currently connected address is still in the new list. diff --git a/balancer/base/balancer_test.go b/balancer/base/balancer_test.go index b50abf8526e6..7bf4d92f8f0a 100644 --- a/balancer/base/balancer_test.go +++ b/balancer/base/balancer_test.go @@ -44,6 +44,8 @@ func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {} func (sc *testSubConn) Connect() {} +func (sc *testSubConn) Shutdown() {} + func (sc *testSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) { return nil, nil } diff --git a/balancer_conn_wrappers.go b/balancer_conn_wrappers.go index 338d106098fb..0e4b08c0a9f5 100644 --- a/balancer_conn_wrappers.go +++ b/balancer_conn_wrappers.go @@ -303,6 +303,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer return nil, err } acbw := &acBalancerWrapper{ + ccb: ccb, ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer), stateListener: opts.StateListener, @@ -372,7 +373,8 @@ func (ccb *ccBalancerWrapper) Target() string { // acBalancerWrapper is a wrapper on top of ac for balancers. // It implements balancer.SubConn interface. type acBalancerWrapper struct { - ac *addrConn // read-only + ac *addrConn // read-only + ccb *ccBalancerWrapper // read-only stateListener func(balancer.SubConnState) mu sync.Mutex @@ -391,6 +393,10 @@ func (acbw *acBalancerWrapper) Connect() { go acbw.ac.connect() } +func (acbw *acBalancerWrapper) Shutdown() { + acbw.ccb.RemoveSubConn(acbw) +} + // NewStream begins a streaming RPC on the addrConn. If the addrConn is not // ready, blocks until it is or ctx expires. Returns an error when the context // expires or the addrConn is shut down. diff --git a/balancer_test.go b/balancer_test.go new file mode 100644 index 000000000000..7d1289b90ef4 --- /dev/null +++ b/balancer_test.go @@ -0,0 +1,30 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpc + +import ( + "context" + "testing" +) + +func (s) TestSubConnShutdown(t *testing.T) { + _, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + +} diff --git a/internal/testutils/balancer.go b/internal/testutils/balancer.go index babce5f19497..2f1afa8fff0c 100644 --- a/internal/testutils/balancer.go +++ b/internal/testutils/balancer.go @@ -37,6 +37,7 @@ type testingLogger interface { // TestSubConn implements the SubConn interface, to be used in tests. type TestSubConn struct { + tcc *TestClientConn // the CC that owns this SubConn id string ConnectCh chan struct{} stateListener func(balancer.SubConnState) @@ -66,6 +67,14 @@ func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) { } } +func (tsc *TestSubConn) Shutdown() { + tsc.tcc.logger.Logf("SubConn %s: Shutdown", tsc) + select { + case tsc.tcc.RemoveSubConnCh <- tsc: + default: + } +} + // String implements stringer to print human friendly error message. func (tsc *TestSubConn) String() string { return tsc.id @@ -106,6 +115,7 @@ func NewTestClientConn(t *testing.T) *TestClientConn { // NewSubConn creates a new SubConn. func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) { sc := &TestSubConn{ + tcc: tcc, id: fmt.Sprintf("sc%d", tcc.subConnIdx), ConnectCh: make(chan struct{}, 1), stateListener: o.StateListener, @@ -127,11 +137,7 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon // RemoveSubConn removes the SubConn. func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) { - tcc.logger.Logf("testClientConn: RemoveSubConn(%s)", sc) - select { - case tcc.RemoveSubConnCh <- sc.(*TestSubConn): - default: - } + sc.(*TestSubConn).Shutdown() } // UpdateAddresses updates the addresses on the SubConn. diff --git a/xds/internal/balancer/ringhash/ringhash_test.go b/xds/internal/balancer/ringhash/ringhash_test.go index 3db8a258124b..ea0ff63705cc 100644 --- a/xds/internal/balancer/ringhash/ringhash_test.go +++ b/xds/internal/balancer/ringhash/ringhash_test.go @@ -170,7 +170,7 @@ func (s) TestOneSubConn(t *testing.T) { p1 := <-cc.NewPickerCh for i := 0; i < 5; i++ { gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) - if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != sc0 { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) } } @@ -212,7 +212,7 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) { p1 := <-cc.NewPickerCh for i := 0; i < 5; i++ { gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) - if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != sc0 { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) } } @@ -241,7 +241,7 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) { p3 := <-cc.NewPickerCh for i := 0; i < 5; i++ { gotSCSt, _ := p3.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != sc1 { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) } } @@ -263,7 +263,7 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) { p4 := <-cc.NewPickerCh for i := 0; i < 5; i++ { gotSCSt, _ := p4.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) - if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != sc0 { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) } } @@ -306,7 +306,7 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) { p1 := <-cc.NewPickerCh for i := 0; i < 5; i++ { gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) - if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != sc0 { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) } } @@ -330,14 +330,14 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) { p2 := <-cc.NewPickerCh for i := 0; i < 5; i++ { gotSCSt, _ := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash2)}) - if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != sc1 { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) } } // But the first hash still picks sc0. for i := 0; i < 5; i++ { gotSCSt, _ := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) - if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + if gotSCSt.SubConn != sc0 { t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) } }