Skip to content

Commit

Permalink
make use of ringpop LookN method
Browse files Browse the repository at this point in the history
  • Loading branch information
ast2023 committed Nov 10, 2023
1 parent 1da93ba commit 4fc6b21
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 35 deletions.
3 changes: 3 additions & 0 deletions common/membership/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ type (
ServiceResolver interface {
// Lookup looks up the host that currently owns the resource identified by the given key.
Lookup(key string) (HostInfo, error)
// LookupN looks n hosts that owns the resource identified by the given key, if n greater than total number
// of hosts total number of hosts will be returned
LookupN(key string, n int) []HostInfo
// AddListener adds a listener which will get notified on the given channel whenever membership changes.
AddListener(name string, notifyChannel chan<- *ChangedEvent) error
// RemoveListener removes a listener for this service.
Expand Down
14 changes: 14 additions & 0 deletions common/membership/interfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions common/membership/ringpop/service_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/primitives"
"go.temporal.io/server/common/util"
)

const (
Expand Down Expand Up @@ -154,6 +155,15 @@ func (r *serviceResolver) Lookup(key string) (membership.HostInfo, error) {
return newHostInfo(addr, r.getLabelsMap()), nil
}

func (r *serviceResolver) LookupN(key string, n int) []membership.HostInfo {
addresses := r.ring().LookupN(key, n)
if len(addresses) == 0 {
r.RequestRefresh()
return []membership.HostInfo{}
}
return util.MapSlice(addresses, membership.NewHostInfoFromAddress)
}

func (r *serviceResolver) AddListener(
name string,
notifyChannel chan<- *membership.ChangedEvent,
Expand Down
13 changes: 11 additions & 2 deletions common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ func MapConcurrent[IN any, OUT any](input []IN, mapper func(IN) (OUT, error)) ([
return results, nil
}

// MapSlice given slice xs []T and f(T) S produces slice []S by applying f to every element of xs
func MapSlice[T, S any](xs []T, f func(T) S) []S {
var result []S
for _, s := range xs {
result = append(result, f(s))
}
return result
}

// FilterSlice iterates over elements of a slice, returning a new slice of all elements predicate returns true for.
func FilterSlice[T any](in []T, predicate func(T) bool) []T {
var out []T
Expand All @@ -128,8 +137,8 @@ func FilterSlice[T any](in []T, predicate func(T) bool) []T {
return out
}

// ReduceSlice reduces a slice using given reducer function and initial value.
func ReduceSlice[T any, A any](in []T, initializer A, reducer func(A, T) A) A {
// FoldSlice folds left a slice using given reducer function and initial value.
func FoldSlice[T any, A any](in []T, initializer A, reducer func(A, T) A) A {
acc := initializer
for _, val := range in {
acc = reducer(acc, val)
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/task_reachability.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (wh *WorkflowHandler) getTaskQueueReachability(ctx context.Context, request
reachableByNewWorkflows = true
} else {
// If the queue became versioned just recently, consider the unversioned build id reachable.
queueBecameVersionedAt := util.ReduceSlice(versionSets, hlc.Clock{WallClock: math.MaxInt64}, func(c hlc.Clock, set *persistencespb.CompatibleVersionSet) hlc.Clock {
queueBecameVersionedAt := util.FoldSlice(versionSets, hlc.Clock{WallClock: math.MaxInt64}, func(c hlc.Clock, set *persistencespb.CompatibleVersionSet) hlc.Clock {
return hlc.Min(c, *set.BecameDefaultTimestamp)
})
reachableByNewWorkflows = time.Since(hlc.UTC(queueBecameVersionedAt)) < wh.config.ReachabilityQuerySetDurationSinceDefault()
Expand Down
25 changes: 11 additions & 14 deletions service/worker/pernamespaceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,21 +262,18 @@ func (wm *perNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) {

func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespace) (int, int, error) {
totalWorkers := wm.config.PerNamespaceWorkerCount(ns.Name().String())
// This can result in fewer than the intended number of workers if totalWorkers > 1, because
// multiple lookups might land on the same node. To compensate, we increase the number of
// pollers in that case, but it would be better to try to spread them across our nodes.
// TODO: implement this properly using LookupN in serviceResolver
multiplicity := 0
for i := 0; i < totalWorkers; i++ {
key := fmt.Sprintf("%s/%d", ns.ID().String(), i)
target, err := wm.serviceResolver.Lookup(key)
if err != nil {
return 0, 0, err
key := ns.ID().String()
targets := wm.serviceResolver.LookupN(key, totalWorkers)
if len(targets) == 0 {
return 0, 0, membership.ErrInsufficientHosts
}
IsLocal := func(info membership.HostInfo) bool { return info.Identity() == wm.self.Identity() }
multiplicity := util.FoldSlice(targets, 0, func(acc int, t membership.HostInfo) int {
if IsLocal(t) {
acc++
}
if target.Identity() == wm.self.Identity() {
multiplicity++
}
}
return acc
})
return multiplicity, totalWorkers, nil
}

Expand Down
51 changes: 33 additions & 18 deletions service/worker/pernamespaceworker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (s *perNsWorkerManagerSuite) TestEnabledButResolvedToOther() {
Enabled: false,
}).AnyTimes()

s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("other1"), nil)
s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other1")})

s.manager.namespaceCallback(ns, false)
// main work happens in a goroutine
Expand All @@ -176,7 +176,7 @@ func (s *perNsWorkerManagerSuite) TestEnabled() {
Enabled: false,
}).AnyTimes()

s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil)
s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")})
cli1 := mocksdk.NewMockClient(s.controller)
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
Expand All @@ -202,9 +202,11 @@ func (s *perNsWorkerManagerSuite) TestMultiplicity() {
Enabled: false,
}).AnyTimes()

s.serviceResolver.EXPECT().Lookup("ns3/0").Return(membership.NewHostInfoFromAddress("self"), nil)
s.serviceResolver.EXPECT().Lookup("ns3/1").Return(membership.NewHostInfoFromAddress("other"), nil)
s.serviceResolver.EXPECT().Lookup("ns3/2").Return(membership.NewHostInfoFromAddress("self"), nil)
s.serviceResolver.EXPECT().LookupN("ns3", 3).Return([]membership.HostInfo{
membership.NewHostInfoFromAddress("self"),
membership.NewHostInfoFromAddress("other"),
membership.NewHostInfoFromAddress("self"),
})
cli1 := mocksdk.NewMockClient(s.controller)
s.cfactory.EXPECT().NewClient(matchOptions("ns3")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
Expand Down Expand Up @@ -237,7 +239,18 @@ func (s *perNsWorkerManagerSuite) TestOptions() {
Enabled: false,
}).AnyTimes()

s.serviceResolver.EXPECT().Lookup(gomock.Any()).Return(membership.NewHostInfoFromAddress("self"), nil).AnyTimes()
s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{
membership.NewHostInfoFromAddress("self"),
}).AnyTimes()
s.serviceResolver.EXPECT().LookupN("ns2", 2).Return([]membership.HostInfo{
membership.NewHostInfoFromAddress("self"),
membership.NewHostInfoFromAddress("self"),
}).AnyTimes()
s.serviceResolver.EXPECT().LookupN("ns3", 3).Return([]membership.HostInfo{
membership.NewHostInfoFromAddress("self"),
membership.NewHostInfoFromAddress("self"),
membership.NewHostInfoFromAddress("self"),
}).AnyTimes()
cli1 := mocksdk.NewMockClient(s.controller)
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
cli2 := mocksdk.NewMockClient(s.controller)
Expand Down Expand Up @@ -288,9 +301,11 @@ func (s *perNsWorkerManagerSuite) TestTwoNamespacesTwoComponents() {
return &workercommon.PerNSDedicatedWorkerOptions{Enabled: ns.Name().String() == "ns1"}
}).AnyTimes()

s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil)
s.serviceResolver.EXPECT().Lookup("ns2/0").Return(membership.NewHostInfoFromAddress("self"), nil)
s.serviceResolver.EXPECT().Lookup("ns2/1").Return(membership.NewHostInfoFromAddress("self"), nil)
s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")})
s.serviceResolver.EXPECT().LookupN("ns2", 2).Return([]membership.HostInfo{
membership.NewHostInfoFromAddress("self"),
membership.NewHostInfoFromAddress("self"),
})

cli1 := mocksdk.NewMockClient(s.controller)
cli2 := mocksdk.NewMockClient(s.controller)
Expand Down Expand Up @@ -331,7 +346,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() {
Enabled: false,
}).AnyTimes()

s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil)
s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")})
cli1 := mocksdk.NewMockClient(s.controller)
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
Expand All @@ -351,7 +366,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() {

// restore it
nsRestored := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED)
s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil)
s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")})
cli2 := mocksdk.NewMockClient(s.controller)
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli2)
wkr2 := mocksdk.NewMockWorker(s.controller)
Expand Down Expand Up @@ -380,13 +395,13 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() {
}).AnyTimes()

