Skip to content

Commit

Permalink
Revert #3070, wait on networking a different way
Browse files Browse the repository at this point in the history
The problem addressed by #3070 is that on an indeterminate basis, we
are seeing containers start without networking fully available. Once
networking seems to work, it works fine.

However, the fix in #3070 introduced a downside: heavy watch traffic,
because I didn't quite understand that it would also block the hanging
GET of the watch. See #3106.

Instead of timing out the whole client, let's use an initial-probe
approach and instead block on a successful GET (with a reasonable
timeout) before we try to start informers.

Fixes #3106
  • Loading branch information
zmerlynn committed Apr 18, 2023
1 parent 18d1e8f commit e90ce40
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 11 deletions.
7 changes: 0 additions & 7 deletions cmd/sdk-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,6 @@ func main() {
if err != nil {
logger.WithError(err).Fatal("Could not create in cluster config")
}
// The SDK client only ever accesses small amounts of data (single object list /
// event updates), latency more than a couple of seconds is excessive. We need to
// keep a relatively tight timeout during initialization as well to allow the
// informer a chance to retry - the SDK won't reply to /healthz checks until the
// informer has synced once, and our liveness configuration only allows 9s before
// a positive /healthz.
config.Timeout = 3 * time.Second

var kubeClient *kubernetes.Clientset
kubeClient, err = kubernetes.NewForConfig(config)
Expand Down
36 changes: 36 additions & 0 deletions pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type SDKServer struct {
gameServerGetter typedv1.GameServersGetter
gameServerLister listersv1.GameServerLister
gameServerSynced cache.InformerSynced
connected bool
server *http.Server
clock clock.Clock
health agonesv1.Health
Expand Down Expand Up @@ -190,6 +191,9 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf
// Run processes the rate limited queue.
// Will block until stop is closed
func (s *SDKServer) Run(ctx context.Context) error {
if err := s.waitForConnection(ctx); err != nil {
return err
}
s.informerFactory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), s.gameServerSynced) {
return errors.New("failed to wait for caches to sync")
Expand Down Expand Up @@ -263,6 +267,38 @@ func (s *SDKServer) Run(ctx context.Context) error {
return nil
}

// waitForConnection attempts a GameServer GET every 3s until the client responds.
// This is a workaround for the informer hanging indefinitely on first LIST due
// to a flaky network to the Kubernetes service endpoint.
func (s *SDKServer) waitForConnection(ctx context.Context) error {
if s.connected {
return nil
}

getWithDeadline := func(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

// Specifically use gameServerGetter since it's the raw client (gameServerLister is the informer).
// We use List here to avoid needing permission to Get().
_, err := s.gameServerGetter.GameServers(s.namespace).List(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", s.gameServerName).String(),
})
return err
}

for try := 0; ctx.Err() == nil; try++ {
err := getWithDeadline(ctx)
if err == nil {
s.logger.WithField("try", try).Info("Kubernetes connection established")
s.connected = true
return nil
}
s.logger.WithField("try", try).WithError(err).Info("Kubernetes connection failed")
}
return ctx.Err()
}

// syncGameServer synchronises the GameServer with the requested operations.
// The format of the key is {operation}. To prevent old operation data from
// overwriting the new one, the operation data is persisted in SDKServer.
Expand Down
18 changes: 14 additions & 4 deletions pkg/sdkserver/sdkserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ func TestSidecarRun(t *testing.T) {
sc, err := NewSDKServer("test", "default", m.KubeClient, m.AgonesClient)
stop := make(chan struct{})
defer close(stop)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(stop)
assert.True(t, cache.WaitForCacheSync(stop, sc.gameServerSynced))

Expand All @@ -193,8 +197,6 @@ func TestSidecarRun(t *testing.T) {
sc.clock = v.clock
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := sync.WaitGroup{}
wg.Add(1)

Expand Down Expand Up @@ -459,16 +461,17 @@ func TestSidecarUnhealthyMessage(t *testing.T) {
return true, gs, nil
})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stop := make(chan struct{})
defer close(stop)

assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(stop)
assert.True(t, cache.WaitForCacheSync(stop, sc.gameServerSynced))

sc.recorder = m.FakeRecorder

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
err := sc.Run(ctx)
assert.Nil(t, err)
Expand Down Expand Up @@ -878,6 +881,7 @@ func TestSDKServerReserveTimeoutOnRun(t *testing.T) {
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(ctx.Done())
assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))

Expand Down Expand Up @@ -933,6 +937,7 @@ func TestSDKServerReserveTimeout(t *testing.T) {

assert.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(ctx.Done())
assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))

Expand Down Expand Up @@ -1061,6 +1066,7 @@ func TestSDKServerPlayerCapacity(t *testing.T) {
return true, gs, nil
})

assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(ctx.Done())
assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))

Expand Down Expand Up @@ -1126,6 +1132,7 @@ func TestSDKServerPlayerConnectAndDisconnectWithoutPlayerTracking(t *testing.T)
sc, err := defaultSidecar(m)
require.NoError(t, err)

assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(ctx.Done())
assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))

Expand Down Expand Up @@ -1210,6 +1217,7 @@ func TestSDKServerPlayerConnectAndDisconnect(t *testing.T) {
return true, gs, nil
})

assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(ctx.Done())
assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))

Expand Down Expand Up @@ -1376,6 +1384,7 @@ func TestSDKServerGracefulTerminationInterrupt(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
sdkCtx := sc.NewSDKServerContext(ctx)
assert.NoError(t, sc.waitForConnection(sdkCtx))
sc.informerFactory.Start(sdkCtx.Done())
assert.True(t, cache.WaitForCacheSync(sdkCtx.Done(), sc.gameServerSynced))

Expand Down Expand Up @@ -1444,6 +1453,7 @@ func TestSDKServerGracefulTerminationShutdown(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
sdkCtx := sc.NewSDKServerContext(ctx)
assert.NoError(t, sc.waitForConnection(sdkCtx))
sc.informerFactory.Start(sdkCtx.Done())
assert.True(t, cache.WaitForCacheSync(sdkCtx.Done(), sc.gameServerSynced))

Expand Down

0 comments on commit e90ce40

Please sign in to comment.