diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index 8d2fe055d5..b82a9b9779 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -365,16 +365,13 @@ func (c *Controller) syncGameServerPortAllocationState(gs *v1alpha1.GameServer) return gs, nil } - gsCopy, err := c.portAllocator.Allocate(gs.DeepCopy()) - if err != nil { - return gsCopy, errors.Wrapf(err, "error allocating port for GameServer %s", gsCopy.Name) - } + gsCopy := c.portAllocator.Allocate(gs.DeepCopy()) gsCopy.Status.State = v1alpha1.Creating c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), "Port allocated") c.logger.WithField("gs", gsCopy).Info("Syncing Port Allocation State") - gs, err = c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(gsCopy) + gs, err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(gsCopy) if err != nil { // if the GameServer doesn't get updated with the port data, then put the port // back in the pool, as it will get retried on the next pass diff --git a/pkg/gameservers/portallocator.go b/pkg/gameservers/portallocator.go index 37cc6957d9..6926a32d72 100644 --- a/pkg/gameservers/portallocator.go +++ b/pkg/gameservers/portallocator.go @@ -15,14 +15,13 @@ package gameservers import ( + "sort" "sync" - "agones.dev/agones/pkg/apis/stable" "agones.dev/agones/pkg/apis/stable/v1alpha1" "agones.dev/agones/pkg/client/informers/externalversions" listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1" "agones.dev/agones/pkg/util/runtime" - "agones.dev/agones/pkg/util/workerqueue" "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" @@ -33,14 +32,6 @@ import ( "k8s.io/client-go/tools/cache" ) -// syncAllKey is the queue key to sync all the ports. -// the + symbol is deliberate, is it can't be used in a K8s -// naming scheme -const syncAllKey = cache.ExplicitKey("SYNC+ALL") - -// ErrPortNotFound is returns when a port is unable to be allocated -var ErrPortNotFound = errors.New("Unable to allocate a port") - // A set of port allocations for a node type portAllocation map[int32]bool @@ -54,7 +45,6 @@ type PortAllocator struct { mutex sync.RWMutex portAllocations []portAllocation gameServerRegistry map[types.UID]bool - nodeRegistry map[types.UID]bool minPort int32 maxPort int32 gameServerSynced cache.InformerSynced @@ -63,7 +53,6 @@ type PortAllocator struct { nodeSynced cache.InformerSynced nodeLister corelisterv1.NodeLister nodeInformer cache.SharedIndexInformer - workerqueue *workerqueue.WorkerQueue } // NewPortAllocator returns a new dynamic port @@ -82,7 +71,6 @@ func NewPortAllocator(minPort, maxPort int32, minPort: minPort, maxPort: maxPort, gameServerRegistry: map[types.UID]bool{}, - nodeRegistry: map[types.UID]bool{}, gameServerSynced: gameServers.Informer().HasSynced, gameServerLister: gameServers.Lister(), gameServerInformer: gameServers.Informer(), @@ -91,29 +79,11 @@ func NewPortAllocator(minPort, maxPort int32, nodeSynced: nodes.Informer().HasSynced, } pa.logger = runtime.NewLoggerWithType(pa) - pa.workerqueue = workerqueue.NewWorkerQueue(pa.syncPorts, pa.logger, stable.GroupName+".PortAllocator") pa.gameServerInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ DeleteFunc: pa.syncDeleteGameServer, }) - pa.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - node := obj.(*corev1.Node) - pa.workerqueue.Enqueue(cache.ExplicitKey(node.Name)) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - oldNode := oldObj.(*corev1.Node) - newNode := newObj.(*corev1.Node) - if oldNode.Spec.Unschedulable != newNode.Spec.Unschedulable { - pa.workerqueue.Enqueue(syncAllKey) - } - }, - DeleteFunc: func(_ interface{}) { - pa.workerqueue.Enqueue(syncAllKey) - }, - }) - pa.logger.WithField("minPort", minPort).WithField("maxPort", maxPort).Info("Starting") return pa } @@ -132,29 +102,12 @@ func (pa *PortAllocator) Run(stop <-chan struct{}) error { return errors.Wrap(err, "error performing initial sync") } - pa.workerqueue.Run(1, stop) - return nil -} - -// syncPorts synchronises ports for the given key -func (pa *PortAllocator) syncPorts(key string) error { - if key == string(syncAllKey) { - return pa.syncAll() - } - - // if we get a specific node name, we add some ports - node, err := pa.nodeLister.Get(key) - if err != nil { - return errors.Wrapf(err, "error retrieving node %s", key) - } - pa.syncAddNode(node) - return nil } // Allocate assigns a port to the GameServer and returns it. // Return ErrPortNotFound if no port is allocatable -func (pa *PortAllocator) Allocate(gs *v1alpha1.GameServer) (*v1alpha1.GameServer, error) { +func (pa *PortAllocator) Allocate(gs *v1alpha1.GameServer) *v1alpha1.GameServer { pa.mutex.Lock() defer pa.mutex.Unlock() @@ -182,26 +135,38 @@ func (pa *PortAllocator) Allocate(gs *v1alpha1.GameServer) (*v1alpha1.GameServer return ports } - amount := gs.CountPorts(v1alpha1.Dynamic) - allocations := findOpenPorts(amount) - - if len(allocations) == amount { - pa.gameServerRegistry[gs.ObjectMeta.UID] = true - - for i, p := range gs.Spec.Ports { - if p.PortPolicy == v1alpha1.Dynamic { - // pop off allocation - var a pn - a, allocations = allocations[0], allocations[1:] - a.pa[a.port] = true - gs.Spec.Ports[i].HostPort = a.port + // this allows us to do recursion, within the mutex lock + var allocate func(gs *v1alpha1.GameServer) *v1alpha1.GameServer + allocate = func(gs *v1alpha1.GameServer) *v1alpha1.GameServer { + amount := gs.CountPorts(v1alpha1.Dynamic) + allocations := findOpenPorts(amount) + + if len(allocations) == amount { + pa.gameServerRegistry[gs.ObjectMeta.UID] = true + + for i, p := range gs.Spec.Ports { + if p.PortPolicy == v1alpha1.Dynamic { + // pop off allocation + var a pn + a, allocations = allocations[0], allocations[1:] + a.pa[a.port] = true + gs.Spec.Ports[i].HostPort = a.port + } } + + return gs } - return gs, nil + // if we get here, we ran out of ports. Add a node, and try again. + // this is important, because to autoscale scale up, we create GameServers that + // can't be scheduled on the current set of nodes, so we need to be sure + // there are always ports available to be allocated. + pa.portAllocations = append(pa.portAllocations, pa.newPortAllocation()) + + return allocate(gs) } - return gs, ErrPortNotFound + return allocate(gs) } // DeAllocate marks the given port as no longer allocated @@ -224,28 +189,6 @@ func (pa *PortAllocator) DeAllocate(gs *v1alpha1.GameServer) { delete(pa.gameServerRegistry, gs.ObjectMeta.UID) } -// syncAddNode adds another node port section -// to the available ports -func (pa *PortAllocator) syncAddNode(node *corev1.Node) { - // if we're already added this node, don't do it again - if _, ok := pa.nodeRegistry[node.ObjectMeta.UID]; ok { - pa.logger.WithField("node", node.ObjectMeta.Name).Info("Already added node to port allocations. Skipping") - return - } - - pa.logger.WithField("node", node.ObjectMeta.Name).Info("Adding Node to port allocations") - pa.mutex.Lock() - defer pa.mutex.Unlock() - - ports := portAllocation{} - for i := pa.minPort; i <= pa.maxPort; i++ { - ports[i] = false - } - - pa.portAllocations = append(pa.portAllocations, ports) - pa.nodeRegistry[node.ObjectMeta.UID] = true -} - // syncDeleteGameServer when a GameServer Pod is deleted // make the HostPort available func (pa *PortAllocator) syncDeleteGameServer(object interface{}) { @@ -260,7 +203,7 @@ func (pa *PortAllocator) syncDeleteGameServer(object interface{}) { // and Terminating Pods values make sure those // portAllocations are marked as taken. // Locks the mutex while doing this. -// This is basically a stop the world Garbage Collection on port allocations. +// This is basically a stop the world Garbage Collection on port allocations, but it only happens on startup. func (pa *PortAllocator) syncAll() error { pa.mutex.Lock() defer pa.mutex.Unlock() @@ -272,9 +215,6 @@ func (pa *PortAllocator) syncAll() error { return errors.Wrap(err, "error listing all nodes") } - // setup blank port values - nodePorts, nodeRegistry := pa.nodePortAllocation(nodes) - gameservers, err := pa.gameServerLister.List(labels.Everything()) if err != nil { return errors.Wrapf(err, "error listing all GameServers") @@ -283,15 +223,7 @@ func (pa *PortAllocator) syncAll() error { gsRegistry := map[types.UID]bool{} // place to put GameServer port allocations that are not ready yet/after the ready state - nonReadyNodesPorts := pa.registerExistingGameServerPorts(gameservers, gsRegistry, nodePorts) - - // this gives us back an ordered node list. - allocations := make([]portAllocation, len(nodePorts)) - i := 0 - for _, np := range nodePorts { - allocations[i] = np - i++ - } + allocations, nonReadyNodesPorts := pa.registerExistingGameServerPorts(gameservers, nodes, gsRegistry) // close off the port on the first node you find // we actually don't mind what node it is, since we only care @@ -303,14 +235,21 @@ func (pa *PortAllocator) syncAll() error { pa.portAllocations = allocations pa.gameServerRegistry = gsRegistry - pa.nodeRegistry = nodeRegistry return nil } // registerExistingGameServerPorts registers the gameservers against gsRegistry and the ports against nodePorts. +// and returns an ordered list of portAllocations per cluster nodes, and an array of // any GameServers allocated a port, but not yet assigned a Node will returned as an array of port values. -func (pa *PortAllocator) registerExistingGameServerPorts(gameservers []*v1alpha1.GameServer, gsRegistry map[types.UID]bool, nodePorts map[string]portAllocation) []int32 { +func (pa *PortAllocator) registerExistingGameServerPorts(gameservers []*v1alpha1.GameServer, nodes []*corev1.Node, gsRegistry map[types.UID]bool) ([]portAllocation, []int32) { + // setup blank port values + nodePortAllocation := pa.nodePortAllocation(nodes) + nodePortCount := make(map[string]int64, len(nodes)) + for _, n := range nodes { + nodePortCount[n.ObjectMeta.Name] = 0 + } + var nonReadyNodesPorts []int32 for _, gs := range gameservers { @@ -319,37 +258,61 @@ func (pa *PortAllocator) registerExistingGameServerPorts(gameservers []*v1alpha1 gsRegistry[gs.ObjectMeta.UID] = true // if the node doesn't exist, it's likely unscheduled - _, ok := nodePorts[gs.Status.NodeName] + _, ok := nodePortAllocation[gs.Status.NodeName] if gs.Status.NodeName != "" && ok { - nodePorts[gs.Status.NodeName][p.HostPort] = true + nodePortAllocation[gs.Status.NodeName][p.HostPort] = true + nodePortCount[gs.Status.NodeName]++ } else if p.HostPort != 0 { nonReadyNodesPorts = append(nonReadyNodesPorts, p.HostPort) } } } } - return nonReadyNodesPorts + + // make a list of the keys + keys := make([]string, 0, len(nodePortAllocation)) + for k := range nodePortAllocation { + keys = append(keys, k) + } + + // sort, since this is how it would have originally been allocated across the + // ordered []portAllocation + sort.Slice(keys, func(i, j int) bool { + return nodePortCount[keys[i]] > nodePortCount[keys[j]] + }) + + // this gives us back an ordered node list + allocations := make([]portAllocation, len(nodePortAllocation)) + for i, k := range keys { + allocations[i] = nodePortAllocation[k] + + } + + return allocations, nonReadyNodesPorts } // nodePortAllocation returns a map of port allocations all set to being available // with a map key for each node, as well as the node registry record (since we're already looping) -func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) (map[string]portAllocation, map[types.UID]bool) { +func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]portAllocation { nodePorts := map[string]portAllocation{} - nodeRegistry := map[types.UID]bool{} for _, n := range nodes { - nodeRegistry[n.ObjectMeta.UID] = true - // ignore unschedulable nodes if !n.Spec.Unschedulable { - nodePorts[n.Name] = portAllocation{} - for i := pa.minPort; i <= pa.maxPort; i++ { - nodePorts[n.Name][i] = false - } + nodePorts[n.Name] = pa.newPortAllocation() } } - return nodePorts, nodeRegistry + return nodePorts +} + +func (pa *PortAllocator) newPortAllocation() portAllocation { + p := make(portAllocation, (pa.maxPort-pa.minPort)+1) + for i := pa.minPort; i <= pa.maxPort; i++ { + p[i] = false + } + + return p } // setPortAllocation takes a port from an all diff --git a/pkg/gameservers/portallocator_test.go b/pkg/gameservers/portallocator_test.go index e8c24bcca8..2886e988d1 100644 --- a/pkg/gameservers/portallocator_test.go +++ b/pkg/gameservers/portallocator_test.go @@ -19,7 +19,6 @@ import ( "strconv" "sync" "testing" - "time" "agones.dev/agones/pkg/apis/stable/v1alpha1" agtesting "agones.dev/agones/pkg/testing" @@ -54,6 +53,7 @@ func TestPortAllocatorAllocate(t *testing.T) { defer cancel() // Make sure the add's don't corrupt the sync + // (no longer an issue, but leave this here for posterity) nodeWatch.Add(&n1) nodeWatch.Add(&n2) assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced)) @@ -62,11 +62,11 @@ func TestPortAllocatorAllocate(t *testing.T) { assert.Nil(t, err) // single port dynamic - _, err = pa.Allocate(fixture.DeepCopy()) + pa.Allocate(fixture.DeepCopy()) assert.Nil(t, err) assert.Equal(t, 1, countTotalAllocatedPorts(pa)) - _, err = pa.Allocate(fixture.DeepCopy()) + pa.Allocate(fixture.DeepCopy()) assert.Nil(t, err) assert.Equal(t, 2, countTotalAllocatedPorts(pa)) @@ -74,7 +74,7 @@ func TestPortAllocatorAllocate(t *testing.T) { copy := fixture.DeepCopy() copy.Spec.Ports = append(copy.Spec.Ports, v1alpha1.GameServerPort{Name: "another", ContainerPort: 6666, PortPolicy: v1alpha1.Dynamic}) assert.Len(t, copy.Spec.Ports, 2) - _, err = pa.Allocate(copy.DeepCopy()) + pa.Allocate(copy.DeepCopy()) assert.Nil(t, err) assert.Equal(t, 4, countTotalAllocatedPorts(pa)) @@ -82,7 +82,7 @@ func TestPortAllocatorAllocate(t *testing.T) { copy = copy.DeepCopy() copy.Spec.Ports = append(copy.Spec.Ports, v1alpha1.GameServerPort{Name: "another", ContainerPort: 6666, PortPolicy: v1alpha1.Dynamic}) assert.Len(t, copy.Spec.Ports, 3) - _, err = pa.Allocate(copy) + pa.Allocate(copy) assert.Nil(t, err) assert.Equal(t, 7, countTotalAllocatedPorts(pa)) @@ -91,7 +91,7 @@ func TestPortAllocatorAllocate(t *testing.T) { expected := int32(9999) copy.Spec.Ports = append(copy.Spec.Ports, v1alpha1.GameServerPort{Name: "another", ContainerPort: 6666, HostPort: expected, PortPolicy: v1alpha1.Static}) assert.Len(t, copy.Spec.Ports, 4) - _, err = pa.Allocate(copy) + pa.Allocate(copy) assert.Nil(t, err) assert.Equal(t, 10, countTotalAllocatedPorts(pa)) assert.Equal(t, v1alpha1.Static, copy.Spec.Ports[3].PortPolicy) @@ -120,15 +120,16 @@ func TestPortAllocatorAllocate(t *testing.T) { // ports between 10 and 20 for i := 10; i <= 20; i++ { var p int32 - gs, err := pa.Allocate(fixture.DeepCopy()) + gs := pa.Allocate(fixture.DeepCopy()) assert.True(t, 10 <= gs.Spec.Ports[0].HostPort && gs.Spec.Ports[0].HostPort <= 20, "%v is not between 10 and 20", p) assert.Nil(t, err) } } - // now we should have none left - _, err = pa.Allocate(fixture.DeepCopy()) - assert.Equal(t, ErrPortNotFound, err) + assert.Len(t, pa.portAllocations, 2) + gs := pa.Allocate(fixture.DeepCopy()) + assert.NotEmpty(t, gs.Spec.Ports[0].HostPort) + assert.Len(t, pa.portAllocations, 3) }) t.Run("ports are all allocated with multiple ports per GameServers", func(t *testing.T) { @@ -159,7 +160,8 @@ func TestPortAllocatorAllocate(t *testing.T) { for i := 10; i <= 14; i++ { copy := morePortFixture.DeepCopy() copy.ObjectMeta.UID = types.UID(strconv.Itoa(x) + ":" + strconv.Itoa(i)) - gs, err := pa.Allocate(copy) + gs := pa.Allocate(copy) + assert.NotEmpty(t, gs.Spec.Ports[0].HostPort) logrus.WithField("uid", copy.ObjectMeta.UID).WithField("ports", gs.Spec.Ports).WithError(err).Info("Allocated Port") assert.Nil(t, err) for _, p := range gs.Spec.Ports { @@ -171,9 +173,10 @@ func TestPortAllocatorAllocate(t *testing.T) { } logrus.WithField("allocated", countTotalAllocatedPorts(pa)).WithField("count", len(pa.portAllocations[0])+len(pa.portAllocations[1])).Info("How many allocated") - // now we should have none left - _, err = pa.Allocate(fixture.DeepCopy()) - assert.Equal(t, ErrPortNotFound, err) + assert.Len(t, pa.portAllocations, 2) + gs := pa.Allocate(fixture.DeepCopy()) + assert.NotEmpty(t, gs.Spec.Ports[0].HostPort) + assert.Len(t, pa.portAllocations, 3) }) t.Run("ports are unique in a node", func(t *testing.T) { @@ -191,7 +194,7 @@ func TestPortAllocatorAllocate(t *testing.T) { assert.Nil(t, err) var ports []int32 for i := 10; i <= 20; i++ { - gs, err := pa.Allocate(fixture.DeepCopy()) + gs := pa.Allocate(fixture.DeepCopy()) assert.Nil(t, err) assert.NotContains(t, ports, gs.Spec.Ports[0].HostPort) ports = append(ports, gs.Spec.Ports[0].HostPort) @@ -202,7 +205,7 @@ func TestPortAllocatorAllocate(t *testing.T) { func TestPortAllocatorMultithreadAllocate(t *testing.T) { fixture := dynamicGameServerFixture() m := agtesting.NewMocks() - pa := NewPortAllocator(10, 110, m.KubeInformationFactory, m.AgonesInformerFactory) + pa := NewPortAllocator(10, 20, m.KubeInformationFactory, m.AgonesInformerFactory) m.KubeClient.AddReactor("list", "nodes", func(action k8stesting.Action) (bool, runtime.Object, error) { nl := &corev1.NodeList{Items: []corev1.Node{n1, n2}} @@ -214,12 +217,14 @@ func TestPortAllocatorMultithreadAllocate(t *testing.T) { assert.Nil(t, err) wg := sync.WaitGroup{} - for i := 0; i < 3; i++ { + // do this for more than the nodes that are pre-allocated, to make sure + // it works for dynamic node addition + for i := 0; i < 5; i++ { wg.Add(1) go func(i int) { for x := 0; x < 10; x++ { logrus.WithField("x", x).WithField("i", i).Info("allocating!") - gs, err := pa.Allocate(fixture.DeepCopy()) + gs := pa.Allocate(fixture.DeepCopy()) for _, p := range gs.Spec.Ports { assert.NotEmpty(t, p.HostPort) } @@ -252,7 +257,7 @@ func TestPortAllocatorDeAllocate(t *testing.T) { assert.NotEmpty(t, fixture.Spec.Ports) for i := 0; i <= 100; i++ { - gs, err := pa.Allocate(fixture.DeepCopy()) + gs := pa.Allocate(fixture.DeepCopy()) assert.Nil(t, err) port := gs.Spec.Ports[0] assert.True(t, 10 <= port.HostPort && port.HostPort <= 20) @@ -412,125 +417,6 @@ func TestPortAllocatorSyncDeleteGameServer(t *testing.T) { pa.mutex.RUnlock() } -func TestPortAllocatorNodeEvents(t *testing.T) { - fixture := dynamicGameServerFixture() - m := agtesting.NewMocks() - pa := NewPortAllocator(10, 20, m.KubeInformationFactory, m.AgonesInformerFactory) - nodeWatch := watch.NewFake() - gsWatch := watch.NewFake() - m.KubeClient.AddWatchReactor("nodes", k8stesting.DefaultWatchReactor(nodeWatch, nil)) - m.AgonesClient.AddWatchReactor("gameservers", k8stesting.DefaultWatchReactor(gsWatch, nil)) - - received := make(chan string, 10) - defer close(received) - - f := pa.workerqueue.SyncHandler - pa.workerqueue.SyncHandler = func(s string) error { - err := f(s) - assert.Nil(t, err, "sync handler failed") - received <- s - return nil - } - - stop, cancel := agtesting.StartInformers(m) - defer cancel() - - // Make sure the add's don't corrupt the sync - nodeWatch.Add(&n1) - nodeWatch.Add(&n2) - - go func() { - err := pa.Run(stop) - assert.Nil(t, err) - }() - - testReceived := func(expected, failMsg string) { - select { - case key := <-received: - assert.Equal(t, expected, key) - case <-time.After(3 * time.Second): - assert.FailNow(t, failMsg, "expected: %s", expected) - } - } - - testReceived(n1.ObjectMeta.Name, "add node 1") - testReceived(n2.ObjectMeta.Name, "add node 2") - - // add a game server - gs, err := pa.Allocate(fixture.DeepCopy()) - port := gs.Spec.Ports[0].HostPort - - assert.Nil(t, err) - gsWatch.Add(gs) - - pa.mutex.RLock() - assert.Len(t, pa.portAllocations, 2) - assert.Equal(t, 1, countAllocatedPorts(pa, port)) - pa.mutex.RUnlock() - - // add the n3 node - logrus.Info("adding n3") - nodeWatch.Add(&n3) - assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced)) - testReceived(n3.ObjectMeta.Name, "add node 3") - - pa.mutex.RLock() - assert.Len(t, pa.portAllocations, 3) - assert.Equal(t, 1, countAllocatedPorts(pa, port)) - pa.mutex.RUnlock() - - // mark the node as unscheduled - logrus.Info("unscheduling n3") - copy := n3.DeepCopy() - copy.Spec.Unschedulable = true - assert.True(t, copy.Spec.Unschedulable) - nodeWatch.Modify(copy) - assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced)) - testReceived(string(syncAllKey), "unscheduled node 3") - - pa.mutex.RLock() - assert.Len(t, pa.portAllocations, 2) - assert.Equal(t, 1, countAllocatedPorts(pa, port)) - pa.mutex.RUnlock() - - // schedule the n3 node again - logrus.Info("scheduling n3") - copy = n3.DeepCopy() - copy.Spec.Unschedulable = false - nodeWatch.Modify(copy) - assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced)) - testReceived(string(syncAllKey), "scheduled node 3") - - pa.mutex.RLock() - assert.Len(t, pa.portAllocations, 3) - assert.Equal(t, 1, countAllocatedPorts(pa, port)) - pa.mutex.RUnlock() - - // delete the n3 node - logrus.Info("deleting n3") - nodeWatch.Delete(n3.DeepCopy()) - assert.True(t, cache.WaitForCacheSync(stop, pa.nodeSynced)) - testReceived(string(syncAllKey), "deleting node 3") - - pa.mutex.RLock() - assert.Len(t, pa.portAllocations, 2) - assert.Equal(t, 1, countAllocatedPorts(pa, port)) - pa.mutex.RUnlock() - - // add the n1 node again, it shouldn't do anything - nodeWatch.Add(&n1) - select { - case <-received: - assert.FailNow(t, "adding back n1: event should not happen") - case <-time.After(time.Second): - } - - pa.mutex.RLock() - assert.Len(t, pa.portAllocations, 2) - assert.Equal(t, 1, countAllocatedPorts(pa, port)) - pa.mutex.RUnlock() -} - func TestNodePortAllocation(t *testing.T) { t.Parallel() @@ -541,8 +427,7 @@ func TestNodePortAllocation(t *testing.T) { nl := &corev1.NodeList{Items: nodes} return true, nl, nil }) - result, registry := pa.nodePortAllocation([]*corev1.Node{&n1, &n2, &n3}) - assert.Len(t, registry, 3) + result := pa.nodePortAllocation([]*corev1.Node{&n1, &n2, &n3}) assert.Len(t, result, 3) for _, n := range nodes { ports, ok := result[n.ObjectMeta.Name] @@ -570,6 +455,43 @@ func TestTakePortAllocation(t *testing.T) { } } +func TestPortAllocatorRegisterExistingGameServerPorts(t *testing.T) { + t.Parallel() + m := agtesting.NewMocks() + pa := NewPortAllocator(10, 13, m.KubeInformationFactory, m.AgonesInformerFactory) + + gs1 := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs1", UID: "1"}, + Spec: v1alpha1.GameServerSpec{ + Ports: []v1alpha1.GameServerPort{{PortPolicy: v1alpha1.Dynamic, HostPort: 10}}, + }, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Ports: []v1alpha1.GameServerStatusPort{{Port: 10}}, NodeName: n1.ObjectMeta.Name}} + + gs2 := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs2", UID: "2"}, + Spec: v1alpha1.GameServerSpec{ + Ports: []v1alpha1.GameServerPort{{PortPolicy: v1alpha1.Dynamic, HostPort: 11}}, + }, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Ports: []v1alpha1.GameServerStatusPort{{Port: 11}}, NodeName: n2.ObjectMeta.Name}} + + gs3 := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs3", UID: "3"}, + Spec: v1alpha1.GameServerSpec{ + Ports: []v1alpha1.GameServerPort{{PortPolicy: v1alpha1.Dynamic, HostPort: 12}}, + }, + Status: v1alpha1.GameServerStatus{State: v1alpha1.Ready, Ports: []v1alpha1.GameServerStatusPort{{Port: 12}}, NodeName: n1.ObjectMeta.Name}} + + gs4 := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gs4", UID: "3"}, + Spec: v1alpha1.GameServerSpec{ + Ports: []v1alpha1.GameServerPort{{PortPolicy: v1alpha1.Dynamic, HostPort: 13}}, + }, + Status: v1alpha1.GameServerStatus{State: v1alpha1.PortAllocation, Ports: []v1alpha1.GameServerStatusPort{{Port: 13}}}} + + allocations, nonReadyNodesPorts := pa.registerExistingGameServerPorts([]*v1alpha1.GameServer{gs1, gs2, gs3, gs4}, []*corev1.Node{&n1, &n2, &n3}, map[types.UID]bool{}) + + assert.Equal(t, []int32{13}, nonReadyNodesPorts) + assert.Equal(t, portAllocation{10: true, 11: false, 12: true, 13: false}, allocations[0]) + assert.Equal(t, portAllocation{10: false, 11: true, 12: false, 13: false}, allocations[1]) + assert.Equal(t, portAllocation{10: false, 11: false, 12: false, 13: false}, allocations[2]) +} + func dynamicGameServerFixture() *v1alpha1.GameServer { return &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default", UID: "1234"}, Spec: v1alpha1.GameServerSpec{