Skip to content

Commit

Permalink
test: update client state subscriber test to be not flaky and more st…
Browse files Browse the repository at this point in the history
…ressful about rapid updates
  • Loading branch information
dfawley committed Aug 8, 2023
1 parent 2059c6e commit f85edd7
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 108 deletions.
82 changes: 82 additions & 0 deletions test/clientconn_state_transition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/stub"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
Expand Down Expand Up @@ -547,3 +550,82 @@ func awaitNoStateChange(ctx context.Context, t *testing.T, cc *grpc.ClientConn,
t.Fatalf("State changed from %q to %q when no state change was expected", currState, cc.GetState())
}
}

type testConnectivityStateSubscriber struct {
onMsgCh chan connectivity.State
}

func (ts *testConnectivityStateSubscriber) OnMessage(msg interface{}) {
ts.onMsgCh <- msg.(connectivity.State)
if msg.(connectivity.State) == connectivity.Shutdown {
close(ts.onMsgCh)
}
}

// TestConnectivityStateSubscriber confirms updates sent by the balancer in
// rapid succession are not missed by the subscriber.
func (s) TestConnectivityStateSubscriber(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()

sendStates := []connectivity.State{
connectivity.Connecting,
connectivity.Ready,
connectivity.Idle,
connectivity.Connecting,
connectivity.Idle,
connectivity.Connecting,
connectivity.Ready,
}
wantStates := append(sendStates, connectivity.Shutdown)

const testBalName = "any"
bf := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, _ balancer.ClientConnState) error {
// Send the expected states in rapid succession.
for _, s := range sendStates {
t.Logf("Sending state update %s", s)
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: s})
}
return nil
},
}
stub.Register(testBalName, bf)

// Create the ClientConn
const testResName = "any"
rb := manual.NewBuilderWithScheme(testResName)
cc, err := grpc.Dial(testResName+":///",
grpc.WithResolvers(rb),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, testBalName)),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
t.Fatalf("Unexpected error from grpc.Dial: %v", err)
}

// Subscribe to state updates
s := &testConnectivityStateSubscriber{onMsgCh: make(chan connectivity.State)}
internal.SubscribeToConnectivityStateChanges.(func(cc *grpc.ClientConn, s grpcsync.Subscriber) func())(cc, s)

// Send an update from the resolver that will trigger the LB policy's UpdateClientConnState.
go rb.UpdateState(resolver.State{})

// Verify the resulting states.
for i, want := range wantStates {
if i == len(sendStates) {
// Trigger Shutdown to be sent by the channel.
cc.Close()
}
select {
case got := <-s.onMsgCh:
if got != want {
t.Errorf("Update %v was %s; want %s", i, got, want)
} else {
t.Logf("Update %v was %s as expected", i, got)
}
case <-ctx.Done():
t.Fatalf("Timed out waiting for state update %v: %s", i, want)
}
}
}
108 changes: 0 additions & 108 deletions test/connectivity_state_updates_test.go

This file was deleted.

0 comments on commit f85edd7

Please sign in to comment.