Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup - no longer need to list Pods for GameServers #747

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 28 additions & 47 deletions pkg/gameservers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,6 @@ import (
"k8s.io/client-go/util/workqueue"
)

var (
errPodNotFound = errors.New("A Pod for this GameServer Was Not Found")
)

// Controller is a the main GameServer crd controller
type Controller struct {
baseLogger *logrus.Entry
Expand Down Expand Up @@ -302,7 +298,7 @@ func (c *Controller) Run(workers int, stop <-chan struct{}) error {
}

c.baseLogger.Info("Wait for cache sync")
if !cache.WaitForCacheSync(stop, c.gameServerSynced) {
if !cache.WaitForCacheSync(stop, c.gameServerSynced, c.podSynced) {
return errors.New("failed to wait for caches to sync")
}

Expand Down Expand Up @@ -389,20 +385,24 @@ func (c *Controller) syncGameServerDeletionTimestamp(gs *v1alpha1.GameServer) (*
}

c.loggerForGameServer(gs).Info("Syncing with Deletion Timestamp")
pods, err := c.listGameServerPods(gs)
if err != nil {

pod, err := c.gameServerPod(gs)
if err != nil && !k8serrors.IsNotFound(err) {
return gs, err
}

if len(pods) > 0 {
c.loggerForGameServer(gs).WithField("pods", pods).Info("Found pods, deleting")
for _, p := range pods {
err = c.podGetter.Pods(p.ObjectMeta.Namespace).Delete(p.ObjectMeta.Name, nil)
_, isDev := gs.GetDevAddress()
if pod != nil && !isDev {
// only need to do this once
if pod.ObjectMeta.DeletionTimestamp.IsZero() {
err = c.podGetter.Pods(pod.ObjectMeta.Namespace).Delete(pod.ObjectMeta.Name, nil)
if err != nil {
return gs, errors.Wrapf(err, "error deleting pod for GameServer %s, %s", gs.ObjectMeta.Name, p.ObjectMeta.Name)
return gs, errors.Wrapf(err, "error deleting pod for GameServer %s, %s", gs.ObjectMeta.Name, pod.ObjectMeta.Name)
}
c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), fmt.Sprintf("Deleting Pod %s", p.ObjectMeta.Name))
c.recorder.Event(gs, corev1.EventTypeNormal, string(gs.Status.State), fmt.Sprintf("Deleting Pod %s", pod.ObjectMeta.Name))
}

// but no removing finalizers until it's truly gone
return gs, nil
}

Expand Down Expand Up @@ -456,18 +456,18 @@ func (c *Controller) syncGameServerCreatingState(gs *v1alpha1.GameServer) (*v1al
c.loggerForGameServer(gs).Info("Syncing Create State")

// Maybe something went wrong, and the pod was created, but the state was never moved to Starting, so let's check
ret, err := c.listGameServerPods(gs)
if err != nil {
return nil, err
}

if len(ret) == 0 {
_, err := c.gameServerPod(gs)
if k8serrors.IsNotFound(err) {
gs, err = c.createGameServerPod(gs)
if err != nil || gs.Status.State == v1alpha1.GameServerStateError {
return gs, err
}
}

if err != nil {
return nil, errors.WithStack(err)
}

gsCopy := gs.DeepCopy()
gsCopy.Status.State = v1alpha1.GameServerStateStarting
gs, err = c.gameServerGetter.GameServers(gs.ObjectMeta.Namespace).Update(gsCopy)
Expand Down Expand Up @@ -703,7 +703,7 @@ func (c *Controller) syncGameServerRequestReadyState(gs *v1alpha1.GameServer) (*
if gs.Status.NodeName == "" {
addressPopulated = true
pod, err := c.gameServerPod(gs)
// errPodNotFound should never happen, and if it does -- something bad happened,
// NotFound should never happen, and if it does -- something bad happened,
// so go into workerqueue backoff.
if err != nil {
return nil, err
Expand Down Expand Up @@ -766,38 +766,19 @@ func (c *Controller) gameServerPod(gs *v1alpha1.GameServer) (*corev1.Pod, error)
if _, isDev := gs.GetDevAddress(); isDev {
return &corev1.Pod{}, nil
}
pods, err := c.listGameServerPods(gs)
if err != nil {
return nil, err
}
len := len(pods)
if len == 0 {
return nil, errPodNotFound
}
if len > 1 {
return nil, errors.Errorf("Found %d pods for Game Server %s", len, gs.ObjectMeta.Name)
}
return pods[0], nil
}

// listGameServerPods returns all the Pods that the GameServer created.
// This should only ever be one.
func (c *Controller) listGameServerPods(gs *v1alpha1.GameServer) ([]*corev1.Pod, error) {
pods, err := c.podLister.List(labels.SelectorFromSet(labels.Set{v1alpha1.GameServerPodLabel: gs.ObjectMeta.Name}))
if err != nil {
return pods, errors.Wrapf(err, "error checking if pod exists for GameServer %s", gs.Name)
pod, err := c.podLister.Pods(gs.ObjectMeta.Namespace).Get(gs.ObjectMeta.Name)

// if not found, propagate this error up, so we can use it in checks
if k8serrors.IsNotFound(err) {
return nil, err
}

// there is a small chance that the GameServer name is not unique, and a Pod for a previous
// GameServer is has yet to Terminate so check its controller, just to be sure.
var result []*corev1.Pod
for _, p := range pods {
if metav1.IsControlledBy(p, gs) {
result = append(result, p)
}
if !metav1.IsControlledBy(pod, gs) {
return nil, k8serrors.NewNotFound(corev1.Resource("pod"), gs.ObjectMeta.Name)
}

return result, nil
return pod, errors.Wrapf(err, "error retrieving pod for GameServer %s", gs.ObjectMeta.Name)
}

// address returns the IP that the given Pod is being run on
Expand Down
149 changes: 103 additions & 46 deletions pkg/gameservers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ func TestControllerSyncGameServerDeletionTimestamp(t *testing.T) {
fixture.ApplyDefaults()
pod, err := fixture.Pod()
assert.Nil(t, err)
pod.ObjectMeta.Name = pod.ObjectMeta.GenerateName

deleted := false
mocks.KubeClient.AddReactor("list", "pods", func(action k8stesting.Action) (bool, runtime.Object, error) {
Expand All @@ -471,7 +470,7 @@ func TestControllerSyncGameServerDeletionTimestamp(t *testing.T) {
defer cancel()

result, err := c.syncGameServerDeletionTimestamp(fixture)
assert.Nil(t, err)
assert.NoError(t, err)
assert.True(t, deleted, "pod should be deleted")
assert.Equal(t, fixture, result)
assert.Equal(t, fmt.Sprintf("%s %s %s", corev1.EventTypeNormal,
Expand Down Expand Up @@ -505,6 +504,37 @@ func TestControllerSyncGameServerDeletionTimestamp(t *testing.T) {
assert.Equal(t, fixture.ObjectMeta.Name, result.ObjectMeta.Name)
assert.Empty(t, result.ObjectMeta.Finalizers)
})

t.Run("Local development GameServer", func(t *testing.T) {
c, mocks := newFakeController()
now := metav1.Now()
fixture := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "test", Namespace: "default",
Annotations: map[string]string{v1alpha1.DevAddressAnnotation: "1.1.1.1"},
DeletionTimestamp: &now},
Spec: newSingleContainerSpec()}
fixture.ApplyDefaults()

updated := false
mocks.AgonesClient.AddReactor("update", "gameservers", func(action k8stesting.Action) (bool, runtime.Object, error) {
updated = true

ua := action.(k8stesting.UpdateAction)
gs := ua.GetObject().(*v1alpha1.GameServer)
assert.Equal(t, fixture.ObjectMeta.Name, gs.ObjectMeta.Name)
assert.Empty(t, gs.ObjectMeta.Finalizers)

return true, gs, nil
})

_, cancel := agtesting.StartInformers(mocks, c.gameServerSynced)
defer cancel()

result, err := c.syncGameServerDeletionTimestamp(fixture)
assert.Nil(t, err)
assert.True(t, updated, "gameserver should be updated, to remove the finaliser")
assert.Equal(t, fixture.ObjectMeta.Name, result.ObjectMeta.Name)
assert.Empty(t, result.ObjectMeta.Finalizers)
})
}

func TestControllerSyncGameServerPortAllocationState(t *testing.T) {
Expand Down Expand Up @@ -1082,54 +1112,81 @@ func TestControllerAddress(t *testing.T) {
func TestControllerGameServerPod(t *testing.T) {
t.Parallel()

c, mocks := newFakeController()
fakeWatch := watch.NewFake()
mocks.KubeClient.AddWatchReactor("pods", k8stesting.DefaultWatchReactor(fakeWatch, nil))
gs := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gameserver", UID: "1234"}, Spec: newSingleContainerSpec()}
gs.ApplyDefaults()
pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{v1alpha1.GameServerPodLabel: gs.ObjectMeta.Name}}}
setup := func() (*Controller, *v1alpha1.GameServer, *watch.FakeWatcher, <-chan struct{}, context.CancelFunc) {
c, mocks := newFakeController()
fakeWatch := watch.NewFake()
mocks.KubeClient.AddWatchReactor("pods", k8stesting.DefaultWatchReactor(fakeWatch, nil))
stop, cancel := agtesting.StartInformers(mocks, c.gameServerSynced)
gs := &v1alpha1.GameServer{ObjectMeta: metav1.ObjectMeta{Name: "gameserver",
Namespace: defaultNs, UID: "1234"}, Spec: newSingleContainerSpec()}
gs.ApplyDefaults()
return c, gs, fakeWatch, stop, cancel
}

stop, cancel := agtesting.StartInformers(mocks, c.gameServerSynced)
defer cancel()
t.Run("no pod exists", func(t *testing.T) {
c, gs, _, stop, cancel := setup()
defer cancel()

cache.WaitForCacheSync(stop, c.gameServerSynced)
_, err := c.gameServerPod(gs)
assert.Error(t, err)
assert.True(t, k8serrors.IsNotFound(err))
})

_, err := c.gameServerPod(gs)
assert.Equal(t, errPodNotFound, err)
t.Run("a pod exists", func(t *testing.T) {
c, gs, fakeWatch, stop, cancel := setup()

// not owned
fakeWatch.Add(pod.DeepCopy())
cache.WaitForCacheSync(stop, c.gameServerSynced)
_, err = c.gameServerPod(gs)
assert.Equal(t, errPodNotFound, err)
defer cancel()
pod, err := gs.Pod()
assert.Nil(t, err)

// owned
ownedPod, err := gs.Pod()
assert.Nil(t, err)
ownedPod.ObjectMeta.Name = "owned1"
fakeWatch.Add(ownedPod)
cache.WaitForCacheSync(stop, c.gameServerSynced)
// should be fine
pod2, err := c.gameServerPod(gs)
assert.Nil(t, err)
assert.Equal(t, ownedPod, pod2)

// add another non-owned pod
p2 := pod.DeepCopy()
p2.ObjectMeta.Name = "pod2"
fakeWatch.Add(p2)
cache.WaitForCacheSync(stop, c.gameServerSynced)
// should still be fine
pod2, err = c.gameServerPod(gs)
assert.Nil(t, err)
assert.Equal(t, ownedPod, pod2)

// now add another owned pod
p3 := ownedPod.DeepCopy()
p3.ObjectMeta.Name = "pod3"
fakeWatch.Add(p3)
cache.WaitForCacheSync(stop, c.gameServerSynced)
// should error out
_, err = c.gameServerPod(gs)
assert.NotNil(t, err)
fakeWatch.Add(pod.DeepCopy())
cache.WaitForCacheSync(stop, c.gameServerSynced)
pod2, err := c.gameServerPod(gs)
assert.NoError(t, err)
assert.Equal(t, pod, pod2)

fakeWatch.Delete(pod.DeepCopy())
cache.WaitForCacheSync(stop, c.gameServerSynced)
_, err = c.gameServerPod(gs)
assert.Error(t, err)
assert.True(t, k8serrors.IsNotFound(err))
})

t.Run("a pod exists, but isn't owned by the gameserver", func(t *testing.T) {
c, gs, fakeWatch, stop, cancel := setup()
defer cancel()

pod := &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: gs.ObjectMeta.Name, Labels: map[string]string{v1alpha1.GameServerPodLabel: gs.ObjectMeta.Name, "owned": "false"}}}
fakeWatch.Add(pod.DeepCopy())

// gate
cache.WaitForCacheSync(stop, c.podSynced)
pod, err := c.podGetter.Pods(defaultNs).Get(pod.ObjectMeta.Name, metav1.GetOptions{})
assert.NoError(t, err)
assert.NotNil(t, pod)

_, err = c.gameServerPod(gs)
assert.Error(t, err)
assert.True(t, k8serrors.IsNotFound(err))
})

t.Run("dev gameserver pod", func(t *testing.T) {
c, _ := newFakeController()

gs := &v1alpha1.GameServer{
ObjectMeta: metav1.ObjectMeta{Name: "gameserver", Namespace: defaultNs,
Annotations: map[string]string{
v1alpha1.DevAddressAnnotation: "1.1.1.1",
},
UID: "1234"},

Spec: newSingleContainerSpec()}

pod, err := c.gameServerPod(gs)
assert.NoError(t, err)
assert.Empty(t, pod.ObjectMeta.Name)
})
}

func TestControllerAddGameServerHealthCheck(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ func (s *SDKServer) Allocate(context.Context, *sdk.Empty) (*sdk.Empty, error) {

// if a contention, and we are under the timeout period.
if k8serrors.IsConflict(err) {
// TODO: write a test for this
if s.clock.Since(now) > defaultTimeout {
return true, errors.New("Allocation request timed out")
}
Expand Down
21 changes: 18 additions & 3 deletions test/e2e/gameserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
e2eframework "agones.dev/agones/test/e2e/framework"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestUnhealthyGameServersWithoutFreePorts(t *testing.T) {

_, err = gameServers.Get(newGs.Name, metav1.GetOptions{})
assert.NotNil(t, err)
assert.True(t, errors.IsNotFound(err))
assert.True(t, k8serrors.IsNotFound(err))
}

func TestGameServerUnhealthyAfterDeletingPod(t *testing.T) {
Expand Down Expand Up @@ -240,9 +240,24 @@ func TestDevelopmentGameServerLifecycle(t *testing.T) {
if err != nil {
t.Fatalf("Could not get a GameServer ready: %v", err)
}
defer framework.AgonesClient.StableV1alpha1().GameServers(defaultNs).Delete(readyGs.ObjectMeta.Name, nil) // nolint: errcheck

assert.Equal(t, readyGs.Status.State, v1alpha1.GameServerStateReady)

//confirm delete works, because if the finalisers don't get removed, this won't work.
err = framework.AgonesClient.StableV1alpha1().GameServers(defaultNs).Delete(readyGs.ObjectMeta.Name, nil)
assert.NoError(t, err)

err = wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
_, err = framework.AgonesClient.StableV1alpha1().GameServers(defaultNs).Get(readyGs.ObjectMeta.Name, metav1.GetOptions{})

if k8serrors.IsNotFound(err) {
return true, nil
}

return false, err
})

assert.NoError(t, err)
}

func TestGameServerSelfAllocate(t *testing.T) {
Expand Down