Skip to content

Commit

Permalink
Batched Packed and Distributed Allocations
Browse files Browse the repository at this point in the history
This implements a batching algorithm for Packed and Distributed
GameServerAllocations (GSA).

This ensures that we can still get high throughout on GSA's, while
retaining a tight packing of Allocated GameServers on each node with
the Packed strategy.

The Distributed strategy is now implemented with a randomised Allocation
process, so ensure distributed load under this strategy.

Closes #783
  • Loading branch information
markmandel committed Jun 4, 2019
1 parent 9a528f7 commit 604a8f8
Show file tree
Hide file tree
Showing 5 changed files with 854 additions and 450 deletions.
7 changes: 7 additions & 0 deletions pkg/gameserverallocations/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,10 @@ func (e *gameServerCacheEntry) Range(f func(key string, gs *stablev1alpha1.GameS
}
}
}

// Len returns the current length of the cache
func (e *gameServerCacheEntry) Len() int {
e.mu.RLock()
defer e.mu.RUnlock()
return len(e.cache)
}
277 changes: 160 additions & 117 deletions pkg/gameserverallocations/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"io/ioutil"
"math/rand"
"net/http"
"sort"
"strconv"
"time"

"agones.dev/agones/pkg/apis"
"agones.dev/agones/pkg/apis/allocation/v1alpha1"
multiclusterv1alpha1 "agones.dev/agones/pkg/apis/multicluster/v1alpha1"
"agones.dev/agones/pkg/apis/stable"
Expand Down Expand Up @@ -70,11 +70,27 @@ var (
)

const (
secretClientCertName = "tls.crt"
secretClientKeyName = "tls.key"
secretCaCertName = "ca.crt"
secretClientCertName = "tls.crt"
secretClientKeyName = "tls.key"
secretCaCertName = "ca.crt"
maxBatchQueue = 100
maxBatchBeforeRefresh = 100
batchWaitTime = 500 * time.Millisecond
)

// request is an async request for allocation
type request struct {
gsa *v1alpha1.GameServerAllocation
response chan response
}

// response is an async response for a matching request
type response struct {
request request
gs *stablev1alpha1.GameServer
err error
}

