Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
taylanisikdemir committed Oct 15, 2024
1 parent 66d0142 commit c8e3874
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 31 deletions.
50 changes: 41 additions & 9 deletions common/rpc/direct_peer_chooser.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type directPeerChooser struct {
t peer.Transport
enableConnRetainMode dynamicconfig.BoolPropertyFn
legacyChooser peer.Chooser
legacyChooserErr error
mu sync.RWMutex
}

Expand All @@ -56,17 +57,32 @@ func newDirectChooser(serviceName string, t peer.Transport, logger log.Logger, e
}

// Start statisfies the peer.Chooser interface.
func (*directPeerChooser) Start() error {
func (g *directPeerChooser) Start() error {
c, ok := g.getLegacyChooser()
if ok {
return c.Start()
}

return nil // no-op
}

// Stop statisfies the peer.Chooser interface.
func (*directPeerChooser) Stop() error {
func (g *directPeerChooser) Stop() error {
c, ok := g.getLegacyChooser()
if ok {
return c.Stop()
}

return nil // no-op
}

// IsRunning statisfies the peer.Chooser interface.
func (*directPeerChooser) IsRunning() bool {
func (g *directPeerChooser) IsRunning() bool {
c, ok := g.getLegacyChooser()
if ok {
return c.IsRunning()
}

return true // no-op
}

Expand All @@ -90,23 +106,39 @@ func (g *directPeerChooser) UpdatePeers(members []membership.HostInfo) {
}

func (g *directPeerChooser) chooseFromLegacyDirectPeerChooser(ctx context.Context, req *transport.Request) (peer.Peer, func(error), error) {
c, ok := g.getLegacyChooser()
if !ok {
return nil, nil, yarpcerrors.InternalErrorf("failed to get legacy direct peer chooser")
}

return c.Choose(ctx, req)
}

func (g *directPeerChooser) getLegacyChooser() (peer.Chooser, bool) {
g.mu.RLock()

if g.legacyChooser != nil {
// Legacy chooser already created, return it
g.mu.RUnlock()
return g.legacyChooser, true
}

if g.legacyChooserErr != nil {
// There was an error creating the legacy chooser, return false
g.mu.RUnlock()
return g.legacyChooser.Choose(ctx, req)
return nil, false
}

g.mu.RUnlock()

var err error
g.mu.Lock()
g.legacyChooser, err = direct.New(direct.Configuration{}, g.t)
g.legacyChooser, g.legacyChooserErr = direct.New(direct.Configuration{}, g.t)
g.mu.Unlock()

if err != nil {
return nil, nil, err
if g.legacyChooserErr != nil {
g.logger.Error("failed to create legacy direct peer chooser", tag.Error(g.legacyChooserErr))
return nil, false
}

return g.legacyChooser.Choose(ctx, req)
return g.legacyChooser, true
}
96 changes: 96 additions & 0 deletions common/rpc/direct_peer_chooser_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package rpc

import (
"context"
"testing"

"go.uber.org/goleak"
"go.uber.org/yarpc/api/transport"
"go.uber.org/yarpc/transport/grpc"

"github.com/stretchr/testify/assert"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log/testlogger"
)

func TestDirectChooser(t *testing.T) {
req := &transport.Request{
Caller: "caller",
Service: "service",
ShardKey: "shard1",
}

tests := []struct {
desc string
retainConn bool
req *transport.Request
wantChooseErr bool
}{
{
desc: "don't retain connection",
retainConn: false,
req: req,
},
{
desc: "retain connection",
retainConn: true,
req: req,
wantChooseErr: true,
},
}

for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
defer goleak.VerifyNone(t)

logger := testlogger.New(t)
serviceName := "service"
directConnRetainFn := func(opts ...dynamicconfig.FilterOption) bool { return tc.retainConn }
grpcTransport := grpc.NewTransport()

chooser := newDirectChooser(serviceName, grpcTransport, logger, directConnRetainFn)
if err := chooser.Start(); err != nil {
t.Fatalf("failed to start direct peer chooser: %v", err)
}

assert.True(t, chooser.IsRunning())

peer, onFinish, err := chooser.Choose(context.Background(), tc.req)
if tc.wantChooseErr != (err != nil) {
t.Fatalf("Choose() err = %v, wantChooseErr = %v", err, tc.wantChooseErr)
}

if err == nil {
assert.NotNil(t, peer)
assert.NotNil(t, onFinish)

// call onFinish to release the peer
onFinish(nil)
}

if err := chooser.Stop(); err != nil {
t.Fatalf("failed to stop direct peer chooser: %v", err)
}
})
}
}
119 changes: 119 additions & 0 deletions common/rpc/factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package rpc

import (
"sync"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"go.uber.org/goleak"

"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/membership"
)

func TestNewFactory(t *testing.T) {
ctrl := gomock.NewController(t)
logger := testlogger.New(t)
serviceName := "service"
ob := NewMockOutboundsBuilder(ctrl)
ob.EXPECT().Build(gomock.Any(), gomock.Any()).Return(&Outbounds{}, nil).Times(1)
grpcMsgSize := 4 * 1024 * 1024
f := NewFactory(logger, Params{
ServiceName: serviceName,
TChannelAddress: "localhost:0",
GRPCMaxMsgSize: grpcMsgSize,
GRPCAddress: "localhost:0",
HTTP: &httpParams{
Address: "localhost:0",
},
OutboundsBuilder: ob,
})

if f == nil {
t.Fatal("NewFactory returned nil")
}

assert.NotNil(t, f.GetDispatcher(), "GetDispatcher returned nil")
assert.NotNil(t, f.GetTChannel(), "GetTChannel returned nil")
assert.Equal(t, grpcMsgSize, f.GetMaxMessageSize(), "GetMaxMessageSize returned wrong value")
}

func TestStartStop(t *testing.T) {
defer goleak.VerifyNone(t)

ctrl := gomock.NewController(t)
logger := testlogger.New(t)
serviceName := "service"
ob := NewMockOutboundsBuilder(ctrl)
var mu sync.Mutex
var gotMembers []membership.HostInfo
outbounds := &Outbounds{
onUpdatePeers: func(members []membership.HostInfo) {
mu.Lock()
defer mu.Unlock()
gotMembers = members
},
}
ob.EXPECT().Build(gomock.Any(), gomock.Any()).Return(outbounds, nil).Times(1)
grpcMsgSize := 4 * 1024 * 1024
f := NewFactory(logger, Params{
ServiceName: serviceName,
TChannelAddress: "localhost:0",
GRPCMaxMsgSize: grpcMsgSize,
GRPCAddress: "localhost:0",
HTTP: &httpParams{
Address: "localhost:0",
},
OutboundsBuilder: ob,
})

members := []membership.HostInfo{
membership.NewHostInfo("localhost:9191"),
membership.NewHostInfo("localhost:9192"),
}
peerLister := membership.NewMockResolver(ctrl)
peerLister.EXPECT().Subscribe(serviceName, factoryComponentName, gomock.Any()).
DoAndReturn(func(service, name string, notifyChannel chan<- *membership.ChangedEvent) error {
// Notify the channel once to validate listening logic is working
notifyChannel <- &membership.ChangedEvent{}
return nil
}).Times(1)
peerLister.EXPECT().Unsubscribe(serviceName, factoryComponentName).Return(nil).Times(1)
peerLister.EXPECT().Members(serviceName).Return(members, nil).Times(1)

if err := f.Start(peerLister); err != nil {
t.Fatalf("Factory.Start() returned error: %v", err)
}

// Wait for membership changes to be processed
time.Sleep(100 * time.Millisecond)
mu.Lock()
assert.Equal(t, members, gotMembers, "UpdatePeers not called with expected members")
mu.Unlock()

if err := f.Stop(); err != nil {
t.Fatalf("Factory.Stop() returned error: %v", err)
}
}
2 changes: 2 additions & 0 deletions common/rpc/outbounds.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination outbounds_mock.go -self_package github.com/uber/cadence/common/rpc

package rpc

import (
Expand Down
73 changes: 73 additions & 0 deletions common/rpc/outbounds_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit c8e3874

Please sign in to comment.