Skip to content

Commit

Permalink
Ports should always be allocated to a GameServer
Browse files Browse the repository at this point in the history
The allocator was written before we have the autoscaler, which has a
requirement to be able to start GameServers past the current number of nodes,
so that the autoscaler can find room for them automaticaly.

The previous port allocator would only allow port ranges for each current node
- but this PR will allow it to go past that, and dynamically add port ranges as
required.

This also removes the requirement to do a full `GameServer` port sync every time
a Node gets removed, and drops a somewhat lengthy locking process.
  • Loading branch information
markmandel committed Nov 13, 2018
1 parent e71de58 commit 21c7edc
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 258 deletions.
7 changes: 2 additions & 5 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,16 +365,13 @@ func (c *Controller) syncGameServerPortAllocationState(gs *v1alpha1.GameServer)
return gs, nil
}

gsCopy, err := c.portAllocator.Allocate(gs.DeepCopy())
if err != nil {
return gsCopy, errors.Wrapf(err, "error allocating port for GameServer %s", gsCopy.Name)
}
gsCopy := c.portAllocator.Allocate(gs.DeepCopy())

gsCopy.Status.State = v1alpha1.Creating
c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), "Port allocated")

c.logger.WithField("gs", gsCopy).Info("Syncing Port Allocation State")
gs, err = c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(gsCopy)
gs, err := c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(gsCopy)
if err != nil {
// if the GameServer doesn't get updated with the port data, then put the port
// back in the pool, as it will get retried on the next pass
Expand Down
189 changes: 76 additions & 113 deletions pkg/gameservers/portallocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
package gameservers

import (
"sort"
"sync"

"agones.dev/agones/pkg/apis/stable"
"agones.dev/agones/pkg/apis/stable/v1alpha1"
"agones.dev/agones/pkg/client/informers/externalversions"
listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/workerqueue"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand All @@ -33,14 +32,6 @@ import (
"k8s.io/client-go/tools/cache"
)

// syncAllKey is the queue key to sync all the ports.
// the + symbol is deliberate, is it can't be used in a K8s
// naming scheme
const syncAllKey = cache.ExplicitKey("SYNC+ALL")

// ErrPortNotFound is returns when a port is unable to be allocated
var ErrPortNotFound = errors.New("Unable to allocate a port")

// A set of port allocations for a node
type portAllocation map[int32]bool

Expand All @@ -54,7 +45,6 @@ type PortAllocator struct {
mutex sync.RWMutex
portAllocations []portAllocation
gameServerRegistry map[types.UID]bool
nodeRegistry map[types.UID]bool
minPort int32
maxPort int32
gameServerSynced cache.InformerSynced
Expand All @@ -63,7 +53,6 @@ type PortAllocator struct {
nodeSynced cache.InformerSynced
nodeLister corelisterv1.NodeLister
nodeInformer cache.SharedIndexInformer
workerqueue *workerqueue.WorkerQueue
}

// NewPortAllocator returns a new dynamic port
Expand All @@ -82,7 +71,6 @@ func NewPortAllocator(minPort, maxPort int32,
minPort: minPort,
maxPort: maxPort,
gameServerRegistry: map[types.UID]bool{},
nodeRegistry: map[types.UID]bool{},
gameServerSynced: gameServers.Informer().HasSynced,
gameServerLister: gameServers.Lister(),
gameServerInformer: gameServers.Informer(),
Expand All @@ -91,29 +79,11 @@ func NewPortAllocator(minPort, maxPort int32,
nodeSynced: nodes.Informer().HasSynced,
}
pa.logger = runtime.NewLoggerWithType(pa)
pa.workerqueue = workerqueue.NewWorkerQueue(pa.syncPorts, pa.logger, stable.GroupName+".PortAllocator")

pa.gameServerInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: pa.syncDeleteGameServer,
})

pa.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
node := obj.(*corev1.Node)
pa.workerqueue.Enqueue(cache.ExplicitKey(node.Name))
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldNode := oldObj.(*corev1.Node)
newNode := newObj.(*corev1.Node)
if oldNode.Spec.Unschedulable != newNode.Spec.Unschedulable {
pa.workerqueue.Enqueue(syncAllKey)
}
},
DeleteFunc: func(_ interface{}) {
pa.workerqueue.Enqueue(syncAllKey)
},
})

pa.logger.WithField("minPort", minPort).WithField("maxPort", maxPort).Info("Starting")
return pa
}
Expand All @@ -132,29 +102,12 @@ func (pa *PortAllocator) Run(stop <-chan struct{}) error {
return errors.Wrap(err, "error performing initial sync")
}

pa.workerqueue.Run(1, stop)
return nil
}