// Controller is a the GameServerAllocation controller
type Controller struct {
baseLogger *logrus.Entry
Expand All @@ -93,13 +109,9 @@ type Controller struct {
stop <-chan struct{}
workerqueue *workerqueue.WorkerQueue
recorder record.EventRecorder
pendingRequests chan request
}

// findComparator is a comparator function specifically for the
// findReadyGameServerForAllocation method for determining
// scheduling strategy
type findComparator func(bestCount, currentCount gameservers.NodeCount) bool

var allocationRetry = wait.Backoff{
Steps: 5,
Duration: 10 * time.Millisecond,
Expand Down Expand Up @@ -129,6 +141,7 @@ func NewController(apiServer *apiserver.APIServer,
allocationPolicySynced: agonesInformerFactory.Multicluster().V1alpha1().GameServerAllocationPolicies().Informer().HasSynced,
secretLister: kubeInformerFactory.Core().V1().Secrets().Lister(),
secretSynced: kubeInformerFactory.Core().V1().Secrets().Informer().HasSynced,
pendingRequests: make(chan request, maxBatchQueue),
}
c.baseLogger = runtime.NewLoggerWithType(c)
c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServers, c.baseLogger, logfields.GameServerKey, stable.GroupName+".GameServerUpdateController")
Expand Down Expand Up @@ -191,6 +204,9 @@ func (c *Controller) Run(_ int, stop <-chan struct{}) error {
return err
}

// workers and logic for batching allocations
go c.runLocalAllocations(maxBatchQueue)

// we don't want mutiple workers refresh cache at the same time so one worker will be better.
// Also we don't expect to have too many failures when allocating
c.workerqueue.Run(1, stop)
Expand Down Expand Up @@ -493,44 +509,154 @@ func (c *Controller) serialisation(r *http.Request, w http.ResponseWriter, obj k
return errors.Wrapf(err, "error encoding %T", obj)
}

// allocate allocated a GameServer from a given Fleet
// allocate allocated a GameServer from a given GameServerAllocation
// this sets up allocation through a batch process.
func (c *Controller) allocate(gsa *v1alpha1.GameServerAllocation) (*stablev1alpha1.GameServer, error) {
var allocation *stablev1alpha1.GameServer
var comparator findComparator
req := request{gsa: gsa, response: make(chan response)}

switch gsa.Spec.Scheduling {
case apis.Packed:
comparator = packedComparator
case apis.Distributed:
comparator = distributedComparator
c.pendingRequests <- req

select {
case res := <-req.response:
return res.gs, res.err
case <-c.stop:
return nil, errors.New("shutting down")
}
}

allocation, err := c.findReadyGameServerForAllocation(gsa, comparator)
if err != nil {
return allocation, err
// runLocalAllocations is a blocking function that runs in a loop
// looking at c.requestBatches for batches of requests that are coming through.
func (c *Controller) runLocalAllocations(updateWorkerCount int) {
updateQueue := make(chan response)
// setup workers for allocation updates
c.allocationUpdateWorkers(updateQueue, updateWorkerCount)

var list []*stablev1alpha1.GameServer
requestCount := 0

for {
select {
case req := <-c.pendingRequests:
// refresh the list after every 100 allocations made in a single batch
requestCount++
if requestCount >= maxBatchBeforeRefresh {
list = nil
requestCount = 0
}

if list == nil {
list = c.listSortedReadyGameServers()
}

gs, index, err := findGameServerForAllocation(req.gsa, list)
if err != nil {
req.response <- response{request: req, gs: nil, err: err}
continue
}
// remove the game server that has been allocated
list = append(list[:index], list[index+1:]...)

key, _ := cache.MetaNamespaceKeyFunc(gs)
if ok := c.readyGameServers.Delete(key); !ok {
// this seems unlikely, but lets handle it just in case
req.response <- response{request: req, gs: nil, err: ErrConflictInGameServerSelection}
continue
}

updateQueue <- response{request: req, gs: gs.DeepCopy(), err: nil}

case <-c.stop:
return
default:
list = nil
requestCount = 0
// slow down cpu churn, and allow items to batch
time.Sleep(batchWaitTime)
}
}
}

// allocationUpdateWorkers runs
func (c *Controller) allocationUpdateWorkers(updateQueue chan response, workerCount int) {
for i := 0; i < workerCount; i++ {
go func() {
for {
select {
case res := <-updateQueue:
gsCopy := res.gs.DeepCopy()
c.patchMetadata(gsCopy, res.request.gsa.Spec.MetaPatch)
gsCopy.Status.State = stablev1alpha1.GameServerStateAllocated

gs, err := c.gameServerGetter.GameServers(res.gs.ObjectMeta.Namespace).Update(gsCopy)
if err != nil {
key, _ := cache.MetaNamespaceKeyFunc(gs)
// since we could not allocate, we should put it back
c.readyGameServers.Store(key, gs)
res.err = errors.Wrap(err, "error updating allocated gameserver")
} else {
res.gs = gs
c.recorder.Event(res.gs, corev1.EventTypeNormal, string(res.gs.Status.State), "Allocated")
}

key, _ := cache.MetaNamespaceKeyFunc(allocation)
if ok := c.readyGameServers.Delete(key); !ok {
return allocation, ErrConflictInGameServerSelection
res.request.response <- res
case <-c.stop:
return
}
}
}()
}
}

gsCopy := allocation.DeepCopy()
gsCopy.Status.State = stablev1alpha1.GameServerStateAllocated
// listSortedReadyGameServers returns a list of the cache ready gameservers
// sorted by most allocated to least
func (c *Controller) listSortedReadyGameServers() []*stablev1alpha1.GameServer {
length := c.readyGameServers.Len()
if length == 0 {
return []*stablev1alpha1.GameServer{}
}

c.patchMetadata(gsCopy, gsa.Spec.MetaPatch)
list := make([]*stablev1alpha1.GameServer, 0, length)
c.readyGameServers.Range(func(_ string, gs *stablev1alpha1.GameServer) bool {
list = append(list, gs)
return true
})
counts := c.counter.Counts()

gs, err := c.gameServerGetter.GameServers(gsCopy.ObjectMeta.Namespace).Update(gsCopy)
sort.Slice(list, func(i, j int) bool {
gs1 := list[i]
gs2 := list[j]

if err != nil {
// since we could not allocate, we should put it back
c.readyGameServers.Store(key, gs)
return gs, errors.Wrapf(err, "error updating GameServer %s", gsCopy.ObjectMeta.Name)
}
c1, ok := counts[gs1.Status.NodeName]
if !ok {
return false
}

c2, ok := counts[gs2.Status.NodeName]
if !ok {
return true
}

c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), "Allocated")
if c1.Allocated > c2.Allocated {
return true
}
if c1.Allocated < c2.Allocated {
return false
}

return gs, nil
// prefer nodes that have the most Ready gameservers on them - they are most likely to be
// completely filled and least likely target for scale down.
if c1.Ready < c2.Ready {
return false
}
if c1.Ready > c2.Ready {
return true
}

// finally sort lexicographically, so we have a stable order
return gs1.Status.NodeName < gs2.Status.NodeName
})

return list
}

// patch the labels and annotations of an allocated GameServer with metadata from a GameServerAllocation
Expand All @@ -555,78 +681,6 @@ func (c *Controller) patchMetadata(gs *stablev1alpha1.GameServer, fam v1alpha1.M
}
}

// findReadyGameServerForAllocation returns the most appropriate GameServer from the set, taking into account
// preferred selectors, as well as the passed in comparator
func (c *Controller) findReadyGameServerForAllocation(gsa *v1alpha1.GameServerAllocation, comparator findComparator) (*stablev1alpha1.GameServer, error) {
// track the best node count
var bestCount *gameservers.NodeCount
// the current GameServer from the node with the most GameServers (allocated, ready)
var bestGS *stablev1alpha1.GameServer

selector, err := metav1.LabelSelectorAsSelector(&gsa.Spec.Required)
if err != nil {
return bestGS, errors.Wrapf(err, "could not convert GameServer %s GameServerAllocation selector", gsa.ObjectMeta.Name)
}

gsList := c.selectGameServers(selector)

preferred, err := gsa.Spec.PreferredSelectors()
if err != nil {
return bestGS, errors.Wrapf(err, "could not create preferred selectors for GameServerAllocation %s", gsa.ObjectMeta.Name)
}

counts := c.counter.Counts()

// track potential GameServers, one for each node
allocatableRequired := map[string]*stablev1alpha1.GameServer{}
allocatablePreferred := make([]map[string]*stablev1alpha1.GameServer, len(preferred))

// build the index of possible allocatable GameServers
for _, gs := range gsList {
if gs.DeletionTimestamp.IsZero() && gs.Status.State == stablev1alpha1.GameServerStateReady {
allocatableRequired[gs.Status.NodeName] = gs

for i, p := range preferred {
if p.Matches(labels.Set(gs.Labels)) {
if allocatablePreferred[i] == nil {
allocatablePreferred[i] = map[string]*stablev1alpha1.GameServer{}
}
allocatablePreferred[i][gs.Status.NodeName] = gs
}
}
}
}

allocationSet := allocatableRequired

// check if there is any preferred options available
for _, set := range allocatablePreferred {
if len(set) > 0 {
allocationSet = set
break
}
}

var bestGSList []stablev1alpha1.GameServer
for nodeName, gs := range allocationSet {
count := counts[nodeName]
// bestGS == nil: if there is no best GameServer, then this node & GameServer is the always the best
if bestGS == nil || comparator(*bestCount, count) {
bestCount = &count
bestGS = gs
bestGSList = append(bestGSList, *gs)
}
}

if bestGS == nil {
err = ErrNoGameServerReady
} else {
bestGS = c.getRandomlySelectedGS(gsa, bestGSList)
}

return bestGS, err
}

// syncGameServers synchronises the GameServers to Gameserver cache. This is called when a failure
// happened during the allocation. This method will sync and make sure the cache is up to date.
func (c *Controller) syncGameServers(key string) error {
Expand Down Expand Up @@ -685,17 +739,6 @@ func (c *Controller) syncReadyGSServerCache() error {
return nil
}

// selectGameServers selects the appropriate gameservers from cache based on selector.
func (c *Controller) selectGameServers(selector labels.Selector) (res []*stablev1alpha1.GameServer) {
c.readyGameServers.Range(func(key string, gs *stablev1alpha1.GameServer) bool {
if selector.Matches(labels.Set(gs.ObjectMeta.GetLabels())) {
res = append(res, gs)
}
return true
})
return res
}

// getKey extract the key of gameserver object
func (c *Controller) getKey(gs *stablev1alpha1.GameServer) (string, bool) {
var key string
Expand Down
Loading

0 comments on commit 604a8f8

Please sign in to comment.