Skip to content

Commit

Permalink
Revert #3070, wait on networking a different way (#3107)
Browse files Browse the repository at this point in the history
* Revert #3070, wait on networking a different way

The problem addressed by #3070 is that on an indeterminate basis, we
are seeing containers start without networking fully available. Once
networking seems to work, it works fine.

However, the fix in #3070 introduced a downside: heavy watch traffic,
because I didn't quite understand that it would also block the hanging
GET of the watch. See #3106.

Instead of timing out the whole client, let's use an initial-probe
approach and instead block on a successful GET (with a reasonable
timeout) before we try to start informers.

Along the way: fix nil pointer deref when TestPingHTTP fails

Fixes #3106
  • Loading branch information
zmerlynn authored Apr 18, 2023
1 parent a38ea13 commit e61be47
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 20 deletions.
7 changes: 0 additions & 7 deletions cmd/sdk-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,6 @@ func main() {
if err != nil {
logger.WithError(err).Fatal("Could not create in cluster config")
}
// The SDK client only ever accesses small amounts of data (single object list /
// event updates), latency more than a couple of seconds is excessive. We need to
// keep a relatively tight timeout during initialization as well to allow the
// informer a chance to retry - the SDK won't reply to /healthz checks until the
// informer has synced once, and our liveness configuration only allows 9s before
// a positive /healthz.
config.Timeout = 3 * time.Second

var kubeClient *kubernetes.Clientset
kubeClient, err = kubernetes.NewForConfig(config)
Expand Down
39 changes: 39 additions & 0 deletions pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
k8sv1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down Expand Up @@ -80,6 +81,7 @@ type SDKServer struct {
gameServerGetter typedv1.GameServersGetter
gameServerLister listersv1.GameServerLister
gameServerSynced cache.InformerSynced
connected bool
server *http.Server
clock clock.Clock
health agonesv1.Health
Expand Down Expand Up @@ -190,6 +192,9 @@ func NewSDKServer(gameServerName, namespace string, kubeClient kubernetes.Interf
// Run processes the rate limited queue.
// Will block until stop is closed
func (s *SDKServer) Run(ctx context.Context) error {
if err := s.waitForConnection(ctx); err != nil {
return err
}
s.informerFactory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), s.gameServerSynced) {
return errors.New("failed to wait for caches to sync")
Expand Down Expand Up @@ -263,6 +268,40 @@ func (s *SDKServer) Run(ctx context.Context) error {
return nil
}

// waitForConnection attempts a GameServer GET every 3s until the client responds.
// This is a workaround for the informer hanging indefinitely on first LIST due
// to a flaky network to the Kubernetes service endpoint.
func (s *SDKServer) waitForConnection(ctx context.Context) error {
// In normal operaiton, waitForConnection is called exactly once in Run().
// In unit tests, waitForConnection() can be called before Run() to ensure
// that connected is true when Run() is called, otherwise the List() below
// may race with a test that changes a mock. (Despite the fact that we drop
// the data on the ground, the Go race detector will pereceive a data race.)
if s.connected {
return nil
}

try := 0
return wait.PollImmediateInfiniteWithContext(ctx, 4*time.Second, func(ctx context.Context) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

// Specifically use gameServerGetter since it's the raw client (gameServerLister is the informer).
// We use List here to avoid needing permission to Get().
_, err := s.gameServerGetter.GameServers(s.namespace).List(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", s.gameServerName).String(),
})
if err != nil {
s.logger.WithField("try", try).WithError(err).Info("Connection to Kubernetes service failed")
try++
return false, nil
}
s.logger.WithField("try", try).Info("Connection to Kubernetes service established")
s.connected = true
return true, nil
})
}

// syncGameServer synchronises the GameServer with the requested operations.
// The format of the key is {operation}. To prevent old operation data from
// overwriting the new one, the operation data is persisted in SDKServer.
Expand Down
18 changes: 14 additions & 4 deletions pkg/sdkserver/sdkserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ func TestSidecarRun(t *testing.T) {
sc, err := NewSDKServer("test", "default", m.KubeClient, m.AgonesClient)
stop := make(chan struct{})
defer close(stop)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(stop)
assert.True(t, cache.WaitForCacheSync(stop, sc.gameServerSynced))

Expand All @@ -193,8 +197,6 @@ func TestSidecarRun(t *testing.T) {
sc.clock = v.clock
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := sync.WaitGroup{}
wg.Add(1)

Expand Down Expand Up @@ -459,16 +461,17 @@ func TestSidecarUnhealthyMessage(t *testing.T) {
return true, gs, nil
})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stop := make(chan struct{})
defer close(stop)

assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(stop)
assert.True(t, cache.WaitForCacheSync(stop, sc.gameServerSynced))

sc.recorder = m.FakeRecorder

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
err := sc.Run(ctx)
assert.Nil(t, err)
Expand Down Expand Up @@ -878,6 +881,7 @@ func TestSDKServerReserveTimeoutOnRun(t *testing.T) {
require.NoError(t, err)

ctx, cancel := context.WithCancel(context.Background())
assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(ctx.Done())
assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))

Expand Down Expand Up @@ -933,6 +937,7 @@ func TestSDKServerReserveTimeout(t *testing.T) {

assert.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(ctx.Done())
assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))

