Skip to content

Commit

Permalink
Update stateful_set_utils.go (#1136)
Browse files Browse the repository at this point in the history
* Update stateful_set_utils.go

Fix all unavailable pod update new version fail

Signed-off-by: linsongzheng <linsongzheng@onething.net>

* Fix will be create one more pod

Signed-off-by: linsongzheng <linsongzheng@onething.net>

* add scaleStrategy e2e test

Signed-off-by: linsongzheng <linsongzheng@onething.net>

* add log

Signed-off-by: linsongzheng <linsongzheng@onething.net>

* add log

Signed-off-by: linsongzheng <linsongzheng@onething.net>

Signed-off-by: linsongzheng <linsongzheng@onething.net>
Co-authored-by: linsongzheng <linsongzheng@onething.net>
  • Loading branch information
ivanszl and linsongzheng authored Jan 3, 2023
1 parent 7b23f68 commit 20b74ba
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 4 deletions.
31 changes: 28 additions & 3 deletions pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,21 +542,35 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
status.UpdatedReplicas++
}
// if the set does not allow bursting, return immediately
if monotonic || decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
if monotonic {
return &status, nil
} else if decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
klog.V(4).Infof(
"StatefulSet %s/%s Pod %s is Creating, and break pods scale",
set.Namespace,
set.Name,
replicas[i].Name)
break
}
// pod created, no more work possible for this round
continue
}
// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
if isTerminating(replicas[i]) && (monotonic || decreaseAndCheckMaxUnavailable(scaleMaxUnavailable)) {
if isTerminating(replicas[i]) && monotonic {
klog.V(4).Infof(
"StatefulSet %s/%s is waiting for Pod %s to Terminate",
set.Namespace,
set.Name,
replicas[i].Name)
return &status, nil
} else if isTerminating(replicas[i]) && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
klog.V(4).Infof(
"StatefulSet %s/%s Pod %s is Terminating, and break pods scale",
set.Namespace,
set.Name,
replicas[i].Name)
break
}
// Update InPlaceUpdateReady condition for pod
if res := ssc.inplaceControl.Refresh(replicas[i], nil); res.RefreshErr != nil {
Expand All @@ -571,7 +585,7 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// ordinal, are Running and Available.
if monotonic || scaleMaxUnavailable != nil {
isAvailable, waitTime := isRunningAndAvailable(replicas[i], minReadySeconds)
if !isAvailable && (monotonic || decreaseAndCheckMaxUnavailable(scaleMaxUnavailable)) {
if !isAvailable && monotonic {
if waitTime > 0 {
// make sure we check later
durationStore.Push(getStatefulSetKey(set), waitTime)
Expand All @@ -591,6 +605,17 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
replicas[i].Name)
}
return &status, nil
} else if !isAvailable && decreaseAndCheckMaxUnavailable(scaleMaxUnavailable) {
klog.V(4).Infof(
"StatefulSet %s/%s Pod %s is unavailable, and break pods scale",
set.Namespace,
set.Name,
replicas[i].Name)
if waitTime > 0 {
// make sure we check later
durationStore.Push(getStatefulSetKey(set), waitTime)
}
break
}
}
// Enforce the StatefulSet invariants
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/statefulset/stateful_set_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func getParentName(pod *v1.Pod) string {
return parent
}

// getOrdinal gets pod's ordinal. If pod has no ordinal, -1 is returned.
// getOrdinal gets pod's ordinal. If pod has no ordinal, -1 is returned.
func getOrdinal(pod *v1.Pod) int {
_, ordinal := getParentNameAndOrdinal(pod)
return ordinal
Expand Down
78 changes: 78 additions & 0 deletions test/e2e/apps/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
watchtools "k8s.io/client-go/tools/watch"
imageutils "k8s.io/kubernetes/test/utils/image"
"k8s.io/utils/pointer"

"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
Expand Down Expand Up @@ -1031,6 +1033,82 @@ var _ = SIGDescribe("StatefulSet", func() {
}
framework.ExpectEqual(*(ss.Spec.Replicas), int32(2))
})

/*
Testname: StatefulSet, ScaleStrategy
Description: StatefulSet resource MUST support the MaxUnavailable ScaleStrategy for scaling.
It only affects when create new pod, terminating pod and unavailable pod at the Parallel PodManagementPolicy.
*/
framework.ConformanceIt("Should can update pods when the statefulset scale strategy is set", func() {
ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns)
maxUnavailable := intstr.FromInt(2)
ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 3, nil, nil, labels)
ss.Spec.Template.Spec.Containers[0].Name = "busybox"
ss.Spec.Template.Spec.Containers[0].Image = BusyboxImage
ss.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "3600"}
ss.Spec.PodManagementPolicy = apps.ParallelPodManagement
ss.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyAlways
ss.Spec.UpdateStrategy.RollingUpdate = &appsv1beta1.RollingUpdateStatefulSetStrategy{
MinReadySeconds: pointer.Int32(3),
PodUpdatePolicy: appsv1beta1.InPlaceIfPossiblePodUpdateStrategyType,
}
ss.Spec.ScaleStrategy = &appsv1beta1.StatefulSetScaleStrategy{MaxUnavailable: &maxUnavailable}
ss.Spec.Template.Spec.ReadinessGates = append(ss.Spec.Template.Spec.ReadinessGates, v1.PodReadinessGate{ConditionType: appspub.InPlaceUpdateReady})
sst := framework.NewStatefulSetTester(c, kc)
// sst.SetHTTPProbe(ss)
ss, err := kc.AppsV1beta1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{})
framework.ExpectNoError(err)
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)

