diff --git a/common/rpc/direct_peer_chooser.go b/common/rpc/direct_peer_chooser.go index dfed0867906..1e515e4ebc4 100644 --- a/common/rpc/direct_peer_chooser.go +++ b/common/rpc/direct_peer_chooser.go @@ -43,6 +43,7 @@ type directPeerChooser struct { t peer.Transport enableConnRetainMode dynamicconfig.BoolPropertyFn legacyChooser peer.Chooser + legacyChooserErr error mu sync.RWMutex } @@ -56,17 +57,32 @@ func newDirectChooser(serviceName string, t peer.Transport, logger log.Logger, e } // Start statisfies the peer.Chooser interface. -func (*directPeerChooser) Start() error { +func (g *directPeerChooser) Start() error { + c, ok := g.getLegacyChooser() + if ok { + return c.Start() + } + return nil // no-op } // Stop statisfies the peer.Chooser interface. -func (*directPeerChooser) Stop() error { +func (g *directPeerChooser) Stop() error { + c, ok := g.getLegacyChooser() + if ok { + return c.Stop() + } + return nil // no-op } // IsRunning statisfies the peer.Chooser interface. -func (*directPeerChooser) IsRunning() bool { +func (g *directPeerChooser) IsRunning() bool { + c, ok := g.getLegacyChooser() + if ok { + return c.IsRunning() + } + return true // no-op } @@ -90,23 +106,39 @@ func (g *directPeerChooser) UpdatePeers(members []membership.HostInfo) { } func (g *directPeerChooser) chooseFromLegacyDirectPeerChooser(ctx context.Context, req *transport.Request) (peer.Peer, func(error), error) { + c, ok := g.getLegacyChooser() + if !ok { + return nil, nil, yarpcerrors.InternalErrorf("failed to get legacy direct peer chooser") + } + + return c.Choose(ctx, req) +} + +func (g *directPeerChooser) getLegacyChooser() (peer.Chooser, bool) { g.mu.RLock() if g.legacyChooser != nil { + // Legacy chooser already created, return it + g.mu.RUnlock() + return g.legacyChooser, true + } + + if g.legacyChooserErr != nil { + // There was an error creating the legacy chooser, return false g.mu.RUnlock() - return g.legacyChooser.Choose(ctx, req) + return nil, false } g.mu.RUnlock() - var err error g.mu.Lock() - g.legacyChooser, err = direct.New(direct.Configuration{}, g.t) + g.legacyChooser, g.legacyChooserErr = direct.New(direct.Configuration{}, g.t) g.mu.Unlock() - if err != nil { - return nil, nil, err + if g.legacyChooserErr != nil { + g.logger.Error("failed to create legacy direct peer chooser", tag.Error(g.legacyChooserErr)) + return nil, false } - return g.legacyChooser.Choose(ctx, req) + return g.legacyChooser, true } diff --git a/common/rpc/direct_peer_chooser_test.go b/common/rpc/direct_peer_chooser_test.go new file mode 100644 index 00000000000..ee955e71ef4 --- /dev/null +++ b/common/rpc/direct_peer_chooser_test.go @@ -0,0 +1,96 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package rpc + +import ( + "context" + "testing" + + "go.uber.org/goleak" + "go.uber.org/yarpc/api/transport" + "go.uber.org/yarpc/transport/grpc" + + "github.com/stretchr/testify/assert" + "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/log/testlogger" +) + +func TestDirectChooser(t *testing.T) { + req := &transport.Request{ + Caller: "caller", + Service: "service", + ShardKey: "shard1", + } + + tests := []struct { + desc string + retainConn bool + req *transport.Request + wantChooseErr bool + }{ + { + desc: "don't retain connection", + retainConn: false, + req: req, + }, + { + desc: "retain connection", + retainConn: true, + req: req, + wantChooseErr: true, + }, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + defer goleak.VerifyNone(t) + + logger := testlogger.New(t) + serviceName := "service" + directConnRetainFn := func(opts ...dynamicconfig.FilterOption) bool { return tc.retainConn } + grpcTransport := grpc.NewTransport() + + chooser := newDirectChooser(serviceName, grpcTransport, logger, directConnRetainFn) + if err := chooser.Start(); err != nil { + t.Fatalf("failed to start direct peer chooser: %v", err) + } + + assert.True(t, chooser.IsRunning()) + + peer, onFinish, err := chooser.Choose(context.Background(), tc.req) + if tc.wantChooseErr != (err != nil) { + t.Fatalf("Choose() err = %v, wantChooseErr = %v", err, tc.wantChooseErr) + } + + if err == nil { + assert.NotNil(t, peer) + assert.NotNil(t, onFinish) + + // call onFinish to release the peer + onFinish(nil) + } + + if err := chooser.Stop(); err != nil { + t.Fatalf("failed to stop direct peer chooser: %v", err) + } + }) + } +} diff --git a/common/rpc/factory_test.go b/common/rpc/factory_test.go new file mode 100644 index 00000000000..6b039cc8116 --- /dev/null +++ b/common/rpc/factory_test.go @@ -0,0 +1,119 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package rpc + +import ( + "sync" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "go.uber.org/goleak" + + "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/membership" +) + +func TestNewFactory(t *testing.T) { + ctrl := gomock.NewController(t) + logger := testlogger.New(t) + serviceName := "service" + ob := NewMockOutboundsBuilder(ctrl) + ob.EXPECT().Build(gomock.Any(), gomock.Any()).Return(&Outbounds{}, nil).Times(1) + grpcMsgSize := 4 * 1024 * 1024 + f := NewFactory(logger, Params{ + ServiceName: serviceName, + TChannelAddress: "localhost:0", + GRPCMaxMsgSize: grpcMsgSize, + GRPCAddress: "localhost:0", + HTTP: &httpParams{ + Address: "localhost:0", + }, + OutboundsBuilder: ob, + }) + + if f == nil { + t.Fatal("NewFactory returned nil") + } + + assert.NotNil(t, f.GetDispatcher(), "GetDispatcher returned nil") + assert.NotNil(t, f.GetTChannel(), "GetTChannel returned nil") + assert.Equal(t, grpcMsgSize, f.GetMaxMessageSize(), "GetMaxMessageSize returned wrong value") +} + +func TestStartStop(t *testing.T) { + defer goleak.VerifyNone(t) + + ctrl := gomock.NewController(t) + logger := testlogger.New(t) + serviceName := "service" + ob := NewMockOutboundsBuilder(ctrl) + var mu sync.Mutex + var gotMembers []membership.HostInfo + outbounds := &Outbounds{ + onUpdatePeers: func(members []membership.HostInfo) { + mu.Lock() + defer mu.Unlock() + gotMembers = members + }, + } + ob.EXPECT().Build(gomock.Any(), gomock.Any()).Return(outbounds, nil).Times(1) + grpcMsgSize := 4 * 1024 * 1024 + f := NewFactory(logger, Params{ + ServiceName: serviceName, + TChannelAddress: "localhost:0", + GRPCMaxMsgSize: grpcMsgSize, + GRPCAddress: "localhost:0", + HTTP: &httpParams{ + Address: "localhost:0", + }, + OutboundsBuilder: ob, + }) + + members := []membership.HostInfo{ + membership.NewHostInfo("localhost:9191"), + membership.NewHostInfo("localhost:9192"), + } + peerLister := membership.NewMockResolver(ctrl) + peerLister.EXPECT().Subscribe(serviceName, factoryComponentName, gomock.Any()). + DoAndReturn(func(service, name string, notifyChannel chan<- *membership.ChangedEvent) error { + // Notify the channel once to validate listening logic is working + notifyChannel <- &membership.ChangedEvent{} + return nil + }).Times(1) + peerLister.EXPECT().Unsubscribe(serviceName, factoryComponentName).Return(nil).Times(1) + peerLister.EXPECT().Members(serviceName).Return(members, nil).Times(1) + + if err := f.Start(peerLister); err != nil { + t.Fatalf("Factory.Start() returned error: %v", err) + } + + // Wait for membership changes to be processed + time.Sleep(100 * time.Millisecond) + mu.Lock() + assert.Equal(t, members, gotMembers, "UpdatePeers not called with expected members") + mu.Unlock() + + if err := f.Stop(); err != nil { + t.Fatalf("Factory.Stop() returned error: %v", err) + } +} diff --git a/common/rpc/outbounds.go b/common/rpc/outbounds.go index 7b1dfe56431..8c17122744b 100644 --- a/common/rpc/outbounds.go +++ b/common/rpc/outbounds.go @@ -18,6 +18,8 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination outbounds_mock.go -self_package github.com/uber/cadence/common/rpc + package rpc import ( diff --git a/common/rpc/outbounds_mock.go b/common/rpc/outbounds_mock.go new file mode 100644 index 00000000000..ce1dd17cc6d --- /dev/null +++ b/common/rpc/outbounds_mock.go @@ -0,0 +1,73 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +// Code generated by MockGen. DO NOT EDIT. +// Source: outbounds.go + +// Package rpc is a generated GoMock package. +package rpc + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + grpc "go.uber.org/yarpc/transport/grpc" + tchannel "go.uber.org/yarpc/transport/tchannel" +) + +// MockOutboundsBuilder is a mock of OutboundsBuilder interface. +type MockOutboundsBuilder struct { + ctrl *gomock.Controller + recorder *MockOutboundsBuilderMockRecorder +} + +// MockOutboundsBuilderMockRecorder is the mock recorder for MockOutboundsBuilder. +type MockOutboundsBuilderMockRecorder struct { + mock *MockOutboundsBuilder +} + +// NewMockOutboundsBuilder creates a new mock instance. +func NewMockOutboundsBuilder(ctrl *gomock.Controller) *MockOutboundsBuilder { + mock := &MockOutboundsBuilder{ctrl: ctrl} + mock.recorder = &MockOutboundsBuilderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockOutboundsBuilder) EXPECT() *MockOutboundsBuilderMockRecorder { + return m.recorder +} + +// Build mocks base method. +func (m *MockOutboundsBuilder) Build(arg0 *grpc.Transport, arg1 *tchannel.Transport) (*Outbounds, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Build", arg0, arg1) + ret0, _ := ret[0].(*Outbounds) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Build indicates an expected call of Build. +func (mr *MockOutboundsBuilderMockRecorder) Build(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Build", reflect.TypeOf((*MockOutboundsBuilder)(nil).Build), arg0, arg1) +} diff --git a/common/rpc/peer_chooser.go b/common/rpc/peer_chooser.go index a4380d2bb18..1b9b26757ef 100644 --- a/common/rpc/peer_chooser.go +++ b/common/rpc/peer_chooser.go @@ -94,8 +94,6 @@ func (f *dnsPeerChooserFactory) CreatePeerChooser(transport peer.Transport, opts return &defaultPeerChooser{Chooser: peerList}, nil } -func (f *dnsPeerChooserFactory) UpdatePeers(peers []membership.HostInfo) {} - func NewDirectPeerChooserFactory(serviceName string, logger log.Logger) PeerChooserFactory { return &directPeerChooserFactory{ logger: logger, @@ -107,9 +105,3 @@ func (f *directPeerChooserFactory) CreatePeerChooser(transport peer.Transport, o f.choosers = append(f.choosers, c) return c, nil } - -func (f *directPeerChooserFactory) UpdatePeers(peers []membership.HostInfo) { - for _, c := range f.choosers { - c.UpdatePeers(peers) - } -} diff --git a/common/rpc/peer_chooser_test.go b/common/rpc/peer_chooser_test.go index 0e41c1a07a0..efd24db99bb 100644 --- a/common/rpc/peer_chooser_test.go +++ b/common/rpc/peer_chooser_test.go @@ -29,10 +29,30 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/yarpc/api/peer" "go.uber.org/yarpc/api/transport" + "go.uber.org/yarpc/transport/grpc" + "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/testlogger" ) +type ( + fakePeerTransport struct{} + fakePeer struct{} +) + +func (t *fakePeerTransport) RetainPeer(peer.Identifier, peer.Subscriber) (peer.Peer, error) { + return &fakePeer{}, nil +} +func (t *fakePeerTransport) ReleasePeer(peer.Identifier, peer.Subscriber) error { + return nil +} + +func (p *fakePeer) Identifier() string { return "fakePeer" } +func (p *fakePeer) Status() peer.Status { return peer.Status{ConnectionStatus: peer.Available} } +func (p *fakePeer) StartRequest() {} +func (p *fakePeer) EndRequest() {} + func TestDNSPeerChooserFactory(t *testing.T) { logger := log.NewNoop() ctx := context.Background() @@ -60,19 +80,24 @@ func TestDNSPeerChooserFactory(t *testing.T) { assert.Equal(t, "fakePeer", peer.Identifier()) } -type ( - fakePeerTransport struct{} - fakePeer struct{} -) +func TestDirectPeerChooserFactory(t *testing.T) { + logger := testlogger.New(t) + serviceName := "service" + pcf := NewDirectPeerChooserFactory(serviceName, logger) + directConnRetainFn := func(opts ...dynamicconfig.FilterOption) bool { return false } + grpcTransport := grpc.NewTransport() + chooser, err := pcf.CreatePeerChooser(grpcTransport, PeerChooserOptions{ + ServiceName: serviceName, + EnableConnectionRetainingDirectChooser: directConnRetainFn, + }) + if err != nil { + t.Fatalf("Failed to create direct peer chooser: %v", err) + } + if chooser == nil { + t.Fatal("Failed to create direct peer chooser: nil") + } -func (t *fakePeerTransport) RetainPeer(peer.Identifier, peer.Subscriber) (peer.Peer, error) { - return &fakePeer{}, nil + if _, dc := chooser.(*directPeerChooser); !dc { + t.Fatalf("Want chooser be of type (*directPeerChooser), got %d", chooser) + } } -func (t *fakePeerTransport) ReleasePeer(peer.Identifier, peer.Subscriber) error { - return nil -} - -func (p *fakePeer) Identifier() string { return "fakePeer" } -func (p *fakePeer) Status() peer.Status { return peer.Status{ConnectionStatus: peer.Available} } -func (p *fakePeer) StartRequest() {} -func (p *fakePeer) EndRequest() {}