Skip to content

Commit

Permalink
Only enable weighted lb when backlog size is larrger than 100
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll committed Oct 2, 2024
1 parent f20b09e commit fb64dce
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 44 deletions.
43 changes: 12 additions & 31 deletions client/matching/weighted_loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ import (
"github.com/uber/cadence/common/types"
)

const _backlogThreshold = 100

type (
weightSelector struct {
sync.RWMutex
weights []int64
initialized bool
threshold int64
}

weightedLoadBalancer struct {
Expand All @@ -54,35 +57,14 @@ type (
}
)

func newWeightSelector(n int) *weightSelector {
pw := &weightSelector{weights: make([]int64, n)}
func newWeightSelector(n int, threshold int64) *weightSelector {
pw := &weightSelector{weights: make([]int64, n), threshold: threshold}
for i := range pw.weights {
pw.weights[i] = -1
}
return pw
}

func (pw *weightSelector) pickZero() int {
pw.RLock()
defer pw.RUnlock()
if !pw.initialized {
return -1
}
if rand.Int63n(1000) != 0 {
return -1
}
zeroWeightsIndices := make([]int, 0, len(pw.weights))
for i, w := range pw.weights {
if w == 0 {
zeroWeightsIndices = append(zeroWeightsIndices, i)
}
}
if len(zeroWeightsIndices) == 0 {
return -1
}
return zeroWeightsIndices[rand.Intn(len(zeroWeightsIndices))]
}

func (pw *weightSelector) pick() int {
cumulativeWeights := make([]int64, len(pw.weights))
totalWeight := int64(0)
Expand All @@ -91,11 +73,15 @@ func (pw *weightSelector) pick() int {
if !pw.initialized {
return -1
}
shouldDrain := false
for i, w := range pw.weights {
totalWeight += w
cumulativeWeights[i] = totalWeight
if w > pw.threshold {
shouldDrain = true // only enable weight selection if backlog size is larger than the threshold
}
}
if totalWeight <= 0 {
if totalWeight <= 0 || !shouldDrain {
return -1
}
r := rand.Int63n(totalWeight)
Expand Down Expand Up @@ -183,12 +169,7 @@ func (lb *weightedLoadBalancer) PickReadPartition(
if !ok {
return lb.fallbackLoadBalancer.PickReadPartition(domainID, taskList, taskListType, forwardedFrom)
}
p := w.pickZero()
if p >= 0 {
lb.logger.Debug("pick read partition with zero weight", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.GetName()), tag.WorkflowTaskListType(taskListType), tag.Dynamic("weights", w.weights), tag.Dynamic("tasklist-partition", p))
return getPartitionTaskListName(taskList.GetName(), p)
}
p = w.pick()
p := w.pick()
lb.logger.Debug("pick read partition", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.GetName()), tag.WorkflowTaskListType(taskListType), tag.Dynamic("weights", w.weights), tag.Dynamic("tasklist-partition", p))
if p < 0 {
return lb.fallbackLoadBalancer.PickReadPartition(domainID, taskList, taskListType, forwardedFrom)
Expand Down Expand Up @@ -234,7 +215,7 @@ func (lb *weightedLoadBalancer) UpdateWeight(
}
wI := lb.weightCache.Get(taskListKey)
if wI == nil {
w := newWeightSelector(n)
w := newWeightSelector(n, _backlogThreshold)
wI, err = lb.weightCache.PutIfNotExist(taskListKey, w)
if err != nil {
return
Expand Down
26 changes: 13 additions & 13 deletions client/matching/weighted_loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (

func TestPollerWeight(t *testing.T) {
n := 4
pw := newWeightSelector(n)
pw := newWeightSelector(n, 100)
// uninitialized weights should return -1
assert.Equal(t, -1, pw.pick())
// all 0 weights should return -1
Expand All @@ -48,23 +48,23 @@ func TestPollerWeight(t *testing.T) {
assert.Equal(t, -1, pw.pick())
}
// if only one item has non-zero weight, always pick that item
pw.update(n, 3, 4)
pw.update(n, 3, 400)
for i := 0; i < 100; i++ {
assert.Equal(t, 3, pw.pick())
}
pw.update(n, 2, 3)
pw.update(n, 1, 2)
pw.update(n, 0, 1)
pw.update(n, 2, 300)
pw.update(n, 1, 200)
pw.update(n, 0, 100)
// test pick probabilities
testPickProbHelper(t, pw, time.Now().UnixNano())

// shrink size and test pick probabilities
pw.update(n-1, 2, 2)
pw.update(n-1, 2, 200)
testPickProbHelper(t, pw, time.Now().UnixNano())

// expand size and test pick probabilities
pw.update(n, 3, 3)
pw.update(n+1, 4, 4)
pw.update(n, 3, 300)
pw.update(n+1, 4, 400)
testPickProbHelper(t, pw, time.Now().UnixNano())
}

Expand Down Expand Up @@ -193,7 +193,7 @@ func TestWeightedLoadBalancer_PickReadPartition(t *testing.T) {
name: "WeightSelector pick returns negative",
domainID: "domainC",
taskList: types.TaskList{Name: "taskListC"},
weightCacheReturn: newWeightSelector(2),
weightCacheReturn: newWeightSelector(2, 100),
fallbackReturn: "fallbackPartition",
expectedResult: "fallbackPartition",
expectFallbackCall: true,
Expand All @@ -203,9 +203,9 @@ func TestWeightedLoadBalancer_PickReadPartition(t *testing.T) {
domainID: "domainD",
taskList: types.TaskList{Name: "taskListD"},
weightCacheReturn: func() *weightSelector {
pw := newWeightSelector(2)
pw := newWeightSelector(2, 10)
pw.update(2, 0, 0)
pw.update(2, 1, 1)
pw.update(2, 1, 11)
return pw
}(),
expectedResult: getPartitionTaskListName("taskListD", 1),
Expand Down Expand Up @@ -318,7 +318,7 @@ func TestWeightedLoadBalancer_UpdateWeight(t *testing.T) {
domainID: "domainA",
taskListName: "a",
taskListType: 0,
}, newWeightSelector(2)).Return(newWeightSelector(2), nil)
}, newWeightSelector(2, 100)).Return(newWeightSelector(2, 100), nil)
},
},
{
Expand All @@ -333,7 +333,7 @@ func TestWeightedLoadBalancer_UpdateWeight(t *testing.T) {
domainID: "domainA",
taskListName: "a",
taskListType: 0,
}).Return(newWeightSelector(2))
}).Return(newWeightSelector(2, 100))
},
},
}
Expand Down

0 comments on commit fb64dce

Please sign in to comment.