From e63355915050f79fb4da58c34528944b1b40d609 Mon Sep 17 00:00:00 2001 From: Jarek Kowalski Date: Mon, 14 Oct 2019 08:16:15 -0700 Subject: [PATCH] Improve fleet controller response times in busy clusters. (#1108) In a cluster with significant number of fleets (> workers in Fleet controller) and many configuration updates in flight, it is possible for change notifications to be queued with ever-increasing exponential back-off (up to 1000 seconds). Because fleet controller listens for both user-triggered changes and GSS changes, it can get really busy when allocations are in flight, leading to slower and slower fleet reaction times to both scaling and reflecting GSS status. When this happens and a user script scales a fleet, it can take multiple minutes for Fleet controller to respond. This change caps the queueing time for each fleet at 3 seconds to avoid spinning CPU cycles for really busy GSSs, but still give reasonable latency for user-triggered changes. Fixes #1107 --- pkg/fleetautoscalers/controller.go | 2 +- pkg/fleets/controller.go | 3 ++- pkg/gameserversets/controller.go | 3 ++- pkg/util/workerqueue/workerqueue.go | 8 ++++++++ 4 files changed, 13 insertions(+), 3 deletions(-) diff --git a/pkg/fleetautoscalers/controller.go b/pkg/fleetautoscalers/controller.go index 3fbea54e0c..b635568ee3 100644 --- a/pkg/fleetautoscalers/controller.go +++ b/pkg/fleetautoscalers/controller.go @@ -85,7 +85,7 @@ func NewController( fleetAutoscalerSynced: autoscaler.Informer().HasSynced, } c.baseLogger = runtime.NewLoggerWithType(c) - c.workerqueue = workerqueue.NewWorkerQueue(c.syncFleetAutoscaler, c.baseLogger, logfields.FleetAutoscalerKey, autoscaling.GroupName+".FleetAutoscalerController") + c.workerqueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncFleetAutoscaler, c.baseLogger, logfields.FleetAutoscalerKey, autoscaling.GroupName+".FleetAutoscalerController", workerqueue.FastRateLimiter(3*time.Second)) health.AddLivenessCheck("fleetautoscaler-workerqueue", healthcheck.Check(c.workerqueue.Healthy)) eventBroadcaster := record.NewBroadcaster() diff --git a/pkg/fleets/controller.go b/pkg/fleets/controller.go index 496128bcfd..3b04d6ddb1 100644 --- a/pkg/fleets/controller.go +++ b/pkg/fleets/controller.go @@ -18,6 +18,7 @@ import ( "encoding/json" "fmt" "reflect" + "time" "agones.dev/agones/pkg/apis/agones" agonesv1 "agones.dev/agones/pkg/apis/agones/v1" @@ -89,7 +90,7 @@ func NewController( } c.baseLogger = runtime.NewLoggerWithType(c) - c.workerqueue = workerqueue.NewWorkerQueue(c.syncFleet, c.baseLogger, logfields.FleetKey, agones.GroupName+".FleetController") + c.workerqueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncFleet, c.baseLogger, logfields.FleetKey, agones.GroupName+".FleetController", workerqueue.FastRateLimiter(3*time.Second)) health.AddLivenessCheck("fleet-workerqueue", healthcheck.Check(c.workerqueue.Healthy)) eventBroadcaster := record.NewBroadcaster() diff --git a/pkg/gameserversets/controller.go b/pkg/gameserversets/controller.go index 5037452170..98cc073728 100644 --- a/pkg/gameserversets/controller.go +++ b/pkg/gameserversets/controller.go @@ -17,6 +17,7 @@ package gameserversets import ( "encoding/json" "sync" + "time" "agones.dev/agones/pkg/apis" "agones.dev/agones/pkg/apis/agones" @@ -109,7 +110,7 @@ func NewController( } c.baseLogger = runtime.NewLoggerWithType(c) - c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServerSet, c.baseLogger, logfields.GameServerSetKey, agones.GroupName+".GameServerSetController") + c.workerqueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServerSet, c.baseLogger, logfields.GameServerSetKey, agones.GroupName+".GameServerSetController", workerqueue.FastRateLimiter(3*time.Second)) health.AddLivenessCheck("gameserverset-workerqueue", healthcheck.Check(c.workerqueue.Healthy)) eventBroadcaster := record.NewBroadcaster() diff --git a/pkg/util/workerqueue/workerqueue.go b/pkg/util/workerqueue/workerqueue.go index c8bf68ba3e..6fde02d5bf 100644 --- a/pkg/util/workerqueue/workerqueue.go +++ b/pkg/util/workerqueue/workerqueue.go @@ -54,6 +54,14 @@ type WorkerQueue struct { running int } +// FastRateLimiter returns a rate limiter without exponential back-off, with specified maximum per-item retry delay. +func FastRateLimiter(maxDelay time.Duration) workqueue.RateLimiter { + const numFastRetries = 5 + const fastDelay = 200 * time.Millisecond // first few retries up to 'numFastRetries' are fast + + return workqueue.NewItemFastSlowRateLimiter(fastDelay, maxDelay, numFastRetries) +} + // NewWorkerQueue returns a new worker queue for a given name func NewWorkerQueue(handler Handler, logger *logrus.Entry, keyName logfields.ResourceType, queueName string) *WorkerQueue { return NewWorkerQueueWithRateLimiter(handler, logger, keyName, queueName, workqueue.DefaultControllerRateLimiter())