Skip to content

Commit

Permalink
Flaky: TestHealthControllerRun (#816)
Browse files Browse the repository at this point in the history
Took another look at this rarely flaky test - I think I worked out some
potential causes.

Along the way, I realised we could also be doing some better sync and
health checking, which should hopefully also helps stop the test
flakiness.
  • Loading branch information
markmandel committed Jun 11, 2019
1 parent 66ad81a commit e5229d5
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
9 changes: 7 additions & 2 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func NewController(
nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(),
nodeSynced: kubeInformerFactory.Core().V1().Nodes().Informer().HasSynced,
portAllocator: NewPortAllocator(minPort, maxPort, kubeInformerFactory, agonesInformerFactory),
healthController: NewHealthController(kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory),
healthController: NewHealthController(health, kubeClient, agonesClient, kubeInformerFactory, agonesInformerFactory),
}

c.baseLogger = runtime.NewLoggerWithType(c)
Expand Down Expand Up @@ -309,7 +309,12 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error {
}

// Run the Health Controller
go c.healthController.Run(stop)
go func() {
err = c.healthController.Run(stop)
if err != nil {
c.baseLogger.WithError(err).Error("error running health controller")
}
}()

// start work queues
var wg sync.WaitGroup
Expand Down
21 changes: 18 additions & 3 deletions pkg/gameservers/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"agones.dev/agones/pkg/util/logfields"
"agones.dev/agones/pkg/util/runtime"
"agones.dev/agones/pkg/util/workerqueue"
"github.com/heptiolabs/healthcheck"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
Expand All @@ -47,26 +48,33 @@ type HealthController struct {
baseLogger *logrus.Entry
podSynced cache.InformerSynced
podLister corelisterv1.PodLister
gameServerSynced cache.InformerSynced
gameServerGetter getterv1alpha1.GameServersGetter
gameServerLister listerv1alpha1.GameServerLister
workerqueue *workerqueue.WorkerQueue
recorder record.EventRecorder
}

// NewHealthController returns a HealthController
func NewHealthController(kubeClient kubernetes.Interface, agonesClient versioned.Interface, kubeInformerFactory informers.SharedInformerFactory,
func NewHealthController(health healthcheck.Handler,
kubeClient kubernetes.Interface,
agonesClient versioned.Interface,
kubeInformerFactory informers.SharedInformerFactory,
agonesInformerFactory externalversions.SharedInformerFactory) *HealthController {

podInformer := kubeInformerFactory.Core().V1().Pods().Informer()
gameserverInformer := agonesInformerFactory.Stable().V1alpha1().GameServers()
hc := &HealthController{
podSynced: podInformer.HasSynced,
podLister: kubeInformerFactory.Core().V1().Pods().Lister(),
gameServerSynced: gameserverInformer.Informer().HasSynced,
gameServerGetter: agonesClient.StableV1alpha1(),
gameServerLister: agonesInformerFactory.Stable().V1alpha1().GameServers().Lister(),
gameServerLister: gameserverInformer.Lister(),
}

hc.baseLogger = runtime.NewLoggerWithType(hc)
hc.workerqueue = workerqueue.NewWorkerQueue(hc.syncGameServer, hc.baseLogger, logfields.GameServerKey, stable.GroupName+".HealthController")
health.AddLivenessCheck("gameserver-health-workerqueue", healthcheck.Check(hc.workerqueue.Healthy))

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(hc.baseLogger.Infof)
Expand Down Expand Up @@ -126,8 +134,15 @@ func (hc *HealthController) failedContainer(pod *corev1.Pod) bool {

// Run processes the rate limited queue.
// Will block until stop is closed
func (hc *HealthController) Run(stop <-chan struct{}) {
func (hc *HealthController) Run(stop <-chan struct{}) error {
hc.baseLogger.Info("Wait for cache sync")
if !cache.WaitForCacheSync(stop, hc.gameServerSynced, hc.podSynced) {
return errors.New("failed to wait for caches to sync")
}

hc.workerqueue.Run(1, stop)

return nil
}

func (hc *HealthController) loggerForGameServerKey(key string) *logrus.Entry {
Expand Down
26 changes: 16 additions & 10 deletions pkg/gameservers/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (

"agones.dev/agones/pkg/apis/stable/v1alpha1"
agtesting "agones.dev/agones/pkg/testing"
"github.com/heptiolabs/healthcheck"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
k8stesting "k8s.io/client-go/testing"
)
Expand All @@ -32,7 +34,7 @@ func TestHealthControllerFailedContainer(t *testing.T) {
t.Parallel()

m := agtesting.NewMocks()
hc := NewHealthController(m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory)
hc := NewHealthController(healthcheck.NewHandler(), m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory)

gs := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test"}, Spec: newSingleContainerSpec()}
gs.ApplyDefaults()
Expand All @@ -56,7 +58,7 @@ func TestHealthUnschedulableWithNoFreePorts(t *testing.T) {
t.Parallel()

m := agtesting.NewMocks()
hc := NewHealthController(m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory)
hc := NewHealthController(healthcheck.NewHandler(), m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory)

gs := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test"}, Spec: newSingleContainerSpec()}
gs.ApplyDefaults()
Expand Down Expand Up @@ -119,7 +121,7 @@ func TestHealthControllerSyncGameServer(t *testing.T) {
for name, test := range fixtures {
t.Run(name, func(t *testing.T) {
m := agtesting.NewMocks()
hc := NewHealthController(m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory)
hc := NewHealthController(healthcheck.NewHandler(), m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory)
hc.recorder = m.FakeRecorder

gs := v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "test"}, Spec: newSingleContainerSpec(),
Expand Down Expand Up @@ -154,7 +156,7 @@ func TestHealthControllerSyncGameServer(t *testing.T) {

func TestHealthControllerRun(t *testing.T) {
m := agtesting.NewMocks()
hc := NewHealthController(m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory)
hc := NewHealthController(healthcheck.NewHandler(), m.KubeClient, m.AgonesClient, m.KubeInformerFactory, m.AgonesInformerFactory)
hc.recorder = m.FakeRecorder

gsWatch := watch.NewFake()
Expand Down Expand Up @@ -184,17 +186,21 @@ func TestHealthControllerRun(t *testing.T) {
stop, cancel := agtesting.StartInformers(m)
defer cancel()

go hc.Run(stop)
gsWatch.Add(gs.DeepCopy())
podWatch.Add(pod.DeepCopy())

gsWatch.Add(gs)
podWatch.Add(pod)
go hc.Run(stop) // nolint: errcheck
err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
return hc.workerqueue.RunCount() == 1, nil
})
assert.NoError(t, err)

pod.Status.ContainerStatuses = []corev1.ContainerStatus{{Name: gs.Spec.Container, State: corev1.ContainerState{Terminated: &corev1.ContainerStateTerminated{}}}}
// gate
assert.True(t, hc.failedContainer(pod))
assert.False(t, hc.unschedulableWithNoFreePorts(pod))

podWatch.Modify(pod)
podWatch.Modify(pod.DeepCopy())

select {
case <-updated:
Expand All @@ -213,7 +219,7 @@ func TestHealthControllerRun(t *testing.T) {
assert.True(t, hc.unschedulableWithNoFreePorts(pod))
assert.False(t, hc.failedContainer(pod))

podWatch.Modify(pod)
podWatch.Modify(pod.DeepCopy())

select {
case <-updated:
Expand All @@ -223,7 +229,7 @@ func TestHealthControllerRun(t *testing.T) {

agtesting.AssertEventContains(t, m.FakeRecorder.Events, string(v1alpha1.GameServerStateUnhealthy))

podWatch.Delete(pod)
podWatch.Delete(pod.DeepCopy())
select {
case <-updated:
case <-time.After(10 * time.Second):
Expand Down

0 comments on commit e5229d5

Please sign in to comment.