Skip to content

Commit

Permalink
Rework game server health initial delay handling (#3072)
Browse files Browse the repository at this point in the history
* Rework game server health initial delay handling

This is a redrive of #3046, which was reverted in #3068

Rework health check handling of InitialDelaySeconds. See
#2966 (comment):

* We remove any knowledge in the SDK of InitialDelaySeconds

* We remove the runHealth goroutine from main and shift this
responsibility to the /gshealthz handler

Along the way:

*  I noted that the FailureThreshold doesn't need to be enforced on
both the kubelet and SDK side, so in the injected liveness probe, I
dropped that to 1. Previously we were waiting more probes than we
needed to. In practice this is not terribly relevant since the SDK
pushes it into Unhealthy.

* Close race if enqueueState is called rapidly before update can succeed

* Re-add Autopilot 1.26 to test matrix (removed in #3059)

* Close consistency race in syncGameServerRequestReadyState:
If the SDK and controller win the race to update the Pod with the
GameServerReadyContainerIDAnnotation before kubelet even gets a chance
to add the running containers to the Pod, the controller may update
the pod with an empty annotation, which then confuses further runs.

* Fixes TestPlayerConnectWithCapacityZero flakes

May fully fix #2445 as well
  • Loading branch information
zmerlynn committed Apr 6, 2023
1 parent 0ee2d76 commit 26647a0
Show file tree
Hide file tree
Showing 6 changed files with 124 additions and 41 deletions.
4 changes: 0 additions & 4 deletions cloudbuild.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,6 @@ steps:
for version in "${!versionsAndRegions[@]}"
do
region=${versionsAndRegions[$version]}
if [ $cloudProduct = gke-autopilot ] && [ $version = 1.26 ]
then
continue
fi
if [ $cloudProduct = generic ]
then
featureWithGate="CustomFasSyncInterval=false&SafeToEvict=false&SDKGracefulTermination=false&StateAllocationFilter=false&PlayerAllocationFilter=true&PlayerTracking=true&ResetMetricsOnDelete=true&PodHostname=true&SplitControllerAndExtensions=true&FleetAllocationOverflow=true&Example=true"
Expand Down
13 changes: 12 additions & 1 deletion pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,9 +705,16 @@ func (c *Controller) addGameServerHealthCheck(gs *agonesv1.GameServer, pod *core
Port: intstr.FromInt(8080),
},
},
// The sidecar relies on kubelet to delay by InitialDelaySeconds after the
// container is started (after image pull, etc), and relies on the kubelet
// for PeriodSeconds heartbeats to evaluate health.
InitialDelaySeconds: gs.Spec.Health.InitialDelaySeconds,
PeriodSeconds: gs.Spec.Health.PeriodSeconds,
FailureThreshold: gs.Spec.Health.FailureThreshold,

// By the time /gshealthz returns unhealthy, the sidecar has already evaluated
// FailureThreshold in a row failed health checks, so on the kubelet side, one
// failure is sufficient to know the game server is unhealthy.
FailureThreshold: 1,
}
}

Expand Down Expand Up @@ -860,6 +867,10 @@ func (c *Controller) syncGameServerRequestReadyState(ctx context.Context, gs *ag
break
}
}
// Verify that we found the game server container - we may have a stale cache where pod is missing ContainerStatuses.
if _, ok := gsCopy.ObjectMeta.Annotations[agonesv1.GameServerReadyContainerIDAnnotation]; !ok {
return nil, workerqueue.NewDebugError(fmt.Errorf("game server container for GameServer %s in namespace %s not present in pod status, try again", gsCopy.ObjectMeta.Name, gsCopy.ObjectMeta.Namespace))
}

