diff --git a/common/quotas/dynamicratelimiter.go b/common/quotas/dynamicratelimiter.go index 50138fcef46..9fbfb1637b2 100644 --- a/common/quotas/dynamicratelimiter.go +++ b/common/quotas/dynamicratelimiter.go @@ -35,13 +35,6 @@ type DynamicRateLimiter struct { rl *RateLimiter } -// DynamicRateLimiterFactory creates a factory function for creating DynamicRateLimiters -func DynamicRateLimiterFactory(rps RPSKeyFunc) func(string) Limiter { - return func(key string) Limiter { - return NewDynamicRateLimiter(func() float64 { return rps(key) }) - } -} - // NewDynamicRateLimiter returns a rate limiter which handles dynamic config func NewDynamicRateLimiter(rps RPSFunc) *DynamicRateLimiter { initialRps := rps() diff --git a/common/quotas/dynamicratelimiterfactory.go b/common/quotas/dynamicratelimiterfactory.go index dc7cc67864e..4cee698c13e 100644 --- a/common/quotas/dynamicratelimiterfactory.go +++ b/common/quotas/dynamicratelimiterfactory.go @@ -22,20 +22,27 @@ package quotas +import "github.com/uber/cadence/common/dynamicconfig" + +// LimiterFactory is used to create a Limiter for a given domain type LimiterFactory interface { + // GetLimiter returns a new Limiter for the given domain GetLimiter(domain string) Limiter } -func NewDynamicRateLimiterFactory(rps RPSKeyFunc) LimiterFactory { +// NewSimpleDynamicRateLimiterFactory creates a new LimiterFactory which creates +// a new DynamicRateLimiter for each domain, the RPS for the DynamicRateLimiter is given by the dynamic config +func NewSimpleDynamicRateLimiterFactory(rps dynamicconfig.IntPropertyFnWithDomainFilter) LimiterFactory { return dynamicRateLimiterFactory{ rps: rps, } } type dynamicRateLimiterFactory struct { - rps RPSKeyFunc + rps dynamicconfig.IntPropertyFnWithDomainFilter } +// GetLimiter returns a new Limiter for the given domain func (f dynamicRateLimiterFactory) GetLimiter(domain string) Limiter { - return NewDynamicRateLimiter(func() float64 { return f.rps(domain) }) + return NewDynamicRateLimiter(func() float64 { return float64(f.rps(domain)) }) } diff --git a/common/quotas/fallbackdynamicratelimiterfactory.go b/common/quotas/fallbackdynamicratelimiterfactory.go new file mode 100644 index 00000000000..21d6831f043 --- /dev/null +++ b/common/quotas/fallbackdynamicratelimiterfactory.go @@ -0,0 +1,60 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package quotas + +import "github.com/uber/cadence/common/dynamicconfig" + +// LimiterFactory is used to create a Limiter for a given domain +// the created Limiter will use the primary dynamic config if it is set +// otherwise it will use the secondary dynamic config +func NewFallbackDynamicRateLimiterFactory( + primary dynamicconfig.IntPropertyFnWithDomainFilter, + secondary dynamicconfig.IntPropertyFn, +) LimiterFactory { + return fallbackDynamicRateLimiterFactory{ + primary: primary, + secondary: secondary, + } +} + +type fallbackDynamicRateLimiterFactory struct { + primary dynamicconfig.IntPropertyFnWithDomainFilter + // secondary is used when primary is not set + secondary dynamicconfig.IntPropertyFn +} + +// GetLimiter returns a new Limiter for the given domain +func (f fallbackDynamicRateLimiterFactory) GetLimiter(domain string) Limiter { + return NewDynamicRateLimiter(func() float64 { + return limitWithFallback( + float64(f.primary(domain)), + float64(f.secondary())) + }) +} + +func limitWithFallback(primary, secondary float64) float64 { + if primary > 0 { + return primary + } + return secondary +} diff --git a/common/quotas/fallbackdynamicratelimiterfactory_test.go b/common/quotas/fallbackdynamicratelimiterfactory_test.go new file mode 100644 index 00000000000..c87f5eb5dca --- /dev/null +++ b/common/quotas/fallbackdynamicratelimiterfactory_test.go @@ -0,0 +1,58 @@ +// The MIT License (MIT) + +// Copyright (c) 2017-2020 Uber Technologies Inc. + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package quotas + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/uber/cadence/common/dynamicconfig" +) + +func TestNewFallbackDynamicRateLimiterFactory(t *testing.T) { + factory := NewFallbackDynamicRateLimiterFactory( + func(string) int { return 2 }, + func(opts ...dynamicconfig.FilterOption) int { return 100 }, + ) + + limiter := factory.GetLimiter("TestDomainName") + + // The limiter should accept 2 requests per second + assert.Equal(t, true, limiter.Allow()) + assert.Equal(t, true, limiter.Allow()) + assert.Equal(t, false, limiter.Allow()) +} + +func TestNewFallbackDynamicRateLimiterFactoryFallback(t *testing.T) { + factory := NewFallbackDynamicRateLimiterFactory( + func(string) int { return 0 }, + func(opts ...dynamicconfig.FilterOption) int { return 2 }, + ) + + limiter := factory.GetLimiter("TestDomainName") + + // The limiter should accept 2 requests per second + assert.Equal(t, true, limiter.Allow()) + assert.Equal(t, true, limiter.Allow()) + assert.Equal(t, false, limiter.Allow()) +} diff --git a/common/quotas/limiter_test.go b/common/quotas/limiter_test.go index 1d3dfe2cda2..97a0173d5a7 100644 --- a/common/quotas/limiter_test.go +++ b/common/quotas/limiter_test.go @@ -136,12 +136,12 @@ func BenchmarkMultiStageRateLimiter1000Domains(b *testing.B) { } } -func newFixedRpsMultiStageRateLimiter(globalRps, domainRps float64) Policy { +func newFixedRpsMultiStageRateLimiter(globalRps float64, domainRps int) Policy { return NewMultiStageRateLimiter( NewDynamicRateLimiter(func() float64 { return globalRps }), - NewCollection(NewDynamicRateLimiterFactory(func(domain string) float64 { + NewCollection(NewSimpleDynamicRateLimiterFactory(func(domain string) int { return domainRps })), ) diff --git a/common/quotas/global.go b/common/quotas/permember.go similarity index 60% rename from common/quotas/global.go rename to common/quotas/permember.go index 64124ba832e..023bd2a8dca 100644 --- a/common/quotas/global.go +++ b/common/quotas/permember.go @@ -23,6 +23,7 @@ package quotas import ( "math" + "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/membership" ) @@ -41,3 +42,39 @@ func PerMember(service string, globalRPS, instanceRPS float64, resolver membersh avgQuota := math.Max(globalRPS/float64(memberCount), 1) return math.Min(avgQuota, instanceRPS) } + +// NewPerMemberDynamicRateLimiterFactory creates a new LimiterFactory which creates +// a new DynamicRateLimiter for each domain, the RPS for the DynamicRateLimiter is given +// by the globalRPS and averaged by member count for a given service. +// instanceRPS is used as a fallback if globalRPS is not provided. +func NewPerMemberDynamicRateLimiterFactory( + service string, + globalRPS dynamicconfig.IntPropertyFnWithDomainFilter, + instanceRPS dynamicconfig.IntPropertyFnWithDomainFilter, + resolver membership.Resolver, +) LimiterFactory { + return perMemberFactory{ + service: service, + globalRPS: globalRPS, + instanceRPS: instanceRPS, + resolver: resolver, + } +} + +type perMemberFactory struct { + service string + globalRPS dynamicconfig.IntPropertyFnWithDomainFilter + instanceRPS dynamicconfig.IntPropertyFnWithDomainFilter + resolver membership.Resolver +} + +func (f perMemberFactory) GetLimiter(domain string) Limiter { + return NewDynamicRateLimiter(func() float64 { + return PerMember( + f.service, + float64(f.globalRPS(domain)), + float64(f.instanceRPS(domain)), + f.resolver, + ) + }) +} diff --git a/common/quotas/global_test.go b/common/quotas/permember_test.go similarity index 71% rename from common/quotas/global_test.go rename to common/quotas/permember_test.go index fa0b8e0ea24..a580849a0c9 100644 --- a/common/quotas/global_test.go +++ b/common/quotas/permember_test.go @@ -32,9 +32,9 @@ import ( func Test_PerMember(t *testing.T) { ctrl := gomock.NewController(t) resolver := membership.NewMockResolver(ctrl) - resolver.EXPECT().MemberCount("A").Return(10, nil).AnyTimes() - resolver.EXPECT().MemberCount("X").Return(0, assert.AnError).AnyTimes() - resolver.EXPECT().MemberCount("Y").Return(0, nil).AnyTimes() + resolver.EXPECT().MemberCount("A").Return(10, nil).MinTimes(1) + resolver.EXPECT().MemberCount("X").Return(0, assert.AnError).MinTimes(1) + resolver.EXPECT().MemberCount("Y").Return(0, nil).MinTimes(1) // Invalid service - fallback to instanceRPS assert.Equal(t, 3.0, PerMember("X", 20.0, 3.0, resolver)) @@ -51,3 +51,23 @@ func Test_PerMember(t *testing.T) { // Calculate average per member RPS (prefer instanceRPS - lower) assert.Equal(t, 3.0, PerMember("A", 100.0, 3.0, resolver)) } + +func Test_PerMemberFactory(t *testing.T) { + ctrl := gomock.NewController(t) + resolver := membership.NewMockResolver(ctrl) + resolver.EXPECT().MemberCount("A").Return(10, nil).MinTimes(1) + + factory := NewPerMemberDynamicRateLimiterFactory( + "A", + func(string) int { return 20 }, + func(string) int { return 3 }, + resolver, + ) + + limiter := factory.GetLimiter("TestDomainName") + + // The limit is 20 and there are 10 instances, so the per member limit is 2 + assert.Equal(t, true, limiter.Allow()) + assert.Equal(t, true, limiter.Allow()) + assert.Equal(t, false, limiter.Allow()) +} diff --git a/service/frontend/workflowHandler.go b/service/frontend/workflowHandler.go index 2c30dbe07df..89a3de74b70 100644 --- a/service/frontend/workflowHandler.go +++ b/service/frontend/workflowHandler.go @@ -174,39 +174,30 @@ func NewWorkflowHandler( tokenSerializer: common.NewJSONTaskTokenSerializer(), userRateLimiter: quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(config.UserRPS.AsFloat64()), - quotas.NewCollection(quotas.NewDynamicRateLimiterFactory( - func(domain string) float64 { - return quotas.PerMember( - service.Frontend, - float64(config.GlobalDomainUserRPS(domain)), - float64(config.MaxDomainUserRPSPerInstance(domain)), - resource.GetMembershipResolver(), - ) - })), + quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory( + service.Frontend, + config.GlobalDomainUserRPS, + config.MaxDomainUserRPSPerInstance, + resource.GetMembershipResolver(), + )), ), workerRateLimiter: quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(config.WorkerRPS.AsFloat64()), - quotas.NewCollection(quotas.NewDynamicRateLimiterFactory( - func(domain string) float64 { - return quotas.PerMember( - service.Frontend, - float64(config.GlobalDomainWorkerRPS(domain)), - float64(config.MaxDomainWorkerRPSPerInstance(domain)), - resource.GetMembershipResolver(), - ) - })), + quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory( + service.Frontend, + config.GlobalDomainWorkerRPS, + config.MaxDomainWorkerRPSPerInstance, + resource.GetMembershipResolver(), + )), ), visibilityRateLimiter: quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(config.VisibilityRPS.AsFloat64()), - quotas.NewCollection(quotas.NewDynamicRateLimiterFactory( - func(domain string) float64 { - return quotas.PerMember( - service.Frontend, - float64(config.GlobalDomainVisibilityRPS(domain)), - float64(config.MaxDomainVisibilityRPSPerInstance(domain)), - resource.GetMembershipResolver(), - ) - })), + quotas.NewCollection(quotas.NewPerMemberDynamicRateLimiterFactory( + service.Frontend, + config.GlobalDomainVisibilityRPS, + config.MaxDomainVisibilityRPSPerInstance, + resource.GetMembershipResolver(), + )), ), versionChecker: versionChecker, domainHandler: domainHandler, diff --git a/service/history/task/priority_assigner.go b/service/history/task/priority_assigner.go index c7249cb2520..8ca1d3b8df0 100644 --- a/service/history/task/priority_assigner.go +++ b/service/history/task/priority_assigner.go @@ -68,10 +68,8 @@ func NewPriorityAssigner( config: config, logger: logger, scope: metricClient.Scope(metrics.TaskPriorityAssignerScope), - rateLimiters: quotas.NewCollection(quotas.NewDynamicRateLimiterFactory( - func(domain string) float64 { - return float64(config.TaskProcessRPS(domain)) - }, + rateLimiters: quotas.NewCollection(quotas.NewSimpleDynamicRateLimiterFactory( + config.TaskProcessRPS, )), } } diff --git a/service/matching/handler.go b/service/matching/handler.go index ac29de9d457..bb6f17a4d0a 100644 --- a/service/matching/handler.go +++ b/service/matching/handler.go @@ -85,27 +85,17 @@ func NewHandler( metricsClient: metricsClient, userRateLimiter: quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(config.UserRPS.AsFloat64()), - quotas.NewCollection(quotas.NewDynamicRateLimiterFactory( - func(domain string) float64 { - domainRPS := float64(config.DomainUserRPS(domain)) - if domainRPS > 0 { - return domainRPS - } - // if domain rps not set, use host rps to keep the old behavior - return float64(config.UserRPS()) - })), + quotas.NewCollection(quotas.NewFallbackDynamicRateLimiterFactory( + config.DomainUserRPS, + config.UserRPS, + )), ), workerRateLimiter: quotas.NewMultiStageRateLimiter( quotas.NewDynamicRateLimiter(config.WorkerRPS.AsFloat64()), - quotas.NewCollection(quotas.NewDynamicRateLimiterFactory( - func(domain string) float64 { - domainRPS := float64(config.DomainWorkerRPS(domain)) - if domainRPS > 0 { - return domainRPS - } - // if domain rps not set, use host rps to keep the old behavior - return float64(config.WorkerRPS()) - })), + quotas.NewCollection(quotas.NewFallbackDynamicRateLimiterFactory( + config.DomainWorkerRPS, + config.WorkerRPS, + )), ), engine: engine, logger: logger,