Skip to content

Commit

Permalink
Introduced different factories for the different cases, and added tes…
Browse files Browse the repository at this point in the history
…ts for them
  • Loading branch information
jakobht committed Nov 20, 2023
1 parent dc0cb0c commit cab0845
Show file tree
Hide file tree
Showing 10 changed files with 183 additions and 64 deletions.
7 changes: 0 additions & 7 deletions common/quotas/dynamicratelimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ type DynamicRateLimiter struct {
rl *RateLimiter
}

// DynamicRateLimiterFactory creates a factory function for creating DynamicRateLimiters
func DynamicRateLimiterFactory(rps RPSKeyFunc) func(string) Limiter {
return func(key string) Limiter {
return NewDynamicRateLimiter(func() float64 { return rps(key) })
}
}

// NewDynamicRateLimiter returns a rate limiter which handles dynamic config
func NewDynamicRateLimiter(rps RPSFunc) *DynamicRateLimiter {
initialRps := rps()
Expand Down
8 changes: 5 additions & 3 deletions common/quotas/dynamicratelimiterfactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@

package quotas

import "github.com/uber/cadence/common/dynamicconfig"

type LimiterFactory interface {
GetLimiter(domain string) Limiter
}

func NewDynamicRateLimiterFactory(rps RPSKeyFunc) LimiterFactory {
func NewSimpleDynamicRateLimiterFactory(rps dynamicconfig.IntPropertyFnWithDomainFilter) LimiterFactory {
return dynamicRateLimiterFactory{
rps: rps,
}
}

type dynamicRateLimiterFactory struct {
rps RPSKeyFunc
rps dynamicconfig.IntPropertyFnWithDomainFilter
}

func (f dynamicRateLimiterFactory) GetLimiter(domain string) Limiter {
return NewDynamicRateLimiter(func() float64 { return f.rps(domain) })
return NewDynamicRateLimiter(func() float64 { return float64(f.rps(domain)) })
}
56 changes: 56 additions & 0 deletions common/quotas/fallbackdynamicratelimiterfactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package quotas

import "github.com/uber/cadence/common/dynamicconfig"

func NewFallbackDynamicRateLimiterFactory(
primary dynamicconfig.IntPropertyFnWithDomainFilter,
secondary dynamicconfig.IntPropertyFn,
) LimiterFactory {
return fallbackDynamicRateLimiterFactory{
primary: primary,
secondary: secondary,
}
}

type fallbackDynamicRateLimiterFactory struct {
primary dynamicconfig.IntPropertyFnWithDomainFilter
// secondary is used when primary is not set
secondary dynamicconfig.IntPropertyFn
}

func (f fallbackDynamicRateLimiterFactory) GetLimiter(domain string) Limiter {
return NewDynamicRateLimiter(func() float64 {
return limitWithFallback(
float64(f.primary(domain)),
float64(f.secondary()))
})
}

func limitWithFallback(primary, secondary float64) float64 {
if primary > 0 {
return primary
}
return secondary
}
36 changes: 36 additions & 0 deletions common/quotas/fallbackdynamicratelimiterfactory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package quotas

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/uber/cadence/common/dynamicconfig"
)

func TestNewFallbackDynamicRateLimiterFactory(t *testing.T) {
factory := NewFallbackDynamicRateLimiterFactory(
func(string) int { return 2 },
func(opts ...dynamicconfig.FilterOption) int { return 100 },
)

limiter := factory.GetLimiter("TestDomainName")

// The limiter should accept 2 requests per second
assert.Equal(t, true, limiter.Allow())
assert.Equal(t, true, limiter.Allow())
assert.Equal(t, false, limiter.Allow())
}

func TestNewFallbackDynamicRateLimiterFactoryFallback(t *testing.T) {
factory := NewFallbackDynamicRateLimiterFactory(
func(string) int { return 0 },
func(opts ...dynamicconfig.FilterOption) int { return 2 },
)

limiter := factory.GetLimiter("TestDomainName")

// The limiter should accept 2 requests per second
assert.Equal(t, true, limiter.Allow())
assert.Equal(t, true, limiter.Allow())
assert.Equal(t, false, limiter.Allow())
}
4 changes: 2 additions & 2 deletions common/quotas/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ func BenchmarkMultiStageRateLimiter1000Domains(b *testing.B) {
}
}

