From 5c941c88eb8a35e4d5198c195fbd67709f5314cb Mon Sep 17 00:00:00 2001 From: "mingzhou.swx" Date: Tue, 20 Jun 2023 10:57:44 +0800 Subject: [PATCH] preparing update pod as update pod Signed-off-by: mingzhou.swx --- .../cloneset/sync/cloneset_scale.go | 4 +- .../cloneset/sync/cloneset_sync_utils.go | 5 +- .../cloneset/sync/cloneset_sync_utils_test.go | 24 +++ .../cloneset/utils/cloneset_utils.go | 12 ++ .../statefulset/stateful_set_utils.go | 3 +- .../statefulset/stateful_update_utils.go | 3 +- .../statefulset/stateful_update_utils_test.go | 32 ++++ pkg/features/kruise_features.go | 5 + pkg/util/revision/revision.go | 38 ++++ pkg/util/revision/revision_test.go | 177 ++++++++++++++++++ 10 files changed, 297 insertions(+), 6 deletions(-) create mode 100644 pkg/util/revision/revision.go create mode 100644 pkg/util/revision/revision_test.go diff --git a/pkg/controller/cloneset/sync/cloneset_scale.go b/pkg/controller/cloneset/sync/cloneset_scale.go index edfdb9969c..656313f5e5 100644 --- a/pkg/controller/cloneset/sync/cloneset_scale.go +++ b/pkg/controller/cloneset/sync/cloneset_scale.go @@ -69,7 +69,7 @@ func (r *realControl) Scale( // 2. calculate scale numbers diffRes := calculateDiffsWithExpectation(updateCS, pods, currentRevision, updateRevision) - updatedPods, notUpdatedPods := clonesetutils.SplitPodsByRevision(pods, updateRevision) + updatedPods, notUpdatedPods := clonesetutils.GroupUpdateAndNotUpdatePods(pods, updateRevision) if diffRes.scaleUpNum > diffRes.scaleUpLimit { r.recorder.Event(updateCS, v1.EventTypeWarning, "ScaleUpLimited", fmt.Sprintf("scaleUp is limited because of scaleStrategy.maxUnavailable, limit: %d", diffRes.scaleUpLimit)) @@ -107,7 +107,7 @@ func (r *realControl) Scale( // 5. specified delete if podsToDelete := util.DiffPods(podsSpecifiedToDelete, podsInPreDelete); len(podsToDelete) > 0 { - newPodsToDelete, oldPodsToDelete := clonesetutils.SplitPodsByRevision(podsToDelete, updateRevision) + newPodsToDelete, oldPodsToDelete := clonesetutils.GroupUpdateAndNotUpdatePods(podsToDelete, updateRevision) klog.V(3).Infof("CloneSet %s try to delete pods specified. Delete ready limit: %d. New Pods: %v, old Pods: %v.", controllerKey, diffRes.deleteReadyLimit, util.GetPodNames(newPodsToDelete).List(), util.GetPodNames(oldPodsToDelete).List()) diff --git a/pkg/controller/cloneset/sync/cloneset_sync_utils.go b/pkg/controller/cloneset/sync/cloneset_sync_utils.go index d4839069e2..bee7372714 100644 --- a/pkg/controller/cloneset/sync/cloneset_sync_utils.go +++ b/pkg/controller/cloneset/sync/cloneset_sync_utils.go @@ -24,11 +24,11 @@ import ( appspub "github.com/openkruise/kruise/apis/apps/pub" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core" - clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils" "github.com/openkruise/kruise/pkg/features" "github.com/openkruise/kruise/pkg/util" utilfeature "github.com/openkruise/kruise/pkg/util/feature" "github.com/openkruise/kruise/pkg/util/lifecycle" + "github.com/openkruise/kruise/pkg/util/revision" "github.com/openkruise/kruise/pkg/util/specifieddelete" v1 "k8s.io/api/core/v1" intstrutil "k8s.io/apimachinery/pkg/util/intstr" @@ -128,7 +128,8 @@ func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, cu }() for _, p := range pods { - if clonesetutils.EqualToRevisionHash("", p, updateRevision) { + if revision.IsPodUpdate(p, updateRevision) { + newRevisionCount++ switch state := lifecycle.GetPodLifecycleState(p); state { diff --git a/pkg/controller/cloneset/sync/cloneset_sync_utils_test.go b/pkg/controller/cloneset/sync/cloneset_sync_utils_test.go index df8a0f3214..b1514e5022 100644 --- a/pkg/controller/cloneset/sync/cloneset_sync_utils_test.go +++ b/pkg/controller/cloneset/sync/cloneset_sync_utils_test.go @@ -904,8 +904,32 @@ func TestCalculateDiffsWithExpectation(t *testing.T) { }, expectResult: expectationDiffs{}, }, + { + name: "[scalingWithPreparingUpdate=true] scaling up when a preparing pod is not updated, and expected-updated is 1", + set: createTestCloneSet(4, intstr.FromString("90%"), intstr.FromInt(1), intstr.FromInt(0)), + setLabels: map[string]string{appsv1alpha1.CloneSetScalingExcludePreparingDeleteKey: "true"}, + pods: []*v1.Pod{ + createTestPod(oldRevision, appspub.LifecycleStatePreparingUpdate, true, false), + createTestPod(oldRevision, appspub.LifecycleStateNormal, true, false), + createTestPod(oldRevision, appspub.LifecycleStateNormal, true, false), + }, + expectResult: expectationDiffs{scaleUpNum: 1, scaleUpLimit: 1, scaleUpNumOldRevision: 1}, + }, + { + name: "[scalingWithPreparingUpdate=true] scaling up when a preparing pod is not updated, and expected-updated is 2", + set: createTestCloneSet(4, intstr.FromInt(2), intstr.FromInt(1), intstr.FromInt(0)), + setLabels: map[string]string{appsv1alpha1.CloneSetScalingExcludePreparingDeleteKey: "true"}, + pods: []*v1.Pod{ + createTestPod(oldRevision, appspub.LifecycleStatePreparingUpdate, true, false), + createTestPod(oldRevision, appspub.LifecycleStateNormal, true, false), + createTestPod(oldRevision, appspub.LifecycleStateNormal, true, false), + }, + expectResult: expectationDiffs{scaleUpNum: 1, scaleUpLimit: 1}, + }, } + defer utilfeature.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreparingUpdateAsUpdate, true)() + for i := range cases { t.Run(cases[i].name, func(t *testing.T) { if cases[i].disableFeatureGate { diff --git a/pkg/controller/cloneset/utils/cloneset_utils.go b/pkg/controller/cloneset/utils/cloneset_utils.go index 5037f8d604..051f14c263 100644 --- a/pkg/controller/cloneset/utils/cloneset_utils.go +++ b/pkg/controller/cloneset/utils/cloneset_utils.go @@ -28,6 +28,7 @@ import ( "github.com/openkruise/kruise/pkg/util/expectations" utilfeature "github.com/openkruise/kruise/pkg/util/feature" "github.com/openkruise/kruise/pkg/util/requeueduration" + "github.com/openkruise/kruise/pkg/util/revision" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -142,6 +143,17 @@ func SplitPodsByRevision(pods []*v1.Pod, rev string) (matched, unmatched []*v1.P return } +func GroupUpdateAndNotUpdatePods(pods []*v1.Pod, updateRevision string) (update, notUpdate []*v1.Pod) { + for _, p := range pods { + if revision.IsPodUpdate(p, updateRevision) { + update = append(update, p) + } else { + notUpdate = append(notUpdate, p) + } + } + return +} + // UpdateStorage insert volumes generated by cs.Spec.VolumeClaimTemplates into Pod. func UpdateStorage(cs *appsv1alpha1.CloneSet, pod *v1.Pod) { currentVolumes := pod.Spec.Volumes diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index d8ea3d97c8..d457a13c86 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -40,6 +40,7 @@ import ( appspub "github.com/openkruise/kruise/apis/apps/pub" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" "github.com/openkruise/kruise/pkg/util/lifecycle" + "github.com/openkruise/kruise/pkg/util/revision" ) var patchCodec = scheme.Codecs.LegacyCodec(appsv1beta1.SchemeGroupVersion) @@ -486,7 +487,7 @@ func isCurrentRevisionNeeded(set *appsv1beta1.StatefulSet, updateRevision string if pod == nil || i == ordinal { continue } - if getPodRevision(pod) != updateRevision { + if !revision.IsPodUpdate(pod, updateRevision) { noUpdatedReplicas++ } } diff --git a/pkg/controller/statefulset/stateful_update_utils.go b/pkg/controller/statefulset/stateful_update_utils.go index ffa4d22d28..20fb2b3448 100644 --- a/pkg/controller/statefulset/stateful_update_utils.go +++ b/pkg/controller/statefulset/stateful_update_utils.go @@ -20,6 +20,7 @@ import ( v1 "k8s.io/api/core/v1" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" + "github.com/openkruise/kruise/pkg/util/revision" "github.com/openkruise/kruise/pkg/util/updatesort" ) @@ -54,7 +55,7 @@ func sortPodsToUpdate(rollingUpdateStrategy *appsv1beta1.RollingUpdateStatefulSe } if isTerminating(replicas[target]) { updatedIdxs = append(updatedIdxs, target) - } else if getPodRevision(replicas[target]) == updateRevision { + } else if revision.IsPodUpdate(replicas[target], updateRevision) { updatedIdxs = append(updatedIdxs, target) } else { waitUpdateIdxs = append(waitUpdateIdxs, target) diff --git a/pkg/controller/statefulset/stateful_update_utils_test.go b/pkg/controller/statefulset/stateful_update_utils_test.go index 4594b67730..9ef953827f 100644 --- a/pkg/controller/statefulset/stateful_update_utils_test.go +++ b/pkg/controller/statefulset/stateful_update_utils_test.go @@ -26,6 +26,8 @@ import ( appspub "github.com/openkruise/kruise/apis/apps/pub" appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1" + "github.com/openkruise/kruise/pkg/features" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" ) func TestSortPodsToUpdate(t *testing.T) { @@ -125,8 +127,38 @@ func TestSortPodsToUpdate(t *testing.T) { }, expected: []int{8, 7, 11, 9}, }, + { + strategy: &appsv1beta1.RollingUpdateStatefulSetStrategy{ + UnorderedUpdate: &appsv1beta1.UnorderedUpdateStrategy{PriorityStrategy: &appspub.UpdatePriorityStrategy{ + WeightPriority: []appspub.UpdatePriorityWeightTerm{ + {Weight: 20, MatchSelector: metav1.LabelSelector{MatchLabels: map[string]string{"k": "v1"}}}, + {Weight: 10, MatchSelector: metav1.LabelSelector{MatchLabels: map[string]string{"k": "v2"}}}, + }, + }}, + Partition: func() *int32 { var i int32 = 6; return &i }(), + }, + updateRevision: "r1", + totalReplicas: 10, + replicas: []*v1.Pod{ + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0", appspub.LifecycleStateKey: string(appspub.LifecycleStatePreparingUpdate)}}}, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0", appspub.LifecycleStateKey: string(appspub.LifecycleStatePreparingUpdate)}}}, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0"}}}, + nil, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0"}}}, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0"}}}, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0"}}}, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r1"}}}, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r1"}}}, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0"}}}, + nil, + {ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{apps.ControllerRevisionHashLabelKey: "r0"}}}, + }, + expected: []int{8, 7, 1, 0}, + }, } + defer utilfeature.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreparingUpdateAsUpdate, true)() + for i, tc := range cases { res := sortPodsToUpdate(tc.strategy, tc.updateRevision, tc.totalReplicas, tc.replicas) if !reflect.DeepEqual(res, tc.expected) { diff --git a/pkg/features/kruise_features.go b/pkg/features/kruise_features.go index 493b7b9b23..89947701b5 100644 --- a/pkg/features/kruise_features.go +++ b/pkg/features/kruise_features.go @@ -101,6 +101,10 @@ const ( // CloneSetEventHandlerOptimization enable optimization for cloneset-controller to reduce the // queuing frequency cased by pod update. CloneSetEventHandlerOptimization featuregate.Feature = "CloneSetEventHandlerOptimization" + + // PreparingUpdateAsUpdate enable CloneSet/Advanced StatefulSet controller to regard preparing-update Pod + // as updated pod when calculating update/current revision when updating and scaling Pods. + PreparingUpdateAsUpdate featuregate.Feature = "PreparingUpdateAsUpdate" ) var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ @@ -124,6 +128,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ PodProbeMarkerGate: {Default: true, PreRelease: featuregate.Alpha}, PreDownloadImageForDaemonSetUpdate: {Default: false, PreRelease: featuregate.Alpha}, CloneSetEventHandlerOptimization: {Default: false, PreRelease: featuregate.Alpha}, + PreparingUpdateAsUpdate: {Default: false, PreRelease: featuregate.Alpha}, } func init() { diff --git a/pkg/util/revision/revision.go b/pkg/util/revision/revision.go new file mode 100644 index 0000000000..2c0ee7600e --- /dev/null +++ b/pkg/util/revision/revision.go @@ -0,0 +1,38 @@ +package revision + +import ( + "strings" + + appspub "github.com/openkruise/kruise/apis/apps/pub" + "github.com/openkruise/kruise/pkg/features" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" + "github.com/openkruise/kruise/pkg/util/lifecycle" + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" +) + +// IsPodUpdate return true when: +// - Pod controller-revision-hash equals to updateRevision; +// - Pod at preparing update state if PreparingUpdateAsUpdate feature-gated is enabled. +func IsPodUpdate(pod *v1.Pod, updateRevision string) bool { + if utilfeature.DefaultFeatureGate.Enabled(features.PreparingUpdateAsUpdate) && + lifecycle.GetPodLifecycleState(pod) == appspub.LifecycleStatePreparingUpdate { + return true + } + return equalToRevisionHash("", pod, updateRevision) +} + +func equalToRevisionHash(s string, pod *v1.Pod, hash string) bool { + objHash := pod.GetLabels()[apps.ControllerRevisionHashLabelKey] + if objHash == hash { + return true + } + return getShortHash(hash) == getShortHash(objHash) +} + +func getShortHash(hash string) string { + // This makes sure the real hash must be the last '-' substring of revision name + // vendor/k8s.io/kubernetes/pkg/controller/history/controller_history.go#82 + list := strings.Split(hash, "-") + return list[len(list)-1] +} diff --git a/pkg/util/revision/revision_test.go b/pkg/util/revision/revision_test.go new file mode 100644 index 0000000000..543e9edbfc --- /dev/null +++ b/pkg/util/revision/revision_test.go @@ -0,0 +1,177 @@ +package revision + +import ( + "github.com/openkruise/kruise/pkg/features" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" + "testing" + + appspub "github.com/openkruise/kruise/apis/apps/pub" + apps "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestIsPodUpdate(t *testing.T) { + cases := []struct { + name string + hash string + pod *v1.Pod + updated bool + }{ + { + name: "normal state, long hash, updated", + hash: "app-name-new", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + apps.ControllerRevisionHashLabelKey: "app-name-new", + appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal), + }, + }, + }, + updated: true, + }, + { + name: "normal state, long and short hash, updated", + hash: "app-name-new", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + apps.ControllerRevisionHashLabelKey: "new", + appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal), + }, + }, + }, + updated: true, + }, + { + name: "normal state, short and long hash, updated", + hash: "new", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + apps.ControllerRevisionHashLabelKey: "app-name-new", + appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal), + }, + }, + }, + updated: true, + }, + { + name: "normal state, short hash, updated", + hash: "new", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + apps.ControllerRevisionHashLabelKey: "new", + appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal), + }, + }, + }, + updated: true, + }, + { + name: "normal state, long hash, not updated", + hash: "app-name-old", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + apps.ControllerRevisionHashLabelKey: "app-name-new", + appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal), + }, + }, + }, + updated: false, + }, + { + name: "normal state, short hash, not updated", + hash: "old", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + apps.ControllerRevisionHashLabelKey: "new", + appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal), + }, + }, + }, + updated: false, + }, + { + name: "preparing-update state, old revision, long hash, updated", + hash: "app-name-old", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + apps.ControllerRevisionHashLabelKey: "app-name-new", + appspub.LifecycleStateKey: string(appspub.LifecycleStatePreparingUpdate), + }, + }, + }, + updated: true, + }, + { + name: "preparing-update state, old revision, long and short hash, updated", + hash: "app-name-old", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + apps.ControllerRevisionHashLabelKey: "new", + appspub.LifecycleStateKey: string(appspub.LifecycleStatePreparingUpdate), + }, + }, + }, + updated: true, + }, + { + name: "preparing-update state, old revision, short and long hash, updated", + hash: "old", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + apps.ControllerRevisionHashLabelKey: "app-name-new", + appspub.LifecycleStateKey: string(appspub.LifecycleStatePreparingUpdate), + }, + }, + }, + updated: true, + }, + { + name: "preparing-update state, old revision, short hash, updated", + hash: "old", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + apps.ControllerRevisionHashLabelKey: "new", + appspub.LifecycleStateKey: string(appspub.LifecycleStatePreparingUpdate), + }, + }, + }, + updated: true, + }, + { + name: "preparing-update state, new revision, long hash, updated", + hash: "app-name-new", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + apps.ControllerRevisionHashLabelKey: "app-name-new", + appspub.LifecycleStateKey: string(appspub.LifecycleStatePreparingUpdate), + }, + }, + }, + updated: true, + }, + { + name: "preparing-update state, new vision short hash, updated", + hash: "new", + pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + apps.ControllerRevisionHashLabelKey: "new", + appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal), + }, + }, + }, + updated: true, + }, + } + + defer utilfeature.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PreparingUpdateAsUpdate, true)() + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + got := IsPodUpdate(cs.pod, cs.hash) + if got != cs.updated { + t.Fatalf("expect %v, but got %v", cs.updated, got) + } + }) + } +}