diff --git a/pkg/gameserverallocations/cache.go b/pkg/gameserverallocations/cache.go index c4d8da3ce0..edf03e2b9a 100644 --- a/pkg/gameserverallocations/cache.go +++ b/pkg/gameserverallocations/cache.go @@ -70,3 +70,10 @@ func (e *gameServerCacheEntry) Range(f func(key string, gs *stablev1alpha1.GameS } } } + +// Len returns the current length of the cache +func (e *gameServerCacheEntry) Len() int { + e.mu.RLock() + defer e.mu.RUnlock() + return len(e.cache) +} diff --git a/pkg/gameserverallocations/controller.go b/pkg/gameserverallocations/controller.go index f0ba2b1cc0..216cb11a47 100644 --- a/pkg/gameserverallocations/controller.go +++ b/pkg/gameserverallocations/controller.go @@ -23,10 +23,10 @@ import ( "io/ioutil" "math/rand" "net/http" + "sort" "strconv" "time" - "agones.dev/agones/pkg/apis" "agones.dev/agones/pkg/apis/allocation/v1alpha1" multiclusterv1alpha1 "agones.dev/agones/pkg/apis/multicluster/v1alpha1" "agones.dev/agones/pkg/apis/stable" @@ -70,11 +70,27 @@ var ( ) const ( - secretClientCertName = "tls.crt" - secretClientKeyName = "tls.key" - secretCaCertName = "ca.crt" + secretClientCertName = "tls.crt" + secretClientKeyName = "tls.key" + secretCaCertName = "ca.crt" + maxBatchQueue = 100 + maxBatchBeforeRefresh = 100 + batchWaitTime = 500 * time.Millisecond ) +// request is an async request for allocation +type request struct { + gsa *v1alpha1.GameServerAllocation + response chan response +} + +// response is an async response for a matching request +type response struct { + request request + gs *stablev1alpha1.GameServer + err error +} + // Controller is a the GameServerAllocation controller type Controller struct { baseLogger *logrus.Entry @@ -93,13 +109,9 @@ type Controller struct { stop <-chan struct{} workerqueue *workerqueue.WorkerQueue recorder record.EventRecorder + pendingRequests chan request } -// findComparator is a comparator function specifically for the -// findReadyGameServerForAllocation method for determining -// scheduling strategy -type findComparator func(bestCount, currentCount gameservers.NodeCount) bool - var allocationRetry = wait.Backoff{ Steps: 5, Duration: 10 * time.Millisecond, @@ -129,6 +141,7 @@ func NewController(apiServer *apiserver.APIServer, allocationPolicySynced: agonesInformerFactory.Multicluster().V1alpha1().GameServerAllocationPolicies().Informer().HasSynced, secretLister: kubeInformerFactory.Core().V1().Secrets().Lister(), secretSynced: kubeInformerFactory.Core().V1().Secrets().Informer().HasSynced, + pendingRequests: make(chan request, maxBatchQueue), } c.baseLogger = runtime.NewLoggerWithType(c) c.workerqueue = workerqueue.NewWorkerQueue(c.syncGameServers, c.baseLogger, logfields.GameServerKey, stable.GroupName+".GameServerUpdateController") @@ -205,6 +218,9 @@ func (c *Controller) Run(_ int, stop <-chan struct{}) error { return err } + // workers and logic for batching allocations + go c.runLocalAllocations(maxBatchQueue) + // we don't want mutiple workers refresh cache at the same time so one worker will be better. // Also we don't expect to have too many failures when allocating c.workerqueue.Run(1, stop) @@ -507,44 +523,196 @@ func (c *Controller) serialisation(r *http.Request, w http.ResponseWriter, obj k return errors.Wrapf(err, "error encoding %T", obj) } -// allocate allocated a GameServer from a given Fleet +// allocate allocated a GameServer from a given GameServerAllocation +// this sets up allocation through a batch process. func (c *Controller) allocate(gsa *v1alpha1.GameServerAllocation) (*stablev1alpha1.GameServer, error) { - var allocation *stablev1alpha1.GameServer - var comparator findComparator + // creates an allocation request. This contains the requested GameServerAllocation, as well as the + // channel we expect the return values to come back for this GameServerAllocation + req := request{gsa: gsa, response: make(chan response)} - switch gsa.Spec.Scheduling { - case apis.Packed: - comparator = packedComparator - case apis.Distributed: - comparator = distributedComparator - } + // this pushes the request into the batching process + c.pendingRequests <- req - allocation, err := c.findReadyGameServerForAllocation(gsa, comparator) - if err != nil { - return allocation, err + select { + case res := <-req.response: // wait for the batch to be completed + return res.gs, res.err + case <-c.stop: + return nil, errors.New("shutting down") } +} + +// runLocalAllocations is a blocking function that runs in a loop +// looking at c.requestBatches for batches of requests that are coming through. +func (c *Controller) runLocalAllocations(updateWorkerCount int) { + // setup workers for allocation updates. Push response values into + // this queue for concurrent updating of GameServers to Allocated + updateQueue := c.allocationUpdateWorkers(updateWorkerCount) + + // Batch processing strategy: + // We constantly loop around the below for loop. If nothing is found in c.pendingRequests, we move to + // default: which will wait for half a second, to allow for some requests to backup in c.pendingRequests, + // providing us with a batch of Allocation requests in that channel + + // Once we have 1 or more requests in c.pendingRequests (which is buffered to 100), we can start the batch process. + + // Assuming this is the first run (either entirely, or for a while), list will be nil, and therefore the first + // thing that will be done is retrieving the Ready GameSerers and sorting them for this batch via + // c.listSortedReadyGameServers(). This list is maintained as we flow through the batch. + + // We then use findGameServerForAllocation to loop around the sorted list of Ready GameServers to look for matches + // against the preferred and required selectors of the GameServerAllocation. If there is an error, we immediately + // pass that straight back to the response channel for this GameServerAllocation. + + // Assuming we find a matching GameServer to our GameServerAllocation, we remove it from the list and the backing + // Ready GameServer cache. + + // We then pass the found GameServers into the updateQueue, where there are updateWorkerCount number of goroutines + // waiting to concurrently attempt to move the GameServer into an Allocated state, and return the result to + // GameServerAllocation request's response channel + + // Then we get the next item off the batch (c.pendingRequests), and do this all over again, but this time, we have + // an already sorted list of GameServers, so we only need to find one that matches our GameServerAllocation + // selectors, and put it into updateQueue + + // The tracking of requestCount >= maxBatchBeforeRefresh is necessary, because without it, at high enough load + // the list of GameServers that we are using to allocate would never get refreshed (list = nil) with an updated + // list of Ready GameServers, and you would eventually never be able to Allocate anything as long as the load + // continued. + + var list []*stablev1alpha1.GameServer + requestCount := 0 + + for { + select { + case req := <-c.pendingRequests: + // refresh the list after every 100 allocations made in a single batch + requestCount++ + if requestCount >= maxBatchBeforeRefresh { + list = nil + requestCount = 0 + } + + if list == nil { + list = c.listSortedReadyGameServers() + } + + gs, index, err := findGameServerForAllocation(req.gsa, list) + if err != nil { + req.response <- response{request: req, gs: nil, err: err} + continue + } + // remove the game server that has been allocated + list = append(list[:index], list[index+1:]...) + + key, _ := cache.MetaNamespaceKeyFunc(gs) + if ok := c.readyGameServers.Delete(key); !ok { + // this seems unlikely, but lets handle it just in case + req.response <- response{request: req, gs: nil, err: ErrConflictInGameServerSelection} + continue + } + + updateQueue <- response{request: req, gs: gs.DeepCopy(), err: nil} - key, _ := cache.MetaNamespaceKeyFunc(allocation) - if ok := c.readyGameServers.Delete(key); !ok { - return allocation, ErrConflictInGameServerSelection + case <-c.stop: + return + default: + list = nil + requestCount = 0 + // slow down cpu churn, and allow items to batch + time.Sleep(batchWaitTime) + } } +} - gsCopy := allocation.DeepCopy() - gsCopy.Status.State = stablev1alpha1.GameServerStateAllocated +// allocationUpdateWorkers runs workerCount number of goroutines as workers to +// process each GameServer passed into the returned updateQueue +// Each worker will concurrently attempt to move the GameServer to an Allocated +// state and then respond to the initial request's response channel with the +// details of that update +func (c *Controller) allocationUpdateWorkers(workerCount int) chan<- response { + updateQueue := make(chan response) + + for i := 0; i < workerCount; i++ { + go func() { + for { + select { + case res := <-updateQueue: + gsCopy := res.gs.DeepCopy() + c.patchMetadata(gsCopy, res.request.gsa.Spec.MetaPatch) + gsCopy.Status.State = stablev1alpha1.GameServerStateAllocated + + gs, err := c.gameServerGetter.GameServers(res.gs.ObjectMeta.Namespace).Update(gsCopy) + if err != nil { + key, _ := cache.MetaNamespaceKeyFunc(gs) + // since we could not allocate, we should put it back + c.readyGameServers.Store(key, gs) + res.err = errors.Wrap(err, "error updating allocated gameserver") + } else { + res.gs = gs + c.recorder.Event(res.gs, corev1.EventTypeNormal, string(res.gs.Status.State), "Allocated") + } - c.patchMetadata(gsCopy, gsa.Spec.MetaPatch) + res.request.response <- res + case <-c.stop: + return + } + } + }() + } - gs, err := c.gameServerGetter.GameServers(gsCopy.ObjectMeta.Namespace).Update(gsCopy) + return updateQueue +} - if err != nil { - // since we could not allocate, we should put it back - c.readyGameServers.Store(key, gs) - return gs, errors.Wrapf(err, "error updating GameServer %s", gsCopy.ObjectMeta.Name) +// listSortedReadyGameServers returns a list of the cache ready gameservers +// sorted by most allocated to least +func (c *Controller) listSortedReadyGameServers() []*stablev1alpha1.GameServer { + length := c.readyGameServers.Len() + if length == 0 { + return []*stablev1alpha1.GameServer{} } - c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), "Allocated") + list := make([]*stablev1alpha1.GameServer, 0, length) + c.readyGameServers.Range(func(_ string, gs *stablev1alpha1.GameServer) bool { + list = append(list, gs) + return true + }) + counts := c.counter.Counts() + + sort.Slice(list, func(i, j int) bool { + gs1 := list[i] + gs2 := list[j] - return gs, nil + c1, ok := counts[gs1.Status.NodeName] + if !ok { + return false + } + + c2, ok := counts[gs2.Status.NodeName] + if !ok { + return true + } + + if c1.Allocated > c2.Allocated { + return true + } + if c1.Allocated < c2.Allocated { + return false + } + + // prefer nodes that have the most Ready gameservers on them - they are most likely to be + // completely filled and least likely target for scale down. + if c1.Ready < c2.Ready { + return false + } + if c1.Ready > c2.Ready { + return true + } + + // finally sort lexicographically, so we have a stable order + return gs1.Status.NodeName < gs2.Status.NodeName + }) + + return list } // patch the labels and annotations of an allocated GameServer with metadata from a GameServerAllocation @@ -569,78 +737,6 @@ func (c *Controller) patchMetadata(gs *stablev1alpha1.GameServer, fam v1alpha1.M } } -// findReadyGameServerForAllocation returns the most appropriate GameServer from the set, taking into account -// preferred selectors, as well as the passed in comparator -func (c *Controller) findReadyGameServerForAllocation(gsa *v1alpha1.GameServerAllocation, comparator findComparator) (*stablev1alpha1.GameServer, error) { - // track the best node count - var bestCount *gameservers.NodeCount - // the current GameServer from the node with the most GameServers (allocated, ready) - var bestGS *stablev1alpha1.GameServer - - selector, err := metav1.LabelSelectorAsSelector(&gsa.Spec.Required) - if err != nil { - return bestGS, errors.Wrapf(err, "could not convert GameServer %s GameServerAllocation selector", gsa.ObjectMeta.Name) - } - - gsList := c.selectGameServers(selector) - - preferred, err := gsa.Spec.PreferredSelectors() - if err != nil { - return bestGS, errors.Wrapf(err, "could not create preferred selectors for GameServerAllocation %s", gsa.ObjectMeta.Name) - } - - counts := c.counter.Counts() - - // track potential GameServers, one for each node - allocatableRequired := map[string]*stablev1alpha1.GameServer{} - allocatablePreferred := make([]map[string]*stablev1alpha1.GameServer, len(preferred)) - - // build the index of possible allocatable GameServers - for _, gs := range gsList { - if gs.DeletionTimestamp.IsZero() && gs.Status.State == stablev1alpha1.GameServerStateReady { - allocatableRequired[gs.Status.NodeName] = gs - - for i, p := range preferred { - if p.Matches(labels.Set(gs.Labels)) { - if allocatablePreferred[i] == nil { - allocatablePreferred[i] = map[string]*stablev1alpha1.GameServer{} - } - allocatablePreferred[i][gs.Status.NodeName] = gs - } - } - } - } - - allocationSet := allocatableRequired - - // check if there is any preferred options available - for _, set := range allocatablePreferred { - if len(set) > 0 { - allocationSet = set - break - } - } - - var bestGSList []stablev1alpha1.GameServer - for nodeName, gs := range allocationSet { - count := counts[nodeName] - // bestGS == nil: if there is no best GameServer, then this node & GameServer is the always the best - if bestGS == nil || comparator(*bestCount, count) { - bestCount = &count - bestGS = gs - bestGSList = append(bestGSList, *gs) - } - } - - if bestGS == nil { - err = ErrNoGameServerReady - } else { - bestGS = c.getRandomlySelectedGS(gsa, bestGSList) - } - - return bestGS, err -} - // syncGameServers synchronises the GameServers to Gameserver cache. This is called when a failure // happened during the allocation. This method will sync and make sure the cache is up to date. func (c *Controller) syncGameServers(key string) error { @@ -699,17 +795,6 @@ func (c *Controller) syncReadyGSServerCache() error { return nil } -// selectGameServers selects the appropriate gameservers from cache based on selector. -func (c *Controller) selectGameServers(selector labels.Selector) (res []*stablev1alpha1.GameServer) { - c.readyGameServers.Range(func(key string, gs *stablev1alpha1.GameServer) bool { - if selector.Matches(labels.Set(gs.ObjectMeta.GetLabels())) { - res = append(res, gs) - } - return true - }) - return res -} - // getKey extract the key of gameserver object func (c *Controller) getKey(gs *stablev1alpha1.GameServer) (string, bool) { var key string diff --git a/pkg/gameserverallocations/controller_test.go b/pkg/gameserverallocations/controller_test.go index ccbb348dfc..a85db8662e 100644 --- a/pkg/gameserverallocations/controller_test.go +++ b/pkg/gameserverallocations/controller_test.go @@ -34,6 +34,7 @@ import ( agtesting "agones.dev/agones/pkg/testing" "agones.dev/agones/pkg/util/apiserver" "github.com/heptiolabs/healthcheck" + "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" @@ -75,15 +76,15 @@ func TestControllerAllocationHandler(t *testing.T) { return true, gs, nil }) - stop, cancel := agtesting.StartInformers(m, c.gameServerSynced) + stop, cancel := agtesting.StartInformers(m) defer cancel() - // This call initializes the cache - err := c.syncReadyGSServerCache() - assert.Nil(t, err) - - err = c.counter.Run(0, stop) - assert.Nil(t, err) + go c.Run(1, stop) // nolint: errcheck + // wait for it to be up and running + err := wait.PollImmediate(time.Second, 10*time.Second, func() (done bool, err error) { + return c.workerqueue.RunCount() == 1, nil + }) + assert.NoError(t, err) test := func(gsa *v1alpha1.GameServerAllocation, expectedState v1alpha1.GameServerAllocationState) { buf := bytes.NewBuffer(nil) @@ -177,17 +178,17 @@ func TestControllerAllocate(t *testing.T) { return true, gs, nil }) - stop, cancel := agtesting.StartInformers(m, m.AgonesInformerFactory.Stable().V1alpha1().GameServers().Informer().HasSynced) + stop, cancel := agtesting.StartInformers(m) defer cancel() - // This call initializes the cache - err := c.syncReadyGSServerCache() - assert.Nil(t, err) - - err = c.counter.Run(0, stop) - assert.Nil(t, err) + go c.Run(1, stop) // nolint: errcheck + // wait for it to be up and running + err := wait.PollImmediate(time.Second, 10*time.Second, func() (done bool, err error) { + return c.workerqueue.RunCount() == 1, nil + }) + assert.NoError(t, err) - gsa := v1alpha1.GameServerAllocation{ObjectMeta: metav1.ObjectMeta{Name: "gsa-1"}, + gsa := v1alpha1.GameServerAllocation{ObjectMeta: metav1.ObjectMeta{Name: "gsa-1", Namespace: defaultNs}, Spec: v1alpha1.GameServerAllocationSpec{ Required: metav1.LabelSelector{MatchLabels: map[string]string{stablev1alpha1.FleetNameLabel: f.ObjectMeta.Name}}, MetaPatch: fam, @@ -254,17 +255,17 @@ func TestControllerAllocatePriority(t *testing.T) { return true, gs, nil }) - stop, cancel := agtesting.StartInformers(m, c.gameServerSynced) + stop, cancel := agtesting.StartInformers(m) defer cancel() - // This call initializes the cache - err := c.syncReadyGSServerCache() - assert.Nil(t, err) - - err = c.counter.Run(0, stop) - assert.Nil(t, err) + go c.Run(1, stop) // nolint: errcheck + // wait for it to be up and running + err := wait.PollImmediate(time.Second, 10*time.Second, func() (done bool, err error) { + return c.workerqueue.RunCount() == 1, nil + }) + assert.NoError(t, err) - gas := &v1alpha1.GameServerAllocation{ObjectMeta: metav1.ObjectMeta{Name: "fa-1"}, + gas := &v1alpha1.GameServerAllocation{ObjectMeta: metav1.ObjectMeta{Name: "fa-1", Namespace: defaultNs}, Spec: v1alpha1.GameServerAllocationSpec{ Required: metav1.LabelSelector{MatchLabels: map[string]string{stablev1alpha1.FleetNameLabel: f.ObjectMeta.Name}}, }} @@ -277,150 +278,80 @@ func TestControllerAllocatePriority(t *testing.T) { run(t, "packed", func(t *testing.T, c *Controller, gas *v1alpha1.GameServerAllocation) { // priority should be node1, then node2 - gs, err := c.allocate(gas) - assert.Nil(t, err) - assert.Equal(t, n1, gs.Status.NodeName) + gs1, err := c.allocate(gas) + assert.NoError(t, err) + assert.Equal(t, n1, gs1.Status.NodeName) - gs, err = c.allocate(gas) - assert.Nil(t, err) - assert.Equal(t, n1, gs.Status.NodeName) + gs2, err := c.allocate(gas) + assert.NoError(t, err) + assert.Equal(t, n1, gs2.Status.NodeName) + assert.NotEqual(t, gs1.ObjectMeta.Name, gs2.ObjectMeta.Name) - gs, err = c.allocate(gas) - assert.Nil(t, err) - assert.Equal(t, n1, gs.Status.NodeName) + gs3, err := c.allocate(gas) + assert.NoError(t, err) + assert.Equal(t, n1, gs3.Status.NodeName) + assert.NotContains(t, []string{gs1.ObjectMeta.Name, gs2.ObjectMeta.Name}, gs3.ObjectMeta.Name) - gs, err = c.allocate(gas) - assert.Nil(t, err) - assert.Equal(t, n2, gs.Status.NodeName) + gs4, err := c.allocate(gas) + assert.NoError(t, err) + assert.Equal(t, n2, gs4.Status.NodeName) + assert.NotContains(t, []string{gs1.ObjectMeta.Name, gs2.ObjectMeta.Name, gs3.ObjectMeta.Name}, gs4.ObjectMeta.Name) // should have none left _, err = c.allocate(gas) - assert.NotNil(t, err) + assert.Equal(t, err, ErrNoGameServerReady) }) run(t, "distributed", func(t *testing.T, c *Controller, gas *v1alpha1.GameServerAllocation) { // make a copy, to avoid the race check gas = gas.DeepCopy() gas.Spec.Scheduling = apis.Distributed - // should go node2, then node1 - gs, err := c.allocate(gas) - assert.Nil(t, err) - assert.Equal(t, n2, gs.Status.NodeName) - gs, err = c.allocate(gas) - assert.Nil(t, err) - assert.Equal(t, n1, gs.Status.NodeName) + // distributed is randomised, so no set pattern - gs, err = c.allocate(gas) - assert.Nil(t, err) - assert.Equal(t, n1, gs.Status.NodeName) + gs1, err := c.allocate(gas) + assert.NoError(t, err) - gs, err = c.allocate(gas) - assert.Nil(t, err) - assert.Equal(t, n1, gs.Status.NodeName) + gs2, err := c.allocate(gas) + assert.NoError(t, err) + assert.NotEqual(t, gs1.ObjectMeta.Name, gs2.ObjectMeta.Name) + + gs3, err := c.allocate(gas) + assert.NoError(t, err) + assert.NotContains(t, []string{gs1.ObjectMeta.Name, gs2.ObjectMeta.Name}, gs3.ObjectMeta.Name) + + gs4, err := c.allocate(gas) + assert.NoError(t, err) + assert.NotContains(t, []string{gs1.ObjectMeta.Name, gs2.ObjectMeta.Name, gs3.ObjectMeta.Name}, gs4.ObjectMeta.Name) // should have none left _, err = c.allocate(gas) - assert.NotNil(t, err) + assert.Equal(t, err, ErrNoGameServerReady) }) } -func TestControllerFindPackedReadyGameServer(t *testing.T) { +func TestControllerRunLocalAllocations(t *testing.T) { t.Parallel() - labels := map[string]string{"role": "gameserver"} - gsa := &v1alpha1.GameServerAllocation{ - Spec: v1alpha1.GameServerAllocationSpec{ - Required: metav1.LabelSelector{ - MatchLabels: labels, - }, - Scheduling: "", - }, - } + t.Run("no problems", func(t *testing.T) { + f, _, gsList := defaultFixtures(5) + gsList[0].Status.NodeName = "special" - t.Run("test just required", func(t *testing.T) { c, m := newFakeController() - watch := watch.NewFake() - m.AgonesClient.AddWatchReactor("gameservers", k8stesting.DefaultWatchReactor(watch, nil)) - hasSync := m.AgonesInformerFactory.Stable().V1alpha1().GameServers().Informer().HasSynced - - n := metav1.Now() - gsList := []stablev1alpha1.GameServer{ - {ObjectMeta: metav1.ObjectMeta{Name: "gs6", Namespace: defaultNs, Labels: labels, DeletionTimestamp: &n}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs2", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs3", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateAllocated}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs4", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateAllocated}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs5", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateError}}, - } - m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { return true, &stablev1alpha1.GameServerList{Items: gsList}, nil }) + updateCount := 0 + m.AgonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { + updateCount++ - stop, cancel := agtesting.StartInformers(m, hasSync) - defer cancel() - - // This call initializes the cache - err := c.syncReadyGSServerCache() - assert.Nil(t, err) - - err = c.counter.Run(0, stop) - assert.Nil(t, err) - - gs, err := c.findReadyGameServerForAllocation(gsa, packedComparator) - assert.Nil(t, err) - assert.Equal(t, "node1", gs.Status.NodeName) - assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) - - // mock that the first game server is allocated - change := gsList[1].DeepCopy() - change.Status.State = stablev1alpha1.GameServerStateAllocated - watch.Modify(change) - assert.True(t, cache.WaitForCacheSync(stop, hasSync)) - - gs, err = c.findReadyGameServerForAllocation(gsa, packedComparator) - assert.Nil(t, err) - assert.Equal(t, "node2", gs.Status.NodeName) - assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) - - gsList[2].Status.State = stablev1alpha1.GameServerStateAllocated - change = gsList[2].DeepCopy() - change.Status.State = stablev1alpha1.GameServerStateAllocated - watch.Modify(change) - assert.True(t, cache.WaitForCacheSync(stop, hasSync)) - - gs, err = c.findReadyGameServerForAllocation(gsa, packedComparator) - assert.Equal(t, ErrNoGameServerReady, err) - assert.Nil(t, gs) - }) - - t.Run("test preferred", func(t *testing.T) { - c, m := newFakeController() - watch := watch.NewFake() - m.AgonesClient.AddWatchReactor("gameservers", k8stesting.DefaultWatchReactor(watch, nil)) - hasSync := m.AgonesInformerFactory.Stable().V1alpha1().GameServers().Informer().HasSynced - - prefLabels := map[string]string{"role": "gameserver", "preferred": "true"} - gsList := []stablev1alpha1.GameServer{ - {ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, Labels: prefLabels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs2", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs3", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs4", Namespace: defaultNs, Labels: prefLabels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs5", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs6", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, - } - - m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { - return true, &stablev1alpha1.GameServerList{Items: gsList}, nil - }) + uo := action.(k8stesting.UpdateAction) + gs := uo.GetObject().(*stablev1alpha1.GameServer) - prefGsa := gsa.DeepCopy() - prefGsa.Spec.Preferred = append(prefGsa.Spec.Preferred, metav1.LabelSelector{ - MatchLabels: map[string]string{"preferred": "true"}, + return true, gs, nil }) - stop, cancel := agtesting.StartInformers(m, hasSync) + stop, cancel := agtesting.StartInformers(m, c.gameServerSynced) defer cancel() // This call initializes the cache @@ -430,63 +361,54 @@ func TestControllerFindPackedReadyGameServer(t *testing.T) { err = c.counter.Run(0, stop) assert.Nil(t, err) - gs, err := c.findReadyGameServerForAllocation(prefGsa, packedComparator) - assert.Nil(t, err) - assert.Equal(t, "node1", gs.Status.NodeName) - assert.Equal(t, "gs1", gs.ObjectMeta.Name) - assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) - - change := gsList[0].DeepCopy() - change.Status.State = stablev1alpha1.GameServerStateAllocated - watch.Modify(change) - assert.True(t, cache.WaitForCacheSync(stop, hasSync)) - - gs, err = c.findReadyGameServerForAllocation(prefGsa, packedComparator) - assert.Nil(t, err) - assert.Equal(t, "node2", gs.Status.NodeName) - assert.Equal(t, "gs4", gs.ObjectMeta.Name) - assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) - - change = gsList[3].DeepCopy() - change.Status.State = stablev1alpha1.GameServerStateAllocated - watch.Modify(change) - assert.True(t, cache.WaitForCacheSync(stop, hasSync)) - - gs, err = c.findReadyGameServerForAllocation(prefGsa, packedComparator) - assert.Nil(t, err) - assert.Equal(t, "node1", gs.Status.NodeName) - assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) + gsa := &v1alpha1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: defaultNs, + }, + Spec: v1alpha1.GameServerAllocationSpec{ + Required: metav1.LabelSelector{MatchLabels: map[string]string{stablev1alpha1.FleetNameLabel: f.ObjectMeta.Name}}, + }} + gsa.ApplyDefaults() + + // line up 3 in a batch + j1 := request{gsa: gsa.DeepCopy(), response: make(chan response)} + c.pendingRequests <- j1 + j2 := request{gsa: gsa.DeepCopy(), response: make(chan response)} + c.pendingRequests <- j2 + j3 := request{gsa: gsa.DeepCopy(), response: make(chan response)} + c.pendingRequests <- j3 + + go c.runLocalAllocations(3) + + res1 := <-j1.response + assert.NoError(t, res1.err) + assert.NotNil(t, res1.gs) + + // since we gave gsList[0] a different nodename, it should always come first + assert.Contains(t, []string{"gs2", "gs3", "gs4", "gs5"}, res1.gs.ObjectMeta.Name) + assert.Equal(t, stablev1alpha1.GameServerStateAllocated, res1.gs.Status.State) + + res2 := <-j2.response + assert.NoError(t, res2.err) + assert.NotNil(t, res2.gs) + assert.NotEqual(t, res1.gs.ObjectMeta.Name, res2.gs.ObjectMeta.Name) + assert.Equal(t, stablev1alpha1.GameServerStateAllocated, res2.gs.Status.State) + + res3 := <-j3.response + assert.NoError(t, res3.err) + assert.NotNil(t, res3.gs) + assert.Equal(t, stablev1alpha1.GameServerStateAllocated, res3.gs.Status.State) + assert.NotEqual(t, res1.gs.ObjectMeta.Name, res3.gs.ObjectMeta.Name) + assert.NotEqual(t, res2.gs.ObjectMeta.Name, res3.gs.ObjectMeta.Name) + + assert.Equal(t, 3, updateCount) }) - t.Run("allocation trap", func(t *testing.T) { + t.Run("no gameservers", func(t *testing.T) { c, m := newFakeController() - hasSync := m.AgonesInformerFactory.Stable().V1alpha1().GameServers().Informer().HasSynced - - gsList := []stablev1alpha1.GameServer{ - {ObjectMeta: metav1.ObjectMeta{Name: "gs1", Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateAllocated}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs2", Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateAllocated}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs3", Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateAllocated}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs4", Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateAllocated}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs5", Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs6", Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs7", Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs8", Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, - } - - m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { - return true, &stablev1alpha1.GameServerList{Items: gsList}, nil - }) - - stop, cancel := agtesting.StartInformers(m, hasSync) + stop, cancel := agtesting.StartInformers(m, c.gameServerSynced) defer cancel() + c.stop = stop // This call initializes the cache err := c.syncReadyGSServerCache() @@ -495,126 +417,25 @@ func TestControllerFindPackedReadyGameServer(t *testing.T) { err = c.counter.Run(0, stop) assert.Nil(t, err) - gs, err := c.findReadyGameServerForAllocation(gsa, packedComparator) - assert.Nil(t, err) - assert.Equal(t, "node2", gs.Status.NodeName) - assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) - }) -} - -func TestControllerFindDistributedReadyGameServer(t *testing.T) { - t.Parallel() - - c, m := newFakeController() - watch := watch.NewFake() - m.AgonesClient.AddWatchReactor("gameservers", k8stesting.DefaultWatchReactor(watch, nil)) - hasSync := m.AgonesInformerFactory.Stable().V1alpha1().GameServers().Informer().HasSynced - - labels := map[string]string{"role": "gameserver"} - - gsa := &v1alpha1.GameServerAllocation{ - Spec: v1alpha1.GameServerAllocationSpec{ - Required: metav1.LabelSelector{ - MatchLabels: labels, + gsa := &v1alpha1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: defaultNs, }, - Scheduling: "", - }, - } - - gsList := []stablev1alpha1.GameServer{ - {ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs2", Namespace: defaultNs, Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs3", Namespace: defaultNs, Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs4", Namespace: defaultNs, Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateError}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs5", Namespace: defaultNs, Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs6", Namespace: defaultNs, Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, - {ObjectMeta: metav1.ObjectMeta{Name: "gs7", Namespace: defaultNs, Labels: labels}, - Status: stablev1alpha1.GameServerStatus{NodeName: "node3", State: stablev1alpha1.GameServerStateReady}}, - } - - m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { - return true, &stablev1alpha1.GameServerList{Items: gsList}, nil - }) - - stop, cancel := agtesting.StartInformers(m, hasSync) - defer cancel() - - // This call initializes the cache - err := c.syncReadyGSServerCache() - assert.Nil(t, err) - - err = c.counter.Run(0, stop) - assert.Nil(t, err) - - gs, err := c.findReadyGameServerForAllocation(gsa, distributedComparator) - assert.Nil(t, err) - assert.Equal(t, "node3", gs.Status.NodeName) - assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) - - change := gsList[6].DeepCopy() - change.Status.State = stablev1alpha1.GameServerStateAllocated - watch.Modify(change) - assert.True(t, cache.WaitForCacheSync(stop, hasSync)) - - gs, err = c.findReadyGameServerForAllocation(gsa, distributedComparator) - assert.Nil(t, err) - assert.Equal(t, "node2", gs.Status.NodeName) - assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) - - change = gsList[4].DeepCopy() - change.Status.State = stablev1alpha1.GameServerStateAllocated - watch.Modify(change) - assert.True(t, cache.WaitForCacheSync(stop, hasSync)) - - gs, err = c.findReadyGameServerForAllocation(gsa, distributedComparator) - assert.Nil(t, err) - assert.Equal(t, "node1", gs.Status.NodeName) - assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) - - change = gsList[0].DeepCopy() - change.Status.State = stablev1alpha1.GameServerStateAllocated - watch.Modify(change) - assert.True(t, cache.WaitForCacheSync(stop, hasSync)) - - gs, err = c.findReadyGameServerForAllocation(gsa, distributedComparator) - assert.Nil(t, err) - assert.Equal(t, "node2", gs.Status.NodeName) - assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) - - change = gsList[5].DeepCopy() - change.Status.State = stablev1alpha1.GameServerStateAllocated - watch.Modify(change) - assert.True(t, cache.WaitForCacheSync(stop, hasSync)) - - gs, err = c.findReadyGameServerForAllocation(gsa, distributedComparator) - assert.Nil(t, err) - assert.Equal(t, "node1", gs.Status.NodeName) - assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) - - change = gsList[1].DeepCopy() - change.Status.State = stablev1alpha1.GameServerStateAllocated - watch.Modify(change) - assert.True(t, cache.WaitForCacheSync(stop, hasSync)) + Spec: v1alpha1.GameServerAllocationSpec{ + Required: metav1.LabelSelector{MatchLabels: map[string]string{stablev1alpha1.FleetNameLabel: "thereisnofleet"}}, + }} + gsa.ApplyDefaults() - gs, err = c.findReadyGameServerForAllocation(gsa, distributedComparator) - assert.Nil(t, err) - assert.Equal(t, "node1", gs.Status.NodeName) - assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) + j1 := request{gsa: gsa.DeepCopy(), response: make(chan response)} + c.pendingRequests <- j1 - change = gsList[2].DeepCopy() - change.Status.State = stablev1alpha1.GameServerStateAllocated - watch.Modify(change) - assert.True(t, cache.WaitForCacheSync(stop, hasSync)) + go c.runLocalAllocations(3) - gs, err = c.findReadyGameServerForAllocation(gsa, distributedComparator) - assert.Equal(t, ErrNoGameServerReady, err) - assert.Nil(t, gs) + res1 := <-j1.response + assert.Nil(t, res1.gs) + assert.Error(t, res1.err) + assert.Equal(t, ErrNoGameServerReady, res1.err) + }) } func TestAllocationApiResource(t *testing.T) { @@ -758,6 +579,201 @@ func TestGetRandomlySelectedGS(t *testing.T) { assert.Equal(t, "gs1", selectedGS.ObjectMeta.Name) } +func TestControllerAllocationUpdateWorkers(t *testing.T) { + t.Run("no error", func(t *testing.T) { + c, m := newFakeController() + + updated := false + gs1 := &stablev1alpha1.GameServer{ + ObjectMeta: metav1.ObjectMeta{Name: "gs1"}, + } + r := response{ + request: request{ + gsa: &v1alpha1.GameServerAllocation{}, + response: make(chan response), + }, + gs: gs1, + } + + m.AgonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { + updated = true + + uo := action.(k8stesting.UpdateAction) + gs := uo.GetObject().(*stablev1alpha1.GameServer) + + assert.Equal(t, gs1.ObjectMeta.Name, gs.ObjectMeta.Name) + assert.Equal(t, stablev1alpha1.GameServerStateAllocated, gs.Status.State) + + return true, gs, nil + }) + + updateQueue := c.allocationUpdateWorkers(1) + + go func() { + updateQueue <- r + }() + + r = <-r.request.response + + assert.True(t, updated) + assert.NoError(t, r.err) + assert.Equal(t, gs1.ObjectMeta.Name, r.gs.ObjectMeta.Name) + assert.Equal(t, stablev1alpha1.GameServerStateAllocated, r.gs.Status.State) + + agtesting.AssertEventContains(t, m.FakeRecorder.Events, "Allocated") + + // make sure we can do more allocations than number of workers + gs2 := &stablev1alpha1.GameServer{ + ObjectMeta: metav1.ObjectMeta{Name: "gs1"}, + } + r = response{ + request: request{ + gsa: &v1alpha1.GameServerAllocation{}, + response: make(chan response), + }, + gs: gs2, + } + + go func() { + updateQueue <- r + }() + + r = <-r.request.response + + assert.True(t, updated) + assert.NoError(t, r.err) + assert.Equal(t, gs2.ObjectMeta.Name, r.gs.ObjectMeta.Name) + assert.Equal(t, stablev1alpha1.GameServerStateAllocated, r.gs.Status.State) + + agtesting.AssertEventContains(t, m.FakeRecorder.Events, "Allocated") + }) + + t.Run("error on update", func(t *testing.T) { + c, m := newFakeController() + + updated := false + gs1 := &stablev1alpha1.GameServer{ + ObjectMeta: metav1.ObjectMeta{Name: "gs1"}, + } + key, err := cache.MetaNamespaceKeyFunc(gs1) + assert.NoError(t, err) + + _, ok := c.readyGameServers.Load(key) + assert.False(t, ok) + + r := response{ + request: request{ + gsa: &v1alpha1.GameServerAllocation{}, + response: make(chan response), + }, + gs: gs1, + } + + m.AgonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { + updated = true + + uo := action.(k8stesting.UpdateAction) + gs := uo.GetObject().(*stablev1alpha1.GameServer) + assert.Equal(t, gs1.ObjectMeta.Name, gs.ObjectMeta.Name) + assert.Equal(t, stablev1alpha1.GameServerStateAllocated, gs.Status.State) + + return true, gs, errors.New("something went wrong") + }) + + updateQueue := c.allocationUpdateWorkers(1) + + go func() { + updateQueue <- r + }() + + r = <-r.request.response + + assert.True(t, updated) + assert.Error(t, r.err) + assert.Equal(t, gs1, r.gs) + agtesting.AssertNoEvent(t, m.FakeRecorder.Events) + + var cached *stablev1alpha1.GameServer + cached, ok = c.readyGameServers.Load(key) + assert.True(t, ok) + assert.Equal(t, gs1.ObjectMeta.Name, cached.ObjectMeta.Name) + }) +} + +func TestControllerListSortedReadyGameServers(t *testing.T) { + t.Parallel() + + gs1 := stablev1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, UID: "1"}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}} + gs2 := stablev1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2", Namespace: defaultNs, UID: "2"}, Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}} + gs3 := stablev1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs3", Namespace: defaultNs, UID: "3"}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateAllocated}} + gs4 := stablev1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs4", Namespace: defaultNs, UID: "4"}, Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}} + + fixtures := map[string]struct { + list []stablev1alpha1.GameServer + test func(*testing.T, []*stablev1alpha1.GameServer) + }{ + "most allocated": { + // node1: 1 ready, 1 allocated, node2: 1 ready + list: []stablev1alpha1.GameServer{gs1, gs2, gs3}, + test: func(t *testing.T, list []*stablev1alpha1.GameServer) { + assert.Len(t, list, 2) + if !assert.Equal(t, []*stablev1alpha1.GameServer{&gs1, &gs2}, list) { + for _, gs := range list { + logrus.WithField("name", gs.Name).Info("game server") + } + } + }, + }, + "list ready": { + // node1: 1 ready, node2: 2 ready + list: []stablev1alpha1.GameServer{gs1, gs2, gs4}, + test: func(t *testing.T, list []*stablev1alpha1.GameServer) { + assert.Len(t, list, 3) + assert.Contains(t, []string{"gs2", "gs4"}, list[0].ObjectMeta.Name) + assert.Contains(t, []string{"gs2", "gs4"}, list[1].ObjectMeta.Name) + assert.Equal(t, &gs1, list[2]) + }, + }, + "lexicographical (node name)": { + list: []stablev1alpha1.GameServer{gs2, gs1}, + test: func(t *testing.T, list []*stablev1alpha1.GameServer) { + assert.Len(t, list, 2) + if !assert.Equal(t, []*stablev1alpha1.GameServer{&gs1, &gs2}, list) { + for _, gs := range list { + logrus.WithField("name", gs.Name).Info("game server") + } + } + }, + }, + } + + for k, v := range fixtures { + t.Run(k, func(t *testing.T) { + c, m := newFakeController() + + gsList := v.list + + m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, k8sruntime.Object, error) { + return true, &stablev1alpha1.GameServerList{Items: gsList}, nil + }) + + stop, cancel := agtesting.StartInformers(m, c.gameServerSynced) + defer cancel() + + // This call initializes the cache + err := c.syncReadyGSServerCache() + assert.Nil(t, err) + + err = c.counter.Run(0, stop) + assert.Nil(t, err) + + list := c.listSortedReadyGameServers() + + v.test(t, list) + }) + } +} + func TestMultiClusterAllocationFromLocal(t *testing.T) { t.Parallel() t.Run("Handle allocation request locally", func(t *testing.T) { @@ -786,15 +802,15 @@ func TestMultiClusterAllocationFromLocal(t *testing.T) { }, nil }) - stop, cancel := agtesting.StartInformers(m, c.allocationPolicySynced, c.gameServerSynced) + stop, cancel := agtesting.StartInformers(m) defer cancel() - // This call initializes the cache - err := c.syncReadyGSServerCache() - assert.Nil(t, err) - - err = c.counter.Run(0, stop) - assert.Nil(t, err) + go c.Run(1, stop) // nolint: errcheck + // wait for it to be up and running + err := wait.PollImmediate(time.Second, 10*time.Second, func() (done bool, err error) { + return c.workerqueue.RunCount() == 1, nil + }) + assert.NoError(t, err) gsa := &v1alpha1.GameServerAllocation{ ObjectMeta: metav1.ObjectMeta{ @@ -832,15 +848,15 @@ func TestMultiClusterAllocationFromLocal(t *testing.T) { }, nil }) - stop, cancel := agtesting.StartInformers(m, c.allocationPolicySynced, c.gameServerSynced) + stop, cancel := agtesting.StartInformers(m) defer cancel() - // This call initializes the cache - err := c.syncReadyGSServerCache() - assert.Nil(t, err) - - err = c.counter.Run(0, stop) - assert.Nil(t, err) + go c.Run(1, stop) // nolint: errcheck + // wait for it to be up and running + err := wait.PollImmediate(time.Second, 10*time.Second, func() (done bool, err error) { + return c.workerqueue.RunCount() == 1, nil + }) + assert.NoError(t, err) gsa := &v1alpha1.GameServerAllocation{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/gameserverallocations/find.go b/pkg/gameserverallocations/find.go index d413044d60..4168c878e2 100644 --- a/pkg/gameserverallocations/find.go +++ b/pkg/gameserverallocations/find.go @@ -14,22 +14,103 @@ package gameserverallocations -import "agones.dev/agones/pkg/gameservers" - -// packedComparator prioritises Nodes with GameServers that are allocated, and then Nodes with the most -// Ready GameServers -- this will bin pack allocated game servers together. -func packedComparator(bestCount, currentCount gameservers.NodeCount) bool { - if currentCount.Allocated == bestCount.Allocated && currentCount.Ready > bestCount.Ready { - return true - } else if currentCount.Allocated > bestCount.Allocated { - return true +import ( + "math/rand" + + "agones.dev/agones/pkg/apis" + "agones.dev/agones/pkg/apis/allocation/v1alpha1" + stablev1alpha1 "agones.dev/agones/pkg/apis/stable/v1alpha1" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +// findGameServerForAllocation finds an optimal gameserver, given the +// set of preferred and required selectors on the GameServerAllocation. This also returns the index +// that the gameserver was found at in `list`, in case you want to remove it from the list +// Packed: will search list from start to finish +// Distributed: will search in a random order through the list +// It is assumed that all gameservers passed in, are Ready and not being deleted, and are sorted in Packed priority order +func findGameServerForAllocation(gsa *v1alpha1.GameServerAllocation, list []*stablev1alpha1.GameServer) (*stablev1alpha1.GameServer, int, error) { + type result struct { + gs *stablev1alpha1.GameServer + index int } - return false -} + requiredSelector, err := metav1.LabelSelectorAsSelector(&gsa.Spec.Required) + if err != nil { + return nil, -1, errors.Wrap(err, "could not convert GameServerAllocation selector") + } + + preferredSelector, err := gsa.Spec.PreferredSelectors() + if err != nil { + return nil, -1, errors.Wrap(err, "could not convert preferred selectors for GameServerAllocation") + } + + var required *result + preferred := make([]*result, len(preferredSelector)) + + var loop func(list []*stablev1alpha1.GameServer, f func(i int, gs *stablev1alpha1.GameServer)) + + // packed is forward looping, distributed is random looping + switch gsa.Spec.Scheduling { + case apis.Packed: + loop = func(list []*stablev1alpha1.GameServer, f func(i int, gs *stablev1alpha1.GameServer)) { + for i, gs := range list { + f(i, gs) + } + } + case apis.Distributed: + // randomised looping - make a list of indices, and then randomise them + // as we don't want to change the order of the gameserver slice + l := len(list) + indices := make([]int, l) + for i := 0; i < l; i++ { + indices[i] = i + } + rand.Shuffle(l, func(i, j int) { + indices[i], indices[j] = indices[j], indices[i] + }) + + loop = func(list []*stablev1alpha1.GameServer, f func(i int, gs *stablev1alpha1.GameServer)) { + for _, i := range indices { + f(i, list[i]) + } + } + default: + return nil, -1, errors.Errorf("scheduling strategy of '%s' is not supported", gsa.Spec.Scheduling) + } + + loop(list, func(i int, gs *stablev1alpha1.GameServer) { + // only search the same namespace + if gs.ObjectMeta.Namespace != gsa.ObjectMeta.Namespace { + return + } + + set := labels.Set(gs.ObjectMeta.Labels) + + // first look at preferred + for j, sel := range preferredSelector { + if preferred[j] == nil && sel.Matches(set) { + preferred[j] = &result{gs: gs, index: i} + } + } + + // then look at required + if required == nil && requiredSelector.Matches(set) { + required = &result{gs: gs, index: i} + } + }) + + for _, r := range preferred { + if r != nil { + return r.gs, r.index, nil + } + } + + if required == nil { + return nil, 0, ErrNoGameServerReady + } -// distributedComparator is the inverse of the packed comparator, -// looking to distribute allocated gameservers on as many nodes as possible. -func distributedComparator(bestCount, currentCount gameservers.NodeCount) bool { - return !packedComparator(bestCount, currentCount) + return required.gs, required.index, nil } diff --git a/pkg/gameserverallocations/find_test.go b/pkg/gameserverallocations/find_test.go new file mode 100644 index 0000000000..e8e2f29994 --- /dev/null +++ b/pkg/gameserverallocations/find_test.go @@ -0,0 +1,255 @@ +// Copyright 2019 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gameserverallocations + +import ( + "testing" + + "agones.dev/agones/pkg/apis" + + "agones.dev/agones/pkg/apis/allocation/v1alpha1" + stablev1alpha1 "agones.dev/agones/pkg/apis/stable/v1alpha1" + agtesting "agones.dev/agones/pkg/testing" + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + k8stesting "k8s.io/client-go/testing" +) + +func TestFindGameServerForAllocationPacked(t *testing.T) { + t.Parallel() + + labels := map[string]string{"role": "gameserver"} + prefLabels := map[string]string{"role": "gameserver", "preferred": "true"} + + gsa := &v1alpha1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{Namespace: defaultNs}, + Spec: v1alpha1.GameServerAllocationSpec{ + Required: metav1.LabelSelector{ + MatchLabels: labels, + }, + Scheduling: apis.Packed, + }, + } + + n := metav1.Now() + prefGsa := gsa.DeepCopy() + prefGsa.Spec.Preferred = append(prefGsa.Spec.Preferred, metav1.LabelSelector{ + MatchLabels: map[string]string{"preferred": "true"}, + }) + + fixtures := map[string]struct { + list []stablev1alpha1.GameServer + test func(*testing.T, []*stablev1alpha1.GameServer) + }{ + "required": { + list: []stablev1alpha1.GameServer{ + {ObjectMeta: metav1.ObjectMeta{Name: "gs6", Namespace: defaultNs, Labels: labels, DeletionTimestamp: &n}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs2", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs3", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateAllocated}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs4", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateAllocated}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs5", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateError}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs6", Namespace: "does-not-apply", Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, + }, + test: func(t *testing.T, list []*stablev1alpha1.GameServer) { + assert.Len(t, list, 3) + + gs, index, err := findGameServerForAllocation(gsa, list) + assert.NoError(t, err) + if !assert.NotNil(t, gs) { + assert.FailNow(t, "gameserver should not be nil") + } + assert.Equal(t, "node1", gs.Status.NodeName) + assert.Equal(t, "gs1", gs.ObjectMeta.Name) + assert.Equal(t, gs, list[index]) + assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) + + // mock that the first found game server is allocated + list = append(list[:index], list[index+1:]...) + assert.Equal(t, stablev1alpha1.GameServerStateReady, list[0].Status.State) + assert.Len(t, list, 2) + + gs, index, err = findGameServerForAllocation(gsa, list) + assert.NoError(t, err) + if !assert.NotNil(t, gs) { + assert.FailNow(t, "gameserver should not be nil") + } + assert.Equal(t, "node2", gs.Status.NodeName) + assert.Equal(t, "gs2", gs.ObjectMeta.Name) + assert.Equal(t, gs, list[index]) + assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) + + list = nil + gs, _, err = findGameServerForAllocation(gsa, list) + assert.Error(t, err) + assert.Equal(t, ErrNoGameServerReady, err) + assert.Nil(t, gs) + }, + }, + "preferred": { + list: []stablev1alpha1.GameServer{ + {ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, Labels: prefLabels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs2", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs3", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs4", Namespace: defaultNs, Labels: prefLabels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs5", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs6", Namespace: defaultNs, Labels: labels}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, + }, + test: func(t *testing.T, list []*stablev1alpha1.GameServer) { + assert.Len(t, list, 6) + + gs, index, err := findGameServerForAllocation(prefGsa, list) + assert.NoError(t, err) + assert.Equal(t, "node1", gs.Status.NodeName) + assert.Equal(t, "gs1", gs.ObjectMeta.Name) + assert.Equal(t, gs, list[index]) + assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) + + list = append(list[:index], list[index+1:]...) + gs, index, err = findGameServerForAllocation(prefGsa, list) + assert.NoError(t, err) + assert.Equal(t, "node2", gs.Status.NodeName) + assert.Equal(t, "gs4", gs.ObjectMeta.Name) + assert.Equal(t, gs, list[index]) + assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) + + list = append(list[:index], list[index+1:]...) + gs, index, err = findGameServerForAllocation(prefGsa, list) + assert.NoError(t, err) + assert.Equal(t, "node1", gs.Status.NodeName) + assert.Contains(t, []string{"gs3", "gs5", "gs6"}, gs.ObjectMeta.Name) + assert.Equal(t, gs, list[index]) + assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) + }, + }, + "allocation trap": { + list: []stablev1alpha1.GameServer{ + {ObjectMeta: metav1.ObjectMeta{Name: "gs1", Labels: labels, Namespace: defaultNs}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateAllocated}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs2", Labels: labels, Namespace: defaultNs}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateAllocated}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs3", Labels: labels, Namespace: defaultNs}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateAllocated}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs4", Labels: labels, Namespace: defaultNs}, Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateAllocated}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs5", Labels: labels, Namespace: defaultNs}, Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs6", Labels: labels, Namespace: defaultNs}, Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs7", Labels: labels, Namespace: defaultNs}, Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs8", Labels: labels, Namespace: defaultNs}, Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, + }, + test: func(t *testing.T, list []*stablev1alpha1.GameServer) { + assert.Len(t, list, 4) + + gs, index, err := findGameServerForAllocation(gsa, list) + assert.Nil(t, err) + assert.Equal(t, "node2", gs.Status.NodeName) + assert.Equal(t, gs, list[index]) + assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) + }, + }, + } + + for k, v := range fixtures { + t.Run(k, func(t *testing.T) { + c, m := newFakeController() + + m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, &stablev1alpha1.GameServerList{Items: v.list}, nil + }) + + stop, cancel := agtesting.StartInformers(m, c.gameServerSynced) + defer cancel() + + // This call initializes the cache + err := c.syncReadyGSServerCache() + assert.Nil(t, err) + + err = c.counter.Run(0, stop) + assert.Nil(t, err) + + list := c.listSortedReadyGameServers() + v.test(t, list) + }) + } +} + +func TestFindGameServerForAllocationDistributed(t *testing.T) { + t.Parallel() + + c, m := newFakeController() + labels := map[string]string{"role": "gameserver"} + + gsa := &v1alpha1.GameServerAllocation{ + ObjectMeta: metav1.ObjectMeta{Namespace: defaultNs}, + Spec: v1alpha1.GameServerAllocationSpec{ + Required: metav1.LabelSelector{ + MatchLabels: labels, + }, + Scheduling: apis.Distributed, + }, + } + + gsList := []stablev1alpha1.GameServer{ + {ObjectMeta: metav1.ObjectMeta{Name: "gs1", Namespace: defaultNs, Labels: labels}, + Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs2", Namespace: defaultNs, Labels: labels}, + Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs3", Namespace: defaultNs, Labels: labels}, + Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs4", Namespace: defaultNs, Labels: labels}, + Status: stablev1alpha1.GameServerStatus{NodeName: "node1", State: stablev1alpha1.GameServerStateError}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs5", Namespace: defaultNs, Labels: labels}, + Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs6", Namespace: defaultNs, Labels: labels}, + Status: stablev1alpha1.GameServerStatus{NodeName: "node2", State: stablev1alpha1.GameServerStateReady}}, + {ObjectMeta: metav1.ObjectMeta{Name: "gs7", Namespace: defaultNs, Labels: labels}, + Status: stablev1alpha1.GameServerStatus{NodeName: "node3", State: stablev1alpha1.GameServerStateReady}}, + } + + m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { + return true, &stablev1alpha1.GameServerList{Items: gsList}, nil + }) + + stop, cancel := agtesting.StartInformers(m, c.gameServerSynced) + defer cancel() + + // This call initializes the cache + err := c.syncReadyGSServerCache() + assert.Nil(t, err) + + err = c.counter.Run(0, stop) + assert.Nil(t, err) + + list := c.listSortedReadyGameServers() + assert.Len(t, list, 6) + + gs, index, err := findGameServerForAllocation(gsa, list) + assert.NoError(t, err) + assert.Equal(t, gs, list[index]) + assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) + + past := gs + // we should get a different result in 10 tries, so we can see we get some randomness. + for i := 0; i < 10; i++ { + gs, index, err = findGameServerForAllocation(gsa, list) + assert.NoError(t, err) + assert.Equal(t, gs, list[index]) + assert.Equal(t, stablev1alpha1.GameServerStateReady, gs.Status.State) + + if gs.ObjectMeta.Name != past.ObjectMeta.Name { + return + } + } + + assert.FailNow(t, "We should get a different gameserver by now") + +}