Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize inplace update and support pod-template-hash label for cloneset #931

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 13 additions & 31 deletions pkg/controller/cloneset/cloneset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
} else {
klog.Errorf("Failed syncing CloneSet %s: %v", request, retErr)
}
// clean the duration store
_ = clonesetutils.DurationStore.Pop(request.String())
}()

// Fetch the CloneSet instance
Expand All @@ -202,7 +204,6 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
// For additional cleanup logic use finalizers.
klog.V(3).Infof("CloneSet %s has been deleted.", request)
clonesetutils.ScaleExpectations.DeleteExpectations(request.String())
clonesetutils.UpdateExpectations.DeleteExpectations(request.String())
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
Expand Down Expand Up @@ -256,20 +257,6 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
return reconcile.Result{}, err
}

// Refresh update expectations
for _, pod := range filteredPods {
clonesetutils.UpdateExpectations.ObserveUpdated(request.String(), updateRevision.Name, pod)
}
// If update expectations have not satisfied yet, just skip this reconcile.
if updateSatisfied, unsatisfiedDuration, updateDirtyPods := clonesetutils.UpdateExpectations.SatisfiedExpectations(request.String(), updateRevision.Name); !updateSatisfied {
if unsatisfiedDuration >= expectations.ExpectationTimeout {
klog.Warningf("Expectation unsatisfied overtime for %v, updateDirtyPods=%v, timeout=%v", request.String(), updateDirtyPods, unsatisfiedDuration)
return reconcile.Result{}, nil
}
klog.V(4).Infof("Not satisfied update for %v, updateDirtyPods=%v", request.String(), updateDirtyPods)
return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
}

// If resourceVersion expectations have not satisfied yet, just skip this reconcile
clonesetutils.ResourceVersionExpectations.Observe(updateRevision)
if isSatisfied, unsatisfiedDuration := clonesetutils.ResourceVersionExpectations.IsSatisfied(updateRevision); !isSatisfied {
Expand Down Expand Up @@ -331,7 +318,7 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
}

// scale and update pods
delayDuration, syncErr := r.syncCloneSet(instance, &newStatus, currentRevision, updateRevision, revisions, filteredPods, filteredPVCs)
syncErr := r.syncCloneSet(instance, &newStatus, currentRevision, updateRevision, revisions, filteredPods, filteredPVCs)

// update new status
if err = r.statusUpdater.UpdateCloneSetStatus(instance, &newStatus, filteredPods); err != nil {
Expand All @@ -347,32 +334,28 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
}

if syncErr == nil && instance.Spec.MinReadySeconds > 0 && newStatus.AvailableReplicas != newStatus.ReadyReplicas {
minReadyDuration := time.Second * time.Duration(instance.Spec.MinReadySeconds)
if delayDuration == 0 || minReadyDuration < delayDuration {
delayDuration = minReadyDuration
}
clonesetutils.DurationStore.Push(request.String(), time.Second*time.Duration(instance.Spec.MinReadySeconds))
}
return reconcile.Result{RequeueAfter: delayDuration}, syncErr
return reconcile.Result{RequeueAfter: clonesetutils.DurationStore.Pop(request.String())}, syncErr
}