func newFixedRpsMultiStageRateLimiter(globalRps, domainRps float64) Policy {
func newFixedRpsMultiStageRateLimiter(globalRps float64, domainRps int) Policy {
return NewMultiStageRateLimiter(
NewDynamicRateLimiter(func() float64 {
return globalRps
}),
NewCollection(NewDynamicRateLimiterFactory(func(domain string) float64 {
NewCollection(NewSimpleDynamicRateLimiterFactory(func(domain string) int {
return domainRps
})),
)
Expand Down
33 changes: 33 additions & 0 deletions common/quotas/global.go → common/quotas/permember.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package quotas
import (
"math"

"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/membership"
)

Expand All @@ -41,3 +42,35 @@ func PerMember(service string, globalRPS, instanceRPS float64, resolver membersh
avgQuota := math.Max(globalRPS/float64(memberCount), 1)
return math.Min(avgQuota, instanceRPS)
}

func NewPerMemberDynamicRateLimiterFactory(
service string,
globalRPS dynamicconfig.IntPropertyFnWithDomainFilter,
instanceRPS dynamicconfig.IntPropertyFnWithDomainFilter,
resolver membership.Resolver,
) LimiterFactory {
return perMemberFactory{
service: service,
globalRPS: globalRPS,
instanceRPS: instanceRPS,
resolver: resolver,
}
}

type perMemberFactory struct {
service string
globalRPS dynamicconfig.IntPropertyFnWithDomainFilter
instanceRPS dynamicconfig.IntPropertyFnWithDomainFilter
resolver membership.Resolver
}

func (f perMemberFactory) GetLimiter(domain string) Limiter {
return NewDynamicRateLimiter(func() float64 {
return PerMember(
f.service,
float64(f.globalRPS(domain)),
float64(f.instanceRPS(domain)),
f.resolver,
)
})
}
26 changes: 23 additions & 3 deletions common/quotas/global_test.go → common/quotas/permember_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import (
func Test_PerMember(t *testing.T) {
ctrl := gomock.NewController(t)
resolver := membership.NewMockResolver(ctrl)
resolver.EXPECT().MemberCount("A").Return(10, nil).AnyTimes()
resolver.EXPECT().MemberCount("X").Return(0, assert.AnError).AnyTimes()
resolver.EXPECT().MemberCount("Y").Return(0, nil).AnyTimes()
resolver.EXPECT().MemberCount("A").Return(10, nil).MinTimes(1)
resolver.EXPECT().MemberCount("X").Return(0, assert.AnError).MinTimes(1)
resolver.EXPECT().MemberCount("Y").Return(0, nil).MinTimes(1)

// Invalid service - fallback to instanceRPS
assert.Equal(t, 3.0, PerMember("X", 20.0, 3.0, resolver))
Expand All @@ -51,3 +51,23 @@ func Test_PerMember(t *testing.T) {
// Calculate average per member RPS (prefer instanceRPS - lower)
assert.Equal(t, 3.0, PerMember("A", 100.0, 3.0, resolver))
}

func Test_PerMemberFactory(t *testing.T) {
ctrl := gomock.NewController(t)
resolver := membership.NewMockResolver(ctrl)
resolver.EXPECT().MemberCount("A").Return(10, nil).MinTimes(1)

factory := NewPerMemberDynamicRateLimiterFactory(
"A",
func(string) int { return 20 },
func(string) int { return 3 },
resolver,
)

limiter := factory.GetLimiter("TestDomainName")

// The limit is 20 and there are 10 instances, so the per member limit is 2
assert.Equal(t, true, limiter.Allow())
assert.Equal(t, true, limiter.Allow())
assert.Equal(t, false, limiter.Allow())
}
45 changes: 18 additions & 27 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,39 +174,30 @@ func NewWorkflowHandler(
tokenSerializer: common.NewJSONTaskTokenSerializer(),
userRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.UserRPS.AsFloat64()),
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
return quotas.PerMember(
service.Frontend,
float64(config.GlobalDomainUserRPS(domain)),
float64(config.MaxDomainUserRPSPerInstance(domain)),
resource.GetMembershipResolver(),
)
})),
quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory(
service.Frontend,
config.GlobalDomainUserRPS,
config.MaxDomainUserRPSPerInstance,
resource.GetMembershipResolver(),
)),
),
workerRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.WorkerRPS.AsFloat64()),
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
return quotas.PerMember(
service.Frontend,
float64(config.GlobalDomainWorkerRPS(domain)),
float64(config.MaxDomainWorkerRPSPerInstance(domain)),
resource.GetMembershipResolver(),
)
})),
quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory(
service.Frontend,
config.GlobalDomainWorkerRPS,
config.MaxDomainWorkerRPSPerInstance,
resource.GetMembershipResolver(),
)),
),
visibilityRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.VisibilityRPS.AsFloat64()),
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
return quotas.PerMember(
service.Frontend,
float64(config.GlobalDomainVisibilityRPS(domain)),
float64(config.MaxDomainVisibilityRPSPerInstance(domain)),
resource.GetMembershipResolver(),
)
})),
quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory(
service.Frontend,
config.GlobalDomainVisibilityRPS,
config.MaxDomainVisibilityRPSPerInstance,
resource.GetMembershipResolver(),
)),
),
versionChecker: versionChecker,
domainHandler: domainHandler,
Expand Down
6 changes: 2 additions & 4 deletions service/history/task/priority_assigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@ func NewPriorityAssigner(
config: config,
logger: logger,
scope: metricClient.Scope(metrics.TaskPriorityAssignerScope),
rateLimiters: quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
return float64(config.TaskProcessRPS(domain))
},
rateLimiters: quotas.NewCollection(quotas.NewSimpleDynamicRateLimiterFactory(
config.TaskProcessRPS,
)),
}
}
Expand Down
26 changes: 8 additions & 18 deletions service/matching/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,27 +85,17 @@ func NewHandler(
metricsClient: metricsClient,
userRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.UserRPS.AsFloat64()),
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
domainRPS := float64(config.DomainUserRPS(domain))
if domainRPS > 0 {
return domainRPS
}
// if domain rps not set, use host rps to keep the old behavior
return float64(config.UserRPS())
})),
quotas.NewCollection(quotas.NewFallbackDynamicRateLimiterFactory(
config.DomainUserRPS,
config.UserRPS,
)),
),
workerRateLimiter: quotas.NewMultiStageRateLimiter(
quotas.NewDynamicRateLimiter(config.WorkerRPS.AsFloat64()),
quotas.NewCollection(quotas.NewDynamicRateLimiterFactory(
func(domain string) float64 {
domainRPS := float64(config.DomainWorkerRPS(domain))
if domainRPS > 0 {
return domainRPS
}
// if domain rps not set, use host rps to keep the old behavior
return float64(config.WorkerRPS())
})),
quotas.NewCollection(quotas.NewFallbackDynamicRateLimiterFactory(
config.DomainWorkerRPS,
config.WorkerRPS,
)),
),
engine: engine,
logger: logger,
Expand Down

0 comments on commit cab0845

Please sign in to comment.