Skip to content

Commit

Permalink
CloneSet supports to calculate scale number excluding Pods in Prepari…
Browse files Browse the repository at this point in the history
…ngDelete

Signed-off-by: FillZpp <FillZpp.pub@gmail.com>
  • Loading branch information
FillZpp committed Jul 15, 2022
1 parent c29b0c1 commit 02c6cde
Show file tree
Hide file tree
Showing 5 changed files with 494 additions and 75 deletions.
4 changes: 4 additions & 0 deletions apis/apps/v1alpha1/cloneset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ const (

// DefaultCloneSetMaxUnavailable is the default value of maxUnavailable for CloneSet update strategy.
DefaultCloneSetMaxUnavailable = "20%"

// CloneSetScalingExcludePreparingDeleteKey is the label key that enables scalingExcludePreparingDelete
// only for this CloneSet, which means it will calculate scale number excluding Pods in PreparingDelete state.
CloneSetScalingExcludePreparingDeleteKey = "apps.kruise.io/cloneset-scaling-exclude-preparing-delete"
)

// CloneSetSpec defines the desired state of CloneSet
Expand Down
37 changes: 13 additions & 24 deletions pkg/controller/cloneset/sync/cloneset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,16 @@ func (r *realControl) Scale(
diffRes := calculateDiffsWithExpectation(updateCS, pods, currentRevision, updateRevision)
updatedPods, notUpdatedPods := clonesetutils.SplitPodsByRevision(pods, updateRevision)

if diffRes.scaleNum > 0 && diffRes.scaleNum > diffRes.scaleUpLimit {
if diffRes.scaleUpNum > diffRes.scaleUpLimit {
r.recorder.Event(updateCS, v1.EventTypeWarning, "ScaleUpLimited", fmt.Sprintf("scaleUp is limited because of scaleStrategy.maxUnavailable, limit: %d", diffRes.scaleUpLimit))
}

// 3. scale out
if diffRes.scaleNum > 0 && diffRes.scaleUpLimit > 0 {
if diffRes.scaleUpNum > 0 {
// total number of this creation
expectedCreations := diffRes.scaleUpLimit
// lack number of current version
expectedCurrentCreations := 0
if diffRes.scaleNumOldRevision > 0 {
expectedCurrentCreations = diffRes.scaleNumOldRevision
}
expectedCurrentCreations := diffRes.scaleUpNumOldRevision

klog.V(3).Infof("CloneSet %s begin to scale out %d pods including %d (current rev)",
controllerKey, expectedCreations, expectedCurrentCreations)
Expand Down Expand Up @@ -110,44 +107,36 @@ func (r *realControl) Scale(
// 5. specified delete
if podsToDelete := util.DiffPods(podsSpecifiedToDelete, podsInPreDelete); len(podsToDelete) > 0 {
newPodsToDelete, oldPodsToDelete := clonesetutils.SplitPodsByRevision(podsToDelete, updateRevision)
klog.V(3).Infof("CloneSet %s try to delete pods specified. Delete ready limit: %d. Pods: %v, %v.",
klog.V(3).Infof("CloneSet %s try to delete pods specified. Delete ready limit: %d. New Pods: %v, old Pods: %v.",
controllerKey, diffRes.deleteReadyLimit, util.GetPodNames(newPodsToDelete).List(), util.GetPodNames(oldPodsToDelete).List())

podsToDelete = make([]*v1.Pod, 0, len(podsToDelete))
for _, pod := range newPodsToDelete {
if !isPodReady(coreControl, pod) {
podsToDelete = append(podsToDelete, pod)
} else if diffRes.deleteReadyLimit > 0 {
podsToDelete = append(podsToDelete, pod)
diffRes.deleteReadyLimit--
}
}
for _, pod := range oldPodsToDelete {
podsCanDelete := make([]*v1.Pod, 0, len(podsToDelete))
for _, pod := range podsToDelete {
if !isPodReady(coreControl, pod) {
podsToDelete = append(podsToDelete, pod)
podsCanDelete = append(podsCanDelete, pod)
} else if diffRes.deleteReadyLimit > 0 {
podsToDelete = append(podsToDelete, pod)
podsCanDelete = append(podsCanDelete, pod)
diffRes.deleteReadyLimit--
}
}

if modified, err := r.deletePods(updateCS, podsToDelete, pvcs); err != nil || modified {
if modified, err := r.deletePods(updateCS, podsCanDelete, pvcs); err != nil || modified {
return modified, err
}
}

// 6. scale in
if diffRes.scaleNum < 0 {
if diffRes.scaleDownNum > 0 {
if numToDelete > 0 {
klog.V(3).Infof("CloneSet %s skip to scale in %d for %d to delete, including %d specified and %d preDelete",
controllerKey, diffRes.scaleNum, numToDelete, len(podsSpecifiedToDelete), len(podsInPreDelete))
controllerKey, diffRes.scaleDownNum, numToDelete, len(podsSpecifiedToDelete), len(podsInPreDelete))
return false, nil
}

klog.V(3).Infof("CloneSet %s begin to scale in %d pods including %d (current rev), delete ready limit: %d",
controllerKey, -diffRes.scaleNum, -diffRes.scaleNumOldRevision, diffRes.deleteReadyLimit)
controllerKey, diffRes.scaleDownNum, diffRes.scaleDownNumOldRevision, diffRes.deleteReadyLimit)

podsPreparingToDelete := r.choosePodsToDelete(updateCS, -diffRes.scaleNum, -diffRes.scaleNumOldRevision, notUpdatedPods, updatedPods)
podsPreparingToDelete := r.choosePodsToDelete(updateCS, diffRes.scaleDownNum, diffRes.scaleDownNumOldRevision, notUpdatedPods, updatedPods)
podsToDelete := make([]*v1.Pod, 0, len(podsPreparingToDelete))
for _, pod := range podsPreparingToDelete {
if !isPodReady(coreControl, pod) {
Expand Down
82 changes: 58 additions & 24 deletions pkg/controller/cloneset/sync/cloneset_sync_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sync

import (
"flag"
"math"
"reflect"

Expand All @@ -35,21 +36,35 @@ import (
"k8s.io/utils/integer"
)

func init() {
flag.BoolVar(&scalingExcludePreparingDelete, "cloneset-scaling-exclude-preparing-delete", false,
"If true, CloneSet Controller will calculate scale number excluding Pods in PreparingDelete state.")
}

var (
// scalingExcludePreparingDelete indicates whether the controller should calculate scale number excluding Pods in PreparingDelete state.
scalingExcludePreparingDelete bool
)

type expectationDiffs struct {
// scaleNum is the diff number that should scale
// '0' means no need to scale
// positive number means need to scale out
// negative number means need to scale in
scaleNum int
// scaleNumOldRevision is part of the scaleNum number
// it indicates the scale number of old revision Pods
scaleNumOldRevision int
// scaleUpNum is a non-negative integer, which indicates the number that should scale up.
scaleUpNum int
// scaleNumOldRevision is a non-negative integer, which indicates the number of old revision Pods that should scale up.
// It might be bigger than scaleUpNum, but controller will scale up at most scaleUpNum number of Pods.
scaleUpNumOldRevision int
// scaleDownNum is a non-negative integer, which indicates the number that should scale down.
scaleDownNum int
// scaleDownNumOldRevision is a non-negative integer, which indicates the number of old revision Pods that should scale down.
// It might be bigger than scaleDownNum, but controller will scale down at most scaleDownNum number of Pods.
scaleDownNumOldRevision int

// scaleUpLimit is the limit number of creating Pods when scaling up
// it is limited by scaleStrategy.maxUnavailable
scaleUpLimit int
// deleteReadyLimit is the limit number of ready Pods that can be deleted
// it is limited by UpdateStrategy.maxUnavailable
deleteReadyLimit int

// useSurge is the number that temporarily expect to be above the desired replicas
useSurge int
// useSurgeOldRevision is part of the useSurge number
Expand Down Expand Up @@ -93,20 +108,20 @@ func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, cu

var newRevisionCount, newRevisionActiveCount, oldRevisionCount, oldRevisionActiveCount int
var unavailableNewRevisionCount, unavailableOldRevisionCount int
var toDeleteNewRevisionCount, toDeleteOldRevisionCount, preDeletingCount int
var toDeleteNewRevisionCount, toDeleteOldRevisionCount, preDeletingNewRevisionCount, preDeletingOldRevisionCount int
defer func() {
if res.isEmpty() {
return
}
klog.V(1).Infof("Calculate diffs for CloneSet %s/%s, replicas=%d, partition=%d, maxSurge=%d, maxUnavailable=%d,"+
" allPods=%d, newRevisionPods=%d, newRevisionActivePods=%d, oldRevisionPods=%d, oldRevisionActivePods=%d,"+
" unavailableNewRevisionCount=%d, unavailableOldRevisionCount=%d,"+
" preDeletingCount=%d, toDeleteNewRevisionCount=%d, toDeleteOldRevisionCount=%d."+
" preDeletingNewRevisionCount=%d, preDeletingOldRevisionCount=%d, toDeleteNewRevisionCount=%d, toDeleteOldRevisionCount=%d."+
" Result: %+v",
cs.Namespace, cs.Name, replicas, partition, maxSurge, maxUnavailable,
len(pods), newRevisionCount, newRevisionActiveCount, oldRevisionCount, oldRevisionActiveCount,
unavailableNewRevisionCount, unavailableOldRevisionCount,
preDeletingCount, toDeleteNewRevisionCount, toDeleteOldRevisionCount,
preDeletingNewRevisionCount, preDeletingOldRevisionCount, toDeleteNewRevisionCount, toDeleteOldRevisionCount,
res)
}()

Expand All @@ -116,7 +131,7 @@ func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, cu

switch state := lifecycle.GetPodLifecycleState(p); state {
case appspub.LifecycleStatePreparingDelete:
preDeletingCount++
preDeletingNewRevisionCount++
default:
newRevisionActiveCount++

Expand All @@ -132,7 +147,7 @@ func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, cu

switch state := lifecycle.GetPodLifecycleState(p); state {
case appspub.LifecycleStatePreparingDelete:
preDeletingCount++
preDeletingOldRevisionCount++
default:
oldRevisionActiveCount++

Expand All @@ -147,7 +162,7 @@ func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, cu

updateOldDiff := oldRevisionActiveCount - partition
updateNewDiff := newRevisionActiveCount - (replicas - partition)
totalUnavailable := preDeletingCount + unavailableNewRevisionCount + unavailableOldRevisionCount
totalUnavailable := preDeletingNewRevisionCount + preDeletingOldRevisionCount + unavailableNewRevisionCount + unavailableOldRevisionCount
// If the currentRevision and updateRevision are consistent, Pods can only update to this revision
// If the CloneSetPartitionRollback is not enabled, Pods can only update to the new revision
if updateRevision == currentRevision || !utilfeature.DefaultFeatureGate.Enabled(features.CloneSetPartitionRollback) {
Expand All @@ -161,7 +176,7 @@ func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, cu
// Use surge for maxUnavailable not satisfied before scaling
var scaleSurge, scaleOldRevisionSurge int
if toDeleteCount := toDeleteNewRevisionCount + toDeleteOldRevisionCount; toDeleteCount > 0 {
scaleSurge = integer.IntMin(integer.IntMax((unavailableNewRevisionCount+unavailableOldRevisionCount+toDeleteCount+preDeletingCount)-maxUnavailable, 0), toDeleteCount)
scaleSurge = integer.IntMin(integer.IntMax((totalUnavailable+toDeleteCount)-maxUnavailable, 0), toDeleteCount)
if scaleSurge > toDeleteNewRevisionCount {
scaleOldRevisionSurge = scaleSurge - toDeleteNewRevisionCount
}
Expand Down Expand Up @@ -193,19 +208,34 @@ func calculateDiffsWithExpectation(cs *appsv1alpha1.CloneSet, pods []*v1.Pod, cu
}
}

res.scaleNum = replicas + res.useSurge - len(pods)
if res.scaleNum > 0 {
res.scaleNumOldRevision = integer.IntMax(partition+res.useSurgeOldRevision-oldRevisionCount, 0)
} else if res.scaleNum < 0 {
res.scaleNumOldRevision = integer.IntMin(partition+res.useSurgeOldRevision-oldRevisionCount, 0)
// prepare for scale calculation
scaleUpTotalCount := len(pods)
scaleDownTotalCount := len(pods) - toDeleteOldRevisionCount - toDeleteNewRevisionCount
scaleUpTotalOldCount := oldRevisionCount
scaleDownTotalOldCount := oldRevisionCount - toDeleteOldRevisionCount
if shouldScalingExcludePreparingDelete(cs) {
scaleUpTotalCount = scaleUpTotalCount - preDeletingOldRevisionCount - preDeletingNewRevisionCount
scaleDownTotalCount = scaleDownTotalCount - preDeletingOldRevisionCount - preDeletingNewRevisionCount
scaleUpTotalOldCount = scaleUpTotalOldCount - preDeletingOldRevisionCount
scaleDownTotalOldCount = scaleDownTotalOldCount - preDeletingOldRevisionCount
}
expectedTotalCount := replicas + res.useSurge
expectedTotalOldCount := partition + res.useSurgeOldRevision

// scale up
if num := expectedTotalCount - scaleUpTotalCount; num > 0 {
res.scaleUpNum = num
res.scaleUpNumOldRevision = integer.IntMax(expectedTotalOldCount-scaleUpTotalOldCount, 0)

if res.scaleNum > 0 {
res.scaleUpLimit = integer.IntMax(scaleMaxUnavailable-totalUnavailable, 0)
res.scaleUpLimit = integer.IntMin(res.scaleNum, res.scaleUpLimit)
res.scaleUpLimit = integer.IntMin(res.scaleUpNum, integer.IntMax(scaleMaxUnavailable-totalUnavailable, 0))
}

if toDeleteNewRevisionCount > 0 || toDeleteOldRevisionCount > 0 || res.scaleNum < 0 {
// scale down
if num := scaleDownTotalCount - expectedTotalCount; num > 0 {
res.scaleDownNum = num
res.scaleDownNumOldRevision = integer.IntMax(scaleDownTotalOldCount-expectedTotalOldCount, 0)
}
if toDeleteNewRevisionCount > 0 || toDeleteOldRevisionCount > 0 || res.scaleDownNum > 0 {
res.deleteReadyLimit = integer.IntMax(maxUnavailable+(len(pods)-replicas)-totalUnavailable, 0)
}

Expand Down Expand Up @@ -245,3 +275,7 @@ func IsPodAvailable(coreControl clonesetcore.Control, pod *v1.Pod, minReadySecon
}
return coreControl.IsPodUpdateReady(pod, minReadySeconds)
}

func shouldScalingExcludePreparingDelete(cs *appsv1alpha1.CloneSet) bool {
return scalingExcludePreparingDelete || cs.Labels[appsv1alpha1.CloneSetScalingExcludePreparingDeleteKey] == "true"
}
Loading

0 comments on commit 02c6cde

Please sign in to comment.