func (r *ReconcileCloneSet) syncCloneSet(
instance *appsv1alpha1.CloneSet, newStatus *appsv1alpha1.CloneSetStatus,
currentRevision, updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
filteredPods []*v1.Pod, filteredPVCs []*v1.PersistentVolumeClaim,
) (time.Duration, error) {
var delayDuration time.Duration
) error {
if instance.DeletionTimestamp != nil {
return delayDuration, nil
return nil
}

// get the current and update revisions of the set.
currentSet, err := r.revisionControl.ApplyRevision(instance, currentRevision)
if err != nil {
return delayDuration, err
return err
}
updateSet, err := r.revisionControl.ApplyRevision(instance, updateRevision)
if err != nil {
return delayDuration, err
return err
}

var scaling bool
Expand All @@ -390,24 +373,23 @@ func (r *ReconcileCloneSet) syncCloneSet(
err = podsScaleErr
}
if scaling {
return delayDuration, podsScaleErr
return podsScaleErr
}

delayDuration, podsUpdateErr = r.syncControl.Update(updateSet, currentRevision, updateRevision, revisions, filteredPods, filteredPVCs)
podsUpdateErr = r.syncControl.Update(updateSet, currentRevision, updateRevision, revisions, filteredPods, filteredPVCs)
if podsUpdateErr != nil {
newStatus.Conditions = append(newStatus.Conditions, appsv1alpha1.CloneSetCondition{
Type: appsv1alpha1.CloneSetConditionFailedUpdate,
Status: v1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: podsUpdateErr.Error(),
})
// If these is a delay duration, need not to return error to outside
if err == nil && delayDuration <= 0 {
if err == nil {
err = podsUpdateErr
}
}

return delayDuration, err
return err
}

func (r *ReconcileCloneSet) getActiveRevisions(cs *appsv1alpha1.CloneSet, revisions []*apps.ControllerRevision) (
Expand Down
1 change: 0 additions & 1 deletion pkg/controller/cloneset/cloneset_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ func (e *podEventHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimiting

klog.V(4).Infof("Pod %s/%s deleted, owner: %s", pod.Namespace, pod.Name, req.Name)
clonesetutils.ScaleExpectations.ObserveScale(req.String(), expectations.Delete, pod.Name)
clonesetutils.UpdateExpectations.DeleteObject(req.String(), pod)
q.Add(*req)
}

Expand Down
4 changes: 1 addition & 3 deletions pkg/controller/cloneset/sync/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package sync

import (
"time"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
Expand All @@ -41,7 +39,7 @@ type Interface interface {
Update(cs *appsv1alpha1.CloneSet,
currentRevision, updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,
) (time.Duration, error)
) error
}

type realControl struct {
Expand Down
27 changes: 15 additions & 12 deletions pkg/controller/cloneset/sync/cloneset_scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ func TestCreatePods(t *testing.T) {
Name: "foo-id1",
GenerateName: "foo-",
Labels: map[string]string{
appsv1alpha1.CloneSetInstanceID: "id1",
apps.ControllerRevisionHashLabelKey: "revision_abc",
"foo": "bar",
appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal),
appsv1alpha1.CloneSetInstanceID: "id1",
apps.ControllerRevisionHashLabelKey: "revision_abc",
apps.DefaultDeploymentUniqueLabelKey: "revision_abc",
"foo": "bar",
appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal),
},
OwnerReferences: []metav1.OwnerReference{
{
Expand Down Expand Up @@ -136,10 +137,11 @@ func TestCreatePods(t *testing.T) {
Name: "foo-id3",
GenerateName: "foo-",
Labels: map[string]string{
appsv1alpha1.CloneSetInstanceID: "id3",
apps.ControllerRevisionHashLabelKey: "revision_xyz",
"foo": "bar",
appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal),
appsv1alpha1.CloneSetInstanceID: "id3",
apps.ControllerRevisionHashLabelKey: "revision_xyz",
apps.DefaultDeploymentUniqueLabelKey: "revision_xyz",
"foo": "bar",
appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal),
},
OwnerReferences: []metav1.OwnerReference{
{
Expand Down Expand Up @@ -193,10 +195,11 @@ func TestCreatePods(t *testing.T) {
Name: "foo-id4",
GenerateName: "foo-",
Labels: map[string]string{
appsv1alpha1.CloneSetInstanceID: "id4",
apps.ControllerRevisionHashLabelKey: "revision_xyz",
"foo": "bar",
appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal),
appsv1alpha1.CloneSetInstanceID: "id4",
apps.ControllerRevisionHashLabelKey: "revision_xyz",
apps.DefaultDeploymentUniqueLabelKey: "revision_xyz",
"foo": "bar",
appspub.LifecycleStateKey: string(appspub.LifecycleStateNormal),
},
OwnerReferences: []metav1.OwnerReference{
{
Expand Down
58 changes: 41 additions & 17 deletions pkg/controller/cloneset/sync/cloneset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sync

import (
"context"
"fmt"
"sort"
"time"
Expand All @@ -32,48 +33,54 @@ import (
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/requeueduration"
"github.com/openkruise/kruise/pkg/util/specifieddelete"
"github.com/openkruise/kruise/pkg/util/updatesort"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
currentRevision, updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim,
) (time.Duration, error) {
) error {

requeueDuration := requeueduration.Duration{}
key := clonesetutils.GetControllerKey(cs)
coreControl := clonesetcore.New(cs)

// 1. refresh states for all pods
var modified bool
for _, pod := range pods {
patched, duration, err := c.refreshPodState(cs, coreControl, pod)
patchedState, duration, err := c.refreshPodState(cs, coreControl, pod)
if err != nil {
return 0, err
return err
} else if duration > 0 {
requeueDuration.Update(duration)
clonesetutils.DurationStore.Push(key, duration)
}
// fix the pod-template-hash label for old pods before v1.1
patchedHash, err := c.fixPodTemplateHashLabel(cs, pod)
if err != nil {
return err
}
if patched {
if patchedState || patchedHash {
modified = true
}
}
if modified {
return requeueDuration.Get(), nil
return nil
}

if cs.Spec.UpdateStrategy.Paused {
return requeueDuration.Get(), nil
return nil
}

// 2. calculate update diff and the revision to update
diffRes := calculateDiffsWithExpectation(cs, pods, currentRevision.Name, updateRevision.Name)
if diffRes.updateNum == 0 {
return requeueDuration.Get(), nil
return nil
}

// 3. find all matched pods can update
Expand Down Expand Up @@ -128,7 +135,7 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
if utilfeature.DefaultFeatureGate.Enabled(features.PodUnavailableBudgetUpdateGate) && len(waitUpdateIndexes) > 0 {
pub, err = pubcontrol.GetPodUnavailableBudgetForPod(c.Client, c.controllerFinder, pods[waitUpdateIndexes[0]])
if err != nil {
return requeueDuration.Get(), err
return err
}
}
// 6. update pods
Expand All @@ -138,22 +145,23 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
if pub != nil {
allowed, _, err := pubcontrol.PodUnavailableBudgetValidatePod(c.Client, pod, pubcontrol.NewPubControl(pub, c.controllerFinder, c.Client), pubcontrol.UpdateOperation, false)
if err != nil {
return requeueDuration.Get(), err
return err
// pub check does not pass, try again in seconds
} else if !allowed {
return time.Second, nil
clonesetutils.DurationStore.Push(key, time.Second)
return nil
}
}
duration, err := c.updatePod(cs, coreControl, targetRevision, revisions, pod, pvcs)
if duration > 0 {
requeueDuration.Update(duration)
clonesetutils.DurationStore.Push(key, duration)
}
if err != nil {
return requeueDuration.Get(), err
return err
}
}

return requeueDuration.Get(), nil
return nil
}

func (c *realControl) refreshPodState(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control, pod *v1.Pod) (bool, time.Duration, error) {
Expand Down Expand Up @@ -198,6 +206,23 @@ func (c *realControl) refreshPodState(cs *appsv1alpha1.CloneSet, coreControl clo
return false, res.DelayDuration, nil
}

// fix the pod-template-hash label for old pods before v1.1
func (c *realControl) fixPodTemplateHashLabel(cs *appsv1alpha1.CloneSet, pod *v1.Pod) (bool, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please comment the history and reason of why we need "fix" template hash. It seems that previously we only has controllerrevisionhash in the pod

if _, exists := pod.Labels[apps.DefaultDeploymentUniqueLabelKey]; exists {
return false, nil
}
patch := []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`,
apps.DefaultDeploymentUniqueLabelKey,
clonesetutils.GetShortHash(pod.Labels[apps.ControllerRevisionHashLabelKey])))
pod = pod.DeepCopy()
if err := c.Patch(context.TODO(), pod, client.RawPatch(types.StrategicMergePatchType, patch)); err != nil {
klog.Warningf("CloneSet %s/%s failed to fix pod-template-hash to Pod %s: %v", cs.Namespace, cs.Name, pod.Name, err)
return false, err
}
clonesetutils.ResourceVersionExpectations.Expect(pod)
return true, nil
}

func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetcore.Control,
updateRevision *apps.ControllerRevision, revisions []*apps.ControllerRevision,
pod *v1.Pod, pvcs []*v1.PersistentVolumeClaim,
Expand Down Expand Up @@ -257,7 +282,6 @@ func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetc
if res.UpdateErr == nil {
c.recorder.Eventf(cs, v1.EventTypeNormal, "SuccessfulUpdatePodInPlace", "successfully update pod %s in-place(revision %v)", pod.Name, updateRevision.Name)
clonesetutils.ResourceVersionExpectations.Expect(&metav1.ObjectMeta{UID: pod.UID, ResourceVersion: res.NewResourceVersion})
clonesetutils.UpdateExpectations.ExpectUpdated(clonesetutils.GetControllerKey(cs), updateRevision.Name, pod)
return res.DelayDuration, nil
}

Expand Down
Loading