From d6fb77a7bd4faece48759490fc186d610e9a8e90 Mon Sep 17 00:00:00 2001 From: Siyu Wang Date: Mon, 14 Mar 2022 11:01:47 +0800 Subject: [PATCH] Support preDelete lifecycle for Advanced DaemonSet (#923) Signed-off-by: FillZpp --- CONTRIBUTING.md | 2 +- Makefile | 2 +- apis/apps/v1alpha1/daemonset_types.go | 6 + apis/apps/v1alpha1/zz_generated.deepcopy.go | 5 + .../crd/bases/apps.kruise.io_daemonsets.yaml | 31 ++++ .../cloneset/sync/cloneset_update.go | 2 + .../daemonset/daemonset_controller.go | 150 +++++++++++------- .../daemonset/daemonset_controller_test.go | 17 +- pkg/controller/daemonset/daemonset_update.go | 30 +++- .../daemonset/daemonset_update_test.go | 2 +- pkg/controller/daemonset/daemonset_util.go | 9 ++ pkg/util/inplaceupdate/inplace_update.go | 25 +-- .../daemonset_validating_handler.go | 6 + test/e2e/apps/daemonset.go | 108 ++++++++++++- test/e2e/framework/daemonset_util.go | 26 ++- 15 files changed, 333 insertions(+), 88 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index dc879b4d5e..d12c9b6d31 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -88,7 +88,7 @@ We encourage contributors to follow the [PR template](./.github/PULL_REQUEST_TEM As a contributor, if you want to make any contribution to Kruise project, we should reach an agreement on the version of tools used in the development environment. Here are some dependents with specific version: -- Golang : v1.15+ (1.17 is best) +- Golang : v1.17+ - Kubernetes: v1.16+ ### Developing guide diff --git a/Makefile b/Makefile index 476766e2d7..a09c0c6b4c 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ all: build ##@ Development go_check: - @scripts/check_go_version "1.15.0" + @scripts/check_go_version "1.17.0" generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations. @scripts/generate_client.sh diff --git a/apis/apps/v1alpha1/daemonset_types.go b/apis/apps/v1alpha1/daemonset_types.go index 10f70ed28c..5431bf108a 100644 --- a/apis/apps/v1alpha1/daemonset_types.go +++ b/apis/apps/v1alpha1/daemonset_types.go @@ -17,6 +17,7 @@ limitations under the License. package v1alpha1 import ( + appspub "github.com/openkruise/kruise/apis/apps/pub" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -156,6 +157,11 @@ type DaemonSetSpec struct { // Defaults to 10. // +optional RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"` + + // Lifecycle defines the lifecycle hooks for Pods pre-delete, in-place update. + // Currently, we only support pre-delete hook for Advanced DaemonSet. + // +optional + Lifecycle *appspub.Lifecycle `json:"lifecycle,omitempty"` } // DaemonSetStatus defines the observed state of DaemonSet diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 1ab3146903..bfefa85e51 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -899,6 +899,11 @@ func (in *DaemonSetSpec) DeepCopyInto(out *DaemonSetSpec) { *out = new(int32) **out = **in } + if in.Lifecycle != nil { + in, out := &in.Lifecycle, &out.Lifecycle + *out = new(pub.Lifecycle) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DaemonSetSpec. diff --git a/config/crd/bases/apps.kruise.io_daemonsets.yaml b/config/crd/bases/apps.kruise.io_daemonsets.yaml index 9b781c3cf0..f7301b9811 100644 --- a/config/crd/bases/apps.kruise.io_daemonsets.yaml +++ b/config/crd/bases/apps.kruise.io_daemonsets.yaml @@ -76,6 +76,37 @@ spec: description: BurstReplicas is a rate limiter for booting pods on a lot of pods. The default value is 250 x-kubernetes-int-or-string: true + lifecycle: + description: Lifecycle defines the lifecycle hooks for Pods pre-delete, + in-place update. Currently, we only support pre-delete hook for + Advanced DaemonSet. + properties: + inPlaceUpdate: + description: InPlaceUpdate is the hook before Pod to update and + after Pod has been updated. + properties: + finalizersHandler: + items: + type: string + type: array + labelsHandler: + additionalProperties: + type: string + type: object + type: object + preDelete: + description: PreDelete is the hook before Pod to be deleted. + properties: + finalizersHandler: + items: + type: string + type: array + labelsHandler: + additionalProperties: + type: string + type: object + type: object + type: object minReadySeconds: description: The minimum number of seconds for which a newly created DaemonSet pod should be ready without any of its container crashing, diff --git a/pkg/controller/cloneset/sync/cloneset_update.go b/pkg/controller/cloneset/sync/cloneset_update.go index c01c0a8983..41171c3cc6 100644 --- a/pkg/controller/cloneset/sync/cloneset_update.go +++ b/pkg/controller/cloneset/sync/cloneset_update.go @@ -37,6 +37,7 @@ import ( "github.com/openkruise/kruise/pkg/util/updatesort" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" ) @@ -255,6 +256,7 @@ func (c *realControl) updatePod(cs *appsv1alpha1.CloneSet, coreControl clonesetc if res.InPlaceUpdate { if res.UpdateErr == nil { c.recorder.Eventf(cs, v1.EventTypeNormal, "SuccessfulUpdatePodInPlace", "successfully update pod %s in-place(revision %v)", pod.Name, updateRevision.Name) + clonesetutils.ResourceVersionExpectations.Expect(&metav1.ObjectMeta{UID: pod.UID, ResourceVersion: res.NewResourceVersion}) clonesetutils.UpdateExpectations.ExpectUpdated(clonesetutils.GetControllerKey(cs), updateRevision.Name, pod) return res.DelayDuration, nil } diff --git a/pkg/controller/daemonset/daemonset_controller.go b/pkg/controller/daemonset/daemonset_controller.go index d476249b3e..7e8083c37f 100644 --- a/pkg/controller/daemonset/daemonset_controller.go +++ b/pkg/controller/daemonset/daemonset_controller.go @@ -69,6 +69,7 @@ import ( utildiscovery "github.com/openkruise/kruise/pkg/util/discovery" kruiseExpectations "github.com/openkruise/kruise/pkg/util/expectations" "github.com/openkruise/kruise/pkg/util/inplaceupdate" + "github.com/openkruise/kruise/pkg/util/lifecycle" "github.com/openkruise/kruise/pkg/util/ratelimiter" "github.com/openkruise/kruise/pkg/util/requeueduration" "github.com/openkruise/kruise/pkg/util/revisionadapter" @@ -176,15 +177,16 @@ func newReconciler(mgr manager.Manager) (reconcile.Reconciler, error) { crControl: kubecontroller.RealControllerRevisionControl{ KubeClient: genericClient.KubeClient, }, - expectations: kubecontroller.NewControllerExpectations(), - updateExpectations: kruiseExpectations.NewUpdateExpectations(revisionadapter.NewDefaultImpl()), - dsLister: dsLister, - historyLister: historyLister, - podLister: podLister, - nodeLister: nodeLister, - failedPodsBackoff: failedPodsBackoff, - inplaceControl: inplaceupdate.New(cli, revisionAdapter), - revisionAdapter: revisionAdapter, + lifecycleControl: lifecycle.New(cli), + expectations: kubecontroller.NewControllerExpectations(), + resourceVersionExpectations: kruiseExpectations.NewResourceVersionExpectation(), + dsLister: dsLister, + historyLister: historyLister, + podLister: podLister, + nodeLister: nodeLister, + failedPodsBackoff: failedPodsBackoff, + inplaceControl: inplaceupdate.New(cli, revisionAdapter), + revisionAdapter: revisionAdapter, } return dsc, err } @@ -213,7 +215,6 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { newDS := e.ObjectNew.(*appsv1alpha1.DaemonSet) if oldDS.UID != newDS.UID { dsc.expectations.DeleteExpectations(keyFunc(oldDS)) - dsc.updateExpectations.DeleteExpectations(keyFunc(oldDS)) } klog.V(4).Infof("Updating DaemonSet %s/%s", newDS.Namespace, newDS.Name) return true @@ -222,7 +223,6 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error { ds := e.Object.(*appsv1alpha1.DaemonSet) klog.V(4).Infof("Deleting DaemonSet %s/%s", ds.Namespace, ds.Name) dsc.expectations.DeleteExpectations(keyFunc(ds)) - dsc.updateExpectations.DeleteExpectations(keyFunc(ds)) return true }, }) @@ -252,16 +252,17 @@ var _ reconcile.Reconciler = &ReconcileDaemonSet{} // ReconcileDaemonSet reconciles a DaemonSet object type ReconcileDaemonSet struct { - kubeClient clientset.Interface - kruiseClient kruiseclientset.Interface - eventRecorder record.EventRecorder - podControl kubecontroller.PodControlInterface - crControl kubecontroller.ControllerRevisionControlInterface + kubeClient clientset.Interface + kruiseClient kruiseclientset.Interface + eventRecorder record.EventRecorder + podControl kubecontroller.PodControlInterface + crControl kubecontroller.ControllerRevisionControlInterface + lifecycleControl lifecycle.Interface // A TTLCache of pod creates/deletes each ds expects to see expectations kubecontroller.ControllerExpectationsInterface - // A cache of pod revisions for in-place update - updateExpectations kruiseExpectations.UpdateExpectations + // A cache of pod resourceVersion expecatations + resourceVersionExpectations kruiseExpectations.ResourceVersionExpectation // dsLister can list/get daemonsets from the shared informer's store dsLister kruiseappslisters.DaemonSetLister @@ -343,7 +344,6 @@ func (dsc *ReconcileDaemonSet) syncDaemonSet(request reconcile.Request) error { if errors.IsNotFound(err) { klog.V(4).Infof("DaemonSet has been deleted %s", dsKey) dsc.expectations.DeleteExpectations(dsKey) - dsc.updateExpectations.DeleteExpectations(dsKey) return nil } return fmt.Errorf("unable to retrieve DaemonSet %s from store: %v", dsKey, err) @@ -374,7 +374,7 @@ func (dsc *ReconcileDaemonSet) syncDaemonSet(request reconcile.Request) error { } hash := cur.Labels[apps.DefaultDaemonSetUniqueLabelKey] - if !dsc.expectations.SatisfiedExpectations(dsKey) { + if !dsc.expectations.SatisfiedExpectations(dsKey) || !dsc.hasPodExpectationsSatisfied(ds) { return dsc.updateDaemonSetStatus(ds, nodeList, hash, false) } @@ -382,18 +382,19 @@ func (dsc *ReconcileDaemonSet) syncDaemonSet(request reconcile.Request) error { if err != nil { return err } + // return and wait next reconcile if expectation changed to unsatisfied - if !dsc.expectations.SatisfiedExpectations(dsKey) { + if !dsc.expectations.SatisfiedExpectations(dsKey) || !dsc.hasPodExpectationsSatisfied(ds) { return dsc.updateDaemonSetStatus(ds, nodeList, hash, false) } - updateSatisfied, err := dsc.refreshUpdateStatesAndGetSatisfied(ds, hash) - if err != nil { + if err := dsc.refreshUpdateStates(ds); err != nil { return err } + // Process rolling updates if we're ready. For all kinds of update should not be executed if the update // expectation is not satisfied. - if updateSatisfied && !isDaemonSetPaused(ds) { + if !isDaemonSetPaused(ds) { switch ds.Spec.UpdateStrategy.Type { case appsv1alpha1.OnDeleteDaemonSetStrategyType: case appsv1alpha1.RollingUpdateDaemonSetStrategyType: @@ -611,6 +612,14 @@ func (dsc *ReconcileDaemonSet) manage(ds *appsv1alpha1.DaemonSet, nodeList []*co // syncNodes deletes given pods and creates new daemon set pods on the given nodes // returns slice with erros if any func (dsc *ReconcileDaemonSet) syncNodes(ds *appsv1alpha1.DaemonSet, podsToDelete, nodesNeedingDaemonPods []string, hash string) error { + if ds.Spec.Lifecycle != nil && ds.Spec.Lifecycle.PreDelete != nil { + var err error + podsToDelete, err = dsc.syncWithPreparingDelete(ds, podsToDelete) + if err != nil { + return err + } + } + dsKey := keyFunc(ds) createDiff := len(nodesNeedingDaemonPods) deleteDiff := len(podsToDelete) @@ -737,6 +746,28 @@ func (dsc *ReconcileDaemonSet) syncNodes(ds *appsv1alpha1.DaemonSet, podsToDelet return utilerrors.NewAggregate(errors) } +func (dsc *ReconcileDaemonSet) syncWithPreparingDelete(ds *appsv1alpha1.DaemonSet, podsToDelete []string) (podsCanDelete []string, err error) { + for _, podName := range podsToDelete { + pod, err := dsc.podLister.Pods(ds.Namespace).Get(podName) + if errors.IsNotFound(err) { + continue + } else if err != nil { + return nil, err + } + if !lifecycle.IsPodHooked(ds.Spec.Lifecycle.PreDelete, pod) { + podsCanDelete = append(podsCanDelete, podName) + continue + } + if updated, gotPod, err := dsc.lifecycleControl.UpdatePodLifecycle(pod, appspub.LifecycleStatePreparingDelete); err != nil { + return nil, err + } else if updated { + klog.V(3).Infof("DaemonSet %s/%s has marked Pod %s as PreparingDelete", ds.Namespace, ds.Name, podName) + dsc.resourceVersionExpectations.Expect(gotPod) + } + } + return +} + // podsShouldBeOnNode figures out the DaemonSet pods to be created and deleted on the given node: // - nodesNeedingDaemonPods: the pods need to start on the node // - podsToDelete: the Pods need to be deleted on the node @@ -785,6 +816,9 @@ func (dsc *ReconcileDaemonSet) podsShouldBeOnNode( // Emit an event so that it's discoverable to users. dsc.eventRecorder.Eventf(ds, corev1.EventTypeWarning, FailedDaemonPodReason, msg) podsToDelete = append(podsToDelete, pod.Name) + } else if isPodPreDeleting(pod) { + klog.V(3).Infof("Found daemon pod %s/%s on node %s is in PreparingDelete state, will try to kill it", pod.Namespace, pod.Name, node.Name) + podsToDelete = append(podsToDelete, pod.Name) } else { daemonPodsRunning = append(daemonPodsRunning, pod) } @@ -949,46 +983,52 @@ func (dsc *ReconcileDaemonSet) cleanupHistory(ds *appsv1alpha1.DaemonSet, old [] return nil } -func (dsc *ReconcileDaemonSet) refreshUpdateStatesAndGetSatisfied(ds *appsv1alpha1.DaemonSet, hash string) (bool, error) { +func (dsc *ReconcileDaemonSet) refreshUpdateStates(ds *appsv1alpha1.DaemonSet) error { dsKey := keyFunc(ds) - opts := &inplaceupdate.UpdateOptions{} - opts = inplaceupdate.SetOptionsDefaults(opts) - - nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) + pods, err := dsc.getDaemonPods(ds) if err != nil { - return false, fmt.Errorf("couldn't get node to daemon pod mapping for DaemonSet %q: %v", ds.Name, err) + return err } - for _, pods := range nodeToDaemonPods { - for _, pod := range pods { - dsc.updateExpectations.ObserveUpdated(dsKey, hash, pod) + opts := &inplaceupdate.UpdateOptions{} + opts = inplaceupdate.SetOptionsDefaults(opts) + + for _, pod := range pods { + if dsc.inplaceControl == nil { + continue + } + res := dsc.inplaceControl.Refresh(pod, opts) + if res.RefreshErr != nil { + klog.Errorf("DaemonSet %s/%s failed to update pod %s condition for inplace: %v", ds.Namespace, ds.Name, pod.Name, res.RefreshErr) + return res.RefreshErr + } + if res.DelayDuration != 0 { + durationStore.Push(dsKey, res.DelayDuration) } } - for _, pods := range nodeToDaemonPods { - for _, pod := range pods { - if dsc.inplaceControl == nil { - continue - } - res := dsc.inplaceControl.Refresh(pod, opts) - if res.RefreshErr != nil { - klog.Errorf("DaemonSet %s/%s failed to update pod %s condition for inplace: %v", ds.Namespace, ds.Name, pod.Name, res.RefreshErr) - return false, res.RefreshErr - } - if res.DelayDuration != 0 { - durationStore.Push(dsKey, res.DelayDuration) - } - } + return nil +} + +func (dsc *ReconcileDaemonSet) hasPodExpectationsSatisfied(ds *appsv1alpha1.DaemonSet) bool { + dsKey := keyFunc(ds) + pods, err := dsc.getDaemonPods(ds) + if err != nil { + klog.Errorf("Failed to get pods for DaemonSet") + return false } - updateSatisfied, unsatisfiedDuration, updateDirtyPods := dsc.updateExpectations.SatisfiedExpectations(dsKey, hash) - if !updateSatisfied { - if unsatisfiedDuration >= kruiseExpectations.ExpectationTimeout { - klog.Warningf("Expectation unsatisfied overtime for %v, updateDirtyPods=%v, timeout=%v", dsKey, updateDirtyPods, unsatisfiedDuration) - } else { - klog.V(5).Infof("Not satisfied update for %v, updateDirtyPods=%v", dsKey, updateDirtyPods) - durationStore.Push(dsKey, kruiseExpectations.ExpectationTimeout-unsatisfiedDuration) + for _, pod := range pods { + dsc.resourceVersionExpectations.Observe(pod) + if isSatisfied, unsatisfiedDuration := dsc.resourceVersionExpectations.IsSatisfied(pod); !isSatisfied { + if unsatisfiedDuration >= kruiseExpectations.ExpectationTimeout { + klog.Errorf("Expectation unsatisfied resourceVersion overtime for %v, wait for pod %v updating, timeout=%v", dsKey, pod.Name, unsatisfiedDuration) + } else { + klog.V(5).Infof("Not satisfied resourceVersion for %v, wait for pod %v updating", dsKey, pod.Name) + durationStore.Push(dsKey, kruiseExpectations.ExpectationTimeout-unsatisfiedDuration) + } + return false } } - return updateSatisfied, nil + return true } diff --git a/pkg/controller/daemonset/daemonset_controller_test.go b/pkg/controller/daemonset/daemonset_controller_test.go index cdadaa58b5..b088b700cf 100644 --- a/pkg/controller/daemonset/daemonset_controller_test.go +++ b/pkg/controller/daemonset/daemonset_controller_test.go @@ -32,7 +32,7 @@ import ( kruiseinformers "github.com/openkruise/kruise/pkg/client/informers/externalversions" kruiseappsinformers "github.com/openkruise/kruise/pkg/client/informers/externalversions/apps/v1alpha1" kruiseExpectations "github.com/openkruise/kruise/pkg/util/expectations" - "github.com/openkruise/kruise/pkg/util/revisionadapter" + "github.com/openkruise/kruise/pkg/util/lifecycle" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -255,13 +255,14 @@ func NewDaemonSetController( crControl: kubecontroller.RealControllerRevisionControl{ KubeClient: kubeClient, }, - expectations: kubecontroller.NewControllerExpectations(), - updateExpectations: kruiseExpectations.NewUpdateExpectations(revisionadapter.NewDefaultImpl()), - dsLister: dsInformer.Lister(), - historyLister: revInformer.Lister(), - podLister: podInformer.Lister(), - nodeLister: nodeInformer.Lister(), - failedPodsBackoff: failedPodsBackoff, + lifecycleControl: lifecycle.NewForInformer(podInformer), + expectations: kubecontroller.NewControllerExpectations(), + resourceVersionExpectations: kruiseExpectations.NewResourceVersionExpectation(), + dsLister: dsInformer.Lister(), + historyLister: revInformer.Lister(), + podLister: podInformer.Lister(), + nodeLister: nodeInformer.Lister(), + failedPodsBackoff: failedPodsBackoff, } } diff --git a/pkg/controller/daemonset/daemonset_update.go b/pkg/controller/daemonset/daemonset_update.go index 43eedd9224..c078a3adbd 100644 --- a/pkg/controller/daemonset/daemonset_update.go +++ b/pkg/controller/daemonset/daemonset_update.go @@ -81,10 +81,10 @@ func (dsc *ReconcileDaemonSet) rollingUpdate(ds *appsv1alpha1.DaemonSet, nodeLis continue } switch { - case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil: + case isPodNilOrPreDeleting(oldPod) && isPodNilOrPreDeleting(newPod), !isPodNilOrPreDeleting(oldPod) && !isPodNilOrPreDeleting(newPod): // the manage loop will handle creating or deleting the appropriate pod, consider this unavailable numUnavailable++ - klog.V(5).Infof("DaemonSet %s/%s find no pods on node %s ", ds.Namespace, ds.Name, nodeName) + klog.V(5).Infof("DaemonSet %s/%s find no pods (or pre-deleting) on node %s ", ds.Namespace, ds.Name, nodeName) case newPod != nil: // this pod is up to date, check its availability if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) { @@ -92,6 +92,11 @@ func (dsc *ReconcileDaemonSet) rollingUpdate(ds *appsv1alpha1.DaemonSet, nodeLis numUnavailable++ klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is new and unavailable", ds.Namespace, ds.Name, newPod.Name, nodeName) } + if isPodPreDeleting(newPod) { + // a pre-deleting new pod is counted against maxUnavailable + numUnavailable++ + klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is pre-deleting", ds.Namespace, ds.Name, newPod.Name, nodeName) + } default: // this pod is old, it is an update candidate switch { @@ -166,7 +171,7 @@ func (dsc *ReconcileDaemonSet) rollingUpdate(ds *appsv1alpha1.DaemonSet, nodeLis continue } switch { - case oldPod == nil: + case isPodNilOrPreDeleting(oldPod): // we don't need to do anything to this node, the manage loop will handle it case newPod == nil: // this is a surge candidate @@ -197,6 +202,10 @@ func (dsc *ReconcileDaemonSet) rollingUpdate(ds *appsv1alpha1.DaemonSet, nodeLis numSurge++ continue } + if isPodPreDeleting(newPod) { + numSurge++ + continue + } // we're available, delete the old pod klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is available, remove %s", ds.Namespace, ds.Name, newPod.Name, nodeName, oldPod.Name) oldPodsToDelete = append(oldPodsToDelete, oldPod.Name) @@ -309,16 +318,21 @@ func (dsc *ReconcileDaemonSet) filterDaemonPodsNodeToUpdate(ds *appsv1alpha1.Dae sort.Strings(allNodeNames) var updated []string + var updating []string var selected []string var rest []string for i := len(allNodeNames) - 1; i >= 0; i-- { nodeName := allNodeNames[i] newPod, oldPod, ok := findUpdatedPodsOnNode(ds, nodeToDaemonPods[nodeName], hash) - if !ok || newPod != nil || oldPod == nil { + if !ok || newPod != nil { updated = append(updated, nodeName) continue } + if isPodNilOrPreDeleting(oldPod) { + updating = append(updating, nodeName) + continue + } if selector != nil { node, err := dsc.nodeLister.Get(nodeName) @@ -334,11 +348,11 @@ func (dsc *ReconcileDaemonSet) filterDaemonPodsNodeToUpdate(ds *appsv1alpha1.Dae rest = append(rest, nodeName) } - var sorted []string + sorted := append(updated, updating...) if selector != nil { - sorted = append(updated, selected...) + sorted = append(sorted, selected...) } else { - sorted = append(updated, rest...) + sorted = append(sorted, rest...) } if maxUpdate := len(allNodeNames) - int(partition); maxUpdate <= 0 { return nil, nil @@ -408,7 +422,7 @@ func (dsc *ReconcileDaemonSet) inPlaceUpdatePods(ds *appsv1alpha1.DaemonSet, pod if res.InPlaceUpdate { if res.UpdateErr == nil { dsc.eventRecorder.Eventf(ds, corev1.EventTypeNormal, "SuccessfulUpdatePodInPlace", "successfully update pod %s in-place", pod.Name) - dsc.updateExpectations.ExpectUpdated(keyFunc(ds), curRevision.Labels[apps.DefaultDaemonSetUniqueLabelKey], pod) + dsc.resourceVersionExpectations.Expect(&metav1.ObjectMeta{UID: pod.UID, ResourceVersion: res.NewResourceVersion}) return } diff --git a/pkg/controller/daemonset/daemonset_update_test.go b/pkg/controller/daemonset/daemonset_update_test.go index 4e0e96196c..323a9f3b29 100644 --- a/pkg/controller/daemonset/daemonset_update_test.go +++ b/pkg/controller/daemonset/daemonset_update_test.go @@ -853,7 +853,7 @@ func TestFilterDaemonPodsNodeToUpdate(t *testing.T) { {ObjectMeta: metav1.ObjectMeta{Name: "n3", Labels: map[string]string{"node-type": "canary"}}}, {ObjectMeta: metav1.ObjectMeta{Name: "n4"}}, }, - expectNodes: []string{"n3", "n2", "n1"}, + expectNodes: []string{"n2", "n3", "n1"}, }, } diff --git a/pkg/controller/daemonset/daemonset_util.go b/pkg/controller/daemonset/daemonset_util.go index 7f73d71c3c..c225a130be 100644 --- a/pkg/controller/daemonset/daemonset_util.go +++ b/pkg/controller/daemonset/daemonset_util.go @@ -23,6 +23,7 @@ import ( appspub "github.com/openkruise/kruise/apis/apps/pub" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" "github.com/openkruise/kruise/pkg/util/inplaceupdate" + "github.com/openkruise/kruise/pkg/util/lifecycle" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -326,3 +327,11 @@ func NodeShouldUpdateBySelector(node *corev1.Node, ds *appsv1alpha1.DaemonSet) b return false } } + +func isPodPreDeleting(pod *corev1.Pod) bool { + return pod != nil && lifecycle.GetPodLifecycleState(pod) == appspub.LifecycleStatePreparingDelete +} + +func isPodNilOrPreDeleting(pod *corev1.Pod) bool { + return pod == nil || isPodPreDeleting(pod) +} diff --git a/pkg/util/inplaceupdate/inplace_update.go b/pkg/util/inplaceupdate/inplace_update.go index 9284a1143a..1f60a941c9 100644 --- a/pkg/util/inplaceupdate/inplace_update.go +++ b/pkg/util/inplaceupdate/inplace_update.go @@ -46,9 +46,10 @@ type RefreshResult struct { } type UpdateResult struct { - InPlaceUpdate bool - UpdateErr error - DelayDuration time.Duration + InPlaceUpdate bool + UpdateErr error + DelayDuration time.Duration + NewResourceVersion string } type UpdateOptions struct { @@ -244,7 +245,8 @@ func (c *realControl) Update(pod *v1.Pod, oldRevision, newRevision *apps.Control } // 3. update container images - if err := c.updatePodInPlace(pod, spec, opts); err != nil { + newResourceVersion, err := c.updatePodInPlace(pod, spec, opts) + if err != nil { return UpdateResult{InPlaceUpdate: true, UpdateErr: err} } @@ -252,11 +254,12 @@ func (c *realControl) Update(pod *v1.Pod, oldRevision, newRevision *apps.Control if opts.GracePeriodSeconds > 0 { delayDuration = time.Second * time.Duration(opts.GracePeriodSeconds) } - return UpdateResult{InPlaceUpdate: true, DelayDuration: delayDuration} + return UpdateResult{InPlaceUpdate: true, DelayDuration: delayDuration, NewResourceVersion: newResourceVersion} } -func (c *realControl) updatePodInPlace(pod *v1.Pod, spec *UpdateSpec, opts *UpdateOptions) error { - return retry.RetryOnConflict(retry.DefaultBackoff, func() error { +func (c *realControl) updatePodInPlace(pod *v1.Pod, spec *UpdateSpec, opts *UpdateOptions) (string, error) { + var newResourceVersion string + retryErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error { clone, err := c.podAdapter.GetPod(pod.Namespace, pod.Name) if err != nil { return err @@ -299,9 +302,13 @@ func (c *realControl) updatePodInPlace(pod *v1.Pod, spec *UpdateSpec, opts *Upda clone.Annotations[appspub.InPlaceUpdateGraceKey] = string(inPlaceUpdateSpecJSON) } - _, err = c.podAdapter.UpdatePod(clone) - return err + newPod, updateErr := c.podAdapter.UpdatePod(clone) + if updateErr == nil { + newResourceVersion = newPod.ResourceVersion + } + return updateErr }) + return newResourceVersion, retryErr } // GetTemplateFromRevision returns the pod template parsed from ControllerRevision. diff --git a/pkg/webhook/daemonset/validating/daemonset_validating_handler.go b/pkg/webhook/daemonset/validating/daemonset_validating_handler.go index 5e22d653a2..e659178494 100644 --- a/pkg/webhook/daemonset/validating/daemonset_validating_handler.go +++ b/pkg/webhook/daemonset/validating/daemonset_validating_handler.go @@ -107,6 +107,12 @@ func validateDaemonSetSpec(spec *appsv1alpha1.DaemonSetSpec, fldPath *field.Path // zero is a valid RevisionHistoryLimit allErrs = append(allErrs, corevalidation.ValidateNonnegativeField(int64(*spec.RevisionHistoryLimit), fldPath.Child("revisionHistoryLimit"))...) } + + if spec.Lifecycle != nil { + if spec.Lifecycle.InPlaceUpdate != nil { + allErrs = append(allErrs, field.Forbidden(fldPath.Child("lifecycle", "inPlaceUpdate"), "inPlaceUpdate hook has not supported yet")) + } + } return allErrs } diff --git a/test/e2e/apps/daemonset.go b/test/e2e/apps/daemonset.go index 6075bdc924..4cd20729c7 100644 --- a/test/e2e/apps/daemonset.go +++ b/test/e2e/apps/daemonset.go @@ -5,18 +5,22 @@ import ( "fmt" "time" + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" + appspub "github.com/openkruise/kruise/apis/apps/pub" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" + "github.com/openkruise/kruise/pkg/util/lifecycle" "github.com/openkruise/kruise/test/e2e/framework" - - "github.com/onsi/ginkgo" - "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" + daemonutil "k8s.io/kubernetes/pkg/controller/daemon/util" ) var _ = SIGDescribe("DaemonSet", func() { @@ -305,5 +309,103 @@ var _ = SIGDescribe("DaemonSet", func() { gomega.Expect(tester.CheckPodHasNotRecreate(oldPodList.Items, newPodList.Items)).Should(gomega.Equal(tc.expectRecreate)) } }) + + framework.ConformanceIt("should upgrade one by one on steps if there is pre-delete hook", func() { + label := map[string]string{framework.DaemonSetNameLabel: dsName} + hookKey := "my-pre-delete" + + ginkgo.By(fmt.Sprintf("Creating DaemonSet %q with pre-delete hook", dsName)) + maxUnavailable := intstr.IntOrString{IntVal: int32(1)} + ads := tester.NewDaemonSet(dsName, label, WebserverImage, appsv1alpha1.DaemonSetUpdateStrategy{ + Type: appsv1alpha1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1alpha1.RollingUpdateDaemonSet{ + Type: appsv1alpha1.InplaceRollingUpdateType, + MaxUnavailable: &maxUnavailable, + }, + }) + ads.Spec.Lifecycle = &appspub.Lifecycle{PreDelete: &appspub.LifecycleHook{LabelsHandler: map[string]string{hookKey: "true"}}} + ads.Spec.Template.Labels = map[string]string{framework.DaemonSetNameLabel: dsName, hookKey: "true"} + ads.Spec.Template.Spec.Containers[0].Resources = v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("100m"), + }, + } + ds, err := tester.CreateDaemonSet(ads) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Check that daemon pods launch on every node of the cluster") + err = wait.PollImmediate(framework.DaemonSetRetryPeriod, framework.DaemonSetRetryTimeout, tester.CheckRunningOnAllNodes(ds)) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "error waiting for daemon pod to start") + + err = tester.CheckDaemonStatus(dsName) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + oldPodList, err := tester.ListDaemonPods(label) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By("Update daemonset resources") + err = tester.UpdateDaemonSet(ds.Name, func(ads *appsv1alpha1.DaemonSet) { + ads.Spec.Template.Spec.Containers[0].Resources = v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("120m"), + }, + } + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "error to update daemon") + + var newHash string + gomega.Eventually(func() int64 { + ads, err = tester.GetDaemonSet(dsName) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + newHash = ads.Status.DaemonSetHash + return ads.Status.ObservedGeneration + }, time.Second*30, time.Second*3).Should(gomega.Equal(int64(2))) + + ginkgo.By("There should be one pod with PreparingDelete and no pods been deleted") + var preDeletingPod *v1.Pod + var podList *v1.PodList + gomega.Eventually(func() int { + podList, err = tester.ListDaemonPods(label) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + var preDeletingCount int + for i := range podList.Items { + pod := &podList.Items[i] + if lifecycle.GetPodLifecycleState(pod) == appspub.LifecycleStatePreparingDelete { + preDeletingCount++ + preDeletingPod = pod + } + } + return preDeletingCount + }, time.Second*30, time.Second*3).Should(gomega.Equal(1)) + + gomega.Expect(tester.SortPodNames(podList)).To(gomega.Equal(tester.SortPodNames(oldPodList))) + + ginkgo.By("Remove the hook label and wait it to be recreated") + patch := fmt.Sprintf(`{"metadata":{"labels":{"%s":null}}}`, hookKey) + _, err = tester.PatchPod(preDeletingPod.Name, types.StrategicMergePatchType, []byte(patch)) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + var getErr error + gomega.Eventually(func() bool { + _, getErr = tester.GetPod(preDeletingPod.Name) + return errors.IsNotFound(getErr) + }, time.Second*60, time.Second).Should(gomega.Equal(true), fmt.Sprintf("get error %v", getErr)) + + gomega.Eventually(func() int { + podList, err = tester.ListDaemonPods(label) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + var newVersionCount int + for i := range podList.Items { + pod := &podList.Items[i] + if daemonutil.IsPodUpdated(pod, newHash, nil) { + newVersionCount++ + } else if lifecycle.GetPodLifecycleState(pod) == appspub.LifecycleStatePreparingDelete { + preDeletingPod = pod + } + } + return newVersionCount + }, time.Second*60, time.Second).Should(gomega.Equal(1)) + + }) }) }) diff --git a/test/e2e/framework/daemonset_util.go b/test/e2e/framework/daemonset_util.go index 3bc4c24438..ec4f5ec2ab 100644 --- a/test/e2e/framework/daemonset_util.go +++ b/test/e2e/framework/daemonset_util.go @@ -17,10 +17,12 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" podutil "k8s.io/kubernetes/pkg/api/v1/pod" + utilpointer "k8s.io/utils/pointer" ) const ( @@ -72,8 +74,9 @@ func (t *DaemonSetTester) NewDaemonSet(name string, label map[string]string, ima Command: []string{"/bin/sh", "-c", "sleep 10000000"}, }, }, - HostNetwork: true, - Tolerations: []v1.Toleration{{Operator: v1.TolerationOpExists}}, + HostNetwork: true, + Tolerations: []v1.Toleration{{Operator: v1.TolerationOpExists}}, + TerminationGracePeriodSeconds: utilpointer.Int64(3), }, }, UpdateStrategy: updateStrategy, @@ -397,3 +400,22 @@ func (t *DaemonSetTester) CheckPodHasNotRecreate(oldPods, newPods []v1.Pod) bool return true } + +func (t *DaemonSetTester) GetPod(name string) (*v1.Pod, error) { + return t.c.CoreV1().Pods(t.ns).Get(context.TODO(), name, metav1.GetOptions{}) +} + +func (t *DaemonSetTester) PatchPod(name string, patchType types.PatchType, patch []byte) (*v1.Pod, error) { + return t.c.CoreV1().Pods(t.ns).Patch(context.TODO(), name, patchType, patch, metav1.PatchOptions{}) +} + +func (t *DaemonSetTester) SortPodNames(podList *v1.PodList) []string { + names := sets.NewString() + if podList == nil { + return names.List() + } + for i := range podList.Items { + names.Insert(podList.Items[i].Name) + } + return names.List() +}