Skip to content

Commit

Permalink
Speed up creation/deletion of game servers in a set by:
Browse files Browse the repository at this point in the history
- parallelizing create/delete calls in GSS controller
- creating dedicated worker queues for creation/deletion requests
- replacing default rate limiters with ones that don't have global QPS limit
  • Loading branch information
jkowalski authored and markmandel committed Feb 6, 2019
1 parent 4af048a commit f9e7e81
Show file tree
Hide file tree
Showing 7 changed files with 809 additions and 182 deletions.
56 changes: 52 additions & 4 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import (
"encoding/json"
"fmt"
"sync"
"time"

"k8s.io/client-go/util/workqueue"

"agones.dev/agones/pkg/apis/stable"
"agones.dev/agones/pkg/apis/stable/v1alpha1"
Expand Down Expand Up @@ -73,6 +76,8 @@ type Controller struct {
portAllocator *PortAllocator
healthController *HealthController
workerqueue *workerqueue.WorkerQueue
creationWorkerQueue *workerqueue.WorkerQueue // handles creation only
deletionWorkerQueue *workerqueue.WorkerQueue // handles deletion only
allocationMutex *sync.Mutex
stop <-chan struct {
}
Expand Down Expand Up @@ -124,20 +129,24 @@ func NewController(
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
c.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "gameserver-controller"})

c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServer, c.logger, stable.GroupName+".GameServerController")
c.workerqueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServer, c.logger, stable.GroupName+".GameServerController", fastRateLimiter())
c.creationWorkerQueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServer, c.logger, stable.GroupName+".GameServerControllerCreation", fastRateLimiter())
c.deletionWorkerQueue = workerqueue.NewWorkerQueueWithRateLimiter(c.syncGameServer, c.logger, stable.GroupName+".GameServerControllerDeletion", fastRateLimiter())
health.AddLivenessCheck("gameserver-workerqueue", healthcheck.Check(c.workerqueue.Healthy))
health.AddLivenessCheck("gameserver-creation-workerqueue", healthcheck.Check(c.creationWorkerQueue.Healthy))
health.AddLivenessCheck("gameserver-deletion-workerqueue", healthcheck.Check(c.deletionWorkerQueue.Healthy))

wh.AddHandler("/mutate", v1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationMutationHandler)
wh.AddHandler("/validate", v1alpha1.Kind("GameServer"), admv1beta1.Create, c.creationValidationHandler)

gsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.workerqueue.Enqueue,
AddFunc: c.enqueueGameServerBasedOnState,
UpdateFunc: func(oldObj, newObj interface{}) {
// no point in processing unless there is a State change
oldGs := oldObj.(*v1alpha1.GameServer)
newGs := newObj.(*v1alpha1.GameServer)
if oldGs.Status.State != newGs.Status.State || oldGs.ObjectMeta.DeletionTimestamp != newGs.ObjectMeta.DeletionTimestamp {
c.workerqueue.Enqueue(newGs)
c.enqueueGameServerBasedOnState(newGs)
}
},
})
Expand Down Expand Up @@ -168,6 +177,31 @@ func NewController(
return c
}

func (c *Controller) enqueueGameServerBasedOnState(item interface{}) {
gs := item.(*v1alpha1.GameServer)

switch gs.Status.State {
case v1alpha1.GameServerStatePortAllocation,
v1alpha1.GameServerStateCreating:
c.creationWorkerQueue.Enqueue(gs)

case v1alpha1.GameServerStateShutdown:
c.deletionWorkerQueue.Enqueue(gs)

default:
c.workerqueue.Enqueue(gs)
}
}

// fastRateLimiter returns a fast rate limiter, without exponential back-off.
func fastRateLimiter() workqueue.RateLimiter {
const numFastRetries = 5
const fastDelay = 20 * time.Millisecond // first few retries up to 'numFastRetries' are fast
const slowDelay = 500 * time.Millisecond // subsequent retries are slow

return workqueue.NewItemFastSlowRateLimiter(fastDelay, slowDelay, numFastRetries)
}

// creationMutationHandler is the handler for the mutating webhook that sets the
// the default values on the GameServer
// Should only be called on gameserver create operations.
Expand Down Expand Up @@ -268,7 +302,21 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error {
// Run the Health Controller
go c.healthController.Run(stop)

c.workerqueue.Run(workers, stop)
// start work queues
var wg sync.WaitGroup

startWorkQueue := func(wq *workerqueue.WorkerQueue) {
wg.Add(1)
go func() {
defer wg.Done()
wq.Run(workers, stop)
}()
}

startWorkQueue(c.workerqueue)
startWorkQueue(c.creationWorkerQueue)
startWorkQueue(c.deletionWorkerQueue)
wg.Wait()
return nil
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/gameservers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,12 +172,16 @@ func TestControllerWatchGameServers(t *testing.T) {
received := make(chan string)
defer close(received)

c.workerqueue.SyncHandler = func(name string) error {
h := func(name string) error {
assert.Equal(t, "default/test", name)
received <- name
return nil
}

c.workerqueue.SyncHandler = h
c.creationWorkerQueue.SyncHandler = h
c.deletionWorkerQueue.SyncHandler = h

stop, cancel := agtesting.StartInformers(m, c.gameServerSynced)
defer cancel()

Expand Down
Loading

0 comments on commit f9e7e81

Please sign in to comment.