Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor AllocationCounter to GameServerCounter #639

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ func main() {

allocationMutex := &sync.Mutex{}

gsCounter := gameservers.NewPerNodeCounter(kubeInformerFactory, agonesInformerFactory)

gsController := gameservers.NewController(wh, health,
ctlConf.MinPort, ctlConf.MaxPort, ctlConf.SidecarImage, ctlConf.AlwaysPullSidecar,
ctlConf.SidecarCPURequest, ctlConf.SidecarCPULimit,
Expand All @@ -193,13 +195,13 @@ func main() {
fleetController := fleets.NewController(wh, health, kubeClient, extClient, agonesClient, agonesInformerFactory)
faController := fleetallocation.NewController(wh, allocationMutex,
kubeClient, extClient, agonesClient, agonesInformerFactory)
gasController := gameserverallocations.NewController(wh, health, kubeClient,
kubeInformerFactory, extClient, agonesClient, agonesInformerFactory, topNGSForAllocation)
gasController := gameserverallocations.NewController(wh, health, gsCounter, topNGSForAllocation,
kubeClient, extClient, agonesClient, agonesInformerFactory)
fasController := fleetautoscalers.NewController(wh, health,
kubeClient, extClient, agonesClient, agonesInformerFactory)

rs = append(rs,
httpsServer, gsController, gsSetController, fleetController, faController, fasController, gasController, server)
httpsServer, gsCounter, gsController, gsSetController, fleetController, faController, fasController, gasController, server)

stop := signals.NewStopChannel()

Expand Down
32 changes: 13 additions & 19 deletions pkg/gameserverallocations/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
getterv1alpha1 "agones.dev/agones/pkg/client/clientset/versioned/typed/stable/v1alpha1"
"agones.dev/agones/pkg/client/informers/externalversions"
listerv1alpha1 "agones.dev/agones/pkg/client/listers/stable/v1alpha1"
"agones.dev/agones/pkg/gameservers"
"agones.dev/agones/pkg/util/crd"
"agones.dev/agones/pkg/util/logfields"
"agones.dev/agones/pkg/util/runtime"
Expand All @@ -43,7 +44,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -61,8 +61,12 @@ var (

// Controller is a the GameServerAllocation controller
type Controller struct {
baseLogger *logrus.Entry
counter *AllocationCounter
baseLogger *logrus.Entry
counter *gameservers.PerNodeCounter
readyGameServers gameServerCacheEntry
// Instead of selecting the top one, controller selects a random one
// from the topNGameServerCount of Ready gameservers
topNGameServerCount int
crdGetter v1beta1.CustomResourceDefinitionInterface
gameServerSynced cache.InformerSynced
gameServerGetter getterv1alpha1.GameServersGetter
Expand All @@ -73,10 +77,6 @@ type Controller struct {
workerqueue *workerqueue.WorkerQueue
gsWorkerqueue *workerqueue.WorkerQueue
recorder record.EventRecorder
readyGameServers gameServerCacheEntry
// Instead of selecting the top one, controller selects a random one
// from the topNGameServerCount of Ready gameservers
topNGameServerCount int
}

// gameserver cache to keep the Ready state gameserver.
Expand Down Expand Up @@ -133,7 +133,7 @@ func (e *gameServerCacheEntry) Range(f func(key string, gs *v1alpha1.GameServer)
// findComparator is a comparator function specifically for the
// findReadyGameServerForAllocation method for determining
// scheduling strategy
type findComparator func(bestCount, currentCount NodeCount) bool
type findComparator func(bestCount, currentCount gameservers.NodeCount) bool

var allocationRetry = wait.Backoff{
Steps: 5,
Expand All @@ -145,24 +145,24 @@ var allocationRetry = wait.Backoff{
// NewController returns a controller for a GameServerAllocation
func NewController(wh *webhooks.WebHook,
health healthcheck.Handler,
counter *gameservers.PerNodeCounter,
topNGameServerCnt int,
kubeClient kubernetes.Interface,
kubeInformerFactory informers.SharedInformerFactory,
extClient extclientset.Interface,
agonesClient versioned.Interface,
agonesInformerFactory externalversions.SharedInformerFactory,
topNGameServerCnt int,
) *Controller {

agonesInformer := agonesInformerFactory.Stable().V1alpha1()
c := &Controller{
counter: NewAllocationCounter(kubeInformerFactory, agonesInformerFactory),
counter: counter,
topNGameServerCount: topNGameServerCnt,
crdGetter: extClient.ApiextensionsV1beta1().CustomResourceDefinitions(),
gameServerSynced: agonesInformer.GameServers().Informer().HasSynced,
gameServerGetter: agonesClient.StableV1alpha1(),
gameServerLister: agonesInformer.GameServers().Lister(),
gameServerAllocationSynced: agonesInformer.GameServerAllocations().Informer().HasSynced,
gameServerAllocationGetter: agonesClient.StableV1alpha1(),
topNGameServerCount: topNGameServerCnt,
}
c.baseLogger = runtime.NewLoggerWithType(c)
c.workerqueue = workerqueue.NewWorkerQueue(c.syncDelete, c.baseLogger, logfields.GameServerAllocationKey, stable.GroupName+".GameServerAllocationController")
Expand Down Expand Up @@ -217,13 +217,7 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error {
return err
}

err = c.counter.Run(stop)
if err != nil {
return err
}

c.stop = stop

c.baseLogger.Info("Wait for cache sync")
if !cache.WaitForCacheSync(stop, c.gameServerAllocationSynced, c.gameServerSynced) {
return errors.New("failed to wait for caches to sync")
Expand Down Expand Up @@ -444,7 +438,7 @@ func (c *Controller) syncDelete(key string) error {
// preferred selectors, as well as the passed in comparator
func (c *Controller) findReadyGameServerForAllocation(gsa *v1alpha1.GameServerAllocation, comparator findComparator) (*v1alpha1.GameServer, error) {
// track the best node count
var bestCount *NodeCount
var bestCount *gameservers.NodeCount
// the current GameServer from the node with the most GameServers (allocated, ready)
var bestGS *v1alpha1.GameServer

Expand Down
18 changes: 10 additions & 8 deletions pkg/gameserverallocations/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"agones.dev/agones/pkg/apis/stable/v1alpha1"
"agones.dev/agones/pkg/gameservers"
agtesting "agones.dev/agones/pkg/testing"
"agones.dev/agones/pkg/util/webhooks"
applypatch "github.com/evanphx/json-patch"
Expand Down Expand Up @@ -186,7 +187,7 @@ func TestControllerAllocate(t *testing.T) {
err := c.syncReadyGSServerCache()
assert.Nil(t, err)

err = c.counter.Run(stop)
err = c.counter.Run(0, stop)
assert.Nil(t, err)

gsa := v1alpha1.GameServerAllocation{ObjectMeta: metav1.ObjectMeta{Name: "gsa-1"},
Expand Down Expand Up @@ -263,7 +264,7 @@ func TestControllerAllocatePriority(t *testing.T) {
err := c.syncReadyGSServerCache()
assert.Nil(t, err)

err = c.counter.Run(stop)
err = c.counter.Run(0, stop)
assert.Nil(t, err)

gas := &v1alpha1.GameServerAllocation{ObjectMeta: metav1.ObjectMeta{Name: "fa-1"},
Expand Down Expand Up @@ -351,7 +352,7 @@ func TestControllerWithoutAllocateMutex(t *testing.T) {
err := c.syncReadyGSServerCache()
assert.Nil(t, err)

err = c.counter.Run(stop)
err = c.counter.Run(0, stop)
assert.Nil(t, err)

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -417,7 +418,7 @@ func TestControllerFindPackedReadyGameServer(t *testing.T) {
err := c.syncReadyGSServerCache()
assert.Nil(t, err)

err = c.counter.Run(stop)
err = c.counter.Run(0, stop)
assert.Nil(t, err)

gs, err := c.findReadyGameServerForAllocation(gsa, packedComparator)
Expand Down Expand Up @@ -479,7 +480,7 @@ func TestControllerFindPackedReadyGameServer(t *testing.T) {
err := c.syncReadyGSServerCache()
assert.Nil(t, err)

err = c.counter.Run(stop)
err = c.counter.Run(0, stop)
assert.Nil(t, err)

gs, err := c.findReadyGameServerForAllocation(prefGsa, packedComparator)
Expand Down Expand Up @@ -544,7 +545,7 @@ func TestControllerFindPackedReadyGameServer(t *testing.T) {
err := c.syncReadyGSServerCache()
assert.Nil(t, err)

err = c.counter.Run(stop)
err = c.counter.Run(0, stop)
assert.Nil(t, err)

gs, err := c.findReadyGameServerForAllocation(gsa, packedComparator)
Expand Down Expand Up @@ -601,7 +602,7 @@ func TestControllerFindDistributedReadyGameServer(t *testing.T) {
err := c.syncReadyGSServerCache()
assert.Nil(t, err)

err = c.counter.Run(stop)
err = c.counter.Run(0, stop)
assert.Nil(t, err)

gs, err := c.findReadyGameServerForAllocation(gsa, distributedComparator)
Expand Down Expand Up @@ -808,7 +809,8 @@ func defaultFixtures(gsLen int) (*v1alpha1.Fleet, *v1alpha1.GameServerSet, []v1a
func newFakeController() (*Controller, agtesting.Mocks) {
m := agtesting.NewMocks()
wh := webhooks.NewWebHook(http.NewServeMux())
c := NewController(wh, healthcheck.NewHandler(), m.KubeClient, m.KubeInformerFactory, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory, 1)
counter := gameservers.NewPerNodeCounter(m.KubeInformerFactory, m.AgonesInformerFactory)
c := NewController(wh, healthcheck.NewHandler(), counter, 1, m.KubeClient, m.ExtClient, m.AgonesClient, m.AgonesInformerFactory)
c.recorder = m.FakeRecorder
return c, m
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/gameserverallocations/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

package gameserverallocations

import "agones.dev/agones/pkg/gameservers"

// packedComparator prioritises Nodes with GameServers that are allocated, and then Nodes with the most
// Ready GameServers -- this will bin pack allocated game servers together.
func packedComparator(bestCount, currentCount NodeCount) bool {
if currentCount.allocated == bestCount.allocated && currentCount.ready > bestCount.ready {
func packedComparator(bestCount, currentCount gameservers.NodeCount) bool {
if currentCount.Allocated == bestCount.Allocated && currentCount.Ready > bestCount.Ready {
return true
} else if currentCount.allocated > bestCount.allocated {
} else if currentCount.Allocated > bestCount.Allocated {
return true
}

Expand All @@ -28,6 +30,6 @@ func packedComparator(bestCount, currentCount NodeCount) bool {

// distributedComparator is the inverse of the packed comparator,
// looking to distribute allocated gameservers on as many nodes as possible.
func distributedComparator(bestCount, currentCount NodeCount) bool {
func distributedComparator(bestCount, currentCount gameservers.NodeCount) bool {
return !packedComparator(bestCount, currentCount)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package gameserverallocations
package gameservers

import (
"sync"
Expand All @@ -29,11 +29,11 @@ import (
"k8s.io/client-go/tools/cache"
)

// AllocationCounter counts how many Allocated and
// PerNodeCounter counts how many Allocated and
// Ready GameServers currently exist on each node.
// This is useful for scheduling allocations on the
// right Nodes.
type AllocationCounter struct {
// This is useful for scheduling allocations, fleet management
// mostly under a Packed strategy
type PerNodeCounter struct {
logger *logrus.Entry
gameServerSynced cache.InformerSynced
gameServerLister listerv1alpha1.GameServerLister
Expand All @@ -44,19 +44,21 @@ type AllocationCounter struct {
// NodeCount is just a convenience data structure for
// keeping relevant GameServer counts about Nodes
type NodeCount struct {
ready int64
allocated int64
// Ready is ready count
Ready int64
// Allocated is allocated out
Allocated int64
}

// NewAllocationCounter returns a new AllocationCounter
func NewAllocationCounter(
// NewPerNodeCounter returns a new PerNodeCounter
func NewPerNodeCounter(
kubeInformerFactory informers.SharedInformerFactory,
agonesInformerFactory externalversions.SharedInformerFactory) *AllocationCounter {
agonesInformerFactory externalversions.SharedInformerFactory) *PerNodeCounter {

gameServers := agonesInformerFactory.Stable().V1alpha1().GameServers()
gsInformer := gameServers.Informer()

ac := &AllocationCounter{
ac := &PerNodeCounter{
gameServerSynced: gsInformer.HasSynced,
gameServerLister: gameServers.Lister(),
countMutex: sync.RWMutex{},
Expand Down Expand Up @@ -131,17 +133,18 @@ func NewAllocationCounter(
}

// Run sets up the current state GameServer counts across nodes
func (ac *AllocationCounter) Run(stop <-chan struct{}) error {
ac.countMutex.Lock()
defer ac.countMutex.Unlock()
// non blocking Run function.
func (pnc *PerNodeCounter) Run(_ int, stop <-chan struct{}) error {
pnc.countMutex.Lock()
defer pnc.countMutex.Unlock()

ac.logger.Info("Running")
pnc.logger.Info("Running")

if !cache.WaitForCacheSync(stop, ac.gameServerSynced) {
if !cache.WaitForCacheSync(stop, pnc.gameServerSynced) {
return errors.New("failed to wait for caches to sync")
}

gsList, err := ac.gameServerLister.List(labels.Everything())
gsList, err := pnc.gameServerLister.List(labels.Everything())
if err != nil {
return errors.Wrap(err, "error attempting to list all GameServers")
}
Expand All @@ -155,49 +158,49 @@ func (ac *AllocationCounter) Run(stop <-chan struct{}) error {

switch gs.Status.State {
case v1alpha1.GameServerStateReady:
counts[gs.Status.NodeName].ready++
counts[gs.Status.NodeName].Ready++
case v1alpha1.GameServerStateAllocated:
counts[gs.Status.NodeName].allocated++
counts[gs.Status.NodeName].Allocated++
}
}

ac.counts = counts
pnc.counts = counts
return nil
}

// Counts returns the NodeCount map in a thread safe way
func (ac *AllocationCounter) Counts() map[string]NodeCount {
ac.countMutex.RLock()
defer ac.countMutex.RUnlock()
func (pnc *PerNodeCounter) Counts() map[string]NodeCount {
pnc.countMutex.RLock()
defer pnc.countMutex.RUnlock()

result := make(map[string]NodeCount, len(ac.counts))
result := make(map[string]NodeCount, len(pnc.counts))

// return a copy, so it's thread safe
for k, v := range ac.counts {
for k, v := range pnc.counts {
result[k] = *v
}

return result
}

func (ac *AllocationCounter) inc(gs *v1alpha1.GameServer, ready, allocated int64) {
ac.countMutex.Lock()
defer ac.countMutex.Unlock()
func (pnc *PerNodeCounter) inc(gs *v1alpha1.GameServer, ready, allocated int64) {
pnc.countMutex.Lock()
defer pnc.countMutex.Unlock()

_, ok := ac.counts[gs.Status.NodeName]
_, ok := pnc.counts[gs.Status.NodeName]
if !ok {
ac.counts[gs.Status.NodeName] = &NodeCount{}
pnc.counts[gs.Status.NodeName] = &NodeCount{}
}

ac.counts[gs.Status.NodeName].allocated += allocated
ac.counts[gs.Status.NodeName].ready += ready
pnc.counts[gs.Status.NodeName].Allocated += allocated
pnc.counts[gs.Status.NodeName].Ready += ready

// just in case
if ac.counts[gs.Status.NodeName].allocated < 0 {
ac.counts[gs.Status.NodeName].allocated = 0
if pnc.counts[gs.Status.NodeName].Allocated < 0 {
pnc.counts[gs.Status.NodeName].Allocated = 0
}

if ac.counts[gs.Status.NodeName].ready < 0 {
ac.counts[gs.Status.NodeName].ready = 0
if pnc.counts[gs.Status.NodeName].Ready < 0 {
pnc.counts[gs.Status.NodeName].Ready = 0
}
}
Loading