Skip to content

Commit

Permalink
balancer: add SubConn.Shutdown; deprecate Balancer.RemoveSubConn
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley committed Aug 1, 2023
1 parent 0246373 commit fdafc23
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 13 deletions.
7 changes: 7 additions & 0 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ type SubConn interface {
// creates a new one and returns it. Returns a close function which must
// be called when the Producer is no longer needed.
GetOrBuildProducer(ProducerBuilder) (p Producer, close func())
// Shutdown shuts down the SubConn. No future calls should be made on the
// SubConn. One final state update will be delivered to the StateListener
// (or UpdateSubConnState; deprecated) with ConnectivityState of Shutdown
// to indicate the shutdown is complete.
Shutdown()
}

// NewSubConnOptions contains options to create new SubConn.
Expand Down Expand Up @@ -161,6 +166,8 @@ type ClientConn interface {
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
// RemoveSubConn removes the SubConn from ClientConn.
// The SubConn will be shutdown.
//
// Deprecated: use SubConn.Shutdown instead.
RemoveSubConn(SubConn)
// UpdateAddresses updates the addresses used in the passed in SubConn.
// gRPC checks if the currently connected address is still in the new list.
Expand Down
2 changes: 2 additions & 0 deletions balancer/base/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {}

func (sc *testSubConn) Connect() {}

func (sc *testSubConn) Shutdown() {}

func (sc *testSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, nil
}
Expand Down
8 changes: 7 additions & 1 deletion balancer_conn_wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
return nil, err
}
acbw := &acBalancerWrapper{
ccb: ccb,
ac: ac,
producers: make(map[balancer.ProducerBuilder]*refCountedProducer),
stateListener: opts.StateListener,
Expand Down Expand Up @@ -372,7 +373,8 @@ func (ccb *ccBalancerWrapper) Target() string {
// acBalancerWrapper is a wrapper on top of ac for balancers.
// It implements balancer.SubConn interface.
type acBalancerWrapper struct {
ac *addrConn // read-only
ac *addrConn // read-only
ccb *ccBalancerWrapper // read-only
stateListener func(balancer.SubConnState)

mu sync.Mutex
Expand All @@ -391,6 +393,10 @@ func (acbw *acBalancerWrapper) Connect() {
go acbw.ac.connect()
}

func (acbw *acBalancerWrapper) Shutdown() {
acbw.ccb.RemoveSubConn(acbw)
}

// NewStream begins a streaming RPC on the addrConn. If the addrConn is not
// ready, blocks until it is or ctx expires. Returns an error when the context
// expires or the addrConn is shut down.
Expand Down
30 changes: 30 additions & 0 deletions balancer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
*
* Copyright 2023 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package grpc

import (
"context"
"testing"
)

func (s) TestSubConnShutdown(t *testing.T) {
_, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

}
16 changes: 11 additions & 5 deletions internal/testutils/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type testingLogger interface {

// TestSubConn implements the SubConn interface, to be used in tests.
type TestSubConn struct {
tcc *TestClientConn // the CC that owns this SubConn
id string
ConnectCh chan struct{}
stateListener func(balancer.SubConnState)
Expand Down Expand Up @@ -66,6 +67,14 @@ func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) {
}
}

func (tsc *TestSubConn) Shutdown() {

Check failure on line 70 in internal/testutils/balancer.go

View workflow job for this annotation

GitHub Actions / tests (vet, 1.20)

exported method TestSubConn.Shutdown should have comment or be unexported
tsc.tcc.logger.Logf("SubConn %s: Shutdown", tsc)
select {
case tsc.tcc.RemoveSubConnCh <- tsc:
default:
}
}

// String implements stringer to print human friendly error message.
func (tsc *TestSubConn) String() string {
return tsc.id
Expand Down Expand Up @@ -106,6 +115,7 @@ func NewTestClientConn(t *testing.T) *TestClientConn {
// NewSubConn creates a new SubConn.
func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubConnOptions) (balancer.SubConn, error) {
sc := &TestSubConn{
tcc: tcc,
id: fmt.Sprintf("sc%d", tcc.subConnIdx),
ConnectCh: make(chan struct{}, 1),
stateListener: o.StateListener,
Expand All @@ -127,11 +137,7 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon

// RemoveSubConn removes the SubConn.
func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) {
tcc.logger.Logf("testClientConn: RemoveSubConn(%s)", sc)
select {
case tcc.RemoveSubConnCh <- sc.(*TestSubConn):
default:
}
sc.(*TestSubConn).Shutdown()
}

// UpdateAddresses updates the addresses on the SubConn.
Expand Down
14 changes: 7 additions & 7 deletions xds/internal/balancer/ringhash/ringhash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (s) TestOneSubConn(t *testing.T) {
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
if gotSCSt.SubConn != sc0 {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
}
}
Expand Down Expand Up @@ -212,7 +212,7 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) {
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
if gotSCSt.SubConn != sc0 {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
}
}
Expand Down Expand Up @@ -241,7 +241,7 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) {
p3 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p3.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
if gotSCSt.SubConn != sc1 {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
Expand All @@ -263,7 +263,7 @@ func (s) TestThreeSubConnsAffinity(t *testing.T) {
p4 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p4.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
if gotSCSt.SubConn != sc0 {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
}
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) {
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
if gotSCSt.SubConn != sc0 {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
}
}
Expand All @@ -330,14 +330,14 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) {
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash2)})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
if gotSCSt.SubConn != sc1 {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
// But the first hash still picks sc0.
for i := 0; i < 5; i++ {
gotSCSt, _ := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
if gotSCSt.SubConn != sc0 {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
}
}
Expand Down

0 comments on commit fdafc23

Please sign in to comment.