From eb31547092a37fab875856ed34090172016a82d1 Mon Sep 17 00:00:00 2001 From: sasha Date: Mon, 13 Nov 2023 12:51:30 -0800 Subject: [PATCH] ensure correct configuration of workers count per namespace + minor refactoring --- common/membership/ringpop/service_resolver.go | 12 +-- common/util/util.go | 22 ++++- common/util/util_test.go | 91 +++++++++++++++++++ service/worker/pernamespaceworker.go | 82 +++++++++++------ service/worker/pernamespaceworker_test.go | 52 ++++++----- 5 files changed, 202 insertions(+), 57 deletions(-) create mode 100644 common/util/util_test.go diff --git a/common/membership/ringpop/service_resolver.go b/common/membership/ringpop/service_resolver.go index f3bd1b854289..4dbedb135bf6 100644 --- a/common/membership/ringpop/service_resolver.go +++ b/common/membership/ringpop/service_resolver.go @@ -63,10 +63,6 @@ const ( replicaPoints = 100 ) -type membershipManager interface { - AddListener() -} - type serviceResolver struct { service primitives.ServiceName port int @@ -156,12 +152,16 @@ func (r *serviceResolver) Lookup(key string) (membership.HostInfo, error) { } func (r *serviceResolver) LookupN(key string, n int) []membership.HostInfo { + if n <= 0 { + return nil + } addresses := r.ring().LookupN(key, n) if len(addresses) == 0 { r.RequestRefresh() - return []membership.HostInfo{} + return nil } - return util.MapSlice(addresses, membership.NewHostInfoFromAddress) + labels := r.getLabelsMap() + return util.MapSlice(addresses, func(address string) membership.HostInfo { return newHostInfo(address, labels) }) } func (r *serviceResolver) AddListener( diff --git a/common/util/util.go b/common/util/util.go index d5fead52d4da..87d8b8d1680c 100644 --- a/common/util/util.go +++ b/common/util/util.go @@ -119,9 +119,12 @@ func MapConcurrent[IN any, OUT any](input []IN, mapper func(IN) (OUT, error)) ([ // MapSlice given slice xs []T and f(T) S produces slice []S by applying f to every element of xs func MapSlice[T, S any](xs []T, f func(T) S) []S { - var result []S - for _, s := range xs { - result = append(result, f(s)) + if xs == nil { + return nil + } + result := make([]S, len(xs)) + for i, s := range xs { + result[i] = f(s) } return result } @@ -146,6 +149,19 @@ func FoldSlice[T any, A any](in []T, initializer A, reducer func(A, T) A) A { return acc } +// RepeatSlice given slice and a number (n) produces a new slice containing original slice n times +// if n is non-positive will produce nil +func RepeatSlice[T any](xs []T, n int) []T { + if xs == nil || n <= 0 { + return nil + } + ys := make([]T, n*len(xs)) + for i := 0; i < n; i++ { + copy(ys[i*len(xs):], xs) + } + return ys +} + // Coalesce returns the first non-zero value of its arguments, or the zero value for the type // if all are zero. func Coalesce[T comparable](vals ...T) T { diff --git a/common/util/util_test.go b/common/util/util_test.go new file mode 100644 index 000000000000..fffff88734f5 --- /dev/null +++ b/common/util/util_test.go @@ -0,0 +1,91 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package util + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestRepeatSlice(t *testing.T) { + t.Run("when input slice is nil should return nil", func(t *testing.T) { + got := RepeatSlice[int](nil, 5) + require.Nil(t, got, "RepeatSlice produced non-nil slice from nil input") + }) + t.Run("when input slice is empty should return empty", func(t *testing.T) { + empty := []int{} + got := RepeatSlice(empty, 5) + require.Len(t, got, 0, "RepeatSlice filled empty slice") + }) + t.Run("when requested repeat number equal 0 should return empty slice", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + got := RepeatSlice(xs, 0) + require.Len(t, got, 0, "RepeatSlice with repeat count 0 returned non-empty slice") + }) + t.Run("when requested repeat number is less than 0 should return empty slice", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + got := RepeatSlice(xs, -1) + require.Len(t, got, 0, "RepeatSlice with repeat count -1 returned non-empty slice") + }) + t.Run("when requested repeat number is 3 should return slice three times the input", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + got := RepeatSlice(xs, 3) + require.Len(t, got, len(xs)*3, "RepeatSlice produced slice of wrong length: expected %d got %d", len(xs)*3, len(got)) + for i, v := range got { + require.Equal(t, xs[i%len(xs)], v, "RepeatSlice wrong value in result: expected %d at index %d but got %d", xs[i%len(xs)], i, v) + } + }) + t.Run("should not change the input slice when truncating", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + _ = RepeatSlice(xs, 0) + require.Len(t, xs, 5, "Repeat slice truncated the original slice: expected {1, 2, 3, 4, 5}, got %v", xs) + }) + t.Run("should not change the input slice when replicating", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + _ = RepeatSlice(xs, 5) + require.Len(t, xs, 5, "Repeat slice changed the original slice: expected {1, 2, 3, 4, 5}, got %v", xs) + }) +} + +func TestMapSlice(t *testing.T) { + t.Run("when given nil as slice should return nil", func(t *testing.T) { + ys := MapSlice(nil, func(x int) uint32 { return uint32(x) }) + require.Nil(t, ys, "mapping over nil produced non nil got %v", ys) + }) + t.Run("when given an empty slice should return empty slice", func(t *testing.T) { + xs := []int{} + var ys []uint32 + ys = MapSlice(xs, func(x int) uint32 { return uint32(x) }) + require.Len(t, ys, 0, "mapping over empty slice produced non empty slice got %v", ys) + }) + t.Run("when given a slice and a function should apply function to every element of the original slice", func(t *testing.T) { + xs := []int{1, 2, 3, 4, 5} + ys := MapSlice(xs, func(x int) int { return x + 1 }) + for i, y := range ys { + require.Equal(t, xs[i]+1, y, "mapping over slice did not apply function expected {2, 3, 4, 5} got %v", ys) + } + }) +} diff --git a/service/worker/pernamespaceworker.go b/service/worker/pernamespaceworker.go index f9221c71a27c..17d036747cb3 100644 --- a/service/worker/pernamespaceworker.go +++ b/service/worker/pernamespaceworker.go @@ -123,10 +123,17 @@ type ( StickyScheduleToStartTimeout string // parse into time.Duration StickyScheduleToStartTimeoutDuration time.Duration } + + workerAllocation struct { + Total int + Local int + } ) var ( errNoWorkerNeeded = errors.New("no worker needed") // sentinel value, not a real error + // errInvalidConfiguration indicates that the value provided by dynamic config is not legal + errInvalidConfiguration = errors.New("invalid dynamic configuration") ) func NewPerNamespaceWorkerManager(params perNamespaceWorkerManagerInitParams) *perNamespaceWorkerManager { @@ -260,21 +267,43 @@ func (wm *perNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) { delete(wm.workers, ns.ID()) } -func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespace) (int, int, error) { +func (wm *perNamespaceWorkerManager) getWorkerAllocation(ns *namespace.Namespace) (*workerAllocation, error) { + desiredWorkersCount, err := wm.getConfiguredWorkersCountFor(ns) + if err != nil { + return nil, err + } + if desiredWorkersCount == 0 { + return &workerAllocation{0, 0}, nil + } + localCount, err := wm.getLocallyDesiredWorkersCount(ns, desiredWorkersCount) + if err != nil { + return nil, err + } + return &workerAllocation{desiredWorkersCount, localCount}, nil +} + +func (wm *perNamespaceWorkerManager) getConfiguredWorkersCountFor(ns *namespace.Namespace) (int, error) { totalWorkers := wm.config.PerNamespaceWorkerCount(ns.Name().String()) + if totalWorkers < 0 { + err := fmt.Errorf("%w namespace %s, workers count %d", errInvalidConfiguration, ns.Name(), totalWorkers) + return 0, err + } + return totalWorkers, nil +} + +func (wm *perNamespaceWorkerManager) getLocallyDesiredWorkersCount(ns *namespace.Namespace, desiredNumberOfWorkers int) (int, error) { key := ns.ID().String() - targets := wm.serviceResolver.LookupN(key, totalWorkers) - if len(targets) == 0 { - return 0, 0, membership.ErrInsufficientHosts - } - IsLocal := func(info membership.HostInfo) bool { return info.Identity() == wm.self.Identity() } - multiplicity := util.FoldSlice(targets, 0, func(acc int, t membership.HostInfo) int { - if IsLocal(t) { - acc++ - } - return acc - }) - return multiplicity, totalWorkers, nil + availableHosts := wm.serviceResolver.LookupN(key, desiredNumberOfWorkers) + hostsCount := len(availableHosts) + if hostsCount == 0 { + return 0, membership.ErrInsufficientHosts + } + maxWorkersPerHost := desiredNumberOfWorkers/hostsCount + 1 + desiredDistribution := util.RepeatSlice(availableHosts, maxWorkersPerHost)[:desiredNumberOfWorkers] + + isLocal := func(info membership.HostInfo) bool { return info.Identity() == wm.self.Identity() } + result := len(util.FilterSlice(desiredDistribution, isLocal)) + return result, nil } func (wm *perNamespaceWorkerManager) getWorkerOptions(ns *namespace.Namespace) sdkWorkerOptions { @@ -390,18 +419,18 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error { } // check if we are responsible for this namespace at all - multiplicity, totalWorkers, err := w.wm.getWorkerMultiplicity(ns) + workerAllocation, err := w.wm.getWorkerAllocation(ns) if err != nil { w.logger.Error("Failed to look up hosts", tag.Error(err)) // TODO: add metric also return err } - if multiplicity == 0 { + if workerAllocation.Local == 0 { // not ours, don't need a worker return errNoWorkerNeeded } // ensure this changes if multiplicity changes - componentSet += fmt.Sprintf(",%d", multiplicity) + componentSet += fmt.Sprintf(",%d", workerAllocation.Local) // get sdk worker options dcOptions := w.wm.getWorkerOptions(ns) @@ -421,7 +450,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error { // create new one. note that even before startWorker returns, the worker may have started // and already called the fatal error handler. we need to set w.client+worker+componentSet // before releasing the lock to keep our state consistent. - client, worker, err := w.startWorker(ns, enabledComponents, multiplicity, totalWorkers, dcOptions) + client, worker, err := w.startWorker(ns, enabledComponents, workerAllocation, dcOptions) if err != nil { // TODO: add metric also return err @@ -436,8 +465,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error { func (w *perNamespaceWorker) startWorker( ns *namespace.Namespace, components []workercommon.PerNSWorkerComponent, - multiplicity int, - totalWorkers int, + allocation *workerAllocation, dcOptions sdkWorkerOptions, ) (sdkclient.Client, sdkworker.Worker, error) { nsName := ns.Name().String() @@ -462,19 +490,19 @@ func (w *perNamespaceWorker) startWorker( sdkoptions.BackgroundActivityContext = headers.SetCallerInfo(context.Background(), headers.NewBackgroundCallerInfo(ns.Name().String())) sdkoptions.Identity = fmt.Sprintf("server-worker@%d@%s@%s", os.Getpid(), w.wm.hostName, nsName) - // increase these if we're supposed to run with more multiplicity - sdkoptions.MaxConcurrentWorkflowTaskPollers *= multiplicity - sdkoptions.MaxConcurrentActivityTaskPollers *= multiplicity - sdkoptions.MaxConcurrentLocalActivityExecutionSize *= multiplicity - sdkoptions.MaxConcurrentWorkflowTaskExecutionSize *= multiplicity - sdkoptions.MaxConcurrentActivityExecutionSize *= multiplicity + // increase these if we're supposed to run with more allocation + sdkoptions.MaxConcurrentWorkflowTaskPollers *= allocation.Local + sdkoptions.MaxConcurrentActivityTaskPollers *= allocation.Local + sdkoptions.MaxConcurrentLocalActivityExecutionSize *= allocation.Local + sdkoptions.MaxConcurrentWorkflowTaskExecutionSize *= allocation.Local + sdkoptions.MaxConcurrentActivityExecutionSize *= allocation.Local sdkoptions.OnFatalError = w.onFatalError // this should not block because the client already has server capabilities worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions) details := workercommon.RegistrationDetails{ - TotalWorkers: totalWorkers, - Multiplicity: multiplicity, + TotalWorkers: allocation.Total, + Multiplicity: allocation.Local, } for _, cmp := range components { cmp.Register(worker, ns, details) diff --git a/service/worker/pernamespaceworker_test.go b/service/worker/pernamespaceworker_test.go index 7fa8d25d2130..0d4db28074f1 100644 --- a/service/worker/pernamespaceworker_test.go +++ b/service/worker/pernamespaceworker_test.go @@ -85,7 +85,7 @@ func (s *perNsWorkerManagerSuite) SetupTest() { HostName: "self", Config: &Config{ PerNamespaceWorkerCount: func(ns string) int { - return max(1, map[string]int{"ns1": 1, "ns2": 2, "ns3": 3}[ns]) + return max(1, map[string]int{"ns1": 1, "ns2": 2, "ns3": 6}[ns]) }, PerNamespaceWorkerOptions: func(ns string) map[string]any { switch ns { @@ -192,8 +192,22 @@ func (s *perNsWorkerManagerSuite) TestEnabled() { cli1.EXPECT().Close() } +/* + Given machine has ownership of the namespace + When name space change reported + Then worker should be started with configuration proportional to administratively set configuration ensuring + fair distribution across all the machines owning the namespace +*/ func (s *perNsWorkerManagerSuite) TestMultiplicity() { - ns := testns("ns3", enumspb.NAMESPACE_STATE_REGISTERED) // three workers + machinesOwningNamespace := []membership.HostInfo{ + membership.NewHostInfoFromAddress("other-1"), + membership.NewHostInfoFromAddress("other-2"), + membership.NewHostInfoFromAddress("self"), + } + desiredWorkersNumber := 6 // should match mock configuration in line 88 + expectedMultiplicity := 2 + + ns := testns("ns3", enumspb.NAMESPACE_STATE_REGISTERED) s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: true, @@ -202,11 +216,7 @@ func (s *perNsWorkerManagerSuite) TestMultiplicity() { Enabled: false, }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns3", 3).Return([]membership.HostInfo{ - membership.NewHostInfoFromAddress("self"), - membership.NewHostInfoFromAddress("other"), - membership.NewHostInfoFromAddress("self"), - }) + s.serviceResolver.EXPECT().LookupN("ns3", desiredWorkersNumber).Return(machinesOwningNamespace) cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns3")).Return(cli1) wkr1 := mocksdk.NewMockWorker(s.controller) @@ -217,10 +227,11 @@ func (s *perNsWorkerManagerSuite) TestMultiplicity() { s.Equal(2000, options.MaxConcurrentLocalActivityExecutionSize) s.Equal(2000, options.MaxConcurrentActivityExecutionSize) }).Return(wkr1) - s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: 3, Multiplicity: 2}) + s.cmp1.EXPECT().Register(wkr1, ns, workercommon.RegistrationDetails{TotalWorkers: desiredWorkersNumber, Multiplicity: expectedMultiplicity}) wkr1.EXPECT().Start() s.manager.namespaceCallback(ns, false) + time.Sleep(50 * time.Millisecond) wkr1.EXPECT().Stop() @@ -235,6 +246,7 @@ func (s *perNsWorkerManagerSuite) TestOptions() { s.cmp1.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: true, }).AnyTimes() + s.cmp2.EXPECT().DedicatedWorkerOptions(gomock.Any()).Return(&workercommon.PerNSDedicatedWorkerOptions{ Enabled: false, }).AnyTimes() @@ -244,32 +256,32 @@ func (s *perNsWorkerManagerSuite) TestOptions() { }).AnyTimes() s.serviceResolver.EXPECT().LookupN("ns2", 2).Return([]membership.HostInfo{ membership.NewHostInfoFromAddress("self"), - membership.NewHostInfoFromAddress("self"), }).AnyTimes() - s.serviceResolver.EXPECT().LookupN("ns3", 3).Return([]membership.HostInfo{ - membership.NewHostInfoFromAddress("self"), - membership.NewHostInfoFromAddress("self"), + s.serviceResolver.EXPECT().LookupN("ns3", 6).Return([]membership.HostInfo{ membership.NewHostInfoFromAddress("self"), }).AnyTimes() + cli1 := mocksdk.NewMockClient(s.controller) s.cfactory.EXPECT().NewClient(matchOptions("ns1")).Return(cli1) - cli2 := mocksdk.NewMockClient(s.controller) - s.cfactory.EXPECT().NewClient(matchOptions("ns2")).Return(cli2) - cli3 := mocksdk.NewMockClient(s.controller) - s.cfactory.EXPECT().NewClient(matchOptions("ns3")).Return(cli3) wkr := mocksdk.NewMockWorker(s.controller) s.cfactory.EXPECT().NewWorker(matchStrict{cli1}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Do(func(_, _ any, options sdkworker.Options) { s.Equal(100, options.MaxConcurrentWorkflowTaskPollers) s.Equal(2, options.MaxConcurrentActivityTaskPollers) s.Equal(0.0, options.WorkerLocalActivitiesPerSecond) }).Return(wkr) + + cli2 := mocksdk.NewMockClient(s.controller) + s.cfactory.EXPECT().NewClient(matchOptions("ns2")).Return(cli2) s.cfactory.EXPECT().NewWorker(matchStrict{cli2}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Do(func(_, _ any, options sdkworker.Options) { s.Equal(4, options.MaxConcurrentWorkflowTaskPollers) s.Equal(200.0, options.WorkerLocalActivitiesPerSecond) s.Equal(7500*time.Millisecond, options.StickyScheduleToStartTimeout) }).Return(wkr) + + cli3 := mocksdk.NewMockClient(s.controller) + s.cfactory.EXPECT().NewClient(matchOptions("ns3")).Return(cli3) s.cfactory.EXPECT().NewWorker(matchStrict{cli3}, primitives.PerNSWorkerTaskQueue, gomock.Any()).Do(func(_, _ any, options sdkworker.Options) { - s.Equal(6, options.MaxConcurrentWorkflowTaskPollers) + s.Equal(12, options.MaxConcurrentWorkflowTaskPollers) s.Equal(0.0, options.WorkerLocalActivitiesPerSecond) s.Equal(0*time.Millisecond, options.StickyScheduleToStartTimeout) }).Return(wkr) @@ -279,6 +291,7 @@ func (s *perNsWorkerManagerSuite) TestOptions() { s.manager.namespaceCallback(ns1, false) s.manager.namespaceCallback(ns2, false) s.manager.namespaceCallback(ns3, false) + time.Sleep(50 * time.Millisecond) wkr.EXPECT().Stop().AnyTimes() @@ -302,10 +315,7 @@ func (s *perNsWorkerManagerSuite) TestTwoNamespacesTwoComponents() { }).AnyTimes() s.serviceResolver.EXPECT().LookupN("ns1", 1).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) - s.serviceResolver.EXPECT().LookupN("ns2", 2).Return([]membership.HostInfo{ - membership.NewHostInfoFromAddress("self"), - membership.NewHostInfoFromAddress("self"), - }) + s.serviceResolver.EXPECT().LookupN("ns2", 2).Return([]membership.HostInfo{membership.NewHostInfoFromAddress("self")}) cli1 := mocksdk.NewMockClient(s.controller) cli2 := mocksdk.NewMockClient(s.controller)