Expand Down Expand Up @@ -1061,6 +1066,7 @@ func TestSDKServerPlayerCapacity(t *testing.T) {
return true, gs, nil
})

assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(ctx.Done())
assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))

Expand Down Expand Up @@ -1126,6 +1132,7 @@ func TestSDKServerPlayerConnectAndDisconnectWithoutPlayerTracking(t *testing.T)
sc, err := defaultSidecar(m)
require.NoError(t, err)

assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(ctx.Done())
assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))

Expand Down Expand Up @@ -1210,6 +1217,7 @@ func TestSDKServerPlayerConnectAndDisconnect(t *testing.T) {
return true, gs, nil
})

assert.NoError(t, sc.waitForConnection(ctx))
sc.informerFactory.Start(ctx.Done())
assert.True(t, cache.WaitForCacheSync(ctx.Done(), sc.gameServerSynced))

Expand Down Expand Up @@ -1376,6 +1384,7 @@ func TestSDKServerGracefulTerminationInterrupt(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
sdkCtx := sc.NewSDKServerContext(ctx)
assert.NoError(t, sc.waitForConnection(sdkCtx))
sc.informerFactory.Start(sdkCtx.Done())
assert.True(t, cache.WaitForCacheSync(sdkCtx.Done(), sc.gameServerSynced))

Expand Down Expand Up @@ -1444,6 +1453,7 @@ func TestSDKServerGracefulTerminationShutdown(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
sdkCtx := sc.NewSDKServerContext(ctx)
assert.NoError(t, sc.waitForConnection(sdkCtx))
sc.informerFactory.Start(sdkCtx.Done())
assert.True(t, cache.WaitForCacheSync(sdkCtx.Done(), sc.gameServerSynced))

Expand Down
19 changes: 10 additions & 9 deletions test/e2e/ping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
Expand All @@ -35,25 +36,25 @@ func TestPingHTTP(t *testing.T) {

kubeCore := framework.KubeClient.CoreV1()
svc, err := kubeCore.Services("agones-system").Get(ctx, "agones-ping-http-service", metav1.GetOptions{})
assert.Nil(t, err)
require.NoError(t, err)

ip, err := externalIP(t, kubeCore, svc)
assert.Nil(t, err)
require.NoError(t, err)

port := svc.Spec.Ports[0]
// gate
assert.Equal(t, "http", port.Name)
assert.Equal(t, corev1.ProtocolTCP, port.Protocol)
p, err := externalPort(svc, port)
assert.Nil(t, err)
require.NoError(t, err)

response, err := http.Get(fmt.Sprintf("http://%s:%d", ip, p))
assert.Nil(t, err)
require.NoError(t, err)
defer response.Body.Close() // nolint: errcheck

assert.Equal(t, http.StatusOK, response.StatusCode)
body, err := io.ReadAll(response.Body)
assert.Nil(t, err)
require.NoError(t, err)
assert.Equal(t, []byte("ok"), body)
}

Expand All @@ -75,21 +76,21 @@ func TestPingUDP(t *testing.T) {

kubeCore := framework.KubeClient.CoreV1()
svc, err := kubeCore.Services("agones-system").Get(ctx, "agones-ping-udp-service", metav1.GetOptions{})
assert.Nil(t, err)
require.NoError(t, err)

externalIP, err := externalIP(t, kubeCore, svc)
assert.Nil(t, err)
require.NoError(t, err)

port := svc.Spec.Ports[0]
// gate
assert.Equal(t, "udp", port.Name)
assert.Equal(t, corev1.ProtocolUDP, port.Protocol)
p, err := externalPort(svc, port)
assert.Nil(t, err)
require.NoError(t, err)

expected := "hello"
reply, err := framework.SendUDP(t, fmt.Sprintf("%s:%d", externalIP, p), expected)
assert.Nil(t, err)
require.NoError(t, err)
assert.Equal(t, expected, reply)
}

Expand Down

0 comments on commit e61be47

Please sign in to comment.