// we don't own it at first
s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("other"), nil)
s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other")})

s.manager.namespaceCallback(ns, false)
time.Sleep(50 * time.Millisecond)

// now we own it
s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil)
s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")})
cli1 := mocksdk.NewMockClient(s.controller)
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
wkr1 := mocksdk.NewMockWorker(s.controller)
Expand All @@ -398,7 +413,7 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() {
time.Sleep(50 * time.Millisecond)

// now we don't own it anymore
s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("other"), nil)
s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other")})
wkr1.EXPECT().Stop()
cli1.EXPECT().Close()

Expand All @@ -416,9 +431,9 @@ func (s *perNsWorkerManagerSuite) TestServiceResolverError() {
Enabled: false,
}).AnyTimes()

s.serviceResolver.EXPECT().Lookup("ns1/0").Return(nil, errors.New("resolver error"))
s.serviceResolver.EXPECT().Lookup("ns1/0").Return(nil, errors.New("resolver error again"))
s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil)
s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{})
s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{})
s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")})

cli1 := mocksdk.NewMockClient(s.controller)
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
Expand All @@ -445,7 +460,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() {
Enabled: false,
}).AnyTimes()

s.serviceResolver.EXPECT().Lookup("ns1/0").Return(membership.NewHostInfoFromAddress("self"), nil).AnyTimes()
s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}).AnyTimes()

cli1 := mocksdk.NewMockClient(s.controller)
s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1)
Expand Down
8 changes: 8 additions & 0 deletions tests/simple_service_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ func (s *simpleResolver) Lookup(key string) (membership.HostInfo, error) {
return s.hostInfos[idx], nil
}

func (s *simpleResolver) LookupN(key string, _ int) []membership.HostInfo {
info, err := s.Lookup(key)
if err != nil {
return []membership.HostInfo{}
}
return []membership.HostInfo{info}
}

func (s *simpleResolver) AddListener(name string, notifyChannel chan<- *membership.ChangedEvent) error {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down

0 comments on commit 4fc6b21

Please sign in to comment.