// Also update the pod with the same annotation, so we can check if the Pod data is up-to-date, now and also in the HealthController.
// But if it is already set, then ignore it, since we only need to do this one time.
Expand Down
37 changes: 35 additions & 2 deletions pkg/gameservers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ func TestControllerCreateGameServerPod(t *testing.T) {
assert.Equal(t, intstr.FromInt(8080), gsContainer.LivenessProbe.HTTPGet.Port)
assert.Equal(t, fixture.Spec.Health.InitialDelaySeconds, gsContainer.LivenessProbe.InitialDelaySeconds)
assert.Equal(t, fixture.Spec.Health.PeriodSeconds, gsContainer.LivenessProbe.PeriodSeconds)
assert.Equal(t, fixture.Spec.Health.FailureThreshold, gsContainer.LivenessProbe.FailureThreshold)
assert.Equal(t, int32(1), gsContainer.LivenessProbe.FailureThreshold)
assert.Len(t, gsContainer.VolumeMounts, 1)
assert.Equal(t, "/var/run/secrets/kubernetes.io/serviceaccount", gsContainer.VolumeMounts[0].MountPath)

Expand Down Expand Up @@ -1495,6 +1495,39 @@ func TestControllerSyncGameServerRequestReadyState(t *testing.T) {
assert.False(t, podUpdated, "Pod was updated")
})

t.Run("GameServer whose pod is missing ContainerStatuses, so should retry and not update", func(t *testing.T) {
c, m := newFakeController()

gsFixture := &agonesv1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"},
Spec: newSingleContainerSpec(), Status: agonesv1.GameServerStatus{State: agonesv1.GameServerStateRequestReady}}
gsFixture.ApplyDefaults()
gsFixture.Status.NodeName = nodeName
pod, err := gsFixture.Pod(agtesting.FakeAPIHooks{})
assert.Nil(t, err)
gsUpdated := false
podUpdated := false

m.KubeClient.AddReactor("list", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) {
return true, &corev1.PodList{Items: []corev1.Pod{*pod}}, nil
})
m.AgonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) {
gsUpdated = true
return true, nil, nil
})
m.KubeClient.AddReactor("update", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) {
podUpdated = true
return true, nil, nil
})

ctx, cancel := agtesting.StartInformers(m, c.podSynced)
defer cancel()

_, err = c.syncGameServerRequestReadyState(ctx, gsFixture)
assert.EqualError(t, err, "game server container for GameServer test in namespace default not present in pod status, try again")
assert.False(t, gsUpdated, "GameServer was updated")
assert.False(t, podUpdated, "Pod was updated")
})

