From 4f005aac29ce4df56eb96147e639bc922bce705d Mon Sep 17 00:00:00 2001 From: sasha Date: Mon, 13 Nov 2023 12:51:30 -0800 Subject: [PATCH] ensure correct configuration of workers count per namespace + minor refactoring --- common/dynamicconfig/collection.go | 3 + common/membership/interfaces.go | 2 +- common/membership/interfaces_mock.go | 2 +- common/membership/ringpop/service_resolver.go | 6 +- common/util/util.go | 23 ++++- common/util/util_test.go | 85 ++++++++++++++++++ service/worker/pernamespaceworker.go | 86 +++++++++++++------ service/worker/pernamespaceworker_test.go | 34 ++++---- tests/simple_service_resolver.go | 2 +- 9 files changed, 192 insertions(+), 51 deletions(-) create mode 100644 common/util/util_test.go diff --git a/common/dynamicconfig/collection.go b/common/dynamicconfig/collection.go index e3980b807e3d..20f1f89d989e 100644 --- a/common/dynamicconfig/collection.go +++ b/common/dynamicconfig/collection.go @@ -92,6 +92,9 @@ const ( errCountLogThreshold = 1000 ) +// ErrInvalidConfiguration indicates that the value provided by dynamic config is not legal +var ErrInvalidConfiguration = errors.New("invalid dynamic configuration") + var ( errKeyNotPresent = errors.New("key not present") errNoMatchingConstraint = errors.New("no matching constraint in key") diff --git a/common/membership/interfaces.go b/common/membership/interfaces.go index 66d93d1b580f..0c026a39be57 100644 --- a/common/membership/interfaces.go +++ b/common/membership/interfaces.go @@ -84,7 +84,7 @@ type ( Lookup(key string) (HostInfo, error) // LookupN looks n hosts that owns the resource identified by the given key, if n greater than total number // of hosts total number of hosts will be returned - LookupN(key string, n int) []HostInfo + LookupN(key string, n uint16) []HostInfo // AddListener adds a listener which will get notified on the given channel whenever membership changes. AddListener(name string, notifyChannel chan<- *ChangedEvent) error // RemoveListener removes a listener for this service. diff --git a/common/membership/interfaces_mock.go b/common/membership/interfaces_mock.go index a7c8811010df..f795fac2d2e4 100644 --- a/common/membership/interfaces_mock.go +++ b/common/membership/interfaces_mock.go @@ -182,7 +182,7 @@ func (mr *MockServiceResolverMockRecorder) Lookup(key interface{}) *gomock.Call } // LookupN mocks base method. -func (m *MockServiceResolver) LookupN(key string, n int) []HostInfo { +func (m *MockServiceResolver) LookupN(key string, n uint16) []HostInfo { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "LookupN", key, n) ret0, _ := ret[0].([]HostInfo) diff --git a/common/membership/ringpop/service_resolver.go b/common/membership/ringpop/service_resolver.go index f3bd1b854289..72fc6d4808e8 100644 --- a/common/membership/ringpop/service_resolver.go +++ b/common/membership/ringpop/service_resolver.go @@ -155,11 +155,11 @@ func (r *serviceResolver) Lookup(key string) (membership.HostInfo, error) { return newHostInfo(addr, r.getLabelsMap()), nil } -func (r *serviceResolver) LookupN(key string, n int) []membership.HostInfo { - addresses := r.ring().LookupN(key, n) +func (r *serviceResolver) LookupN(key string, n uint16) []membership.HostInfo { + addresses := r.ring().LookupN(key, int(n)) if len(addresses) == 0 { r.RequestRefresh() - return []membership.HostInfo{} + return nil } return util.MapSlice(addresses, membership.NewHostInfoFromAddress) } diff --git a/common/util/util.go b/common/util/util.go index d5fead52d4da..293e99d6f6a2 100644 --- a/common/util/util.go +++ b/common/util/util.go @@ -119,9 +119,12 @@ func MapConcurrent[IN any, OUT any](input []IN, mapper func(IN) (OUT, error)) ([ // MapSlice given slice xs []T and f(T) S produces slice []S by applying f to every element of xs func MapSlice[T, S any](xs []T, f func(T) S) []S { - var result []S - for _, s := range xs { - result = append(result, f(s)) + if xs == nil { + return nil + } + result := make([]S, len(xs)) + for i, s := range xs { + result[i] = f(s) } return result } @@ -146,6 +149,20 @@ func FoldSlice[T any, A any](in []T, initializer A, reducer func(A, T) A) A { return acc } +func RepeatSlice[T any](xs []T, n int) []T { + if xs == nil { + return xs + } + if n <= 0 { + return []T{} + } + ys := make([]T, n*len(xs)) + for i := 0; i < n; i++ { + copy(ys[i*len(xs):(i+1)*len(xs)], xs) + } + return ys +} + // Coalesce returns the first non-zero value of its arguments, or the zero value for the type // if all are zero. func Coalesce[T comparable](vals ...T) T { diff --git a/common/util/util_test.go b/common/util/util_test.go new file mode 100644 index 000000000000..d42413eaf390 --- /dev/null +++ b/common/util/util_test.go @@ -0,0 +1,85 @@ +package util + +import "testing" + +func TestRepeatSlice(t *testing.T) { + t.Run("when input slice is nil should return nil", func(t *testing.T) { + got := RepeatSlice[int](nil, 5) + if got != nil { + t.Errorf("RepeatSlice produced non-nil slice from nil input") + } + }) + t.Run("when input slice is empty should return empty", func(t *testing.T) { + empty := []int{} + got := RepeatSlice(empty, 5) + if len(got) != 0 { + t.Errorf("RepeatSlice filled empty slice") + } + }) + t.Run("when requested repeat number equal 0 should return empty slice", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + got := RepeatSlice(xs, 0) + if len(got) != 0 { + t.Errorf("RepeatSlice with repeat count 0 returned non-empty slice") + } + }) + t.Run("when requested repeat number is less than 0 should return empty slice", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + got := RepeatSlice(xs, -1) + if len(got) != 0 { + t.Errorf("RepeatSlice with repeat count -1 returned non-empty slice") + } + }) + t.Run("when requested repeat number is 3 should return slice three times the input", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + got := RepeatSlice(xs, 3) + if len(got) != len(xs)*3 { + t.Errorf("RepeatSlice produced slice of wrong length: expected %d got %d", len(xs)*3, len(got)) + } + for i, v := range got { + if v != xs[i%len(xs)] { + t.Errorf("RepeatSlice wrong value in result: expected %d at index %d but got %d", xs[i%len(xs)], i, v) + } + } + }) + t.Run("should not change the input slice when truncating", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + _ = RepeatSlice(xs, 0) + if len(xs) != 5 { + t.Errorf("Repeat slice trancated the original slice: expected {1, 2, 3, 4, 5}, got %v", xs) + } + }) + t.Run("should not change the input slice when replicating", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + _ = RepeatSlice(xs, 5) + if len(xs) != 5 { + t.Errorf("Repeat slice changed the original slice: expected {1, 2, 3, 4, 5}, got %v", xs) + } + }) +} + +func TestMapSlice(t *testing.T) { + t.Run("when given nil as slice should return nil", func(t *testing.T) { + ys := MapSlice(nil, func(x int) uint32 { return uint32(x) }) + if ys != nil { + t.Errorf("mapping over nil produced non nil got %v", ys) + } + }) + t.Run("when given an empty slice should return empty slice", func(t *testing.T) { + xs := []int{} + var ys []uint32 + ys = MapSlice(xs, func(x int) uint32 { return uint32(x) }) + if len(ys) != 0 { + t.Errorf("mapping over empty slice produced non empty slice got %v", ys) + } + }) + t.Run("when given a slice and a function should apply function to every element of the original slice", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + ys := MapSlice(xs, func(x int) int { return x + 2 }) + for i, y := range ys { + if y != (xs[i] + 1) { + t.Fatalf("mapping over slice did not apply function expected {2, 3, 4, 5} got %v", ys) + } + } + }) +} diff --git a/service/worker/pernamespaceworker.go b/service/worker/pernamespaceworker.go index f9221c71a27c..79cd3796f8c0 100644 --- a/service/worker/pernamespaceworker.go +++ b/service/worker/pernamespaceworker.go @@ -29,6 +29,7 @@ import ( "encoding/json" "errors" "fmt" + "math" "os" "sync" "sync/atomic" @@ -44,6 +45,7 @@ import ( "go.temporal.io/server/common" "go.temporal.io/server/common/backoff" "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" "go.temporal.io/server/common/log/tag" @@ -123,12 +125,25 @@ type ( StickyScheduleToStartTimeout string // parse into time.Duration StickyScheduleToStartTimeoutDuration time.Duration } + + workerAllocation struct { + total uint16 + local uint16 + } ) var ( errNoWorkerNeeded = errors.New("no worker needed") // sentinel value, not a real error ) +func (c *workerAllocation) Local() int { + return int(c.local) +} + +func (c *workerAllocation) Total() int { + return int(c.total) +} + func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *perNamespaceWorkerManager { return &perNamespaceWorkerManager{ logger: log.With(params.Logger, tag.ComponentPerNSWorkerManager), @@ -260,21 +275,43 @@ func (wm *perNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) { delete(wm.workers, ns.ID()) } -func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespace) (int, int, error) { +func (wm *perNamespaceWorkerManager) getWorkerAllocation(ns *namespace.Namespace) (*workerAllocation, error) { + desiredWorkersCount, err := wm.getConfiguredWorkersCountFor(ns) + if err != nil { + return nil, err + } + if desiredWorkersCount == 0 { + return &workerAllocation{0, 0}, nil + } + localCount, err := wm.getLocallyDesiredWorkersCount(ns, desiredWorkersCount) + if err != nil { + return nil, err + } + return &workerAllocation{desiredWorkersCount, localCount}, nil +} + +func (wm *perNamespaceWorkerManager) getConfiguredWorkersCountFor(ns *namespace.Namespace) (uint16, error) { totalWorkers := wm.config.PerNamespaceWorkerCount(ns.Name().String()) + if totalWorkers < 0 || totalWorkers > math.MaxUint16 { + err := fmt.Errorf("%w namespace %s, workers count %d", dynamicconfig.ErrInvalidConfiguration, ns.Name(), totalWorkers) + return 0, err + } + return uint16(totalWorkers), nil +} + +func (wm *perNamespaceWorkerManager) getLocallyDesiredWorkersCount(ns *namespace.Namespace, desiredNumberOfWorkers uint16) (uint16, error) { key := ns.ID().String() - targets := wm.serviceResolver.LookupN(key, totalWorkers) - if len(targets) == 0 { - return 0, 0, membership.ErrInsufficientHosts + hostsInNamespace := wm.serviceResolver.LookupN(key, desiredNumberOfWorkers) + hostsCount := uint16(len(hostsInNamespace)) + if hostsCount == 0 { + return 0, membership.ErrInsufficientHosts } + maxWorkersPerHost := int(math.Ceil(float64(desiredNumberOfWorkers) / float64(hostsCount))) + desiredDistribution := util.RepeatSlice(hostsInNamespace, maxWorkersPerHost)[:desiredNumberOfWorkers] + IsLocal := func(info membership.HostInfo) bool { return info.Identity() == wm.self.Identity() } - multiplicity := util.FoldSlice(targets, 0, func(acc int, t membership.HostInfo) int { - if IsLocal(t) { - acc++ - } - return acc - }) - return multiplicity, totalWorkers, nil + result := len(util.FilterSlice(desiredDistribution, IsLocal)) + return uint16(result), nil } func (wm *perNamespaceWorkerManager) getWorkerOptions(ns *namespace.Namespace) sdkWorkerOptions { @@ -390,18 +427,18 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error { } // check if we are responsible for this namespace at all - multiplicity, totalWorkers, err := w.wm.getWorkerMultiplicity(ns) + workerAllocation, err := w.wm.getWorkerAllocation(ns) if err != nil { w.logger.Error("Failed to look up hosts", tag.Error(err)) // TODO: add metric also return err } - if multiplicity == 0 { + if workerAllocation.Local() == 0 { // not ours, don't need a worker return errNoWorkerNeeded } // ensure this changes if multiplicity changes - componentSet += fmt.Sprintf(",%d", multiplicity) + componentSet += fmt.Sprintf(",%d", workerAllocation.Local()) // get sdk worker options dcOptions := w.wm.getWorkerOptions(ns) @@ -421,7 +458,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error { // create new one. note that even before startWorker returns, the worker may have started // and already called the fatal error handler. we need to set w.client+worker+componentSet // before releasing the lock to keep our state consistent. - client, worker, err := w.startWorker(ns, enabledComponents, multiplicity, totalWorkers, dcOptions) + client, worker, err := w.startWorker(ns, enabledComponents, workerAllocation, dcOptions) if err != nil { // TODO: add metric also return err @@ -436,8 +473,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error { func (w *perNamespaceWorker) startWorker( ns *namespace.Namespace, components []workercommon.PerNSWorkerComponent, - multiplicity int, - totalWorkers int, + allocation *workerAllocation, dcOptions sdkWorkerOptions, ) (sdkclient.Client, sdkworker.Worker, error) { nsName := ns.Name().String() @@ -462,19 +498,19 @@ func (w *perNamespaceWorker) startWorker( sdkoptions.BackgroundActivityContext = headers.SetCallerInfo(context.Background(), headers.NewBackgroundCallerInfo(ns.Name().String())) sdkoptions.Identity = fmt.Sprintf("server-worker@%d@%s@%s", os.Getpid(), w.wm.hostName, nsName) - // increase these if we're supposed to run with more multiplicity - sdkoptions.MaxConcurrentWorkflowTaskPollers *= multiplicity - sdkoptions.MaxConcurrentActivityTaskPollers *= multiplicity - sdkoptions.MaxConcurrentLocalActivityExecutionSize *= multiplicity - sdkoptions.MaxConcurrentWorkflowTaskExecutionSize *= multiplicity - sdkoptions.MaxConcurrentActivityExecutionSize *= multiplicity + // increase these if we're supposed to run with more allocation + sdkoptions.MaxConcurrentWorkflowTaskPollers *= allocation.Local() + sdkoptions.MaxConcurrentActivityTaskPollers *= allocation.Local() + sdkoptions.MaxConcurrentLocalActivityExecutionSize *= allocation.Local() + sdkoptions.MaxConcurrentWorkflowTaskExecutionSize *= allocation.Local() + sdkoptions.MaxConcurrentActivityExecutionSize *= allocation.Local() sdkoptions.OnFatalError = w.onFatalError // this should not block because the client already has server capabilities worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions) details := workercommon.RegistrationDetails{ - TotalWorkers: totalWorkers, - Multiplicity: multiplicity, + TotalWorkers: allocation.Total(), + Multiplicity: allocation.Local(), } for _, cmp := range components { cmp.Register(worker, ns, details) diff --git a/service/worker/pernamespaceworker_test.go b/service/worker/pernamespaceworker_test.go index 7fa8d25d2130..ea7a5affcf86 100644 --- a/service/worker/pernamespaceworker_test.go +++ b/service/worker/pernamespaceworker_test.go @@ -159,7 +159,7 @@ func (s *perNsWorkerManagerSuite) TestEnabledButResolvedToOther() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other1")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint16(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other1")}) s.manager.namespaceCallback(ns, false) // main work happens in a goroutine @@ -176,7 +176,7 @@ func (s *perNsWorkerManagerSuite) TestEnabled() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint16(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -202,7 +202,7 @@ func (s *perNsWorkerManagerSuite) TestMultiplicity() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns3", 3).Return([]membership.HostInfo{ + s.serviceResolver.EXPECT().LookupN("ns3", uint16(3)).Return([]membership.HostInfo{ membership.NewHostInfoFromAddress("self"), membership.NewHostInfoFromAddress("other"), membership.NewHostInfoFromAddress("self"), @@ -239,14 +239,14 @@ func (s *perNsWorkerManagerSuite) TestOptions() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{ + s.serviceResolver.EXPECT().LookupN("ns1", uint16(1)).Return([]membership.HostInfo{ membership.NewHostInfoFromAddress("self"), }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns2", 2).Return([]membership.HostInfo{ + s.serviceResolver.EXPECT().LookupN("ns2", uint16(2)).Return([]membership.HostInfo{ membership.NewHostInfoFromAddress("self"), membership.NewHostInfoFromAddress("self"), }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns3", 3).Return([]membership.HostInfo{ + s.serviceResolver.EXPECT().LookupN("ns3", uint16(3)).Return([]membership.HostInfo{ membership.NewHostInfoFromAddress("self"), membership.NewHostInfoFromAddress("self"), membership.NewHostInfoFromAddress("self"), @@ -301,8 +301,8 @@ func (s *perNsWorkerManagerSuite) TestTwoNamespacesTwoComponents() { return &workercommon.PerNSDedicatedWorkerOptions{Enabled: ns.Name().String() == "ns1"} }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) - s.serviceResolver.EXPECT().LookupN("ns2", 2).Return([]membership.HostInfo{ + s.serviceResolver.EXPECT().LookupN("ns1", uint16(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) + s.serviceResolver.EXPECT().LookupN("ns2", uint16(2)).Return([]membership.HostInfo{ membership.NewHostInfoFromAddress("self"), membership.NewHostInfoFromAddress("self"), }) @@ -346,7 +346,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint16(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -366,7 +366,7 @@ func (s *perNsWorkerManagerSuite) TestDeleteNs() { // restore it nsRestored := testns("ns1", enumspb.NAMESPACE_STATE_REGISTERED) - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint16(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli2 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli2) wkr2 := mocksdk.NewMockWorker(s.controller) @@ -395,13 +395,13 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() { }).AnyTimes() // we don't own it at first - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint16(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other")}) s.manager.namespaceCallback(ns, false) time.Sleep(50 * time.Millisecond) // now we own it - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint16(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -413,7 +413,7 @@ func (s *perNsWorkerManagerSuite) TestMembershipChanged() { time.Sleep(50 * time.Millisecond) // now we don't own it anymore - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint16(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("other")}) wkr1.EXPECT().Stop() cli1.EXPECT().Close() @@ -431,9 +431,9 @@ func (s *perNsWorkerManagerSuite) TestServiceResolverError() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{}) - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{}) - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) + s.serviceResolver.EXPECT().LookupN("ns1", uint16(1)).Return([]membership.HostInfo{}) + s.serviceResolver.EXPECT().LookupN("ns1", uint16(1)).Return([]membership.HostInfo{}) + s.serviceResolver.EXPECT().LookupN("ns1", uint16(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) @@ -460,7 +460,7 @@ func (s *perNsWorkerManagerSuite) TestStartWorkerError() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}).AnyTimes() + s.serviceResolver.EXPECT().LookupN("ns1", uint16(1)).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}).AnyTimes() cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) diff --git a/tests/simple_service_resolver.go b/tests/simple_service_resolver.go index b894d4f336f3..d1bb3231e5e7 100644 --- a/tests/simple_service_resolver.go +++ b/tests/simple_service_resolver.go @@ -88,7 +88,7 @@ func (s *simpleResolver) Lookup(key string) (membership.HostInfo, error) { return s.hostInfos[idx], nil } -func (s *simpleResolver) LookupN(key string, _ int) []membership.HostInfo { +func (s *simpleResolver) LookupN(key string, _ uint16) []membership.HostInfo { info, err := s.Lookup(key) if err != nil { return []membership.HostInfo{}