Skip to content

Commit

Permalink
Fix Rolling Update with Allocated GameServers (#2420)
Browse files Browse the repository at this point in the history
* e2e test to reproduce issue#2397

Fix for bug!

Co-authored-by: Mark Mandel <markmandel@google.com>
  • Loading branch information
Wouter Verlaek and markmandel committed Mar 17, 2022
1 parent ed17358 commit e4060cc
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 97 deletions.
2 changes: 1 addition & 1 deletion pkg/fleets/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ func (c *Controller) rollingUpdateRestFixedOnReady(ctx context.Context, fleet *a

// Check if we are ready to scale down
allPodsCount := agonesv1.SumSpecReplicas(allGSS)
newGSSUnavailablePodCount := active.Spec.Replicas - active.Status.ReadyReplicas
newGSSUnavailablePodCount := active.Spec.Replicas - active.Status.ReadyReplicas - active.Status.AllocatedReplicas
maxScaledDown := allPodsCount - minAvailable - newGSSUnavailablePodCount

if maxScaledDown <= 0 {
Expand Down
256 changes: 160 additions & 96 deletions test/e2e/fleet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,123 +222,187 @@ func TestFleetScaleUpEditAndScaleDown(t *testing.T) {
func TestFleetRollingUpdate(t *testing.T) {
t.Parallel()
ctx := context.Background()
// Use scaleFleetPatch (true) or scaleFleetSubresource (false)
fixtures := []bool{true, false}
maxSurge := []string{"25%", "10%"}
fixtures := []struct {
// Use scaleFleetPatch (true) or scaleFleetSubresource (false)
usePatch bool
maxSurge string
// If true, create and cycle GS Allocations when triggering a rolling update:
// - Repeatedly allocate and shutdown GameServers (keeping ~50% of the Fleet in an Allocated state).
// - Once 50% of the Fleet is Allocated, trigger the rolling update.
// - Keep allocating/shutting down GameServers, to allocate in both the old and new GSSets.
// - Verify the rolling update completes.
// This simulates updating a Fleet that is live/in-use, and reproduces an issue where allocated GameServers
// causes a rolling update to get stuck and keep the old GameServerSet up.
cycleAllocations bool
}{
{
usePatch: true,
maxSurge: "25%",
cycleAllocations: false,
},
{
usePatch: true,
maxSurge: "10%",
cycleAllocations: false,
},
{
usePatch: false,
maxSurge: "25%",
cycleAllocations: false,
},
{
usePatch: false,
maxSurge: "10%",
cycleAllocations: false,
},
{
usePatch: true,
maxSurge: "25%",
cycleAllocations: true,
},
}
for i := range fixtures {
fixture := fixtures[i]
t.Run(fmt.Sprintf("Use fleet Patch %t %s cycle %t", fixture.usePatch, fixture.maxSurge, fixture.cycleAllocations), func(t *testing.T) {
t.Parallel()

for _, usePatch := range fixtures {
for _, maxSurgeParam := range maxSurge {
usePatch := usePatch
maxSurgeParam := maxSurgeParam
t.Run(fmt.Sprintf("Use fleet Patch %t %s", usePatch, maxSurgeParam), func(t *testing.T) {
t.Parallel()

client := framework.AgonesClient.AgonesV1()

flt := defaultFleet(framework.Namespace)
flt.ApplyDefaults()
flt.Spec.Replicas = 1
rollingUpdatePercent := intstr.FromString(maxSurgeParam)
flt.Spec.Strategy.RollingUpdate.MaxSurge = &rollingUpdatePercent
flt.Spec.Strategy.RollingUpdate.MaxUnavailable = &rollingUpdatePercent

flt, err := client.Fleets(framework.Namespace).Create(ctx, flt, metav1.CreateOptions{})
if assert.Nil(t, err) {
defer client.Fleets(framework.Namespace).Delete(ctx, flt.ObjectMeta.Name, metav1.DeleteOptions{}) // nolint:errcheck
}
client := framework.AgonesClient.AgonesV1()

assert.Equal(t, int32(1), flt.Spec.Replicas)
assert.Equal(t, maxSurgeParam, flt.Spec.Strategy.RollingUpdate.MaxSurge.StrVal)
assert.Equal(t, maxSurgeParam, flt.Spec.Strategy.RollingUpdate.MaxUnavailable.StrVal)
flt := defaultFleet(framework.Namespace)
flt.ApplyDefaults()
flt.Spec.Replicas = 1
rollingUpdatePercent := intstr.FromString(fixture.maxSurge)
flt.Spec.Strategy.RollingUpdate.MaxSurge = &rollingUpdatePercent
flt.Spec.Strategy.RollingUpdate.MaxUnavailable = &rollingUpdatePercent

framework.AssertFleetCondition(t, flt, e2e.FleetReadyCount(flt.Spec.Replicas))
flt, err := client.Fleets(framework.Namespace).Create(ctx, flt, metav1.CreateOptions{})
require.NoError(t, err)
defer client.Fleets(framework.Namespace).Delete(ctx, flt.ObjectMeta.Name, metav1.DeleteOptions{}) // nolint:errcheck

// scale up
const targetScale = 8
if usePatch {
flt = scaleFleetPatch(ctx, t, flt, targetScale)
assert.Equal(t, int32(targetScale), flt.Spec.Replicas)
} else {
flt = scaleFleetSubresource(ctx, t, flt, targetScale)
}
assert.Equal(t, int32(1), flt.Spec.Replicas)
assert.Equal(t, fixture.maxSurge, flt.Spec.Strategy.RollingUpdate.MaxSurge.StrVal)
assert.Equal(t, fixture.maxSurge, flt.Spec.Strategy.RollingUpdate.MaxUnavailable.StrVal)

framework.AssertFleetCondition(t, flt, e2e.FleetReadyCount(targetScale))
framework.AssertFleetCondition(t, flt, e2e.FleetReadyCount(flt.Spec.Replicas))

flt, err = client.Fleets(framework.Namespace).Get(ctx, flt.ObjectMeta.GetName(), metav1.GetOptions{})
assert.NoError(t, err)
// scale up
const targetScale = 8
if fixture.usePatch {
flt = scaleFleetPatch(ctx, t, flt, targetScale)
assert.Equal(t, int32(targetScale), flt.Spec.Replicas)
} else {
flt = scaleFleetSubresource(ctx, t, flt, targetScale)
}

framework.AssertFleetCondition(t, flt, e2e.FleetReadyCount(targetScale))

flt, err = client.Fleets(framework.Namespace).Get(ctx, flt.ObjectMeta.GetName(), metav1.GetOptions{})
require.NoError(t, err)

cycleCtx, cancelCycle := context.WithCancel(ctx)
defer cancelCycle()
if fixture.cycleAllocations {
// Repeatedly cycle allocations to keep ~half of the GameServers Allocated. Repeatedly Allocate and
// delete such that both the old and new GSSet contain allocated GameServers.
const halfScale = targetScale / 2
const period = 3 * time.Second
go framework.CycleAllocations(cycleCtx, t, flt, period, period*halfScale)

// Wait for at least half of the fleet to have be cycled (either Allocated or shutting down)
// before updating the fleet.
err = framework.WaitForFleetCondition(t, flt, func(entry *logrus.Entry, fleet *agonesv1.Fleet) bool {
return fleet.Status.ReadyReplicas < halfScale
})
}

// Change ContainerPort to trigger creating a new GSSet
// Change ContainerPort to trigger creating a new GSSet. Retry in case of a conflict.
fltName := flt.GetName()
require.Eventually(t, func() bool {
flt, err = client.Fleets(framework.Namespace).Get(ctx, fltName, metav1.GetOptions{})
require.NoError(t, err)
fltCopy := flt.DeepCopy()
fltCopy.Spec.Template.Spec.Ports[0].ContainerPort++
flt, err = client.Fleets(framework.Namespace).Update(ctx, fltCopy, metav1.UpdateOptions{})
assert.NoError(t, err)
return err == nil
}, time.Minute, time.Second)

selector := labels.SelectorFromSet(labels.Set{agonesv1.FleetNameLabel: flt.ObjectMeta.Name})
// New GSS was created
err = wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) {
gssList, err := framework.AgonesClient.AgonesV1().GameServerSets(framework.Namespace).List(ctx,
metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return false, err
}
return len(gssList.Items) == 2, nil
})
assert.NoError(t, err)
// Check that total number of gameservers in the system does not exceed the RollingUpdate
// parameters (creating no more than maxSurge, deleting maxUnavailable servers at a time)
// Wait for old GSSet to be deleted
err = wait.PollImmediate(1*time.Second, 5*time.Minute, func() (bool, error) {
list, err := framework.AgonesClient.AgonesV1().GameServers(framework.Namespace).List(ctx,
metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return false, err
}
selector := labels.SelectorFromSet(labels.Set{agonesv1.FleetNameLabel: flt.ObjectMeta.Name})
// New GSS was created
err = wait.PollImmediate(1*time.Second, 30*time.Second, func() (bool, error) {
gssList, err := framework.AgonesClient.AgonesV1().GameServerSets(framework.Namespace).List(ctx,
metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return false, err
}
return len(gssList.Items) == 2, nil
})
assert.NoError(t, err)
// Check that total number of gameservers in the system does not exceed the RollingUpdate
// parameters (creating no more than maxSurge, deleting maxUnavailable servers at a time)
// Wait for old GSSet to be deleted
err = wait.PollImmediate(1*time.Second, 5*time.Minute, func() (bool, error) {
list, err := framework.AgonesClient.AgonesV1().GameServers(framework.Namespace).List(ctx,
metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return false, err
}

maxSurge, err := intstr.GetValueFromIntOrPercent(flt.Spec.Strategy.RollingUpdate.MaxSurge, int(flt.Spec.Replicas), true)
assert.Nil(t, err)
maxSurge, err := intstr.GetValueFromIntOrPercent(flt.Spec.Strategy.RollingUpdate.MaxSurge, int(flt.Spec.Replicas), true)
assert.Nil(t, err)

roundUp := false
maxUnavailable, err := intstr.GetValueFromIntOrPercent(flt.Spec.Strategy.RollingUpdate.MaxUnavailable, int(flt.Spec.Replicas), roundUp)
roundUp := false
maxUnavailable, err := intstr.GetValueFromIntOrPercent(flt.Spec.Strategy.RollingUpdate.MaxUnavailable, int(flt.Spec.Replicas), roundUp)

if maxUnavailable == 0 {
maxUnavailable = 1
}
// This difference is inevitable, also could be seen with Deployments and ReplicaSets
shift := maxUnavailable
assert.Nil(t, err)
if maxUnavailable == 0 {
maxUnavailable = 1
}
// This difference is inevitable, also could be seen with Deployments and ReplicaSets
shift := maxUnavailable
assert.Nil(t, err)

expectedTotal := targetScale + maxSurge + maxUnavailable + shift
if len(list.Items) > expectedTotal {
err = fmt.Errorf("new replicas should be less than target + maxSurge + maxUnavailable + shift. Replicas: %d, Expected: %d", len(list.Items), expectedTotal)
}
if err != nil {
return false, err
}
gssList, err := framework.AgonesClient.AgonesV1().GameServerSets(framework.Namespace).List(ctx,
metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return false, err
// Ignore any GameServers that are shutting down (resulting from Allocation cycling).
shuttingDown := 0
for _, gs := range list.Items {
if gs.IsBeingDeleted() {
shuttingDown++
}
return len(gssList.Items) == 1, nil
})
}
expectedTotal := targetScale + maxSurge + maxUnavailable + shift + shuttingDown
if len(list.Items) > expectedTotal {
err = fmt.Errorf("new replicas should be less than target + maxSurge + maxUnavailable + shift + shuttingDown. Replicas: %d, Expected: %d", len(list.Items), expectedTotal)
}
if err != nil {
return false, err
}
gssList, err := framework.AgonesClient.AgonesV1().GameServerSets(framework.Namespace).List(ctx,
metav1.ListOptions{LabelSelector: selector.String()})
if err != nil {
return false, err
}
return len(gssList.Items) == 1, nil
})

assert.NoError(t, err)
assert.NoError(t, err)

// scale down, with allocation
const scaleDownTarget = 1
if usePatch {
flt = scaleFleetPatch(ctx, t, flt, scaleDownTarget)
} else {
flt = scaleFleetSubresource(ctx, t, flt, scaleDownTarget)
}
// Stop cycling Allocations.
// The AssertFleetConditions below will wait until the Allocation cycling has
// fully stopped (when all Allocated GameServers are shut down).
cancelCycle()

// scale down, with allocation
const scaleDownTarget = 1
if fixture.usePatch {
flt = scaleFleetPatch(ctx, t, flt, scaleDownTarget)
} else {
flt = scaleFleetSubresource(ctx, t, flt, scaleDownTarget)
}

framework.AssertFleetCondition(t, flt, e2e.FleetReadyCount(1))
framework.AssertFleetCondition(t, flt, e2e.FleetReadyCount(1))

framework.AssertFleetCondition(t, flt, func(log *logrus.Entry, fleet *agonesv1.Fleet) bool {
return fleet.Status.AllocatedReplicas == 0
})
framework.AssertFleetCondition(t, flt, func(log *logrus.Entry, fleet *agonesv1.Fleet) bool {
return fleet.Status.AllocatedReplicas == 0
})
}
})
}
}

Expand Down
27 changes: 27 additions & 0 deletions test/e2e/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,33 @@ func (f *Framework) WaitForGameServerState(t *testing.T, gs *agonesv1.GameServer
state, gs.Namespace, gs.Name)
}

// CycleAllocations repeatedly Allocates a GameServer in the Fleet (if one is available), once every specified period.
// Each Allocated GameServer gets deleted allocDuration after it was Allocated.
// GameServers will continue to be Allocated until a message is passed to the done channel.
func (f *Framework) CycleAllocations(ctx context.Context, t *testing.T, flt *agonesv1.Fleet, period time.Duration, allocDuration time.Duration) {
err := wait.PollImmediateUntil(period, func() (bool, error) {
gsa := GetAllocation(flt)
gsa, err := f.AgonesClient.AllocationV1().GameServerAllocations(flt.Namespace).Create(context.Background(), gsa, metav1.CreateOptions{})
if err != nil || gsa.Status.State != allocationv1.GameServerAllocationAllocated {
// Ignore error. Could be that the buffer was empty, will try again next cycle.
return false, nil
}

// Deallocate after allocDuration.
go func(gsa *allocationv1.GameServerAllocation) {
time.Sleep(allocDuration)
err := f.AgonesClient.AgonesV1().GameServers(gsa.Namespace).Delete(context.Background(), gsa.Status.GameServerName, metav1.DeleteOptions{})
require.NoError(t, err)
}(gsa)

return false, nil
}, ctx.Done())
// Ignore wait timeout error, will always be returned when the context is cancelled at the end of the test.
if err != wait.ErrWaitTimeout {
require.NoError(t, err)
}
}

// AssertFleetCondition waits for the Fleet to be in a specific condition or fails the test if the condition can't be met in 5 minutes.
func (f *Framework) AssertFleetCondition(t *testing.T, flt *agonesv1.Fleet, condition func(*logrus.Entry, *agonesv1.Fleet) bool) {
err := f.WaitForFleetCondition(t, flt, condition)
Expand Down

0 comments on commit e4060cc

Please sign in to comment.