diff --git a/cmd/sdk-server/main.go b/cmd/sdk-server/main.go index 8627a1221b..9a54646dbd 100644 --- a/cmd/sdk-server/main.go +++ b/cmd/sdk-server/main.go @@ -122,13 +122,6 @@ func main() { if err != nil { logger.WithError(err).Fatal("Could not create in cluster config") } - // The SDK client only ever accesses small amounts of data (single object list / - // event updates), latency more than a couple of seconds is excessive. We need to - // keep a relatively tight timeout during initialization as well to allow the - // informer a chance to retry - the SDK won't reply to /healthz checks until the - // informer has synced once, and our liveness configuration only allows 9s before - // a positive /healthz. - config.Timeout = 3 * time.Second var kubeClient *kubernetes.Clientset kubeClient, err = kubernetes.NewForConfig(config) diff --git a/pkg/sdkserver/sdkserver.go b/pkg/sdkserver/sdkserver.go index b751b6033b..bdbfef3516 100644 --- a/pkg/sdkserver/sdkserver.go +++ b/pkg/sdkserver/sdkserver.go @@ -80,6 +80,7 @@ type SDKServer struct { gameServerGetter typedv1.GameServersGetter gameServerLister listersv1.GameServerLister gameServerSynced cache.InformerSynced + connected bool server *http.Server clock clock.Clock health agonesv1.Health @@ -190,6 +191,9 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf // Run processes the rate limited queue. // Will block until stop is closed func (s *SDKServer) Run(ctx context.Context) error { + if err := s.waitForConnection(ctx); err != nil { + return err + } s.informerFactory.Start(ctx.Done()) if !cache.WaitForCacheSync(ctx.Done(), s.gameServerSynced) { return errors.New("failed to wait for caches to sync") @@ -263,6 +267,38 @@ func (s *SDKServer) Run(ctx context.Context) error { return nil } +// waitForConnection attempts a GameServer GET every 3s until the client responds. +// This is a workaround for the informer hanging indefinitely on first LIST due +// to a flaky network to the Kubernetes service endpoint. +func (s *SDKServer) waitForConnection(ctx context.Context) error { + if s.connected { + return nil + } + + getWithDeadline := func(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, 3*time.Second) + defer cancel() + + // Specifically use gameServerGetter since it's the raw client (gameServerLister is the informer). + // We use List here to avoid needing permission to Get(). + _, err := s.gameServerGetter.GameServers(s.namespace).List(ctx, metav1.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", s.gameServerName).String(), + }) + return err + } + + for try := 0; ctx.Err() == nil; try++ { + err := getWithDeadline(ctx) + if err == nil { + s.logger.WithField("try", try).Info("Kubernetes connection established") + s.connected = true + return nil + } + s.logger.WithField("try", try).WithError(err).Info("Kubernetes connection failed") + } + return ctx.Err() +} + // syncGameServer synchronises the GameServer with the requested operations. // The format of the key is {operation}. To prevent old operation data from // overwriting the new one, the operation data is persisted in SDKServer. diff --git a/pkg/sdkserver/sdkserver_test.go b/pkg/sdkserver/sdkserver_test.go index d3b04cc602..7865424341 100644 --- a/pkg/sdkserver/sdkserver_test.go +++ b/pkg/sdkserver/sdkserver_test.go @@ -184,6 +184,10 @@ func TestSidecarRun(t *testing.T) { sc, err := NewSDKServer("test", "default", m.KubeClient, m.AgonesClient) stop := make(chan struct{}) defer close(stop) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + assert.NoError(t, sc.waitForConnection(ctx)) sc.informerFactory.Start(stop) assert.True(t, cache.WaitForCacheSync(stop, sc.gameServerSynced)) @@ -193,8 +197,6 @@ func TestSidecarRun(t *testing.T) { sc.clock = v.clock } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() wg := sync.WaitGroup{} wg.Add(1) @@ -459,16 +461,17 @@ func TestSidecarUnhealthyMessage(t *testing.T) { return true, gs, nil }) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() stop := make(chan struct{}) defer close(stop) + assert.NoError(t, sc.waitForConnection(ctx)) sc.informerFactory.Start(stop) assert.True(t, cache.WaitForCacheSync(stop, sc.gameServerSynced)) sc.recorder = m.FakeRecorder - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() go func() { err := sc.Run(ctx) assert.Nil(t, err) @@ -878,6 +881,7 @@ func TestSDKServerReserveTimeoutOnRun(t *testing.T) { require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) + assert.NoError(t, sc.waitForConnection(ctx)) sc.informerFactory.Start(ctx.Done()) assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced)) @@ -933,6 +937,7 @@ func TestSDKServerReserveTimeout(t *testing.T) { assert.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) + assert.NoError(t, sc.waitForConnection(ctx)) sc.informerFactory.Start(ctx.Done()) assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced)) @@ -1061,6 +1066,7 @@ func TestSDKServerPlayerCapacity(t *testing.T) { return true, gs, nil }) + assert.NoError(t, sc.waitForConnection(ctx)) sc.informerFactory.Start(ctx.Done()) assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced)) @@ -1126,6 +1132,7 @@ func TestSDKServerPlayerConnectAndDisconnectWithoutPlayerTracking(t *testing.T) sc, err := defaultSidecar(m) require.NoError(t, err) + assert.NoError(t, sc.waitForConnection(ctx)) sc.informerFactory.Start(ctx.Done()) assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced)) @@ -1210,6 +1217,7 @@ func TestSDKServerPlayerConnectAndDisconnect(t *testing.T) { return true, gs, nil }) + assert.NoError(t, sc.waitForConnection(ctx)) sc.informerFactory.Start(ctx.Done()) assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced)) @@ -1376,6 +1384,7 @@ func TestSDKServerGracefulTerminationInterrupt(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) sdkCtx := sc.NewSDKServerContext(ctx) + assert.NoError(t, sc.waitForConnection(sdkCtx)) sc.informerFactory.Start(sdkCtx.Done()) assert.True(t, cache.WaitForCacheSync(sdkCtx.Done(), sc.gameServerSynced)) @@ -1444,6 +1453,7 @@ func TestSDKServerGracefulTerminationShutdown(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) sdkCtx := sc.NewSDKServerContext(ctx) + assert.NoError(t, sc.waitForConnection(sdkCtx)) sc.informerFactory.Start(sdkCtx.Done()) assert.True(t, cache.WaitForCacheSync(sdkCtx.Done(), sc.gameServerSynced))