From 8b49cc233b105635a8005dc273e0e07d2f02caf8 Mon Sep 17 00:00:00 2001 From: Jan Kisel Date: Fri, 20 Sep 2024 14:31:47 +0200 Subject: [PATCH 1/2] Refactor PeerProvider & hashring interaction As the result: - hashring never misses updated (no "channel is full" situation possible) - subscribers of MultiringResolver (history, matching) will always get sane ChangedEvent which reflects the REAL changes (see details below) Previously there were problems: - hashring subscribed to PeerProvider (ringpop/uns) with non-buffered channel which led to failures to write every time `ring` was doing something else than reading the channel (happened 60% of times based on error-logs). Switched to calling handlers instead which are implementing schedule-update with channel with cap=1 approach (see `signalSelf`). This approach never skips updates. - PeerProvider supplies ChangedEvent to `ring`, but in reality, we do not use it - we refresh everything from scratch. This makes very misleading to even rely on the ChangedEvent. Basically, we might be triggered by some event (host "c" appeared), but during refresh() we realise there are more changes (host "a" removed, host "c" added as well, etc.), and we notify our Subscribers with an absolutely irrelevant data. - Same misleading took place in other methods like `emitHashIdentifier`. It retrieved list of members from PeerProvider **independantly**, which could lead to emitting hash of a different state than members we just retrieved in refresh(). - Some tests were working "by mistake": like `TestFailedLookupWillAskProvider` and `TestRefreshUpdatesRingOnlyWhenRingHasChanged`. All in all, not methods are more synchronised, called more expectedly (`compareMembers` should not make a new map), and notifiying subscribers is **inseparable** from ring::refresh() like it should be. --- common/membership/hashring.go | 131 +++++++++------- common/membership/hashring_test.go | 144 ++++++++++-------- common/membership/peerprovider_mock.go | 8 +- common/membership/resolver.go | 4 + common/membership/resolver_test.go | 1 + .../peerprovider/ringpopprovider/provider.go | 26 ++-- 6 files changed, 178 insertions(+), 136 deletions(-) diff --git a/common/membership/hashring.go b/common/membership/hashring.go index 751f6031ded..94da372bcf4 100644 --- a/common/membership/hashring.go +++ b/common/membership/hashring.go @@ -22,6 +22,7 @@ package membership import ( "fmt" + "slices" "sort" "strings" "sync" @@ -44,7 +45,6 @@ import ( // ErrInsufficientHosts is thrown when there are not enough hosts to serve the request var ErrInsufficientHosts = &types.InternalServiceError{Message: "Not enough hosts to serve the request"} -var emptyEvent = &ChangedEvent{} const ( minRefreshInternal = time.Second * 4 @@ -59,14 +59,14 @@ type PeerProvider interface { GetMembers(service string) ([]HostInfo, error) WhoAmI() (HostInfo, error) SelfEvict() error - Subscribe(name string, notifyChannel chan<- *ChangedEvent) error + Subscribe(name string, handler func(ChangedEvent)) error } type ring struct { status int32 service string peerProvider PeerProvider - refreshChan chan *ChangedEvent + refreshChan chan struct{} shutdownCh chan struct{} shutdownWG sync.WaitGroup timeSource clock.TimeSource @@ -99,7 +99,7 @@ func newHashring( service: service, peerProvider: provider, shutdownCh: make(chan struct{}), - refreshChan: make(chan *ChangedEvent), + refreshChan: make(chan struct{}, 1), timeSource: timeSource, logger: logger, scope: scope, @@ -125,11 +125,11 @@ func (r *ring) Start() { ) { return } - if err := r.peerProvider.Subscribe(r.service, r.refreshChan); err != nil { + if err := r.peerProvider.Subscribe(r.service, r.handleUpdates); err != nil { r.logger.Fatal("subscribing to peer provider", tag.Error(err)) } - if _, err := r.refresh(); err != nil { + if err := r.refresh(); err != nil { r.logger.Fatal("failed to start service resolver", tag.Error(err)) } @@ -166,12 +166,10 @@ func (r *ring) Lookup( ) (HostInfo, error) { addr, found := r.ring().Lookup(key) if !found { - select { - case r.refreshChan <- emptyEvent: - default: - } + r.signalSelf() return HostInfo{}, ErrInsufficientHosts } + r.members.RLock() defer r.members.RUnlock() host, ok := r.members.keys[addr] @@ -195,12 +193,25 @@ func (r *ring) Subscribe(watcher string, notifyChannel chan<- *ChangedEvent) err return nil } -func (r *ring) notifySubscribers(msg *ChangedEvent) { +func (r *ring) handleUpdates(event ChangedEvent) { + r.signalSelf() +} + +func (r *ring) signalSelf() { + var event struct{} + select { + case r.refreshChan <- event: + default: // channel already has an event, don't block + } +} + +func (r *ring) notifySubscribers(msg ChangedEvent) { r.subscribers.Lock() defer r.subscribers.Unlock() + for name, ch := range r.subscribers.keys { select { - case ch <- msg: + case ch <- &msg: default: r.logger.Error("subscriber notification failed", tag.Name(name)) } @@ -240,43 +251,43 @@ func (r *ring) Members() []HostInfo { return hosts } -func (r *ring) refresh() (refreshed bool, err error) { +func (r *ring) refresh() error { if r.members.refreshed.After(r.timeSource.Now().Add(-minRefreshInternal)) { // refreshed too frequently - return false, nil + return nil } members, err := r.peerProvider.GetMembers(r.service) if err != nil { - return false, fmt.Errorf("getting members from peer provider: %w", err) + return fmt.Errorf("getting members from peer provider: %w", err) } - r.members.Lock() - defer r.members.Unlock() - newMembersMap, changed := r.compareMembers(members) - if !changed { - return false, nil + newMembersMap := r.makeMembersMap(members) + + diff := r.diffMembers(newMembersMap) + if diff.Empty() { + return nil } ring := emptyHashring() ring.AddMembers(castToMembers(members)...) - - r.members.keys = newMembersMap - r.members.refreshed = r.timeSource.Now() r.value.Store(ring) r.logger.Info("refreshed ring members", tag.Value(members)) - return true, nil + r.updateMembersMap(newMembersMap) + + r.emitHashIdentifier() + r.notifySubscribers(diff) + + return nil } -func (r *ring) refreshAndNotifySubscribers(event *ChangedEvent) { - refreshed, err := r.refresh() - if err != nil { - r.logger.Error("refreshing ring", tag.Error(err)) - } - if refreshed { - r.notifySubscribers(event) - } +func (r *ring) updateMembersMap(newMembers map[string]HostInfo) { + r.members.Lock() + defer r.members.Unlock() + + r.members.keys = newMembers + r.members.refreshed = r.timeSource.Now() } func (r *ring) refreshRingWorker() { @@ -284,15 +295,17 @@ func (r *ring) refreshRingWorker() { refreshTicker := r.timeSource.NewTicker(defaultRefreshInterval) defer refreshTicker.Stop() + for { select { case <-r.shutdownCh: return - case event := <-r.refreshChan: // local signal or signal from provider - r.refreshAndNotifySubscribers(event) - case <-refreshTicker.Chan(): // periodically refresh membership - r.emitHashIdentifier() - r.refreshAndNotifySubscribers(emptyEvent) + case <-r.refreshChan: // local signal or signal from provider + if err := r.refresh(); err != nil { + r.logger.Error("refreshing ring", tag.Error(err)) + } + case <-refreshTicker.Chan(): // periodically force refreshing membership + r.signalSelf() } } } @@ -302,11 +315,7 @@ func (r *ring) ring() *hashring.HashRing { } func (r *ring) emitHashIdentifier() float64 { - members, err := r.peerProvider.GetMembers(r.service) - if err != nil { - r.logger.Error("Observed a problem getting peer members while emitting hash identifier metrics", tag.Error(err)) - return -1 - } + members := r.Members() self, err := r.peerProvider.WhoAmI() if err != nil { r.logger.Error("Observed a problem looking up self from the membership provider while emitting hash identifier metrics", tag.Error(err)) @@ -343,22 +352,38 @@ func (r *ring) emitHashIdentifier() float64 { return trimmedForMetric } -func (r *ring) compareMembers(members []HostInfo) (map[string]HostInfo, bool) { - changed := false - newMembersMap := make(map[string]HostInfo, len(members)) - for _, member := range members { - newMembersMap[member.GetAddress()] = member - if _, ok := r.members.keys[member.GetAddress()]; !ok { - changed = true +func (r *ring) makeMembersMap(members []HostInfo) map[string]HostInfo { + membersMap := make(map[string]HostInfo, len(members)) + for _, m := range members { + membersMap[m.GetAddress()] = m + } + return membersMap +} + +func (r *ring) diffMembers(newMembers map[string]HostInfo) ChangedEvent { + r.members.RLock() + defer r.members.RUnlock() + + var combinedChange ChangedEvent + + // find newly added hosts + for addr := range newMembers { + if _, found := r.members.keys[addr]; !found { + combinedChange.HostsAdded = append(combinedChange.HostsAdded, addr) } } + // find removed hosts for addr := range r.members.keys { - if _, ok := newMembersMap[addr]; !ok { - changed = true - break + if _, found := newMembers[addr]; !found { + combinedChange.HostsRemoved = append(combinedChange.HostsRemoved, addr) } } - return newMembersMap, changed + + // order since it will most probably used in logs + slices.Sort(combinedChange.HostsAdded) + slices.Sort(combinedChange.HostsUpdated) + slices.Sort(combinedChange.HostsRemoved) + return combinedChange } func castToMembers[T membership.Member](members []T) []membership.Member { diff --git a/common/membership/hashring_test.go b/common/membership/hashring_test.go index f4f4b757877..222612855ac 100644 --- a/common/membership/hashring_test.go +++ b/common/membership/hashring_test.go @@ -31,6 +31,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.uber.org/goleak" "go.uber.org/zap/zaptest/observer" @@ -42,6 +43,8 @@ import ( var letters = []rune("abcdefghijklmnopqrstuvwxyz") +const maxTestDuration = 5 * time.Second + func randSeq(n int) string { b := make([]rune, n) for i := range b { @@ -58,46 +61,56 @@ func randomHostInfo(n int) []HostInfo { return res } -func testCompareMembers(t *testing.T, curr []HostInfo, new []HostInfo, hasDiff bool) { - hashring := &ring{} - currMembers := make(map[string]HostInfo, len(curr)) - for _, m := range curr { - currMembers[m.GetAddress()] = m - } - hashring.members.keys = currMembers - newMembers, changed := hashring.compareMembers(new) - assert.Equal(t, hasDiff, changed) - assert.Equal(t, len(new), len(newMembers)) - for _, m := range new { - _, ok := newMembers[m.GetAddress()] - assert.True(t, ok) - } -} - -func Test_ring_compareMembers(t *testing.T) { - +func TestDiffMemberMakesCorrectDiff(t *testing.T) { tests := []struct { - curr []HostInfo - new []HostInfo - hasDiff bool + name string + curr []HostInfo + new []HostInfo + expectedChange ChangedEvent }{ - {curr: []HostInfo{}, new: []HostInfo{NewHostInfo("a")}, hasDiff: true}, - {curr: []HostInfo{}, new: []HostInfo{NewHostInfo("a"), NewHostInfo("b")}, hasDiff: true}, - {curr: []HostInfo{NewHostInfo("a")}, new: []HostInfo{NewHostInfo("a"), NewHostInfo("b")}, hasDiff: true}, - {curr: []HostInfo{}, new: []HostInfo{}, hasDiff: false}, - {curr: []HostInfo{NewHostInfo("a")}, new: []HostInfo{NewHostInfo("a")}, hasDiff: false}, - // order doesn't matter. - {curr: []HostInfo{NewHostInfo("a"), NewHostInfo("b")}, new: []HostInfo{NewHostInfo("b"), NewHostInfo("a")}, hasDiff: false}, - // member has left the ring - {curr: []HostInfo{NewHostInfo("a"), NewHostInfo("b"), NewHostInfo("c")}, new: []HostInfo{NewHostInfo("b"), NewHostInfo("a")}, hasDiff: true}, - // ring becomes empty - {curr: []HostInfo{NewHostInfo("a"), NewHostInfo("b"), NewHostInfo("c")}, new: []HostInfo{}, hasDiff: true}, + { + name: "empty and one added", + curr: []HostInfo{}, + new: []HostInfo{NewHostInfo("a")}, + expectedChange: ChangedEvent{HostsAdded: []string{"a"}}, + }, + { + name: "non-empty and added", + curr: []HostInfo{NewHostInfo("a")}, + new: []HostInfo{NewHostInfo("a"), NewHostInfo("b")}, + expectedChange: ChangedEvent{HostsAdded: []string{"b"}}, + }, + { + name: "empty and nothing has changed", + curr: []HostInfo{}, + new: []HostInfo{}, + expectedChange: ChangedEvent{}, + }, + { + name: "multiple hosts, but no change", + curr: []HostInfo{NewHostInfo("a"), NewHostInfo("b"), NewHostInfo("c")}, + new: []HostInfo{NewHostInfo("c"), NewHostInfo("b"), NewHostInfo("a")}, + expectedChange: ChangedEvent{}, + }, + + { + name: "multiple hosts, add/delete", + curr: []HostInfo{NewHostInfo("a"), NewHostInfo("b"), NewHostInfo("c")}, + new: []HostInfo{NewHostInfo("b"), NewHostInfo("e"), NewHostInfo("f")}, + expectedChange: ChangedEvent{HostsRemoved: []string{"a", "c"}, HostsAdded: []string{"e", "f"}}, + }, } for _, tt := range tests { - testCompareMembers(t, tt.curr, tt.new, tt.hasDiff) - } + t.Run(tt.name, func(t *testing.T) { + r := &ring{} + currMembers := r.makeMembersMap(tt.curr) + r.members.keys = currMembers + combinedChange := r.diffMembers(r.makeMembersMap(tt.new)) + assert.Equal(t, tt.expectedChange, combinedChange) + }) + } } type hashringTestData struct { @@ -145,13 +158,21 @@ func (td *hashringTestData) startHashRing() { func TestFailedLookupWillAskProvider(t *testing.T) { td := newHashringTestData(t) + var wg sync.WaitGroup + wg.Add(2) td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1) - td.mockPeerProvider.EXPECT().GetMembers("test-service").Times(1) + td.mockPeerProvider.EXPECT().GetMembers("test-service"). + Do(func(string) { + // we expect first call on hashring creation + // the second call should be initiated by failed Lookup + wg.Done() + }).Times(2) td.startHashRing() _, err := td.hashRing.Lookup("a") - assert.Error(t, err) + + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "Failed Lookup should lead to refresh") } func TestRefreshUpdatesRingOnlyWhenRingHasChanged(t *testing.T) { @@ -159,15 +180,13 @@ func TestRefreshUpdatesRingOnlyWhenRingHasChanged(t *testing.T) { td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1) td.mockPeerProvider.EXPECT().GetMembers("test-service").Times(1).Return(randomHostInfo(3), nil) + td.mockPeerProvider.EXPECT().WhoAmI().AnyTimes() // Start will also call .refresh() td.startHashRing() updatedAt := td.hashRing.members.refreshed - td.hashRing.refresh() - refreshed, err := td.hashRing.refresh() - assert.NoError(t, err) - assert.False(t, refreshed) + assert.NoError(t, td.hashRing.refresh()) assert.Equal(t, updatedAt, td.hashRing.members.refreshed) } @@ -176,16 +195,11 @@ func TestRefreshWillNotifySubscribers(t *testing.T) { var hostsToReturn []HostInfo td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1) - td.mockPeerProvider.EXPECT().GetMembers("test-service").Times(2).DoAndReturn(func(service string) ([]HostInfo, error) { + td.mockPeerProvider.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) { hostsToReturn = randomHostInfo(5) return hostsToReturn, nil - }) - - changed := &ChangedEvent{ - HostsAdded: []string{"a"}, - HostsUpdated: []string{"b"}, - HostsRemoved: []string{"c"}, - } + }).Times(2) + td.mockPeerProvider.EXPECT().WhoAmI().AnyTimes() td.startHashRing() @@ -200,14 +214,17 @@ func TestRefreshWillNotifySubscribers(t *testing.T) { defer wg.Done() changedEvent := <-changeCh changedEvent2 := <-changeCh - assert.Equal(t, changed, changedEvent) - assert.Equal(t, changed, changedEvent2) + assert.NotEmpty(t, changedEvent, "changed event should never be empty") + assert.NotEmpty(t, changedEvent2, "changed event should never be empty") }() // to bypass internal check td.hashRing.members.refreshed = time.Now().AddDate(0, 0, -1) - td.hashRing.refreshChan <- changed - wg.Wait() // wait until both subscribers will get notification + td.hashRing.signalSelf() + + // wait until both subscribers will get notification + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration)) + // Test if internal members are updated assert.ElementsMatch(t, td.hashRing.Members(), hostsToReturn, "members should contain just-added nodes") } @@ -218,11 +235,11 @@ func TestSubscribersAreNotifiedPeriodically(t *testing.T) { var hostsToReturn []HostInfo td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1) - td.mockPeerProvider.EXPECT().GetMembers("test-service").Times(3).DoAndReturn(func(service string) ([]HostInfo, error) { + td.mockPeerProvider.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) { // we have to change members since subscribers are only notified on change hostsToReturn = randomHostInfo(5) return hostsToReturn, nil - }) + }).Times(2) td.mockPeerProvider.EXPECT().WhoAmI().AnyTimes() td.startHashRing() @@ -235,13 +252,13 @@ func TestSubscribersAreNotifiedPeriodically(t *testing.T) { go func() { defer wg.Done() event := <-changeCh - assert.Empty(t, event, "event should be empty when periodical update happens") + assert.NotEmpty(t, event, "changed event should never be empty") }() td.mockTimeSource.BlockUntil(1) // we should wait until ticker(defaultRefreshInterval) is created td.mockTimeSource.Advance(defaultRefreshInterval) // and only then to advance time - wg.Wait() // wait until subscriber will get notification + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration)) // wait until subscriber will get notification // Test if internal members are updated assert.ElementsMatch(t, td.hashRing.Members(), hostsToReturn, "members should contain just-added nodes") @@ -276,7 +293,6 @@ func TestUnsubcribeDeletes(t *testing.T) { assert.Equal(t, 1, len(td.hashRing.subscribers.keys)) assert.NoError(t, td.hashRing.Unsubscribe("testservice1")) assert.Equal(t, 0, len(td.hashRing.subscribers.keys)) - } func TestMemberCountReturnsNumber(t *testing.T) { @@ -296,10 +312,9 @@ func TestMemberCountReturnsNumber(t *testing.T) { func TestErrorIsPropagatedWhenProviderFails(t *testing.T) { td := newHashringTestData(t) - td.mockPeerProvider.EXPECT().GetMembers(gomock.Any()).Return(nil, errors.New("error")) + td.mockPeerProvider.EXPECT().GetMembers(gomock.Any()).Return(nil, errors.New("provider failure")) - _, err := td.hashRing.refresh() - assert.Error(t, err) + assert.ErrorContains(t, td.hashRing.refresh(), "provider failure") } func TestStopWillStopProvider(t *testing.T) { @@ -319,6 +334,7 @@ func TestLookupAndRefreshRaceCondition(t *testing.T) { td.mockPeerProvider.EXPECT().GetMembers("test-service").AnyTimes().DoAndReturn(func(service string) ([]HostInfo, error) { return randomHostInfo(5), nil }) + td.mockPeerProvider.EXPECT().WhoAmI().AnyTimes() td.startHashRing() wg.Add(2) @@ -332,8 +348,7 @@ func TestLookupAndRefreshRaceCondition(t *testing.T) { for i := 0; i < 50; i++ { // to bypass internal check td.hashRing.members.refreshed = time.Now().AddDate(0, 0, -1) - _, err := td.hashRing.refresh() - assert.NoError(t, err) + assert.NoError(t, td.hashRing.refresh()) } wg.Done() }() @@ -386,10 +401,11 @@ func TestEmitHashringView(t *testing.T) { td.mockPeerProvider.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) { return testInput.hosts, testInput.lookuperr }) - td.mockPeerProvider.EXPECT().WhoAmI().DoAndReturn(func() (HostInfo, error) { return testInput.selfInfo, testInput.selfErr - }) + }).AnyTimes() + + require.NoError(t, td.hashRing.refresh()) assert.Equal(t, testInput.expectedResult, td.hashRing.emitHashIdentifier()) }) } diff --git a/common/membership/peerprovider_mock.go b/common/membership/peerprovider_mock.go index d7b48deb405..dc6c6d53c3f 100644 --- a/common/membership/peerprovider_mock.go +++ b/common/membership/peerprovider_mock.go @@ -109,17 +109,17 @@ func (mr *MockPeerProviderMockRecorder) Stop() *gomock.Call { } // Subscribe mocks base method. -func (m *MockPeerProvider) Subscribe(name string, notifyChannel chan<- *ChangedEvent) error { +func (m *MockPeerProvider) Subscribe(name string, handler func(ChangedEvent)) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Subscribe", name, notifyChannel) + ret := m.ctrl.Call(m, "Subscribe", name, handler) ret0, _ := ret[0].(error) return ret0 } // Subscribe indicates an expected call of Subscribe. -func (mr *MockPeerProviderMockRecorder) Subscribe(name, notifyChannel interface{}) *gomock.Call { +func (mr *MockPeerProviderMockRecorder) Subscribe(name, handler interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockPeerProvider)(nil).Subscribe), name, notifyChannel) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockPeerProvider)(nil).Subscribe), name, handler) } // WhoAmI mocks base method. diff --git a/common/membership/resolver.go b/common/membership/resolver.go index 86e237012ff..96443c7d493 100644 --- a/common/membership/resolver.go +++ b/common/membership/resolver.go @@ -233,3 +233,7 @@ func (rpo *MultiringResolver) MemberCount(service string) (int, error) { } return ring.MemberCount(), nil } + +func (ce *ChangedEvent) Empty() bool { + return len(ce.HostsAdded) == 0 && len(ce.HostsUpdated) == 0 && len(ce.HostsRemoved) == 0 +} diff --git a/common/membership/resolver_test.go b/common/membership/resolver_test.go index 794f621e507..810a5045190 100644 --- a/common/membership/resolver_test.go +++ b/common/membership/resolver_test.go @@ -65,6 +65,7 @@ func TestMethodsAreRoutedToARing(t *testing.T) { } pp.EXPECT().GetMembers("test-worker").Return(hosts, nil).Times(1) + pp.EXPECT().WhoAmI().AnyTimes() r, err := a.getRing("test-worker") r.refresh() diff --git a/common/peerprovider/ringpopprovider/provider.go b/common/peerprovider/ringpopprovider/provider.go index 1a21f1ecd5c..050dda3c546 100644 --- a/common/peerprovider/ringpopprovider/provider.go +++ b/common/peerprovider/ringpopprovider/provider.go @@ -52,7 +52,7 @@ type ( logger log.Logger portmap membership.PortMap mu sync.RWMutex - subscribers map[string]chan<- *membership.ChangedEvent + subscribers map[string]func(membership.ChangedEvent) } ) @@ -118,7 +118,7 @@ func NewRingpopProvider( logger: logger, portmap: portMap, ringpop: rp, - subscribers: map[string]chan<- *membership.ChangedEvent{}, + subscribers: map[string]func(membership.ChangedEvent){}, } } @@ -165,26 +165,22 @@ func (r *Provider) HandleEvent(event events.Event) { return } - r.logger.Info("Received a ringpop ring changed event") - // Marshall the event object into the required type - change := &membership.ChangedEvent{ + change := membership.ChangedEvent{ HostsAdded: e.ServersAdded, HostsUpdated: e.ServersUpdated, HostsRemoved: e.ServersRemoved, } + r.logger.Info("Received a ringpop ring changed event", tag.MembershipChangeEvent(change)) + r.notifySubscribers(change) +} - // Notify subscribers +func (r *Provider) notifySubscribers(event membership.ChangedEvent) { r.mu.RLock() defer r.mu.RUnlock() - for name, ch := range r.subscribers { - select { - case ch <- change: - default: - r.logger.Error("Failed to send listener notification, channel full", tag.Subscriber(name)) - } + for _, handler := range r.subscribers { + handler(event) } - } func (r *Provider) SelfEvict() error { @@ -288,7 +284,7 @@ func (r *Provider) Stop() { } // Subscribe allows to be subscribed for ring changes -func (r *Provider) Subscribe(name string, notifyChannel chan<- *membership.ChangedEvent) error { +func (r *Provider) Subscribe(name string, handler func(membership.ChangedEvent)) error { r.mu.Lock() defer r.mu.Unlock() @@ -297,7 +293,7 @@ func (r *Provider) Subscribe(name string, notifyChannel chan<- *membership.Chang return fmt.Errorf("%q already subscribed to ringpop provider", name) } - r.subscribers[name] = notifyChannel + r.subscribers[name] = handler return nil } From 8d31c74c758ef52b6ca5197a4fc1cb48aaf68035 Mon Sep 17 00:00:00 2001 From: Jan Kisel Date: Thu, 26 Sep 2024 15:36:41 +0200 Subject: [PATCH 2/2] Adding more tests to ringpop provider Also fixed race-condition: we were waiting for ring to stop, but in order to read stop-channel it sometimes had to finish notifying subscribers which took the same lock. We need to be careful with lock-scope. --- common/membership/hashring.go | 4 +- common/membership/hashring_test.go | 112 +++++++++++++++++- .../ringpopprovider/provider_test.go | 49 ++++++++ 3 files changed, 159 insertions(+), 6 deletions(-) diff --git a/common/membership/hashring.go b/common/membership/hashring.go index 94da372bcf4..6eb1f77fba2 100644 --- a/common/membership/hashring.go +++ b/common/membership/hashring.go @@ -151,8 +151,8 @@ func (r *ring) Stop() { r.value.Store(emptyHashring()) r.subscribers.Lock() - defer r.subscribers.Unlock() r.subscribers.keys = make(map[string]chan<- *ChangedEvent) + r.subscribers.Unlock() close(r.shutdownCh) if success := common.AwaitWaitGroup(&r.shutdownWG, time.Minute); !success { @@ -302,7 +302,7 @@ func (r *ring) refreshRingWorker() { return case <-r.refreshChan: // local signal or signal from provider if err := r.refresh(); err != nil { - r.logger.Error("refreshing ring", tag.Error(err)) + r.logger.Error("failed to refresh ring", tag.Error(err)) } case <-refreshTicker.Chan(): // periodically force refreshing membership r.signalSelf() diff --git a/common/membership/hashring_test.go b/common/membership/hashring_test.go index 222612855ac..507715c4dc6 100644 --- a/common/membership/hashring_test.go +++ b/common/membership/hashring_test.go @@ -24,19 +24,23 @@ package membership import ( "errors" + "fmt" "math/rand" + "runtime" "sync" "testing" "time" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/goleak" "go.uber.org/zap/zaptest/observer" "github.com/uber/cadence/common" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/metrics" ) @@ -155,6 +159,10 @@ func (td *hashringTestData) startHashRing() { td.hashRing.Start() } +func (td *hashringTestData) bypassRefreshRatelimiter() { + td.hashRing.members.refreshed = time.Now().AddDate(0, 0, -1) +} + func TestFailedLookupWillAskProvider(t *testing.T) { td := newHashringTestData(t) @@ -175,6 +183,104 @@ func TestFailedLookupWillAskProvider(t *testing.T) { require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "Failed Lookup should lead to refresh") } +func TestFailingToSubscribeIsFatal(t *testing.T) { + defer goleak.VerifyNone(t) + td := newHashringTestData(t) + + // we need to intercept logger calls, use mock + mockLogger := &log.MockLogger{} + td.hashRing.logger = mockLogger + + mockLogger.On("Fatal", mock.Anything, mock.Anything).Run( + func(arguments mock.Arguments) { + // we need to stop goroutine like log.Fatal() does with an entire program + runtime.Goexit() + }, + ).Times(1) + + td.mockPeerProvider.EXPECT(). + Subscribe(gomock.Any(), gomock.Any()). + Return(errors.New("can't subscribe")) + + // because we use runtime.Goexit() we need to call .Start in a separate goroutine + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + td.hashRing.Start() + }() + + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "must be finished - failed to subscribe") + require.True(t, mockLogger.AssertExpectations(t), "log.Fatal must be called") +} + +func TestHandleUpdatesNeverBlocks(t *testing.T) { + td := newHashringTestData(t) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + td.hashRing.handleUpdates(ChangedEvent{}) + wg.Done() + }() + } + + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "handleUpdates should never block") +} + +func TestHandlerSchedulesUpdates(t *testing.T) { + td := newHashringTestData(t) + + var wg sync.WaitGroup + td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1) + td.mockPeerProvider.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) { + wg.Done() + fmt.Println("GetMembers called") + return randomHostInfo(5), nil + }).Times(2) + td.mockPeerProvider.EXPECT().WhoAmI().AnyTimes() + + wg.Add(1) // we expect 1st GetMembers to be called during hashring start + td.startHashRing() + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "GetMembers must be called") + + wg.Add(1) // another call to GetMembers should happen because of handleUpdate + td.bypassRefreshRatelimiter() + td.hashRing.handleUpdates(ChangedEvent{}) + + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "GetMembers must be called again") +} + +func TestFailedRefreshLogsError(t *testing.T) { + td := newHashringTestData(t) + + var wg sync.WaitGroup + td.mockPeerProvider.EXPECT().Subscribe(gomock.Any(), gomock.Any()).Times(1) + td.mockPeerProvider.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) { + wg.Done() + return randomHostInfo(5), nil + }).Times(1) + td.mockPeerProvider.EXPECT().WhoAmI().AnyTimes() + + wg.Add(1) // we expect 1st GetMembers to be called during hashring start + td.startHashRing() + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "GetMembers must be called") + + td.mockPeerProvider.EXPECT().GetMembers("test-service").DoAndReturn(func(service string) ([]HostInfo, error) { + wg.Done() + return nil, errors.New("GetMembers failed") + }).Times(1) + + wg.Add(1) // another call to GetMembers should happen because of handleUpdate + td.bypassRefreshRatelimiter() + td.hashRing.handleUpdates(ChangedEvent{}) + + require.True(t, common.AwaitWaitGroup(&wg, maxTestDuration), "GetMembers must be called again (and fail)") + td.hashRing.Stop() + assert.Equal(t, 1, td.observedLogs.FilterMessageSnippet("failed to refresh ring").Len()) +} + func TestRefreshUpdatesRingOnlyWhenRingHasChanged(t *testing.T) { td := newHashringTestData(t) @@ -218,8 +324,7 @@ func TestRefreshWillNotifySubscribers(t *testing.T) { assert.NotEmpty(t, changedEvent2, "changed event should never be empty") }() - // to bypass internal check - td.hashRing.members.refreshed = time.Now().AddDate(0, 0, -1) + td.bypassRefreshRatelimiter() td.hashRing.signalSelf() // wait until both subscribers will get notification @@ -346,8 +451,7 @@ func TestLookupAndRefreshRaceCondition(t *testing.T) { }() go func() { for i := 0; i < 50; i++ { - // to bypass internal check - td.hashRing.members.refreshed = time.Now().AddDate(0, 0, -1) + td.bypassRefreshRatelimiter() assert.NoError(t, td.hashRing.refresh()) } wg.Done() diff --git a/common/peerprovider/ringpopprovider/provider_test.go b/common/peerprovider/ringpopprovider/provider_test.go index 9f10ff3a7c4..ba45742fa45 100644 --- a/common/peerprovider/ringpopprovider/provider_test.go +++ b/common/peerprovider/ringpopprovider/provider_test.go @@ -27,6 +27,9 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/ringpop-go/events" "github.com/uber/tchannel-go" "go.uber.org/goleak" @@ -34,6 +37,8 @@ import ( "github.com/uber/cadence/common/membership" ) +const testServiceName = "test-service" + type srvAndCh struct { service string ch *tchannel.Channel @@ -136,6 +141,50 @@ func TestRingpopProvider(t *testing.T) { } } +func TestSubscribeAndNotify(t *testing.T) { + provider := NewRingpopProvider(testServiceName, nil, nil, nil, testlogger.New(t)) + + ringpopEvent := events.RingChangedEvent{ + ServersAdded: []string{"aa", "bb", "cc"}, + ServersUpdated: []string{"dd"}, + ServersRemoved: []string{"ee", "ff"}, + } + expectedEvent := membership.ChangedEvent{ + HostsAdded: ringpopEvent.ServersAdded, + HostsUpdated: ringpopEvent.ServersUpdated, + HostsRemoved: ringpopEvent.ServersRemoved, + } + + var calls1, calls2 int + require.NoError(t, + provider.Subscribe("subscriber1", + func(ev membership.ChangedEvent) { + calls1++ + assert.Equal(t, ev, expectedEvent) + }, + )) + + require.NoError(t, + provider.Subscribe("subscriber2", + func(ev membership.ChangedEvent) { + calls2++ + assert.Equal(t, ev, expectedEvent) + }, + )) + + require.Error(t, + provider.Subscribe( + "subscriber2", + func(membership.ChangedEvent) { t.Error("Should never be called") }, + ), + "Subscribe doesn't allow duplicate names", + ) + + provider.HandleEvent(ringpopEvent) + assert.Equal(t, 1, calls1, "every subscriber must have been called once") + assert.Equal(t, 1, calls2, "every subscriber must have been called once") +} + func createAndListenChannels(serviceName string, n int) ([]*srvAndCh, func(), error) { var res []*srvAndCh cleanupFn := func(srvs []*srvAndCh) func() {