From 49126becc2b2d0a6c7d472474aa56e12789c463d Mon Sep 17 00:00:00 2001 From: shaofan-hs <133250733+shaofan-hs@users.noreply.github.com> Date: Sat, 26 Aug 2023 09:31:48 +0800 Subject: [PATCH] fix, podopslifecycle webhook (#60) * fix, podopslifecycle webhook * fix * fix, comments --- .../collaset/collaset_controller_test.go | 6 +- .../collaset/synccontrol/sync_control.go | 11 +- .../poddeletion_controller_test.go | 4 +- .../podopslifecycle_controller.go | 129 ++++++++++++------ .../podopslifecycle_controller_test.go | 40 +----- pkg/controllers/podopslifecycle/predicate.go | 7 +- .../resourcecontext_controller_test.go | 4 +- pkg/controllers/ruleset/ruleset_controller.go | 2 +- pkg/controllers/utils/pod_utils.go | 45 ++++++ .../generic/pod/opslifecycle/mutating.go | 22 +-- .../generic/pod/opslifecycle/validating.go | 5 + .../generic/pod/opslifecycle/webhook.go | 63 +-------- .../generic/pod/opslifecycle/webhook_test.go | 22 +-- test/e2e/framework/util.go | 2 +- 14 files changed, 171 insertions(+), 191 deletions(-) diff --git a/pkg/controllers/collaset/collaset_controller_test.go b/pkg/controllers/collaset/collaset_controller_test.go index b40ab2f9..fa85257d 100644 --- a/pkg/controllers/collaset/collaset_controller_test.go +++ b/pkg/controllers/collaset/collaset_controller_test.go @@ -635,7 +635,7 @@ func expectedStatusReplicas(c client.Client, cls *appsv1alpha1.CollaSet, schedul } func updateCollaSetWithRetry(c client.Client, namespace, name string, updateFn func(cls *appsv1alpha1.CollaSet) bool) error { - return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { cls := &appsv1alpha1.CollaSet{} if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, cls); err != nil { return err @@ -650,7 +650,7 @@ func updateCollaSetWithRetry(c client.Client, namespace, name string, updateFn f } func updatePodWithRetry(c client.Client, namespace, name string, updateFn func(pod *corev1.Pod) bool) error { - return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { pod := &corev1.Pod{} if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil { return err @@ -665,7 +665,7 @@ func updatePodWithRetry(c client.Client, namespace, name string, updateFn func(p } func updatePodStatusWithRetry(c client.Client, namespace, name string, updateFn func(pod *corev1.Pod) bool) error { - return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { pod := &corev1.Pod{} if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil { return err diff --git a/pkg/controllers/collaset/synccontrol/sync_control.go b/pkg/controllers/collaset/synccontrol/sync_control.go index 2fc84e0a..e22f75fa 100644 --- a/pkg/controllers/collaset/synccontrol/sync_control.go +++ b/pkg/controllers/collaset/synccontrol/sync_control.go @@ -18,6 +18,7 @@ package synccontrol import ( "fmt" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -70,7 +71,7 @@ func (sc *RealSyncControl) SyncPods(instance *appsv1alpha1.CollaSet, updatedRevi // get owned IDs var ownedIDs map[int]*appsv1alpha1.ContextDetail - if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { ownedIDs, err = podcontext.AllocateID(sc.client, instance, updatedRevision.Name, int(replicasRealValue(instance.Spec.Replicas))) return err }); err != nil { @@ -126,7 +127,7 @@ func (sc *RealSyncControl) SyncPods(instance *appsv1alpha1.CollaSet, updatedRevi if needUpdateContext { klog.V(1).Infof("try to update ResourceContext for CollaSet %s/%s when sync", instance.Namespace, instance.Name) - if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { return podcontext.UpdateToPodContext(sc.client, instance, ownedIDs) }); err != nil { return false, nil, ownedIDs, fmt.Errorf("fail to update ResourceContext when reclaiming IDs: %s", err) @@ -248,7 +249,7 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll // mark these Pods to scalingIn if needUpdateContext { klog.V(1).Infof("try to update ResourceContext for CollaSet %s/%s when scaling in Pod", set.Namespace, set.Name) - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { return podcontext.UpdateToPodContext(sc.client, set, ownedIDs) }) @@ -303,7 +304,7 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll if needUpdatePodContext { klog.V(1).Infof("try to update ResourceContext for CollaSet %s/%s after scaling", set.Namespace, set.Name) - if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { return podcontext.UpdateToPodContext(sc.client, set, ownedIDs) }); err != nil { return scaling, fmt.Errorf("fail to reset ResourceContext: %s", err) @@ -401,7 +402,7 @@ func (sc *RealSyncControl) Update(set *appsv1alpha1.CollaSet, podWrapers []*coll // 5. mark Pod to use updated revision before updating it. if needUpdateContext { klog.V(1).Infof("try to update ResourceContext for CollaSet %s/%s", set.Namespace, set.Name) - err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { return podcontext.UpdateToPodContext(sc.client, set, ownedIDs) }) diff --git a/pkg/controllers/poddeletion/poddeletion_controller_test.go b/pkg/controllers/poddeletion/poddeletion_controller_test.go index d901c4f9..f9fa445f 100644 --- a/pkg/controllers/poddeletion/poddeletion_controller_test.go +++ b/pkg/controllers/poddeletion/poddeletion_controller_test.go @@ -110,7 +110,7 @@ var _ = Describe("Pod Deletion controller", func() { }) func updatePodWithRetry(c client.Client, namespace, name string, updateFn func(pod *corev1.Pod) bool) error { - return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { pod := &corev1.Pod{} if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil { return err @@ -125,7 +125,7 @@ func updatePodWithRetry(c client.Client, namespace, name string, updateFn func(p } func updatePodStatusWithRetry(c client.Client, namespace, name string, updateFn func(pod *corev1.Pod) bool) error { - return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { pod := &corev1.Pod{} if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil { return err diff --git a/pkg/controllers/podopslifecycle/podopslifecycle_controller.go b/pkg/controllers/podopslifecycle/podopslifecycle_controller.go index b3515811..1ab21ddd 100644 --- a/pkg/controllers/podopslifecycle/podopslifecycle_controller.go +++ b/pkg/controllers/podopslifecycle/podopslifecycle_controller.go @@ -26,6 +26,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -116,6 +117,27 @@ func (r *ReconcilePodOpsLifecycle) Reconcile(ctx context.Context, request reconc return reconcile.Result{}, err } + if !r.expectation.SatisfiedExpectations(key, pod.ResourceVersion) { + klog.Errorf("skip pod %s with no satisfied", key) + return reconcile.Result{}, nil + } + + idToLabelsMap, _, err := PodIDAndTypesMap(pod) + if err != nil { + return reconcile.Result{}, err + } + if len(idToLabelsMap) == 0 { + updated, err := r.addServiceAvailable(pod) + if updated { + return reconcile.Result{}, err + } + + updated, err = r.updateServiceReadiness(ctx, pod, true) + if updated { + return reconcile.Result{}, err + } + } + state, err := r.ruleSetManager.GetState(r.Client, pod) if err != nil { klog.Errorf("failed to get pod %s state: %s", key, err) @@ -123,38 +145,20 @@ func (r *ReconcilePodOpsLifecycle) Reconcile(ctx context.Context, request reconc } var labels map[string]string - if state.InStageAndPassed() { switch state.Stage { case v1alpha1.PodOpsLifecyclePreTrafficOffStage: - labels, err = r.preTrafficOffStage(pod) + labels, err = r.preTrafficOffStage(pod, idToLabelsMap) case v1alpha1.PodOpsLifecyclePreTrafficOnStage: - labels, err = r.preTrafficOnStage(pod) + labels, err = r.preTrafficOnStage(pod, idToLabelsMap) } } - klog.Infof("pod %s in stage %q, labels: %v, error: %v", key, state.Stage, labels, err) if err != nil { return reconcile.Result{}, err } - if len(labels) > 0 { - expectation.ExpectUpdate(key, pod.ResourceVersion) - err = r.addLabels(ctx, pod, labels) - if err != nil { - expectation.DeleteExpectations(key) - } - return reconcile.Result{}, err - } - - if !r.expectation.SatisfiedExpectations(key, pod.ResourceVersion) { - klog.Errorf("skip pod %s with no satisfied", key) - return reconcile.Result{}, nil - } - - idToLabelsMap, _, err := PodIDAndTypesMap(pod) - if err != nil { - return reconcile.Result{}, err + return reconcile.Result{}, r.addLabels(ctx, pod, labels) } expected := map[string]bool{ @@ -176,9 +180,30 @@ func (r *ReconcilePodOpsLifecycle) Reconcile(ctx context.Context, request reconc } } } + return reconcile.Result{}, nil +} - _, err = r.updateServiceReadiness(ctx, pod, true) - return reconcile.Result{}, err +func (r *ReconcilePodOpsLifecycle) addServiceAvailable(pod *corev1.Pod) (bool, error) { + if pod.Labels == nil { + return false, nil + } + if _, ok := pod.Labels[v1alpha1.PodServiceAvailableLabel]; ok { + return false, nil + } + + satisfied, _, err := controllerutils.SatisfyExpectedFinalizers(pod) // whether all expected finalizers are satisfied + if err != nil || !satisfied { + return false, err + } + + if !controllerutils.IsPodReady(pod) { + return false, nil + } + + labels := map[string]string{ + v1alpha1.PodServiceAvailableLabel: strconv.FormatInt(time.Now().Unix(), 10), + } + return true, r.addLabels(context.Background(), pod, labels) } func (r *ReconcilePodOpsLifecycle) updateServiceReadiness(ctx context.Context, pod *corev1.Pod, isReady bool) (bool, error) { @@ -189,9 +214,18 @@ func (r *ReconcilePodOpsLifecycle) updateServiceReadiness(ctx context.Context, p key := controllerKey(pod) r.expectation.ExpectUpdate(key, pod.ResourceVersion) + if err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + newPod := &corev1.Pod{} + err := r.Client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, newPod) + if err != nil { + return err + } + needUpdate, _ := r.setServiceReadiness(newPod, isReady) + if !needUpdate { + return nil + } - if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - return r.Client.Status().Update(ctx, pod) + return r.Client.Status().Update(ctx, newPod) }); err != nil { klog.Errorf("failed to update pod status %s: %s", key, err) r.expectation.DeleteExpectations(key) @@ -248,12 +282,7 @@ func (r *ReconcilePodOpsLifecycle) setServiceReadiness(pod *corev1.Pod, isReady return true, fmt.Sprintf("update service readiness gate to: %s", string(status)) } -func (r *ReconcilePodOpsLifecycle) preTrafficOffStage(pod *corev1.Pod) (labels map[string]string, err error) { - idToLabelsMap, _, err := PodIDAndTypesMap(pod) - if err != nil { - return nil, err - } - +func (r *ReconcilePodOpsLifecycle) preTrafficOffStage(pod *corev1.Pod, idToLabelsMap map[string]map[string]string) (labels map[string]string, err error) { labels = map[string]string{} currentTime := strconv.FormatInt(time.Now().Unix(), 10) for k, v := range idToLabelsMap { @@ -276,17 +305,12 @@ func (r *ReconcilePodOpsLifecycle) preTrafficOffStage(pod *corev1.Pod) (labels m return } -func (r *ReconcilePodOpsLifecycle) preTrafficOnStage(pod *corev1.Pod) (labels map[string]string, err error) { - idToLabelsMap, _, err := PodIDAndTypesMap(pod) - if err != nil { - return nil, err - } - +func (r *ReconcilePodOpsLifecycle) preTrafficOnStage(pod *corev1.Pod, idToLabelsMap map[string]map[string]string) (labels map[string]string, err error) { labels = map[string]string{} currentTime := strconv.FormatInt(time.Now().Unix(), 10) for k := range idToLabelsMap { key := fmt.Sprintf("%s/%s", v1alpha1.PodPostCheckedLabelPrefix, k) - if _, ok := pod.GetLabels()[key]; !ok { + if _, ok := pod.Labels[key]; !ok { labels[key] = currentTime // post-checked } } @@ -295,16 +319,31 @@ func (r *ReconcilePodOpsLifecycle) preTrafficOnStage(pod *corev1.Pod) (labels ma } func (r *ReconcilePodOpsLifecycle) addLabels(ctx context.Context, pod *corev1.Pod, labels map[string]string) error { - if pod.Labels == nil { - pod.Labels = map[string]string{} - } - for k, v := range labels { - pod.Labels[k] = v + if len(labels) == 0 { + return nil } - return retry.RetryOnConflict(retry.DefaultBackoff, func() error { - return r.Client.Update(ctx, pod) + key := controllerKey(pod) + expectation.ExpectUpdate(key, pod.ResourceVersion) + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + newPod := &corev1.Pod{} + err := r.Client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, newPod) + if err != nil { + return err + } + if newPod.Labels == nil { + newPod.Labels = map[string]string{} + } + for k, v := range labels { + newPod.Labels[k] = v + } + return r.Client.Update(ctx, newPod) }) + if err != nil { + klog.Errorf("failed to update pod %s with labels: %v: %s", key, labels, err) + expectation.DeleteExpectations(key) + } + return err } func (r *ReconcilePodOpsLifecycle) initRuleSetManager() { diff --git a/pkg/controllers/podopslifecycle/podopslifecycle_controller_test.go b/pkg/controllers/podopslifecycle/podopslifecycle_controller_test.go index 6f7194db..0df27af7 100644 --- a/pkg/controllers/podopslifecycle/podopslifecycle_controller_test.go +++ b/pkg/controllers/podopslifecycle/podopslifecycle_controller_test.go @@ -110,9 +110,8 @@ var _ = Describe("podopslifecycle controller", func() { }, }, } - operationType = "restart" - id = "123" - time = "1402144848" + id = "123" + time = "1402144848" ) AfterEach(func() { @@ -156,41 +155,6 @@ var _ = Describe("podopslifecycle controller", func() { Expect(pod.Status.Conditions).To(HaveLen(1)) Expect(string(pod.Status.Conditions[0].Type)).To(Equal(v1alpha1.ReadinessGatePodServiceReady)) Expect(pod.Status.Conditions[0].Status).To(Equal(corev1.ConditionTrue)) - - podOpsLifecycle.ruleSetManager = &mockRuleSetManager{ - CheckState: &checker.CheckState{ - Stage: v1alpha1.PodOpsLifecyclePreTrafficOffStage, - States: []checker.State{ - { - Detail: &v1alpha1.Detail{ - Stage: v1alpha1.PodOpsLifecyclePreTrafficOffStage, - Passed: true, - }, - }, - }, - }, - } - - pod.ObjectMeta.Labels = map[string]string{ - v1alpha1.ControlledByPodOpsLifecycle: "true", - fmt.Sprintf("%s/%s", v1alpha1.PodOperateLabelPrefix, id): time, - fmt.Sprintf("%s/%s", v1alpha1.PodPreCheckLabelPrefix, id): time, - fmt.Sprintf("%s/%s", v1alpha1.PodOperationTypeLabelPrefix, id): operationType, - } - err = mgr.GetClient().Update(context.Background(), pod) - Expect(err).NotTo(HaveOccurred()) - - <-request - - pod = &corev1.Pod{} - err = mgr.GetAPIReader().Get(context.Background(), client.ObjectKey{ - Name: "test", - Namespace: "default", - }, pod) - Expect(err).NotTo(HaveOccurred()) - - Expect(pod.GetLabels()).To(HaveKey(fmt.Sprintf("%s/%s", v1alpha1.PodPreCheckedLabelPrefix, id))) - Expect(pod.GetLabels()).To(HaveKey(fmt.Sprintf("%s/%s", v1alpha1.PodOperationPermissionLabelPrefix, operationType))) }) It("create pod with label prepare", func() { diff --git a/pkg/controllers/podopslifecycle/predicate.go b/pkg/controllers/podopslifecycle/predicate.go index 194a43e2..3a2f55d7 100644 --- a/pkg/controllers/podopslifecycle/predicate.go +++ b/pkg/controllers/podopslifecycle/predicate.go @@ -18,7 +18,6 @@ package podopslifecycle import ( corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" "sigs.k8s.io/controller-runtime/pkg/event" ) @@ -51,11 +50,7 @@ func (pp *PodPredicate) Update(evt event.UpdateEvent) bool { if oldPod == nil && newPod == nil { return false } - if !pp.NeedOpsLifecycle(oldPod, newPod) { - return false - } - - return !equality.Semantic.DeepEqual(oldPod.ObjectMeta.Annotations, newPod.ObjectMeta.Annotations) || !equality.Semantic.DeepEqual(oldPod.ObjectMeta.Labels, newPod.ObjectMeta.Labels) + return pp.NeedOpsLifecycle(oldPod, newPod) } func (pp *PodPredicate) Generic(evt event.GenericEvent) bool { diff --git a/pkg/controllers/resourcecontext/resourcecontext_controller_test.go b/pkg/controllers/resourcecontext/resourcecontext_controller_test.go index f2c11f31..42bdc21d 100644 --- a/pkg/controllers/resourcecontext/resourcecontext_controller_test.go +++ b/pkg/controllers/resourcecontext/resourcecontext_controller_test.go @@ -274,7 +274,7 @@ func expectedStatusReplicas(c client.Client, cls *appsv1alpha1.CollaSet, schedul } func updateCollaSetWithRetry(c client.Client, namespace, name string, updateFn func(cls *appsv1alpha1.CollaSet) bool) error { - return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { cls := &appsv1alpha1.CollaSet{} if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, cls); err != nil { return err @@ -289,7 +289,7 @@ func updateCollaSetWithRetry(c client.Client, namespace, name string, updateFn f } func updatePodWithRetry(c client.Client, namespace, name string, updateFn func(pod *corev1.Pod) bool) error { - return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { pod := &corev1.Pod{} if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil { return err diff --git a/pkg/controllers/ruleset/ruleset_controller.go b/pkg/controllers/ruleset/ruleset_controller.go index e6454be0..569c1162 100644 --- a/pkg/controllers/ruleset/ruleset_controller.go +++ b/pkg/controllers/ruleset/ruleset_controller.go @@ -249,7 +249,7 @@ func (r *RuleSetReconciler) updatePodDetail(ctx context.Context, pod *corev1.Pod return nil } patch := client.RawPatch(types.MergePatchType, controllerutils.GetLabelAnnoPatchBytes(nil, nil, nil, map[string]string{detailAnno: newDetail})) - return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + return retry.RetryOnConflict(retry.DefaultRetry, func() error { return r.Patch(ctx, pod, patch) }) } diff --git a/pkg/controllers/utils/pod_utils.go b/pkg/controllers/utils/pod_utils.go index 18f326cd..e9405170 100644 --- a/pkg/controllers/utils/pod_utils.go +++ b/pkg/controllers/utils/pod_utils.go @@ -26,8 +26,10 @@ import ( apimachineryvalidation "k8s.io/apimachinery/pkg/api/validation" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" + "kusionstack.io/kafed/apis/apps/v1alpha1" appsv1alpha1 "kusionstack.io/kafed/apis/apps/v1alpha1" revisionutils "kusionstack.io/kafed/pkg/controllers/utils/revision" ) @@ -275,3 +277,46 @@ func IsPodUpdatedRevision(pod *corev1.Pod, revision string) bool { return pod.Labels[appsv1.ControllerRevisionHashLabelKey] == revision } + +func SatisfyExpectedFinalizers(pod *corev1.Pod) (bool, []string, error) { + satisfied := true + var expectedFinalizers []string // expected finalizers that are not satisfied + + availableConditions, err := PodAvailableConditions(pod) + if err != nil { + return satisfied, expectedFinalizers, err + } + + if availableConditions != nil && len(availableConditions.ExpectedFinalizers) != 0 { + existFinalizers := sets.String{} + for _, finalizer := range pod.Finalizers { + existFinalizers.Insert(finalizer) + } + + for _, finalizer := range availableConditions.ExpectedFinalizers { + if !existFinalizers.Has(finalizer) { + satisfied = false + expectedFinalizers = append(expectedFinalizers, finalizer) + } + } + } + + return satisfied, expectedFinalizers, nil +} + +func PodAvailableConditions(pod *corev1.Pod) (*v1alpha1.PodAvailableConditions, error) { + if pod.Annotations == nil { + return nil, nil + } + + anno, ok := pod.Annotations[v1alpha1.PodAvailableConditionsAnnotation] + if !ok { + return nil, nil + } + + availableConditions := &v1alpha1.PodAvailableConditions{} + if err := json.Unmarshal([]byte(anno), availableConditions); err != nil { + return nil, err + } + return availableConditions, nil +} diff --git a/pkg/webhook/server/generic/pod/opslifecycle/mutating.go b/pkg/webhook/server/generic/pod/opslifecycle/mutating.go index 20579e0f..2d83bd00 100644 --- a/pkg/webhook/server/generic/pod/opslifecycle/mutating.go +++ b/pkg/webhook/server/generic/pod/opslifecycle/mutating.go @@ -27,6 +27,7 @@ import ( "kusionstack.io/kafed/apis/apps/v1alpha1" "kusionstack.io/kafed/pkg/controllers/podopslifecycle" + controllerutils "kusionstack.io/kafed/pkg/controllers/utils" "kusionstack.io/kafed/pkg/utils" ) @@ -46,10 +47,6 @@ func (lc *OpsLifecycle) Mutating(ctx context.Context, c client.Client, oldPod, n } numOfIDs := len(newIDToLabelsMap) - if numOfIDs == 0 { - return lc.podServiceAvailableLabel(newPod) - } - var operatingCount, operateCount, operatedCount, completeCount int var undoTypeToNumsMap = map[string]int{} for id, labels := range newIDToLabelsMap { @@ -79,15 +76,18 @@ func (lc *OpsLifecycle) Mutating(ctx context.Context, c client.Client, oldPod, n operatingCount++ if _, ok := labels[v1alpha1.PodPreCheckedLabelPrefix]; ok { // pre-checked - if _, ok := labels[v1alpha1.PodPrepareLabelPrefix]; !ok { + _, hasPrepare := labels[v1alpha1.PodPrepareLabelPrefix] + _, hasOperate := labels[v1alpha1.PodOperateLabelPrefix] + + if !hasPrepare && !hasOperate { delete(newPod.Labels, v1alpha1.PodServiceAvailableLabel) lc.addLabelWithTime(newPod, fmt.Sprintf("%s/%s", v1alpha1.PodPrepareLabelPrefix, id)) // prepare - } else if _, ok := labels[v1alpha1.PodOperateLabelPrefix]; !ok { + } else if !hasOperate { if ready, _ := lc.readyToUpgrade(newPod); ready { delete(newPod.Labels, fmt.Sprintf("%s/%s", v1alpha1.PodPrepareLabelPrefix, id)) - lc.addLabelWithTime(newPod, fmt.Sprintf("%s/%s", v1alpha1.PodOperateLabelPrefix, id)) // operate, controllers can begin to operate + lc.addLabelWithTime(newPod, fmt.Sprintf("%s/%s", v1alpha1.PodOperateLabelPrefix, id)) // operate } } } else { @@ -99,7 +99,7 @@ func (lc *OpsLifecycle) Mutating(ctx context.Context, c client.Client, oldPod, n if _, ok := labels[v1alpha1.PodPostCheckedLabelPrefix]; ok { // post-checked if _, ok := labels[v1alpha1.PodCompleteLabelPrefix]; !ok { - lc.addLabelWithTime(newPod, fmt.Sprintf("%s/%s", v1alpha1.PodCompleteLabelPrefix, id)) // complete, wait fo podopslifecycle controller adds readiness gate + lc.addLabelWithTime(newPod, fmt.Sprintf("%s/%s", v1alpha1.PodCompleteLabelPrefix, id)) // complete } } @@ -126,12 +126,12 @@ func (lc *OpsLifecycle) Mutating(ctx context.Context, c client.Client, oldPod, n } if completeCount == numOfIDs { // all operations are completed - err := lc.podServiceAvailableLabel(newPod) - if err != nil { + satisfied, expectedFinalizers, err := controllerutils.SatisfyExpectedFinalizers(newPod) // whether all expected finalizers are satisfied + if err != nil || !satisfied { + klog.Infof("pod: %s/%s, satisfied: %v, expectedFinalizer: %v, err: %v", newPod.Namespace, newPod.Name, satisfied, expectedFinalizers, err) return err } - // all operations are done and all expected finalizers are satisfied, then remove all unuseful labels, and add service available label for id := range newIDToLabelsMap { for _, v := range []string{v1alpha1.PodOperateLabelPrefix, v1alpha1.PodOperatedLabelPrefix, diff --git a/pkg/webhook/server/generic/pod/opslifecycle/validating.go b/pkg/webhook/server/generic/pod/opslifecycle/validating.go index 9d3a1497..8bf8fc1e 100644 --- a/pkg/webhook/server/generic/pod/opslifecycle/validating.go +++ b/pkg/webhook/server/generic/pod/opslifecycle/validating.go @@ -25,6 +25,7 @@ import ( corev1 "k8s.io/api/core/v1" "sigs.k8s.io/controller-runtime/pkg/client" + controllerutils "kusionstack.io/kafed/pkg/controllers/utils" "kusionstack.io/kafed/pkg/utils" ) @@ -33,6 +34,10 @@ func (lc *OpsLifecycle) Validating(ctx context.Context, c client.Client, oldPod, return nil } + if _, err := controllerutils.PodAvailableConditions(newPod); err != nil { + return err + } + expectedLabels := make(map[string]struct{}) foundLabels := make(map[string]struct{}) for label := range newPod.Labels { diff --git a/pkg/webhook/server/generic/pod/opslifecycle/webhook.go b/pkg/webhook/server/generic/pod/opslifecycle/webhook.go index 95576549..d7f5667c 100644 --- a/pkg/webhook/server/generic/pod/opslifecycle/webhook.go +++ b/pkg/webhook/server/generic/pod/opslifecycle/webhook.go @@ -23,8 +23,6 @@ import ( "time" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/klog/v2" "kusionstack.io/kafed/apis/apps/v1alpha1" controllerutils "kusionstack.io/kafed/pkg/controllers/utils" @@ -52,22 +50,19 @@ var ( ) type ReadyToUpgrade func(pod *corev1.Pod) (bool, []string) -type SatisfyExpectedFinalizers func(pod *corev1.Pod) (bool, []string, error) type TimeLabelValue func() string type IsPodReady func(pod *corev1.Pod) bool type OpsLifecycle struct { - readyToUpgrade ReadyToUpgrade // for testing - satisfyExpectedFinalizers SatisfyExpectedFinalizers - isPodReady IsPodReady - timeLabelValue TimeLabelValue + readyToUpgrade ReadyToUpgrade // for testing + isPodReady IsPodReady + timeLabelValue TimeLabelValue } func New() *OpsLifecycle { return &OpsLifecycle{ - readyToUpgrade: hasNoBlockingFinalizer, - satisfyExpectedFinalizers: satisfyExpectedFinalizers, - isPodReady: controllerutils.IsPodReady, + readyToUpgrade: hasNoBlockingFinalizer, + isPodReady: controllerutils.IsPodReady, timeLabelValue: func() string { return strconv.FormatInt(time.Now().Unix(), 10) }, @@ -85,28 +80,6 @@ func (lc *OpsLifecycle) addLabelWithTime(pod *corev1.Pod, key string) { pod.Labels[key] = lc.timeLabelValue() } -func (lc *OpsLifecycle) podServiceAvailableLabel(pod *corev1.Pod) error { - if pod.Labels == nil { - return nil - } - if _, ok := pod.Labels[v1alpha1.PodServiceAvailableLabel]; ok { - return nil - } - - satisfied, expectedFinalizer, err := lc.satisfyExpectedFinalizers(pod) // whether all expected finalizers are satisfied - if err != nil || !satisfied { - klog.Infof("pod: %s/%s, expected finalizers: %v, err: %v", pod.Namespace, pod.Name, expectedFinalizer, err) - return err - } - - if !lc.isPodReady(pod) { - return nil - } - - lc.addLabelWithTime(pod, v1alpha1.PodServiceAvailableLabel) - return nil -} - func addReadinessGates(pod *corev1.Pod, conditionType corev1.PodConditionType) { for _, v := range pod.Spec.ReadinessGates { if v.ConditionType == conditionType { @@ -118,32 +91,6 @@ func addReadinessGates(pod *corev1.Pod, conditionType corev1.PodConditionType) { }) } -func satisfyExpectedFinalizers(pod *corev1.Pod) (bool, []string, error) { - satisfied := true - var expectedFinalizer []string // expected finalizers that are not satisfied - - availableConditions, err := podAvailableConditions(pod) - if err != nil { - return satisfied, expectedFinalizer, err - } - - if availableConditions != nil && len(availableConditions.ExpectedFinalizers) != 0 { - existFinalizers := sets.String{} - for _, finalizer := range pod.Finalizers { - existFinalizers.Insert(finalizer) - } - - for _, finalizer := range availableConditions.ExpectedFinalizers { - if !existFinalizers.Has(finalizer) { - satisfied = false - expectedFinalizer = append(expectedFinalizer, finalizer) - } - } - } - - return satisfied, expectedFinalizer, nil -} - func hasNoBlockingFinalizer(pod *corev1.Pod) (bool, []string) { if pod == nil { return true, nil diff --git a/pkg/webhook/server/generic/pod/opslifecycle/webhook_test.go b/pkg/webhook/server/generic/pod/opslifecycle/webhook_test.go index 65429efd..994227e1 100644 --- a/pkg/webhook/server/generic/pod/opslifecycle/webhook_test.go +++ b/pkg/webhook/server/generic/pod/opslifecycle/webhook_test.go @@ -124,9 +124,8 @@ func TestMutating(t *testing.T) { newPodLabels map[string]string expectedLabels map[string]string - satisfyExpectedFinalizers SatisfyExpectedFinalizers - readyToUpgrade ReadyToUpgrade - isPodReady IsPodReady + readyToUpgrade ReadyToUpgrade + isPodReady IsPodReady }{ { notes: "pre-check", @@ -402,7 +401,6 @@ func TestMutating(t *testing.T) { fmt.Sprintf("%s/%s", v1alpha1.PodCompleteLabelPrefix, "123"): "1402144848", }, - satisfyExpectedFinalizers: satifyExpectedFinalizersReturnFalse, }, { @@ -424,9 +422,7 @@ func TestMutating(t *testing.T) { fmt.Sprintf("%s/%s", v1alpha1.PodCompleteLabelPrefix, "456"): "1402144848", }, - expectedLabels: map[string]string{ - v1alpha1.PodServiceAvailableLabel: "1402144848", - }, + expectedLabels: map[string]string{}, }, } @@ -458,9 +454,6 @@ func TestMutating(t *testing.T) { if v.readyToUpgrade != nil { opslifecycle.readyToUpgrade = v.readyToUpgrade } - if v.satisfyExpectedFinalizers != nil { - opslifecycle.satisfyExpectedFinalizers = v.satisfyExpectedFinalizers - } if v.isPodReady != nil { opslifecycle.isPodReady = v.isPodReady } @@ -489,7 +482,6 @@ func opsLifecycleDefaultFunc(opslifecycle *OpsLifecycle) { } opslifecycle.readyToUpgrade = readyToUpgradeReturnTrue - opslifecycle.satisfyExpectedFinalizers = satifyExpectedFinalizersReturnTrue opslifecycle.isPodReady = isPodReadyReturnTrue } @@ -501,14 +493,6 @@ func readyToUpgradeReturnFalse(pod *corev1.Pod) (bool, []string) { return false, nil } -func satifyExpectedFinalizersReturnTrue(pod *corev1.Pod) (bool, []string, error) { - return true, nil, nil -} - -func satifyExpectedFinalizersReturnFalse(pod *corev1.Pod) (bool, []string, error) { - return false, nil, nil -} - func isPodReadyReturnTrue(pod *corev1.Pod) bool { return true } diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index 5773ff79..bc3d6352 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -214,7 +214,7 @@ func CreateTestingNS(baseName string, c clientset.Interface, labels map[string]s return nil, err } - err = retry.OnError(retry.DefaultBackoff, apierrors.IsNotFound, func() error { + err = retry.OnError(retry.DefaultRetry, apierrors.IsNotFound, func() error { time.Sleep(1 * time.Second) Logf("waiting get namespace for: %s", got.Name) getNs, err := c.CoreV1().Namespaces().Get(context.TODO(), got.Name, metav1.GetOptions{})