diff --git a/common/dynamicconfig/collection.go b/common/dynamicconfig/collection.go index e3980b807e3d..20f1f89d989e 100644 --- a/common/dynamicconfig/collection.go +++ b/common/dynamicconfig/collection.go @@ -92,6 +92,9 @@ const ( errCountLogThreshold = 1000 ) +// ErrInvalidConfiguration indicates that the value provided by dynamic config is not legal +var ErrInvalidConfiguration = errors.New("invalid dynamic configuration") + var ( errKeyNotPresent = errors.New("key not present") errNoMatchingConstraint = errors.New("no matching constraint in key") diff --git a/common/membership/interfaces.go b/common/membership/interfaces.go index 66d93d1b580f..4c88278d613e 100644 --- a/common/membership/interfaces.go +++ b/common/membership/interfaces.go @@ -84,7 +84,7 @@ type ( Lookup(key string) (HostInfo, error) // LookupN looks n hosts that owns the resource identified by the given key, if n greater than total number // of hosts total number of hosts will be returned - LookupN(key string, n int) []HostInfo + LookupN(key string, n uint) []HostInfo // AddListener adds a listener which will get notified on the given channel whenever membership changes. AddListener(name string, notifyChannel chan<- *ChangedEvent) error // RemoveListener removes a listener for this service. diff --git a/common/membership/interfaces_mock.go b/common/membership/interfaces_mock.go index a7c8811010df..b4c5e329c955 100644 --- a/common/membership/interfaces_mock.go +++ b/common/membership/interfaces_mock.go @@ -182,7 +182,7 @@ func (mr *MockServiceResolverMockRecorder) Lookup(key interface{}) *gomock.Call } // LookupN mocks base method. -func (m *MockServiceResolver) LookupN(key string, n int) []HostInfo { +func (m *MockServiceResolver) LookupN(key string, n uint) []HostInfo { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "LookupN", key, n) ret0, _ := ret[0].([]HostInfo) diff --git a/common/membership/ringpop/service_resolver.go b/common/membership/ringpop/service_resolver.go index f3bd1b854289..bc9d519b3198 100644 --- a/common/membership/ringpop/service_resolver.go +++ b/common/membership/ringpop/service_resolver.go @@ -155,11 +155,11 @@ func (r *serviceResolver) Lookup(key string) (membership.HostInfo, error) { return newHostInfo(addr, r.getLabelsMap()), nil } -func (r *serviceResolver) LookupN(key string, n int) []membership.HostInfo { - addresses := r.ring().LookupN(key, n) +func (r *serviceResolver) LookupN(key string, n uint) []membership.HostInfo { + addresses := r.ring().LookupN(key, int(n)) if len(addresses) == 0 { r.RequestRefresh() - return []membership.HostInfo{} + return nil } return util.MapSlice(addresses, membership.NewHostInfoFromAddress) } diff --git a/common/util/util.go b/common/util/util.go index d5fead52d4da..f4f3f3b019c9 100644 --- a/common/util/util.go +++ b/common/util/util.go @@ -119,9 +119,12 @@ func MapConcurrent[IN any, OUT any](input []IN, mapper func(IN) (OUT, error)) ([ // MapSlice given slice xs []T and f(T) S produces slice []S by applying f to every element of xs func MapSlice[T, S any](xs []T, f func(T) S) []S { - var result []S - for _, s := range xs { - result = append(result, f(s)) + if xs == nil { + return nil + } + result := make([]S, len(xs)) + for i, s := range xs { + result[i] = f(s) } return result } @@ -146,6 +149,19 @@ func FoldSlice[T any, A any](in []T, initializer A, reducer func(A, T) A) A { return acc } +// RepeatSlice given slice and a number (n) produces a new slice containing original slice n times +// if n is non-positive will produce nil +func RepeatSlice[T any](xs []T, n int) []T { + if xs == nil || n <= 0 { + return nil + } + ys := make([]T, n*len(xs)) + for i := 0; i < n; i++ { + copy(ys[i*len(xs):(i+1)*len(xs)], xs) + } + return ys +} + // Coalesce returns the first non-zero value of its arguments, or the zero value for the type // if all are zero. func Coalesce[T comparable](vals ...T) T { diff --git a/common/util/util_test.go b/common/util/util_test.go new file mode 100644 index 000000000000..52c287eaecf6 --- /dev/null +++ b/common/util/util_test.go @@ -0,0 +1,85 @@ +package util + +import "testing" + +func TestRepeatSlice(t *testing.T) { + t.Run("when input slice is nil should return nil", func(t *testing.T) { + got := RepeatSlice[int](nil, 5) + if got != nil { + t.Errorf("RepeatSlice produced non-nil slice from nil input") + } + }) + t.Run("when input slice is empty should return empty", func(t *testing.T) { + empty := []int{} + got := RepeatSlice(empty, 5) + if len(got) != 0 { + t.Errorf("RepeatSlice filled empty slice") + } + }) + t.Run("when requested repeat number equal 0 should return empty slice", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + got := RepeatSlice(xs, 0) + if len(got) != 0 { + t.Errorf("RepeatSlice with repeat count 0 returned non-empty slice") + } + }) + t.Run("when requested repeat number is less than 0 should return empty slice", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + got := RepeatSlice(xs, -1) + if len(got) != 0 { + t.Errorf("RepeatSlice with repeat count -1 returned non-empty slice") + } + }) + t.Run("when requested repeat number is 3 should return slice three times the input", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + got := RepeatSlice(xs, 3) + if len(got) != len(xs)*3 { + t.Errorf("RepeatSlice produced slice of wrong length: expected %d got %d", len(xs)*3, len(got)) + } + for i, v := range got { + if v != xs[i%len(xs)] { + t.Errorf("RepeatSlice wrong value in result: expected %d at index %d but got %d", xs[i%len(xs)], i, v) + } + } + }) + t.Run("should not change the input slice when truncating", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + _ = RepeatSlice(xs, 0) + if len(xs) != 5 { + t.Errorf("Repeat slice trancated the original slice: expected {1, 2, 3, 4, 5}, got %v", xs) + } + }) + t.Run("should not change the input slice when replicating", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + _ = RepeatSlice(xs, 5) + if len(xs) != 5 { + t.Errorf("Repeat slice changed the original slice: expected {1, 2, 3, 4, 5}, got %v", xs) + } + }) +} + +func TestMapSlice(t *testing.T) { + t.Run("when given nil as slice should return nil", func(t *testing.T) { + ys := MapSlice(nil, func(x int) uint32 { return uint32(x) }) + if ys != nil { + t.Errorf("mapping over nil produced non nil got %v", ys) + } + }) + t.Run("when given an empty slice should return empty slice", func(t *testing.T) { + xs := []int{} + var ys []uint32 + ys = MapSlice(xs, func(x int) uint32 { return uint32(x) }) + if len(ys) != 0 { + t.Errorf("mapping over empty slice produced non empty slice got %v", ys) + } + }) + t.Run("when given a slice and a function should apply function to every element of the original slice", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + ys := MapSlice(xs, func(x int) int { return x + 1 }) + for i, y := range ys { + if y != (xs[i] + 1) { + t.Fatalf("mapping over slice did not apply function expected {2, 3, 4, 5} got %v", ys) + } + } + }) +} diff --git a/service/worker/pernamespaceworker.go b/service/worker/pernamespaceworker.go index f9221c71a27c..7c27da844a15 100644 --- a/service/worker/pernamespaceworker.go +++ b/service/worker/pernamespaceworker.go @@ -44,6 +44,7 @@ import ( "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -123,6 +124,11 @@ type ( StickyScheduleToStartTimeout string // parse into time.Duration StickyScheduleToStartTimeoutDuration time.Duration } + + workerAllocation struct { + Total uint + Local uint + } ) var ( @@ -260,21 +266,43 @@ func (wm *perNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) { delete(wm.workers, ns.ID()) } -func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespace) (int, int, error) { +func (wm *perNamespaceWorkerManager) getWorkerAllocation(ns *namespace.Namespace) (*workerAllocation, error) { + desiredWorkersCount, err := wm.getConfiguredWorkersCountFor(ns) + if err != nil { + return nil, err + } + if desiredWorkersCount == 0 { + return &workerAllocation{0, 0}, nil + } + localCount, err := wm.getLocallyDesiredWorkersCount(ns, desiredWorkersCount) + if err != nil { + return nil, err + } + return &workerAllocation{desiredWorkersCount, localCount}, nil +} + +func (wm *perNamespaceWorkerManager) getConfiguredWorkersCountFor(ns *namespace.Namespace) (uint, error) { totalWorkers := wm.config.PerNamespaceWorkerCount(ns.Name().String()) + if totalWorkers < 0 { + err := fmt.Errorf("%w namespace %s, workers count %d", dynamicconfig.ErrInvalidConfiguration, ns.Name(), totalWorkers) + return 0, err + } + return uint(totalWorkers), nil +} + +func (wm *perNamespaceWorkerManager) getLocallyDesiredWorkersCount(ns *namespace.Namespace, desiredNumberOfWorkers uint) (uint, error) { key := ns.ID().String() - targets := wm.serviceResolver.LookupN(key, totalWorkers) - if len(targets) == 0 { - return 0, 0, membership.ErrInsufficientHosts - } - IsLocal := func(info membership.HostInfo) bool { return info.Identity() == wm.self.Identity() } - multiplicity := util.FoldSlice(targets, 0, func(acc int, t membership.HostInfo) int { - if IsLocal(t) { - acc++ - } - return acc - }) - return multiplicity, totalWorkers, nil + availableHosts := wm.serviceResolver.LookupN(key, desiredNumberOfWorkers) + hostsCount := uint(len(availableHosts)) + if hostsCount == 0 { + return 0, membership.ErrInsufficientHosts + } + maxWorkersPerHost := desiredNumberOfWorkers/hostsCount + 1 + desiredDistribution := util.RepeatSlice(availableHosts, int(maxWorkersPerHost))[:desiredNumberOfWorkers] + + isLocal := func(info membership.HostInfo) bool { return info.Identity() == wm.self.Identity() } + result := len(util.FilterSlice(desiredDistribution, isLocal)) + return uint(result), nil } func (wm *perNamespaceWorkerManager) getWorkerOptions(ns *namespace.Namespace) sdkWorkerOptions { @@ -390,18 +418,18 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error { } // check if we are responsible for this namespace at all - multiplicity, totalWorkers, err := w.wm.getWorkerMultiplicity(ns) + workerAllocation, err := w.wm.getWorkerAllocation(ns) if err != nil { w.logger.Error("Failed to look up hosts", tag.Error(err)) // TODO: add metric also return err } - if multiplicity == 0 { + if workerAllocation.Local == 0 { // not ours, don't need a worker return errNoWorkerNeeded } // ensure this changes if multiplicity changes - componentSet += fmt.Sprintf(",%d", multiplicity) + componentSet += fmt.Sprintf(",%d", workerAllocation.Local) // get sdk worker options dcOptions := w.wm.getWorkerOptions(ns) @@ -421,7 +449,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error { // create new one. note that even before startWorker returns, the worker may have started // and already called the fatal error handler. we need to set w.client+worker+componentSet // before releasing the lock to keep our state consistent. - client, worker, err := w.startWorker(ns, enabledComponents, multiplicity, totalWorkers, dcOptions) + client, worker, err := w.startWorker(ns, enabledComponents, workerAllocation, dcOptions) if err != nil { // TODO: add metric also return err @@ -436,8 +464,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error { func (w *perNamespaceWorker) startWorker( ns *namespace.Namespace, components []workercommon.PerNSWorkerComponent, - multiplicity int, - totalWorkers int, + allocation *workerAllocation, dcOptions sdkWorkerOptions, ) (sdkclient.Client, sdkworker.Worker, error) { nsName := ns.Name().String() @@ -462,19 +489,19 @@ func (w *perNamespaceWorker) startWorker( sdkoptions.BackgroundActivityContext = headers.SetCallerInfo(context.Background(), headers.NewBackgroundCallerInfo(ns.Name().String())) sdkoptions.Identity = fmt.Sprintf("server-worker@%d@%s@%s", os.Getpid(), w.wm.hostName, nsName) - // increase these if we're supposed to run with more multiplicity - sdkoptions.MaxConcurrentWorkflowTaskPollers *= multiplicity - sdkoptions.MaxConcurrentActivityTaskPollers *= multiplicity - sdkoptions.MaxConcurrentLocalActivityExecutionSize *= multiplicity - sdkoptions.MaxConcurrentWorkflowTaskExecutionSize *= multiplicity - sdkoptions.MaxConcurrentActivityExecutionSize *= multiplicity + // increase these if we're supposed to run with more allocation + sdkoptions.MaxConcurrentWorkflowTaskPollers *= int(allocation.Local) + sdkoptions.MaxConcurrentActivityTaskPollers *= int(allocation.Local) + sdkoptions.MaxConcurrentLocalActivityExecutionSize *= int(allocation.Local) + sdkoptions.MaxConcurrentWorkflowTaskExecutionSize *= int(allocation.Local) + sdkoptions.MaxConcurrentActivityExecutionSize *= int(allocation.Local) sdkoptions.OnFatalError = w.onFatalError // this should not block because the client already has server capabilities worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions) details := workercommon.RegistrationDetails{ - TotalWorkers: totalWorkers, - Multiplicity: multiplicity, + TotalWorkers: int(allocation.Total), + Multiplicity: int(allocation.Local), } for _, cmp := range components { cmp.Register(worker, ns, details) diff --git a/service/worker/pernamespaceworker_test.go b/service/worker/pernamespaceworker_test.go index 7fa8d25d2130..410fe6f304e6 100644 --- a/service/worker/pernamespaceworker_test.go +++ b/service/worker/pernamespaceworker_test.go @@ -159,7 +159,7 @@ func (s *perNsWorkerManagerSuite) TestEnabledButResolvedToOther() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other1")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other1")}) s.manager.namespaceCallback(ns, false) // main work happens in a goroutine @@ -176,7 +176,7 @@ func (s *perNsWorkerManagerSuite) TestEnabled() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -192,8 +192,23 @@ func (s *perNsWorkerManagerSuite) TestEnabled() { cli1.EXPECT().Close() } +/* + Given machine has ownership of the namespace + When name space change reported + Then worker should be started with configuration proportional to administratively set configuration ensuring + fair distribution across all the machines owning the namespace +*/ func (s *perNsWorkerManagerSuite) TestMultiplicity() { - ns := testns("ns3", enumspb.NAMESPACE_STATE_REGISTERED) // three workers + desiredWorkersNumber := 6 + machinesOwningNamespace := []membership.HostInfo{ + membership.NewHostInfoFromAddress("other-1"), + membership.NewHostInfoFromAddress("other-2"), + membership.NewHostInfoFromAddress("self"), + } + expectedMultiplicity := 2 + + ns := testns("ns3", enumspb.NAMESPACE_STATE_REGISTERED) + s.manager.config.PerNamespaceWorkerCount = func(_ string) int { return desiredWorkersNumber } s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: true, @@ -202,11 +217,7 @@ func (s *perNsWorkerManagerSuite) TestMultiplicity() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns3", 3).Return([]membership.HostInfo{ - membership.NewHostInfoFromAddress("self"), - membership.NewHostInfoFromAddress("other"), - membership.NewHostInfoFromAddress("self"), - }) + s.serviceResolver.EXPECT().LookupN("ns3", uint(desiredWorkersNumber)).Return(machinesOwningNamespace) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns3")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -217,10 +228,11 @@ func (s *perNsWorkerManagerSuite) TestMultiplicity() { s.Equal(2000, options.MaxConcurrentLocalActivityExecutionSize) s.Equal(2000, options.MaxConcurrentActivityExecutionSize) }).Return(wkr1) - s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 3, Multiplicity: 2}) + s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: desiredWorkersNumber, Multiplicity: expectedMultiplicity}) wkr1.EXPECT().Start() s.manager.namespaceCallback(ns, false) + time.Sleep(50 * time.Millisecond) wkr1.EXPECT().Stop() @@ -239,14 +251,14 @@ func (s *perNsWorkerManagerSuite) TestOptions() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{ + s.serviceResolver.EXPECT().LookupN("ns1", uint(1)).Return([]membership.HostInfo{ membership.NewHostInfoFromAddress("self"), }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns2", 2).Return([]membership.HostInfo{ + s.serviceResolver.EXPECT().LookupN("ns2", uint(2)).Return([]membership.HostInfo{ membership.NewHostInfoFromAddress("self"), membership.NewHostInfoFromAddress("self"), }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns3", 3).Return([]membership.HostInfo{ + s.serviceResolver.EXPECT().LookupN("ns3", uint(3)).Return([]membership.HostInfo{ membership.NewHostInfoFromAddress("self"), membership.NewHostInfoFromAddress("self"), membership.NewHostInfoFromAddress("self"), @@ -301,8 +313,8 @@ func (s *perNsWorkerManagerSuite) TestTwoNamespacesTwoComponents() { return &workercommon.PerNSDedicatedWorkerOptions{Enabled: ns.Name().String() == "ns1"} }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) - s.serviceResolver.EXPECT().LookupN("ns2", 2).Return([]membership.HostInfo{ + s.serviceResolver.EXPECT().LookupN("ns1", uint(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) + s.serviceResolver.EXPECT().LookupN("ns2", uint(2)).Return([]membership.HostInfo{ membership.NewHostInfoFromAddress("self"), membership.NewHostInfoFromAddress("self"), }) @@ -346,7 +358,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -366,7 +378,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() { // restore it nsRestored := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli2 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli2) wkr2 := mocksdk.NewMockWorker(s.controller) @@ -395,13 +407,13 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() { }).AnyTimes() // we don't own it at first - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other")}) s.manager.namespaceCallback(ns, false) time.Sleep(50 * time.Millisecond) // now we own it - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -413,7 +425,7 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() { time.Sleep(50 * time.Millisecond) // now we don't own it anymore - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other")}) wkr1.EXPECT().Stop() cli1.EXPECT().Close() @@ -431,9 +443,9 @@ func (s *perNsWorkerManagerSuite) TestServiceResolverError() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{}) - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{}) - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint(1)).Return([]membership.HostInfo{}) + s.serviceResolver.EXPECT().LookupN("ns1", uint(1)).Return([]membership.HostInfo{}) + s.serviceResolver.EXPECT().LookupN("ns1", uint(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) @@ -460,7 +472,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}).AnyTimes() + s.serviceResolver.EXPECT().LookupN("ns1", uint(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}).AnyTimes() cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) diff --git a/tests/simple_service_resolver.go b/tests/simple_service_resolver.go index b894d4f336f3..ca88c9cc91fa 100644 --- a/tests/simple_service_resolver.go +++ b/tests/simple_service_resolver.go @@ -88,7 +88,7 @@ func (s *simpleResolver) Lookup(key string) (membership.HostInfo, error) { return s.hostInfos[idx], nil } -func (s *simpleResolver) LookupN(key string, _ int) []membership.HostInfo { +func (s *simpleResolver) LookupN(key string, _ uint) []membership.HostInfo { info, err := s.Lookup(key) if err != nil { return []membership.HostInfo{}