// syncPorts synchronises ports for the given key
func (pa *PortAllocator) syncPorts(key string) error {
if key == string(syncAllKey) {
return pa.syncAll()
}

// if we get a specific node name, we add some ports
node, err := pa.nodeLister.Get(key)
if err != nil {
return errors.Wrapf(err, "error retrieving node %s", key)
}
pa.syncAddNode(node)

return nil
}

// Allocate assigns a port to the GameServer and returns it.
// Return ErrPortNotFound if no port is allocatable
func (pa *PortAllocator) Allocate(gs *v1alpha1.GameServer) (*v1alpha1.GameServer, error) {
func (pa *PortAllocator) Allocate(gs *v1alpha1.GameServer) *v1alpha1.GameServer {
pa.mutex.Lock()
defer pa.mutex.Unlock()

Expand Down Expand Up @@ -182,26 +135,38 @@ func (pa *PortAllocator) Allocate(gs *v1alpha1.GameServer) (*v1alpha1.GameServer
return ports
}

amount := gs.CountPorts(v1alpha1.Dynamic)
allocations := findOpenPorts(amount)

if len(allocations) == amount {
pa.gameServerRegistry[gs.ObjectMeta.UID] = true

for i, p := range gs.Spec.Ports {
if p.PortPolicy == v1alpha1.Dynamic {
// pop off allocation
var a pn
a, allocations = allocations[0], allocations[1:]
a.pa[a.port] = true
gs.Spec.Ports[i].HostPort = a.port
// this allows us to do recursion, within the mutex lock
var allocate func(gs *v1alpha1.GameServer) *v1alpha1.GameServer
allocate = func(gs *v1alpha1.GameServer) *v1alpha1.GameServer {
amount := gs.CountPorts(v1alpha1.Dynamic)
allocations := findOpenPorts(amount)

if len(allocations) == amount {
pa.gameServerRegistry[gs.ObjectMeta.UID] = true

for i, p := range gs.Spec.Ports {
if p.PortPolicy == v1alpha1.Dynamic {
// pop off allocation
var a pn
a, allocations = allocations[0], allocations[1:]
a.pa[a.port] = true
gs.Spec.Ports[i].HostPort = a.port
}
}

return gs
}

return gs, nil
// if we get here, we ran out of ports. Add a node, and try again.
// this is important, because to autoscale scale up, we create GameServers that
// can't be scheduled on the current set of nodes, so we need to be sure
// there are always ports available to be allocated.
pa.portAllocations = append(pa.portAllocations, pa.newPortAllocation())

return allocate(gs)
}

return gs, ErrPortNotFound
return allocate(gs)
}

// DeAllocate marks the given port as no longer allocated
Expand All @@ -224,28 +189,6 @@ func (pa *PortAllocator) DeAllocate(gs *v1alpha1.GameServer) {
delete(pa.gameServerRegistry, gs.ObjectMeta.UID)
}

// syncAddNode adds another node port section
// to the available ports
func (pa *PortAllocator) syncAddNode(node *corev1.Node) {
// if we're already added this node, don't do it again
if _, ok := pa.nodeRegistry[node.ObjectMeta.UID]; ok {
pa.logger.WithField("node", node.ObjectMeta.Name).Info("Already added node to port allocations. Skipping")
return
}

pa.logger.WithField("node", node.ObjectMeta.Name).Info("Adding Node to port allocations")
pa.mutex.Lock()
defer pa.mutex.Unlock()

ports := portAllocation{}
for i := pa.minPort; i <= pa.maxPort; i++ {
ports[i] = false
}

pa.portAllocations = append(pa.portAllocations, ports)
pa.nodeRegistry[node.ObjectMeta.UID] = true
}

// syncDeleteGameServer when a GameServer Pod is deleted
// make the HostPort available
func (pa *PortAllocator) syncDeleteGameServer(object interface{}) {
Expand All @@ -260,7 +203,7 @@ func (pa *PortAllocator) syncDeleteGameServer(object interface{}) {
// and Terminating Pods values make sure those
// portAllocations are marked as taken.
// Locks the mutex while doing this.
// This is basically a stop the world Garbage Collection on port allocations.
// This is basically a stop the world Garbage Collection on port allocations, but it only happens on startup.
func (pa *PortAllocator) syncAll() error {
pa.mutex.Lock()
defer pa.mutex.Unlock()
Expand All @@ -272,9 +215,6 @@ func (pa *PortAllocator) syncAll() error {
return errors.Wrap(err, "error listing all nodes")
}

// setup blank port values
nodePorts, nodeRegistry := pa.nodePortAllocation(nodes)

gameservers, err := pa.gameServerLister.List(labels.Everything())
if err != nil {
return errors.Wrapf(err, "error listing all GameServers")
Expand All @@ -283,15 +223,7 @@ func (pa *PortAllocator) syncAll() error {
gsRegistry := map[types.UID]bool{}

// place to put GameServer port allocations that are not ready yet/after the ready state
nonReadyNodesPorts := pa.registerExistingGameServerPorts(gameservers, gsRegistry, nodePorts)

// this gives us back an ordered node list.
allocations := make([]portAllocation, len(nodePorts))
i := 0
for _, np := range nodePorts {
allocations[i] = np
i++
}
allocations, nonReadyNodesPorts := pa.registerExistingGameServerPorts(gameservers, nodes, gsRegistry)

// close off the port on the first node you find
// we actually don't mind what node it is, since we only care
Expand All @@ -303,14 +235,21 @@ func (pa *PortAllocator) syncAll() error {

pa.portAllocations = allocations
pa.gameServerRegistry = gsRegistry
pa.nodeRegistry = nodeRegistry

return nil
}

// registerExistingGameServerPorts registers the gameservers against gsRegistry and the ports against nodePorts.
// and returns an ordered list of portAllocations per cluster nodes, and an array of
// any GameServers allocated a port, but not yet assigned a Node will returned as an array of port values.
func (pa *PortAllocator) registerExistingGameServerPorts(gameservers []*v1alpha1.GameServer, gsRegistry map[types.UID]bool, nodePorts map[string]portAllocation) []int32 {
func (pa *PortAllocator) registerExistingGameServerPorts(gameservers []*v1alpha1.GameServer, nodes []*corev1.Node, gsRegistry map[types.UID]bool) ([]portAllocation, []int32) {
// setup blank port values
nodePortAllocation := pa.nodePortAllocation(nodes)
nodePortCount := make(map[string]int64, len(nodes))
for _, n := range nodes {
nodePortCount[n.ObjectMeta.Name] = 0
}

var nonReadyNodesPorts []int32

for _, gs := range gameservers {
Expand All @@ -319,37 +258,61 @@ func (pa *PortAllocator) registerExistingGameServerPorts(gameservers []*v1alpha1
gsRegistry[gs.ObjectMeta.UID] = true

// if the node doesn't exist, it's likely unscheduled
_, ok := nodePorts[gs.Status.NodeName]
_, ok := nodePortAllocation[gs.Status.NodeName]
if gs.Status.NodeName != "" && ok {
nodePorts[gs.Status.NodeName][p.HostPort] = true
nodePortAllocation[gs.Status.NodeName][p.HostPort] = true
nodePortCount[gs.Status.NodeName]++
} else if p.HostPort != 0 {
nonReadyNodesPorts = append(nonReadyNodesPorts, p.HostPort)
}
}
}
}
return nonReadyNodesPorts

// make a list of the keys
keys := make([]string, 0, len(nodePortAllocation))
for k := range nodePortAllocation {
keys = append(keys, k)
}

// sort, since this is how it would have originally been allocated across the
// ordered []portAllocation
sort.Slice(keys, func(i, j int) bool {
return nodePortCount[keys[i]] > nodePortCount[keys[j]]
})

// this gives us back an ordered node list
allocations := make([]portAllocation, len(nodePortAllocation))
for i, k := range keys {
allocations[i] = nodePortAllocation[k]

}

return allocations, nonReadyNodesPorts
}

// nodePortAllocation returns a map of port allocations all set to being available
// with a map key for each node, as well as the node registry record (since we're already looping)
func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) (map[string]portAllocation, map[types.UID]bool) {
func (pa *PortAllocator) nodePortAllocation(nodes []*corev1.Node) map[string]portAllocation {
nodePorts := map[string]portAllocation{}
nodeRegistry := map[types.UID]bool{}

for _, n := range nodes {
nodeRegistry[n.ObjectMeta.UID] = true

// ignore unschedulable nodes
if !n.Spec.Unschedulable {
nodePorts[n.Name] = portAllocation{}
for i := pa.minPort; i <= pa.maxPort; i++ {
nodePorts[n.Name][i] = false
}
nodePorts[n.Name] = pa.newPortAllocation()
}
}

return nodePorts, nodeRegistry
return nodePorts
}

func (pa *PortAllocator) newPortAllocation() portAllocation {
p := make(portAllocation, (pa.maxPort-pa.minPort)+1)
for i := pa.minPort; i <= pa.maxPort; i++ {
p[i] = false
}

return p
}

// setPortAllocation takes a port from an all
Expand Down
Loading

0 comments on commit 21c7edc

Please sign in to comment.