ginkgo.By("Scaling up stateful set " + ssName + " to 10 replicas and check create new pod equal MaxUnavailable")
sst.UpdateReplicas(ss, 10)
sst.ConfirmStatefulPodCount(5, ss, time.Second, false)
sst.WaitForRunningAndReady(10, ss)

ginkgo.By("Confirming that stateful can update all pods to be unhealthy")
maxUnavailable = intstr.FromString("100%")
ss, err = framework.UpdateStatefulSetWithRetries(kc, ns, ss.Name, func(update *appsv1beta1.StatefulSet) {
update.Spec.ScaleStrategy.MaxUnavailable = &maxUnavailable
update.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable = &intstr.IntOrString{Type: intstr.String, StrVal: "100%"}
update.Spec.Template.Spec.Containers[0].Command = []string{}
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
sst.WaitForRunningAndNotReady(10, ss)
sst.WaitForStatusReadyReplicas(ss, 0)
ss = sst.WaitForStatus(ss)

ginkgo.By("Confirming that stateful can update all pods if any stateful pod is unhealthy")

ss, err = framework.UpdateStatefulSetWithRetries(kc, ns, ss.Name, func(update *appsv1beta1.StatefulSet) {
update.Spec.Template.Labels["test-update"] = "yes"
update.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "180"}
})
sst.WaitForRunningAndReady(10, ss)
sst.WaitForStatusReadyReplicas(ss, 10)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
var pods *v1.PodList
sst.WaitForState(ss, func(set *appsv1beta1.StatefulSet, pl *v1.PodList) (bool, error) {
ss = set
pods = pl
sst.SortStatefulPods(pods)
for i := range pods.Items {
if pods.Items[i].Labels[apps.StatefulSetRevisionLabel] != set.Status.UpdateRevision {
framework.Logf("Waiting for Pod %s/%s to have revision %s update revision %s",
pods.Items[i].Namespace,
pods.Items[i].Name,
set.Status.UpdateRevision,
pods.Items[i].Labels[apps.StatefulSetRevisionLabel])
return false, nil
}
}
return true, nil
})

ginkgo.By("Confirming Pods were updated successful")
for i := range pods.Items {
gomega.Expect(pods.Items[i].Labels["test-update"]).To(gomega.Equal("yes"))
}
})
})

//ginkgo.Describe("Deploy clustered applications [Feature:StatefulSet] [Slow]", func() {
Expand Down

0 comments on commit 20b74ba

Please sign in to comment.