From 4fc6b2175963257af62bbb57d199b2d0844eab83 Mon Sep 17 00:00:00 2001 From: sasha Date: Thu, 9 Nov 2023 16:55:49 -0800 Subject: [PATCH] make use of ringpop LookN method --- common/membership/interfaces.go | 3 ++ common/membership/interfaces_mock.go | 14 +++++ common/membership/ringpop/service_resolver.go | 10 ++++ common/util/util.go | 13 ++++- service/frontend/task_reachability.go | 2 +- service/worker/pernamespaceworker.go | 25 ++++----- service/worker/pernamespaceworker_test.go | 51 ++++++++++++------- tests/simple_service_resolver.go | 8 +++ 8 files changed, 91 insertions(+), 35 deletions(-) diff --git a/common/membership/interfaces.go b/common/membership/interfaces.go index a1a225039dd9..66d93d1b580f 100644 --- a/common/membership/interfaces.go +++ b/common/membership/interfaces.go @@ -82,6 +82,9 @@ type ( ServiceResolver interface { // Lookup looks up the host that currently owns the resource identified by the given key. Lookup(key string) (HostInfo, error) + // LookupN looks n hosts that owns the resource identified by the given key, if n greater than total number + // of hosts total number of hosts will be returned + LookupN(key string, n int) []HostInfo // AddListener adds a listener which will get notified on the given channel whenever membership changes. AddListener(name string, notifyChannel chan<- *ChangedEvent) error // RemoveListener removes a listener for this service. diff --git a/common/membership/interfaces_mock.go b/common/membership/interfaces_mock.go index a64f44767436..a7c8811010df 100644 --- a/common/membership/interfaces_mock.go +++ b/common/membership/interfaces_mock.go @@ -181,6 +181,20 @@ func (mr *MockServiceResolverMockRecorder) Lookup(key interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lookup", reflect.TypeOf((*MockServiceResolver)(nil).Lookup), key) } +// LookupN mocks base method. +func (m *MockServiceResolver) LookupN(key string, n int) []HostInfo { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LookupN", key, n) + ret0, _ := ret[0].([]HostInfo) + return ret0 +} + +// LookupN indicates an expected call of LookupN. +func (mr *MockServiceResolverMockRecorder) LookupN(key, n interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LookupN", reflect.TypeOf((*MockServiceResolver)(nil).LookupN), key, n) +} + // MemberCount mocks base method. func (m *MockServiceResolver) MemberCount() int { m.ctrl.T.Helper() diff --git a/common/membership/ringpop/service_resolver.go b/common/membership/ringpop/service_resolver.go index f24c25b45bb0..f3bd1b854289 100644 --- a/common/membership/ringpop/service_resolver.go +++ b/common/membership/ringpop/service_resolver.go @@ -45,6 +45,7 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/membership" "go.temporal.io/server/common/primitives" + "go.temporal.io/server/common/util" ) const ( @@ -154,6 +155,15 @@ func (r *serviceResolver) Lookup(key string) (membership.HostInfo, error) { return newHostInfo(addr, r.getLabelsMap()), nil } +func (r *serviceResolver) LookupN(key string, n int) []membership.HostInfo { + addresses := r.ring().LookupN(key, n) + if len(addresses) == 0 { + r.RequestRefresh() + return []membership.HostInfo{} + } + return util.MapSlice(addresses, membership.NewHostInfoFromAddress) +} + func (r *serviceResolver) AddListener( name string, notifyChannel chan<- *membership.ChangedEvent, diff --git a/common/util/util.go b/common/util/util.go index e3375717b520..d5fead52d4da 100644 --- a/common/util/util.go +++ b/common/util/util.go @@ -117,6 +117,15 @@ func MapConcurrent[IN any, OUT any](input []IN, mapper func(IN) (OUT, error)) ([ return results, nil } +// MapSlice given slice xs []T and f(T) S produces slice []S by applying f to every element of xs +func MapSlice[T, S any](xs []T, f func(T) S) []S { + var result []S + for _, s := range xs { + result = append(result, f(s)) + } + return result +} + // FilterSlice iterates over elements of a slice, returning a new slice of all elements predicate returns true for. func FilterSlice[T any](in []T, predicate func(T) bool) []T { var out []T @@ -128,8 +137,8 @@ func FilterSlice[T any](in []T, predicate func(T) bool) []T { return out } -// ReduceSlice reduces a slice using given reducer function and initial value. -func ReduceSlice[T any, A any](in []T, initializer A, reducer func(A, T) A) A { +// FoldSlice folds left a slice using given reducer function and initial value. +func FoldSlice[T any, A any](in []T, initializer A, reducer func(A, T) A) A { acc := initializer for _, val := range in { acc = reducer(acc, val) diff --git a/service/frontend/task_reachability.go b/service/frontend/task_reachability.go index ec40b380c9c9..26b885cf9d65 100644 --- a/service/frontend/task_reachability.go +++ b/service/frontend/task_reachability.go @@ -187,7 +187,7 @@ func (wh *WorkflowHandler) getTaskQueueReachability(ctx context.Context, request reachableByNewWorkflows = true } else { // If the queue became versioned just recently, consider the unversioned build id reachable. - queueBecameVersionedAt := util.ReduceSlice(versionSets, hlc.Clock{WallClock: math.MaxInt64}, func(c hlc.Clock, set *persistencespb.CompatibleVersionSet) hlc.Clock { + queueBecameVersionedAt := util.FoldSlice(versionSets, hlc.Clock{WallClock: math.MaxInt64}, func(c hlc.Clock, set *persistencespb.CompatibleVersionSet) hlc.Clock { return hlc.Min(c, *set.BecameDefaultTimestamp) }) reachableByNewWorkflows = time.Since(hlc.UTC(queueBecameVersionedAt)) < wh.config.ReachabilityQuerySetDurationSinceDefault() diff --git a/service/worker/pernamespaceworker.go b/service/worker/pernamespaceworker.go index f163e9c227be..f9221c71a27c 100644 --- a/service/worker/pernamespaceworker.go +++ b/service/worker/pernamespaceworker.go @@ -262,21 +262,18 @@ func (wm *perNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) { func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespace) (int, int, error) { totalWorkers := wm.config.PerNamespaceWorkerCount(ns.Name().String()) - // This can result in fewer than the intended number of workers if totalWorkers > 1, because - // multiple lookups might land on the same node. To compensate, we increase the number of - // pollers in that case, but it would be better to try to spread them across our nodes. - // TODO: implement this properly using LookupN in serviceResolver - multiplicity := 0 - for i := 0; i < totalWorkers; i++ { - key := fmt.Sprintf("%s/%d", ns.ID().String(), i) - target, err := wm.serviceResolver.Lookup(key) - if err != nil { - return 0, 0, err + key := ns.ID().String() + targets := wm.serviceResolver.LookupN(key, totalWorkers) + if len(targets) == 0 { + return 0, 0, membership.ErrInsufficientHosts + } + IsLocal := func(info membership.HostInfo) bool { return info.Identity() == wm.self.Identity() } + multiplicity := util.FoldSlice(targets, 0, func(acc int, t membership.HostInfo) int { + if IsLocal(t) { + acc++ } - if target.Identity() == wm.self.Identity() { - multiplicity++ - } - } + return acc + }) return multiplicity, totalWorkers, nil } diff --git a/service/worker/pernamespaceworker_test.go b/service/worker/pernamespaceworker_test.go index 14cf0ea04b81..7fa8d25d2130 100644 --- a/service/worker/pernamespaceworker_test.go +++ b/service/worker/pernamespaceworker_test.go @@ -159,7 +159,7 @@ func (s *perNsWorkerManagerSuite) TestEnabledButResolvedToOther() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("other1"), nil) + s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other1")}) s.manager.namespaceCallback(ns, false) // main work happens in a goroutine @@ -176,7 +176,7 @@ func (s *perNsWorkerManagerSuite) TestEnabled() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil) + s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -202,9 +202,11 @@ func (s *perNsWorkerManagerSuite) TestMultiplicity() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns3/0").Return(membership.NewHostInfoFromAddress("self"), nil) - s.serviceResolver.EXPECT().Lookup("ns3/1").Return(membership.NewHostInfoFromAddress("other"), nil) - s.serviceResolver.EXPECT().Lookup("ns3/2").Return(membership.NewHostInfoFromAddress("self"), nil) + s.serviceResolver.EXPECT().LookupN("ns3", 3).Return([]membership.HostInfo{ + membership.NewHostInfoFromAddress("self"), + membership.NewHostInfoFromAddress("other"), + membership.NewHostInfoFromAddress("self"), + }) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns3")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -237,7 +239,18 @@ func (s *perNsWorkerManagerSuite) TestOptions() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup(gomock.Any()).Return(membership.NewHostInfoFromAddress("self"), nil).AnyTimes() + s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{ + membership.NewHostInfoFromAddress("self"), + }).AnyTimes() + s.serviceResolver.EXPECT().LookupN("ns2", 2).Return([]membership.HostInfo{ + membership.NewHostInfoFromAddress("self"), + membership.NewHostInfoFromAddress("self"), + }).AnyTimes() + s.serviceResolver.EXPECT().LookupN("ns3", 3).Return([]membership.HostInfo{ + membership.NewHostInfoFromAddress("self"), + membership.NewHostInfoFromAddress("self"), + membership.NewHostInfoFromAddress("self"), + }).AnyTimes() cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) cli2 := mocksdk.NewMockClient(s.controller) @@ -288,9 +301,11 @@ func (s *perNsWorkerManagerSuite) TestTwoNamespacesTwoComponents() { return &workercommon.PerNSDedicatedWorkerOptions{Enabled: ns.Name().String() == "ns1"} }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil) - s.serviceResolver.EXPECT().Lookup("ns2/0").Return(membership.NewHostInfoFromAddress("self"), nil) - s.serviceResolver.EXPECT().Lookup("ns2/1").Return(membership.NewHostInfoFromAddress("self"), nil) + s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) + s.serviceResolver.EXPECT().LookupN("ns2", 2).Return([]membership.HostInfo{ + membership.NewHostInfoFromAddress("self"), + membership.NewHostInfoFromAddress("self"), + }) cli1 := mocksdk.NewMockClient(s.controller) cli2 := mocksdk.NewMockClient(s.controller) @@ -331,7 +346,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil) + s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -351,7 +366,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() { // restore it nsRestored := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil) + s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli2 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli2) wkr2 := mocksdk.NewMockWorker(s.controller) @@ -380,13 +395,13 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() { }).AnyTimes() // we don't own it at first - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("other"), nil) + s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other")}) s.manager.namespaceCallback(ns, false) time.Sleep(50 * time.Millisecond) // now we own it - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil) + s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -398,7 +413,7 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() { time.Sleep(50 * time.Millisecond) // now we don't own it anymore - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("other"), nil) + s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other")}) wkr1.EXPECT().Stop() cli1.EXPECT().Close() @@ -416,9 +431,9 @@ func (s *perNsWorkerManagerSuite) TestServiceResolverError() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(nil, errors.New("resolver error")) - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(nil, errors.New("resolver error again")) - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil) + s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{}) + s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{}) + s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) @@ -445,7 +460,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil).AnyTimes() + s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}).AnyTimes() cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) diff --git a/tests/simple_service_resolver.go b/tests/simple_service_resolver.go index ac0d15c78815..b894d4f336f3 100644 --- a/tests/simple_service_resolver.go +++ b/tests/simple_service_resolver.go @@ -88,6 +88,14 @@ func (s *simpleResolver) Lookup(key string) (membership.HostInfo, error) { return s.hostInfos[idx], nil } +func (s *simpleResolver) LookupN(key string, _ int) []membership.HostInfo { + info, err := s.Lookup(key) + if err != nil { + return []membership.HostInfo{} + } + return []membership.HostInfo{info} +} + func (s *simpleResolver) AddListener(name string, notifyChannel chan<- *membership.ChangedEvent) error { s.mu.Lock() defer s.mu.Unlock()