diff --git a/pkg/gameservers/controller.go b/pkg/gameservers/controller.go index 9cae1be7d6..e1cc262f2c 100644 --- a/pkg/gameservers/controller.go +++ b/pkg/gameservers/controller.go @@ -116,7 +116,7 @@ func NewController( nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(), nodeSynced: kubeInformerFactory.Core().V1().Nodes().Informer().HasSynced, portAllocator: NewPortAllocator(minPort, maxPort, kubeInformerFactory, agonesInformerFactory), - healthController: NewHealthController(kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory), + healthController: NewHealthController(health, kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory), } c.baseLogger = runtime.NewLoggerWithType(c) @@ -309,7 +309,12 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error { } // Run the Health Controller - go c.healthController.Run(stop) + go func() { + err = c.healthController.Run(stop) + if err != nil { + c.baseLogger.WithError(err).Error("error running health controller") + } + }() // start work queues var wg sync.WaitGroup diff --git a/pkg/gameservers/health.go b/pkg/gameservers/health.go index 9bfb25eaa0..23afe2f93f 100644 --- a/pkg/gameservers/health.go +++ b/pkg/gameservers/health.go @@ -26,6 +26,7 @@ import ( "agones.dev/agones/pkg/util/logfields" "agones.dev/agones/pkg/util/runtime" "agones.dev/agones/pkg/util/workerqueue" + "github.com/heptiolabs/healthcheck" "github.com/pkg/errors" "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" @@ -47,6 +48,7 @@ type HealthController struct { baseLogger *logrus.Entry podSynced cache.InformerSynced podLister corelisterv1.PodLister + gameServerSynced cache.InformerSynced gameServerGetter getterv1alpha1.GameServersGetter gameServerLister listerv1alpha1.GameServerLister workerqueue *workerqueue.WorkerQueue @@ -54,19 +56,25 @@ type HealthController struct { } // NewHealthController returns a HealthController -func NewHealthController(kubeClient kubernetes.Interface, agonesClient versioned.Interface, kubeInformerFactory informers.SharedInformerFactory, +func NewHealthController(health healthcheck.Handler, + kubeClient kubernetes.Interface, + agonesClient versioned.Interface, + kubeInformerFactory informers.SharedInformerFactory, agonesInformerFactory externalversions.SharedInformerFactory) *HealthController { podInformer := kubeInformerFactory.Core().V1().Pods().Informer() + gameserverInformer := agonesInformerFactory.Stable().V1alpha1().GameServers() hc := &HealthController{ podSynced: podInformer.HasSynced, podLister: kubeInformerFactory.Core().V1().Pods().Lister(), + gameServerSynced: gameserverInformer.Informer().HasSynced, gameServerGetter: agonesClient.StableV1alpha1(), - gameServerLister: agonesInformerFactory.Stable().V1alpha1().GameServers().Lister(), + gameServerLister: gameserverInformer.Lister(), } hc.baseLogger = runtime.NewLoggerWithType(hc) hc.workerqueue = workerqueue.NewWorkerQueue(hc.syncGameServer, hc.baseLogger, logfields.GameServerKey, stable.GroupName+".HealthController") + health.AddLivenessCheck("gameserver-health-workerqueue", healthcheck.Check(hc.workerqueue.Healthy)) eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(hc.baseLogger.Infof) @@ -126,8 +134,15 @@ func (hc *HealthController) failedContainer(pod *corev1.Pod) bool { // Run processes the rate limited queue. // Will block until stop is closed -func (hc *HealthController) Run(stop <-chan struct{}) { +func (hc *HealthController) Run(stop <-chan struct{}) error { + hc.baseLogger.Info("Wait for cache sync") + if !cache.WaitForCacheSync(stop, hc.gameServerSynced, hc.podSynced) { + return errors.New("failed to wait for caches to sync") + } + hc.workerqueue.Run(1, stop) + + return nil } func (hc *HealthController) loggerForGameServerKey(key string) *logrus.Entry { diff --git a/pkg/gameservers/health_test.go b/pkg/gameservers/health_test.go index 9234b14ff9..b504a86255 100644 --- a/pkg/gameservers/health_test.go +++ b/pkg/gameservers/health_test.go @@ -20,10 +20,12 @@ import ( "agones.dev/agones/pkg/apis/stable/v1alpha1" agtesting "agones.dev/agones/pkg/testing" + "github.com/heptiolabs/healthcheck" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" k8stesting "k8s.io/client-go/testing" ) @@ -32,7 +34,7 @@ func TestHealthControllerFailedContainer(t *testing.T) { t.Parallel() m := agtesting.NewMocks() - hc := NewHealthController(m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory) + hc := NewHealthController(healthcheck.NewHandler(), m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory) gs := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test"}, Spec: newSingleContainerSpec()} gs.ApplyDefaults() @@ -56,7 +58,7 @@ func TestHealthUnschedulableWithNoFreePorts(t *testing.T) { t.Parallel() m := agtesting.NewMocks() - hc := NewHealthController(m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory) + hc := NewHealthController(healthcheck.NewHandler(), m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory) gs := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test"}, Spec: newSingleContainerSpec()} gs.ApplyDefaults() @@ -119,7 +121,7 @@ func TestHealthControllerSyncGameServer(t *testing.T) { for name, test := range fixtures { t.Run(name, func(t *testing.T) { m := agtesting.NewMocks() - hc := NewHealthController(m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory) + hc := NewHealthController(healthcheck.NewHandler(), m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory) hc.recorder = m.FakeRecorder gs := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "test"}, Spec: newSingleContainerSpec(), @@ -154,7 +156,7 @@ func TestHealthControllerSyncGameServer(t *testing.T) { func TestHealthControllerRun(t *testing.T) { m := agtesting.NewMocks() - hc := NewHealthController(m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory) + hc := NewHealthController(healthcheck.NewHandler(), m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory) hc.recorder = m.FakeRecorder gsWatch := watch.NewFake() @@ -184,17 +186,21 @@ func TestHealthControllerRun(t *testing.T) { stop, cancel := agtesting.StartInformers(m) defer cancel() - go hc.Run(stop) + gsWatch.Add(gs.DeepCopy()) + podWatch.Add(pod.DeepCopy()) - gsWatch.Add(gs) - podWatch.Add(pod) + go hc.Run(stop) // nolint: errcheck + err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) { + return hc.workerqueue.RunCount() == 1, nil + }) + assert.NoError(t, err) pod.Status.ContainerStatuses = []corev1.ContainerStatus{{Name: gs.Spec.Container, State: corev1.ContainerState{Terminated: &corev1.ContainerStateTerminated{}}}} // gate assert.True(t, hc.failedContainer(pod)) assert.False(t, hc.unschedulableWithNoFreePorts(pod)) - podWatch.Modify(pod) + podWatch.Modify(pod.DeepCopy()) select { case <-updated: @@ -213,7 +219,7 @@ func TestHealthControllerRun(t *testing.T) { assert.True(t, hc.unschedulableWithNoFreePorts(pod)) assert.False(t, hc.failedContainer(pod)) - podWatch.Modify(pod) + podWatch.Modify(pod.DeepCopy()) select { case <-updated: @@ -223,7 +229,7 @@ func TestHealthControllerRun(t *testing.T) { agtesting.AssertEventContains(t, m.FakeRecorder.Events, string(v1alpha1.GameServerStateUnhealthy)) - podWatch.Delete(pod) + podWatch.Delete(pod.DeepCopy()) select { case <-updated: case <-time.After(10 * time.Second):