t.Run("GameServer with non zero deletion datetime", func(t *testing.T) {
testWithNonZeroDeletionTimestamp(t, func(c *Controller, fixture *agonesv1.GameServer) (*agonesv1.GameServer, error) {
return c.syncGameServerRequestReadyState(context.Background(), fixture)
Expand Down Expand Up @@ -1730,7 +1763,7 @@ func TestControllerAddGameServerHealthCheck(t *testing.T) {
require.NotNil(t, probe)
assert.Equal(t, "/gshealthz", probe.HTTPGet.Path)
assert.Equal(t, intstr.IntOrString{IntVal: 8080}, probe.HTTPGet.Port)
assert.Equal(t, fixture.Spec.Health.FailureThreshold, probe.FailureThreshold)
assert.Equal(t, int32(1), probe.FailureThreshold)
assert.Equal(t, fixture.Spec.Health.InitialDelaySeconds, probe.InitialDelaySeconds)
assert.Equal(t, fixture.Spec.Health.PeriodSeconds, probe.PeriodSeconds)
}
Expand Down
38 changes: 18 additions & 20 deletions pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
k8sv1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -164,7 +163,7 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf
}
})
mux.HandleFunc("/gshealthz", func(w http.ResponseWriter, r *http.Request) {
if s.healthy() {
if s.runHealth() {
_, err := w.Write([]byte("ok"))
if err != nil {
s.logger.WithError(err).Error("could not send ok response on gshealthz")
Expand All @@ -188,12 +187,6 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf
return s, nil
}

// initHealthLastUpdated adds the initial delay to now, then it will always be after `now`
// until the delay passes
func (s *SDKServer) initHealthLastUpdated(healthInitialDelay time.Duration) {
s.healthLastUpdated = s.clock.Now().UTC().Add(healthInitialDelay)
}

// Run processes the rate limited queue.
// Will block until stop is closed
func (s *SDKServer) Run(ctx context.Context) error {
Expand Down Expand Up @@ -229,7 +222,7 @@ func (s *SDKServer) Run(ctx context.Context) error {
s.health = gs.Spec.Health
s.logger.WithField("health", s.health).Debug("Setting health configuration")
s.healthTimeout = time.Duration(gs.Spec.Health.PeriodSeconds) * time.Second
s.initHealthLastUpdated(time.Duration(gs.Spec.Health.InitialDelaySeconds) * time.Second)
s.touchHealthLastUpdated()

if gs.Status.State == agonesv1.GameServerStateReserved && gs.Status.ReservedUntil != nil {
s.gsUpdateMutex.Lock()
Expand All @@ -240,7 +233,6 @@ func (s *SDKServer) Run(ctx context.Context) error {
// start health checking running
if !s.health.Disabled {
s.logger.Debug("Starting GameServer health checking")
go wait.Until(s.runHealth, s.healthTimeout, ctx.Done())
}

// populate player tracking values
Expand Down Expand Up @@ -431,7 +423,10 @@ func (s *SDKServer) updateAnnotations(ctx context.Context) error {
// workerqueue
func (s *SDKServer) enqueueState(state agonesv1.GameServerState) {
s.gsUpdateMutex.Lock()
s.gsState = state
// Update cached state, but prevent transitions out of `Unhealthy` by the SDK.
if s.gsState != agonesv1.GameServerStateUnhealthy {
s.gsState = state
}
s.gsUpdateMutex.Unlock()
s.workerqueue.Enqueue(cache.ExplicitKey(string(updateState)))
}
Expand Down Expand Up @@ -814,15 +809,17 @@ func (s *SDKServer) sendGameServerUpdate(gs *agonesv1.GameServer) {
}
}

// runHealth actively checks the health, and if not
// healthy will push the Unhealthy state into the queue so
// it can be updated
func (s *SDKServer) runHealth() {
// runHealth checks the health as part of the /gshealthz hook, and if not
// healthy will push the Unhealthy state into the queue so it can be updated.
// Returns current health.
func (s *SDKServer) runHealth() bool {
s.checkHealth()
if !s.healthy() {
s.logger.WithField("gameServerName", s.gameServerName).Warn("GameServer has failed health check")
s.enqueueState(agonesv1.GameServerStateUnhealthy)
if s.healthy() {
return true
}
s.logger.WithField("gameServerName", s.gameServerName).Warn("GameServer has failed health check")
s.enqueueState(agonesv1.GameServerStateUnhealthy)
return false
}

// touchHealthLastUpdated sets the healthLastUpdated
Expand All @@ -838,10 +835,11 @@ func (s *SDKServer) touchHealthLastUpdated() {
// and if it is outside the timeout value, logger and
// count a failure
func (s *SDKServer) checkHealth() {
s.healthMutex.Lock()
defer s.healthMutex.Unlock()

timeout := s.healthLastUpdated.Add(s.healthTimeout)
if timeout.Before(s.clock.Now().UTC()) {
s.healthMutex.Lock()
defer s.healthMutex.Unlock()
s.healthFailureCount++
s.logger.WithField("failureCount", s.healthFailureCount).Warn("GameServer Health Check failed")
}
Expand Down
32 changes: 18 additions & 14 deletions pkg/sdkserver/sdkserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,10 @@ func TestSidecarRun(t *testing.T) {
},
"unhealthy": {
f: func(sc *SDKServer, ctx context.Context) {
// we have a 1 second timeout
time.Sleep(2 * time.Second)
time.Sleep(1 * time.Second)
sc.runHealth() // normally invoked from /gshealthz handler
time.Sleep(2 * time.Second) // exceed 1s timeout
sc.runHealth() // normally invoked from /gshealthz handler
},
expected: expected{
state: agonesv1.GameServerStateUnhealthy,
Expand Down Expand Up @@ -475,6 +477,12 @@ func TestSidecarUnhealthyMessage(t *testing.T) {
// manually push through an unhealthy state change
sc.enqueueState(agonesv1.GameServerStateUnhealthy)
agtesting.AssertEventContains(t, m.FakeRecorder.Events, "Health check failure")

// try to push back to Ready, enqueueState should block it.
sc.enqueueState(agonesv1.GameServerStateRequestReady)
sc.gsUpdateMutex.Lock()
assert.Equal(t, agonesv1.GameServerStateUnhealthy, sc.gsState)
sc.gsUpdateMutex.Unlock()
}

func TestSidecarHealthy(t *testing.T) {
Expand All @@ -487,7 +495,7 @@ func TestSidecarHealthy(t *testing.T) {
// manually set the values
sc.health = agonesv1.Health{FailureThreshold: 1}
sc.healthTimeout = 5 * time.Second
sc.initHealthLastUpdated(0 * time.Second)
sc.touchHealthLastUpdated()

now := time.Now().UTC()
fc := testclocks.NewFakeClock(now)
Expand Down Expand Up @@ -532,15 +540,12 @@ func TestSidecarHealthy(t *testing.T) {
t.Run("initial delay", func(t *testing.T) {
sc.health.Disabled = false
fc.SetTime(time.Now().UTC())
sc.initHealthLastUpdated(0)
sc.healthFailureCount = 0
sc.checkHealth()
assert.True(t, sc.healthy())
sc.touchHealthLastUpdated()

sc.initHealthLastUpdated(10 * time.Second)
sc.checkHealth()
assert.True(t, sc.healthy())
fc.Step(9 * time.Second)
// initial delay is handled by kubelet, runHealth() isn't
// called until container starts.
fc.Step(10 * time.Second)
sc.touchHealthLastUpdated()
sc.checkHealth()
assert.True(t, sc.healthy())

Expand All @@ -553,8 +558,7 @@ func TestSidecarHealthy(t *testing.T) {
sc.health.Disabled = false
sc.health.FailureThreshold = 3
fc.SetTime(time.Now().UTC())
sc.initHealthLastUpdated(0)
sc.healthFailureCount = 0
sc.touchHealthLastUpdated()

sc.checkHealth()
assert.True(t, sc.healthy())
Expand Down Expand Up @@ -622,8 +626,8 @@ func TestSidecarHTTPHealthCheck(t *testing.T) {

fc.Step(step)
time.Sleep(step)
testHTTPHealth(t, "http://localhost:8080/gshealthz", "", http.StatusInternalServerError) // force runHealth to run
assert.False(t, sc.healthy())
testHTTPHealth(t, "http://localhost:8080/gshealthz", "", http.StatusInternalServerError)
cancel()
wg.Wait() // wait for go routine test results.
}
Expand Down
41 changes: 41 additions & 0 deletions test/e2e/gameserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1249,3 +1249,44 @@ func TestGracefulShutdown(t *testing.T) {
log.WithField("diff", diff).Info("Time difference")
require.Less(t, diff, 40)
}

func TestGameServerSlowStart(t *testing.T) {
t.Parallel()

// Inject an additional game server sidecar that forces a delayed start
// to the main game server container following the pattern at
// https://medium.com/@marko.luksa/delaying-application-start-until-sidecar-is-ready-2ec2d21a7b74
gs := framework.DefaultGameServer(framework.Namespace)
gs.Spec.Template.Spec.Containers = append(
[]corev1.Container{{
Name: "delay-game-server-start",
Image: "alpine:latest",
ImagePullPolicy: corev1.PullIfNotPresent,
Command: []string{"sleep", "3600"},
Lifecycle: &corev1.Lifecycle{
PostStart: &corev1.LifecycleHandler{
Exec: &corev1.ExecAction{
Command: []string{"sleep", "60"},
},
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("30m"),
corev1.ResourceMemory: resource.MustParse("64Mi"),
},
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("30m"),
corev1.ResourceMemory: resource.MustParse("64Mi"),
},
},
}},
gs.Spec.Template.Spec.Containers...)

// Validate that a game server whose primary container starts slowly (a full minute
// after the SDK starts) is capable of reaching Ready. Here we force the condition
// with a lifecycle hook, but it imitates a slow image pull, or other container
// start delays.
_, err := framework.CreateGameServerAndWaitUntilReady(t, framework.Namespace, gs)
assert.NoError(t, err)
}

0 comments on commit 26647a0

Please sign in to comment.