From f414b2bf32404fc6ca93fc07f3a8e99249583d99 Mon Sep 17 00:00:00 2001 From: Mark Mandel Date: Thu, 9 Aug 2018 11:33:32 -0700 Subject: [PATCH] Fix bug: Disabled health checking not implemented This commit fixes actually two bugs: 1) While we documented that you could disabled health checking we never actually implemented it! This fixes that. Thankfully, the work that has been done to retrieve `GameServer` details through the SDK makes this relatively easy. 2) SDK Server sidecar never had the necessary RBAC permissions to send events to the `GameServer` CRD. This is now fixed as well. --- cmd/sdk-server/main.go | 53 +++----- .../agones/templates/serviceaccounts/sdk.yaml | 3 + install/yaml/install.yaml | 3 + pkg/gameservers/sdkserver.go | 90 +++++++------- pkg/gameservers/sdkserver_test.go | 116 ++++++++++++------ 5 files changed, 146 insertions(+), 119 deletions(-) diff --git a/cmd/sdk-server/main.go b/cmd/sdk-server/main.go index 4ffa12566e..ea522fa744 100644 --- a/cmd/sdk-server/main.go +++ b/cmd/sdk-server/main.go @@ -20,7 +20,6 @@ import ( "net" "net/http" "strings" - "time" "agones.dev/agones/pkg" "agones.dev/agones/pkg/client/clientset/versioned" @@ -46,12 +45,8 @@ const ( podNamespaceEnv = "POD_NAMESPACE" // Flags (that can also be env vars) - localFlag = "local" - addressFlag = "address" - healthDisabledFlag = "health-disabled" - healthTimeoutFlag = "health-timeout" - healthInitialDelayFlag = "health-initial-delay" - healthFailureThresholdFlag = "health-failure-threshold" + localFlag = "local" + addressFlag = "address" ) var ( @@ -107,14 +102,18 @@ func main() { } var s *gameservers.SDKServer - s, err = gameservers.NewSDKServer(viper.GetString(gameServerNameEnv), viper.GetString(podNamespaceEnv), - ctlConf.HealthDisabled, ctlConf.HealthTimeout, ctlConf.HealthFailureThreshold, - ctlConf.HealthInitialDelay, kubeClient, agonesClient) + s, err = gameservers.NewSDKServer(viper.GetString(gameServerNameEnv), + viper.GetString(podNamespaceEnv), kubeClient, agonesClient) if err != nil { logger.WithError(err).Fatalf("Could not start sidecar") } - go s.Run(ctx.Done()) + go func() { + err := s.Run(ctx.Done()) + if err != nil { + logger.WithError(err).Fatalf("Could not run sidecar") + } + }() sdk.RegisterSDKServer(grpcServer, s) } @@ -159,49 +158,25 @@ func runGateway(ctx context.Context, grpcEndpoint string, mux *gwruntime.ServeMu func parseEnvFlags() config { viper.SetDefault(localFlag, false) viper.SetDefault(addressFlag, "localhost") - viper.SetDefault(healthDisabledFlag, false) - viper.SetDefault(healthTimeoutFlag, 5) - viper.SetDefault(healthInitialDelayFlag, 5) - viper.SetDefault(healthFailureThresholdFlag, 3) pflag.Bool(localFlag, viper.GetBool(localFlag), "Set this, or LOCAL env, to 'true' to run this binary in local development mode. Defaults to 'false'") pflag.String(addressFlag, viper.GetString(addressFlag), "The Address to bind the server grpcPort to. Defaults to 'localhost") - pflag.Bool(healthDisabledFlag, viper.GetBool(healthDisabledFlag), - "Set this, or HEALTH_ENABLED env, to 'true' to enable health checking on the GameServer. Defaults to 'true'") - pflag.Int64(healthTimeoutFlag, viper.GetInt64(healthTimeoutFlag), - "Set this or HEALTH_TIMEOUT env to the number of seconds that the health check times out at. Defaults to 5") - pflag.Int64(healthInitialDelayFlag, viper.GetInt64(healthInitialDelayFlag), - "Set this or HEALTH_INITIAL_DELAY env to the number of seconds that the health will wait before starting. Defaults to 5") - pflag.Int64(healthFailureThresholdFlag, viper.GetInt64(healthFailureThresholdFlag), - "Set this or HEALTH_FAILURE_THRESHOLD env to the number of times the health check needs to fail to be deemed unhealthy. Defaults to 3") pflag.Parse() viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_")) runtime.Must(viper.BindEnv(localFlag)) runtime.Must(viper.BindEnv(gameServerNameEnv)) runtime.Must(viper.BindEnv(podNamespaceEnv)) - runtime.Must(viper.BindEnv(healthDisabledFlag)) - runtime.Must(viper.BindEnv(healthTimeoutFlag)) - runtime.Must(viper.BindEnv(healthInitialDelayFlag)) - runtime.Must(viper.BindEnv(healthFailureThresholdFlag)) runtime.Must(viper.BindPFlags(pflag.CommandLine)) return config{ - IsLocal: viper.GetBool(localFlag), - Address: viper.GetString(addressFlag), - HealthDisabled: viper.GetBool(healthDisabledFlag), - HealthTimeout: time.Duration(viper.GetInt64(healthTimeoutFlag)) * time.Second, - HealthInitialDelay: time.Duration(viper.GetInt64(healthInitialDelayFlag)) * time.Second, - HealthFailureThreshold: viper.GetInt64(healthFailureThresholdFlag), + IsLocal: viper.GetBool(localFlag), + Address: viper.GetString(addressFlag), } } // config is all the configuration for this program type config struct { - Address string - IsLocal bool - HealthDisabled bool - HealthTimeout time.Duration - HealthInitialDelay time.Duration - HealthFailureThreshold int64 + Address string + IsLocal bool } diff --git a/install/helm/agones/templates/serviceaccounts/sdk.yaml b/install/helm/agones/templates/serviceaccounts/sdk.yaml index 2116c28609..68737c2c5a 100644 --- a/install/helm/agones/templates/serviceaccounts/sdk.yaml +++ b/install/helm/agones/templates/serviceaccounts/sdk.yaml @@ -36,6 +36,9 @@ metadata: release: {{ .Release.Name }} heritage: {{ .Release.Service }} rules: +- apiGroups: [""] + resources: ["events"] + verbs: ["create"] - apiGroups: ["stable.agones.dev"] resources: ["gameservers"] verbs: ["list", "update", "watch"] diff --git a/install/yaml/install.yaml b/install/yaml/install.yaml index 231af08135..0423400b8d 100644 --- a/install/yaml/install.yaml +++ b/install/yaml/install.yaml @@ -111,6 +111,9 @@ metadata: release: agones-manual heritage: Tiller rules: +- apiGroups: [""] + resources: ["events"] + verbs: ["create"] - apiGroups: ["stable.agones.dev"] resources: ["gameservers"] verbs: ["list", "update", "watch"] diff --git a/pkg/gameservers/sdkserver.go b/pkg/gameservers/sdkserver.go index b7446e63c7..a482ad338a 100644 --- a/pkg/gameservers/sdkserver.go +++ b/pkg/gameservers/sdkserver.go @@ -50,33 +50,30 @@ var _ sdk.SDKServer = &SDKServer{} // SDKServer is a gRPC server, that is meant to be a sidecar // for a GameServer that will update the game server status on SDK requests type SDKServer struct { - logger *logrus.Entry - gameServerName string - namespace string - informerFactory externalversions.SharedInformerFactory - gameServerGetter typedv1alpha1.GameServersGetter - gameServerLister v1alpha1.GameServerLister - gameServerSynced cache.InformerSynced - server *http.Server - clock clock.Clock - healthDisabled bool - healthTimeout time.Duration - healthFailureThreshold int64 - healthMutex sync.RWMutex - healthLastUpdated time.Time - healthFailureCount int64 - workerqueue *workerqueue.WorkerQueue - streamMutex sync.RWMutex - connectedStreams []sdk.SDK_WatchGameServerServer - stop <-chan struct{} - recorder record.EventRecorder + logger *logrus.Entry + gameServerName string + namespace string + informerFactory externalversions.SharedInformerFactory + gameServerGetter typedv1alpha1.GameServersGetter + gameServerLister v1alpha1.GameServerLister + gameServerSynced cache.InformerSynced + server *http.Server + clock clock.Clock + health stablev1alpha1.Health + healthTimeout time.Duration + healthMutex sync.RWMutex + healthLastUpdated time.Time + healthFailureCount int32 + workerqueue *workerqueue.WorkerQueue + streamMutex sync.RWMutex + connectedStreams []sdk.SDK_WatchGameServerServer + stop <-chan struct{} + recorder record.EventRecorder } // NewSDKServer creates a SDKServer that sets up an // InClusterConfig for Kubernetes -func NewSDKServer(gameServerName, namespace string, - healthDisabled bool, healthTimeout time.Duration, healthFailureThreshold int64, healthInitialDelay time.Duration, - kubeClient kubernetes.Interface, +func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interface, agonesClient versioned.Interface) (*SDKServer, error) { mux := http.NewServeMux() @@ -97,13 +94,10 @@ func NewSDKServer(gameServerName, namespace string, Addr: ":8080", Handler: mux, }, - clock: clock.RealClock{}, - healthDisabled: healthDisabled, - healthFailureThreshold: healthFailureThreshold, - healthTimeout: healthTimeout, - healthMutex: sync.RWMutex{}, - healthFailureCount: 0, - streamMutex: sync.RWMutex{}, + clock: clock.RealClock{}, + healthMutex: sync.RWMutex{}, + healthFailureCount: 0, + streamMutex: sync.RWMutex{}, } s.informerFactory = factory @@ -140,7 +134,6 @@ func NewSDKServer(gameServerName, namespace string, } }) - s.initHealthLastUpdated(healthInitialDelay) s.workerqueue = workerqueue.NewWorkerQueue( func(key string) error { return s.updateState(stablev1alpha1.State(key)) @@ -161,7 +154,28 @@ func (s *SDKServer) initHealthLastUpdated(healthInitialDelay time.Duration) { // Run processes the rate limited queue. // Will block until stop is closed -func (s *SDKServer) Run(stop <-chan struct{}) { +func (s *SDKServer) Run(stop <-chan struct{}) error { + s.informerFactory.Start(stop) + cache.WaitForCacheSync(stop, s.gameServerSynced) + + gs, err := s.gameServerLister.GameServers(s.namespace).Get(s.gameServerName) + if err != nil { + return errors.Wrapf(err, "error retrieving gameserver %s/%s", s.namespace, s.gameServerName) + } + + // grab configuration details + s.health = gs.Spec.Health + s.logger.WithField("health", s.health).Info("setting health configuration") + s.healthTimeout = time.Duration(gs.Spec.Health.PeriodSeconds) * time.Second + s.initHealthLastUpdated(time.Duration(gs.Spec.Health.InitialDelaySeconds) * time.Second) + + // start health checking running + if !s.health.Disabled { + s.logger.Info("Starting GameServer health checking") + go wait.Until(s.runHealth, s.healthTimeout, stop) + } + + // then start the http endpoints s.logger.Info("Starting SDKServer http health check...") go func() { if err := s.server.ListenAndServe(); err != nil { @@ -175,16 +189,10 @@ func (s *SDKServer) Run(stop <-chan struct{}) { }() defer s.server.Close() // nolint: errcheck - if !s.healthDisabled { - s.logger.Info("Starting GameServer health checking") - go wait.Until(s.runHealth, s.healthTimeout, stop) - } - - s.informerFactory.Start(stop) - cache.WaitForCacheSync(stop, s.gameServerSynced) // need this for streaming gRPC commands s.stop = stop s.workerqueue.Run(1, stop) + return nil } // updateState sets the GameServer Status's state to the state @@ -371,11 +379,11 @@ func (s *SDKServer) checkHealth() { // currently healthy or not based on the configured // failure count vs failure threshold func (s *SDKServer) healthy() bool { - if s.healthDisabled { + if s.health.Disabled { return true } s.healthMutex.RLock() defer s.healthMutex.RUnlock() - return s.healthFailureCount < s.healthFailureThreshold + return s.healthFailureCount < s.health.FailureThreshold } diff --git a/pkg/gameservers/sdkserver_test.go b/pkg/gameservers/sdkserver_test.go index 1e8104af00..98782d5a1d 100644 --- a/pkg/gameservers/sdkserver_test.go +++ b/pkg/gameservers/sdkserver_test.go @@ -15,10 +15,9 @@ package gameservers import ( - "net/http" + "net/http" "sync" "testing" - "time" "agones.dev/agones/pkg/apis/stable/v1alpha1" @@ -34,7 +33,7 @@ import ( "k8s.io/apimachinery/pkg/watch" k8stesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" -) + ) func TestSidecarRun(t *testing.T) { t.Parallel() @@ -74,10 +73,14 @@ func TestSidecarRun(t *testing.T) { m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { gs := v1alpha1.GameServer{ ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default"}, + Spec: v1alpha1.GameServerSpec{ + Health: v1alpha1.Health{Disabled: false, FailureThreshold: 1, PeriodSeconds: 1, InitialDelaySeconds: 0}, + }, Status: v1alpha1.GameServerStatus{ State: v1alpha1.Starting, }, } + gs.ApplyDefaults() return true, &v1alpha1.GameServerList{Items: []v1alpha1.GameServer{gs}}, nil }) m.AgonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { @@ -90,27 +93,35 @@ func TestSidecarRun(t *testing.T) { return true, gs, nil }) - sc, err := NewSDKServer("test", "default", - false, time.Second, 1, 0, m.KubeClient, m.AgonesClient) + sc, err := NewSDKServer("test", "default", m.KubeClient, m.AgonesClient) assert.Nil(t, err) sc.recorder = m.FakeRecorder ctx, cancel := context.WithCancel(context.Background()) defer cancel() - - go sc.Run(ctx.Done()) + wg := sync.WaitGroup{} + wg.Add(1) + + go func() { + err := sc.Run(ctx.Done()) + assert.Nil(t, err) + wg.Done() + }() v.f(sc, ctx) - timeout := time.After(10 * time.Second) select { case <-done: - case <-timeout: + case <-time.After(10 * time.Second): assert.Fail(t, "Timeout on Run") } + logrus.Info("attempting to find event recording") for _, str := range v.recordings { agtesting.AssertEventContains(t, m.FakeRecorder.Events, str) } + + cancel() + wg.Wait() }) } } @@ -157,7 +168,7 @@ func TestSidecarHealthLastUpdated(t *testing.T) { sc, err := defaultSidecar(m) assert.Nil(t, err) - sc.healthDisabled = false + sc.health = v1alpha1.Health{Disabled: false} fc := clock.NewFakeClock(now) sc.clock = fc @@ -203,6 +214,11 @@ func TestSidecarHealthy(t *testing.T) { m := agtesting.NewMocks() sc, err := defaultSidecar(m) + // manually set the values + sc.health = v1alpha1.Health{FailureThreshold: 1} + sc.healthTimeout = 5 * time.Second + sc.initHealthLastUpdated(0 * time.Second) + assert.Nil(t, err) now := time.Now().UTC() @@ -233,7 +249,7 @@ func TestSidecarHealthy(t *testing.T) { for k, v := range fixtures { t.Run(k, func(t *testing.T) { logrus.WithField("test", k).Infof("Test Running") - sc.healthDisabled = v.disabled + sc.health.Disabled = v.disabled fc.SetTime(time.Now().UTC()) stream.msgs <- &sdk.Empty{} err = waitForMessage(sc) @@ -246,7 +262,7 @@ func TestSidecarHealthy(t *testing.T) { } t.Run("initial delay", func(t *testing.T) { - sc.healthDisabled = false + sc.health.Disabled = false fc.SetTime(time.Now().UTC()) sc.initHealthLastUpdated(0) sc.healthFailureCount = 0 @@ -266,15 +282,15 @@ func TestSidecarHealthy(t *testing.T) { }) t.Run("health failure threshold", func(t *testing.T) { - sc.healthDisabled = false - sc.healthFailureThreshold = 3 + sc.health.Disabled = false + sc.health.FailureThreshold = 3 fc.SetTime(time.Now().UTC()) sc.initHealthLastUpdated(0) sc.healthFailureCount = 0 sc.checkHealth() assert.True(t, sc.healthy()) - assert.Equal(t, int64(0), sc.healthFailureCount) + assert.Equal(t, int32(0), sc.healthFailureCount) fc.Step(10 * time.Second) sc.checkHealth() @@ -297,27 +313,50 @@ func TestSidecarHealthy(t *testing.T) { func TestSidecarHTTPHealthCheck(t *testing.T) { m := agtesting.NewMocks() - sc, err := NewSDKServer("test", "default", - false, 1*time.Second, 1, 0, m.KubeClient, m.AgonesClient) + sc, err := NewSDKServer("test", "default", m.KubeClient, m.AgonesClient) assert.Nil(t, err) now := time.Now().Add(time.Hour).UTC() fc := clock.NewFakeClock(now) // now we control time - so slow machines won't fail anymore sc.clock = fc - sc.healthLastUpdated = now - sc.healthFailureCount = 0 + + m.AgonesClient.AddReactor("list", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) { + gs := v1alpha1.GameServer{ + ObjectMeta: metav1.ObjectMeta{Name: sc.gameServerName, Namespace: sc.namespace}, + Spec: v1alpha1.GameServerSpec{ + Health: v1alpha1.Health{Disabled: false, FailureThreshold: 1, PeriodSeconds: 1, InitialDelaySeconds: 0}, + }, + } + + return true, &v1alpha1.GameServerList{Items: []v1alpha1.GameServer{gs}}, nil + }) ctx, cancel := context.WithCancel(context.Background()) defer cancel() + wg := sync.WaitGroup{} + wg.Add(1) - go sc.Run(ctx.Done()) + step := 2 * time.Second + + go func() { + err := sc.Run(ctx.Done()) + assert.Nil(t, err) + // gate + assert.Equal(t, 1*time.Second, sc.healthTimeout) + wg.Done() + }() testHTTPHealth(t, "http://localhost:8080/healthz", "ok", http.StatusOK) testHTTPHealth(t, "http://localhost:8080/gshealthz", "ok", http.StatusOK) - step := 2 * time.Second + + assert.Equal(t, now, sc.healthLastUpdated) + fc.Step(step) time.Sleep(step) + assert.False(t, sc.healthy()) testHTTPHealth(t, "http://localhost:8080/gshealthz", "", http.StatusInternalServerError) + cancel() + wg.Wait() // wait for go routine test results. } func TestSDKServerConvert(t *testing.T) { @@ -469,21 +508,6 @@ func TestSDKServerSendGameServerUpdate(t *testing.T) { assert.Equal(t, fixture.ObjectMeta.Name, sdkGS.ObjectMeta.Name) } -func waitConnectedStreamCount(sc *SDKServer, count int) error { - return wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) { - sc.streamMutex.RLock() - defer sc.streamMutex.RUnlock() - return len(sc.connectedStreams) == count, nil - }) -} - -func asyncWatchGameServer(t *testing.T, sc *SDKServer, stream *gameServerMockStream) { - go func() { - err := sc.WatchGameServer(&sdk.Empty{}, stream) - assert.Nil(t, err) - }() -} - func TestSDKServerUpdateEventHandler(t *testing.T) { t.Parallel() m := agtesting.NewMocks() @@ -523,9 +547,8 @@ func TestSDKServerUpdateEventHandler(t *testing.T) { assert.Equal(t, fixture.ObjectMeta.Name, sdkGS.ObjectMeta.Name) } -func defaultSidecar(mocks agtesting.Mocks) (*SDKServer, error) { - return NewSDKServer("test", "default", - true, 5*time.Second, 1, 0, mocks.KubeClient, mocks.AgonesClient) +func defaultSidecar(m agtesting.Mocks) (*SDKServer, error) { + return NewSDKServer("test", "default", m.KubeClient, m.AgonesClient) } func waitForMessage(sc *SDKServer) error { @@ -535,3 +558,18 @@ func waitForMessage(sc *SDKServer) error { return sc.clock.Now().UTC() == sc.healthLastUpdated, nil }) } + +func waitConnectedStreamCount(sc *SDKServer, count int) error { + return wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) { + sc.streamMutex.RLock() + defer sc.streamMutex.RUnlock() + return len(sc.connectedStreams) == count, nil + }) +} + +func asyncWatchGameServer(t *testing.T, sc *SDKServer, stream *gameServerMockStream) { + go func() { + err := sc.WatchGameServer(&sdk.Empty{}, stream) + assert.Nil(t, err) + }() +}