From 7bd4979aefca23127e843c5682ce2ad43649762a Mon Sep 17 00:00:00 2001 From: Matthew Wong Date: Wed, 16 Jan 2019 14:27:18 -0500 Subject: [PATCH] Add RateLimiter arg so that callers can pass their own workqueue.RateLimiter --- controller/controller.go | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index 0b5b866..ecbf129 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -127,6 +127,7 @@ type ProvisionController struct { resyncPeriod time.Duration + rateLimiter workqueue.RateLimiter exponentialBackOffOnError bool threadiness int @@ -212,6 +213,18 @@ func Threadiness(threadiness int) func(*ProvisionController) error { } } +// RateLimiter is the workqueue.RateLimiter to use for the provisioning and +// deleting work queues. If set, ExponentialBackOffOnError is ignored. +func RateLimiter(rateLimiter workqueue.RateLimiter) func(*ProvisionController) error { + return func(c *ProvisionController) error { + if c.HasRun() { + return errRuntime + } + c.rateLimiter = rateLimiter + return nil + } +} + // ExponentialBackOffOnError determines whether to exponentially back off from // failures of Provision and Delete. Defaults to true. func ExponentialBackOffOnError(exponentialBackOffOnError bool) func(*ProvisionController) error { @@ -469,18 +482,23 @@ func NewProvisionController( option(controller) } - ratelimiter := workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second), - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, - ) - if !controller.exponentialBackOffOnError { - ratelimiter = workqueue.NewMaxOfRateLimiter( + var rateLimiter workqueue.RateLimiter + if controller.rateLimiter != nil { + // rateLimiter set via parameter takes precedence + rateLimiter = controller.rateLimiter + } else if controller.exponentialBackOffOnError { + rateLimiter = workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ) + } else { + rateLimiter = workqueue.NewMaxOfRateLimiter( workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 15*time.Second), &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ) } - controller.claimQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "claims") - controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "volumes") + controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims") + controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes") informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod)