Skip to content

Commit

Permalink
ensure correct configuration of workers count per namespace
Browse files Browse the repository at this point in the history
+ minor refactoring
  • Loading branch information
ast2023 committed Nov 16, 2023
1 parent 4fc6b21 commit fba2f63
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 59 deletions.
3 changes: 3 additions & 0 deletions common/dynamicconfig/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const (
errCountLogThreshold = 1000
)

// ErrInvalidConfiguration indicates that the value provided by dynamic config is not legal
var ErrInvalidConfiguration = errors.New("invalid dynamic configuration")

var (
errKeyNotPresent = errors.New("key not present")
errNoMatchingConstraint = errors.New("no matching constraint in key")
Expand Down
2 changes: 1 addition & 1 deletion common/membership/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type (
Lookup(key string) (HostInfo, error)
// LookupN looks n hosts that owns the resource identified by the given key, if n greater than total number
// of hosts total number of hosts will be returned
LookupN(key string, n int) []HostInfo
LookupN(key string, n uint) []HostInfo
// AddListener adds a listener which will get notified on the given channel whenever membership changes.
AddListener(name string, notifyChannel chan<- *ChangedEvent) error
// RemoveListener removes a listener for this service.
Expand Down
2 changes: 1 addition & 1 deletion common/membership/interfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions common/membership/ringpop/service_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,11 @@ func (r *serviceResolver) Lookup(key string) (membership.HostInfo, error) {
return newHostInfo(addr, r.getLabelsMap()), nil
}

func (r *serviceResolver) LookupN(key string, n int) []membership.HostInfo {
addresses := r.ring().LookupN(key, n)
func (r *serviceResolver) LookupN(key string, n uint) []membership.HostInfo {
addresses := r.ring().LookupN(key, int(n))
if len(addresses) == 0 {
r.RequestRefresh()
return []membership.HostInfo{}
return nil
}
return util.MapSlice(addresses, membership.NewHostInfoFromAddress)
}
Expand Down
22 changes: 19 additions & 3 deletions common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,12 @@ func MapConcurrent[IN any, OUT any](input []IN, mapper func(IN) (OUT, error)) ([

// MapSlice given slice xs []T and f(T) S produces slice []S by applying f to every element of xs
func MapSlice[T, S any](xs []T, f func(T) S) []S {
var result []S
for _, s := range xs {
result = append(result, f(s))
if xs == nil {
return nil
}
result := make([]S, len(xs))
for i, s := range xs {
result[i] = f(s)
}
return result
}
Expand All @@ -146,6 +149,19 @@ func FoldSlice[T any, A any](in []T, initializer A, reducer func(A, T) A) A {
return acc
}

// RepeatSlice given slice and a number (n) produces a new slice containing original slice n times
// if n is non-positive will produce nil
func RepeatSlice[T any](xs []T, n int) []T {
if xs == nil || n <= 0 {
return nil
}
ys := make([]T, n*len(xs))
for i := 0; i < n; i++ {
copy(ys[i*len(xs):(i+1)*len(xs)], xs)
}
return ys
}

// Coalesce returns the first non-zero value of its arguments, or the zero value for the type
// if all are zero.
func Coalesce[T comparable](vals ...T) T {
Expand Down
85 changes: 85 additions & 0 deletions common/util/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package util

import "testing"

func TestRepeatSlice(t *testing.T) {
t.Run("when input slice is nil should return nil", func(t *testing.T) {
got := RepeatSlice[int](nil, 5)
if got != nil {
t.Errorf("RepeatSlice produced non-nil slice from nil input")
}
})
t.Run("when input slice is empty should return empty", func(t *testing.T) {
empty := []int{}
got := RepeatSlice(empty, 5)
if len(got) != 0 {
t.Errorf("RepeatSlice filled empty slice")
}
})
t.Run("when requested repeat number equal 0 should return empty slice", func(t *testing.T) {
xs := []int{1, 2, 3, 4, 5}
got := RepeatSlice(xs, 0)
if len(got) != 0 {
t.Errorf("RepeatSlice with repeat count 0 returned non-empty slice")
}
})
t.Run("when requested repeat number is less than 0 should return empty slice", func(t *testing.T) {
xs := []int{1, 2, 3, 4, 5}
got := RepeatSlice(xs, -1)
if len(got) != 0 {
t.Errorf("RepeatSlice with repeat count -1 returned non-empty slice")
}
})
t.Run("when requested repeat number is 3 should return slice three times the input", func(t *testing.T) {
xs := []int{1, 2, 3, 4, 5}
got := RepeatSlice(xs, 3)
if len(got) != len(xs)*3 {
t.Errorf("RepeatSlice produced slice of wrong length: expected %d got %d", len(xs)*3, len(got))
}
for i, v := range got {
if v != xs[i%len(xs)] {
t.Errorf("RepeatSlice wrong value in result: expected %d at index %d but got %d", xs[i%len(xs)], i, v)
}
}
})
t.Run("should not change the input slice when truncating", func(t *testing.T) {
xs := []int{1, 2, 3, 4, 5}
_ = RepeatSlice(xs, 0)
if len(xs) != 5 {
t.Errorf("Repeat slice trancated the original slice: expected {1, 2, 3, 4, 5}, got %v", xs)
}
})
t.Run("should not change the input slice when replicating", func(t *testing.T) {
xs := []int{1, 2, 3, 4, 5}
_ = RepeatSlice(xs, 5)
if len(xs) != 5 {
t.Errorf("Repeat slice changed the original slice: expected {1, 2, 3, 4, 5}, got %v", xs)
}
})
}

func TestMapSlice(t *testing.T) {
t.Run("when given nil as slice should return nil", func(t *testing.T) {
ys := MapSlice(nil, func(x int) uint32 { return uint32(x) })
if ys != nil {
t.Errorf("mapping over nil produced non nil got %v", ys)
}
})
t.Run("when given an empty slice should return empty slice", func(t *testing.T) {
xs := []int{}
var ys []uint32
ys = MapSlice(xs, func(x int) uint32 { return uint32(x) })
if len(ys) != 0 {
t.Errorf("mapping over empty slice produced non empty slice got %v", ys)
}
})
t.Run("when given a slice and a function should apply function to every element of the original slice", func(t *testing.T) {
xs := []int{1, 2, 3, 4, 5}
ys := MapSlice(xs, func(x int) int { return x + 1 })
for i, y := range ys {
if y != (xs[i] + 1) {
t.Fatalf("mapping over slice did not apply function expected {2, 3, 4, 5} got %v", ys)
}
}
})
}
81 changes: 54 additions & 27 deletions service/worker/pernamespaceworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand Down Expand Up @@ -123,6 +124,11 @@ type (
StickyScheduleToStartTimeout string // parse into time.Duration
StickyScheduleToStartTimeoutDuration time.Duration
}

workerAllocation struct {
Total uint
Local uint
}
)

var (
Expand Down Expand Up @@ -260,21 +266,43 @@ func (wm *perNamespaceWorkerManager) removeWorker(ns *namespace.Namespace) {
delete(wm.workers, ns.ID())
}

func (wm *perNamespaceWorkerManager) getWorkerMultiplicity(ns *namespace.Namespace) (int, int, error) {
func (wm *perNamespaceWorkerManager) getWorkerAllocation(ns *namespace.Namespace) (*workerAllocation, error) {
desiredWorkersCount, err := wm.getConfiguredWorkersCountFor(ns)
if err != nil {
return nil, err
}
if desiredWorkersCount == 0 {
return &workerAllocation{0, 0}, nil
}
localCount, err := wm.getLocallyDesiredWorkersCount(ns, desiredWorkersCount)
if err != nil {
return nil, err
}
return &workerAllocation{desiredWorkersCount, localCount}, nil
}

func (wm *perNamespaceWorkerManager) getConfiguredWorkersCountFor(ns *namespace.Namespace) (uint, error) {
totalWorkers := wm.config.PerNamespaceWorkerCount(ns.Name().String())
if totalWorkers < 0 {
err := fmt.Errorf("%w namespace %s, workers count %d", dynamicconfig.ErrInvalidConfiguration, ns.Name(), totalWorkers)
return 0, err
}
return uint(totalWorkers), nil
}

func (wm *perNamespaceWorkerManager) getLocallyDesiredWorkersCount(ns *namespace.Namespace, desiredNumberOfWorkers uint) (uint, error) {
key := ns.ID().String()
targets := wm.serviceResolver.LookupN(key, totalWorkers)
if len(targets) == 0 {
return 0, 0, membership.ErrInsufficientHosts
}
IsLocal := func(info membership.HostInfo) bool { return info.Identity() == wm.self.Identity() }
multiplicity := util.FoldSlice(targets, 0, func(acc int, t membership.HostInfo) int {
if IsLocal(t) {
acc++
}
return acc
})
return multiplicity, totalWorkers, nil
availableHosts := wm.serviceResolver.LookupN(key, desiredNumberOfWorkers)
hostsCount := uint(len(availableHosts))
if hostsCount == 0 {
return 0, membership.ErrInsufficientHosts
}
maxWorkersPerHost := desiredNumberOfWorkers/hostsCount + 1
desiredDistribution := util.RepeatSlice(availableHosts, int(maxWorkersPerHost))[:desiredNumberOfWorkers]

isLocal := func(info membership.HostInfo) bool { return info.Identity() == wm.self.Identity() }
result := len(util.FilterSlice(desiredDistribution, isLocal))
return uint(result), nil
}

func (wm *perNamespaceWorkerManager) getWorkerOptions(ns *namespace.Namespace) sdkWorkerOptions {
Expand Down Expand Up @@ -390,18 +418,18 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
}

// check if we are responsible for this namespace at all
multiplicity, totalWorkers, err := w.wm.getWorkerMultiplicity(ns)
workerAllocation, err := w.wm.getWorkerAllocation(ns)
if err != nil {
w.logger.Error("Failed to look up hosts", tag.Error(err))
// TODO: add metric also
return err
}
if multiplicity == 0 {
if workerAllocation.Local == 0 {
// not ours, don't need a worker
return errNoWorkerNeeded
}
// ensure this changes if multiplicity changes
componentSet += fmt.Sprintf(",%d", multiplicity)
componentSet += fmt.Sprintf(",%d", workerAllocation.Local)

// get sdk worker options
dcOptions := w.wm.getWorkerOptions(ns)
Expand All @@ -421,7 +449,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
// create new one. note that even before startWorker returns, the worker may have started
// and already called the fatal error handler. we need to set w.client+worker+componentSet
// before releasing the lock to keep our state consistent.
client, worker, err := w.startWorker(ns, enabledComponents, multiplicity, totalWorkers, dcOptions)
client, worker, err := w.startWorker(ns, enabledComponents, workerAllocation, dcOptions)
if err != nil {
// TODO: add metric also
return err
Expand All @@ -436,8 +464,7 @@ func (w *perNamespaceWorker) tryRefresh(ns *namespace.Namespace) error {
func (w *perNamespaceWorker) startWorker(
ns *namespace.Namespace,
components []workercommon.PerNSWorkerComponent,
multiplicity int,
totalWorkers int,
allocation *workerAllocation,
dcOptions sdkWorkerOptions,
) (sdkclient.Client, sdkworker.Worker, error) {
nsName := ns.Name().String()
Expand All @@ -462,19 +489,19 @@ func (w *perNamespaceWorker) startWorker(

sdkoptions.BackgroundActivityContext = headers.SetCallerInfo(context.Background(), headers.NewBackgroundCallerInfo(ns.Name().String()))
sdkoptions.Identity = fmt.Sprintf("server-worker@%d@%s@%s", os.Getpid(), w.wm.hostName, nsName)
// increase these if we're supposed to run with more multiplicity
sdkoptions.MaxConcurrentWorkflowTaskPollers *= multiplicity
sdkoptions.MaxConcurrentActivityTaskPollers *= multiplicity
sdkoptions.MaxConcurrentLocalActivityExecutionSize *= multiplicity
sdkoptions.MaxConcurrentWorkflowTaskExecutionSize *= multiplicity
sdkoptions.MaxConcurrentActivityExecutionSize *= multiplicity
// increase these if we're supposed to run with more allocation
sdkoptions.MaxConcurrentWorkflowTaskPollers *= int(allocation.Local)
sdkoptions.MaxConcurrentActivityTaskPollers *= int(allocation.Local)
sdkoptions.MaxConcurrentLocalActivityExecutionSize *= int(allocation.Local)
sdkoptions.MaxConcurrentWorkflowTaskExecutionSize *= int(allocation.Local)
sdkoptions.MaxConcurrentActivityExecutionSize *= int(allocation.Local)
sdkoptions.OnFatalError = w.onFatalError

// this should not block because the client already has server capabilities
worker := w.wm.sdkClientFactory.NewWorker(client, primitives.PerNSWorkerTaskQueue, sdkoptions)
details := workercommon.RegistrationDetails{
TotalWorkers: totalWorkers,
Multiplicity: multiplicity,
TotalWorkers: int(allocation.Total),
Multiplicity: int(allocation.Local),
}
for _, cmp := range components {
cmp.Register(worker, ns, details)
Expand Down
Loading

0 comments on commit fba2f63

Please sign in to comment.