From a421b6e6381bc1c38d89b5a99b25c9c6cca2c638 Mon Sep 17 00:00:00 2001 From: berg Date: Tue, 28 Jun 2022 10:32:18 +0800 Subject: [PATCH] pub support pod unvailable label (#1004) --- apis/apps/pub/pod_unavailable_label.go | 40 +++++ .../v1alpha1/podunavailablebudget_types.go | 5 + pkg/control/pubcontrol/pub_control.go | 16 +- pkg/control/pubcontrol/pub_control_test.go | 129 +++++++++++++++++ pkg/control/pubcontrol/utils.go | 56 ++++--- pkg/control/pubcontrol/utils_test.go | 137 +++++++++++++++++- .../cloneset/sync/cloneset_update.go | 13 +- pkg/util/pods.go | 2 +- .../pod/validating/pod_unavailable_budget.go | 85 +++-------- test/e2e/policy/podunavailablebudget.go | 104 ++++++++++++- 10 files changed, 481 insertions(+), 106 deletions(-) create mode 100644 apis/apps/pub/pod_unavailable_label.go create mode 100644 pkg/control/pubcontrol/pub_control_test.go diff --git a/apis/apps/pub/pod_unavailable_label.go b/apis/apps/pub/pod_unavailable_label.go new file mode 100644 index 0000000000..697f939a42 --- /dev/null +++ b/apis/apps/pub/pod_unavailable_label.go @@ -0,0 +1,40 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pub + +import ( + "strings" +) + +const ( + // PubUnavailablePodLabelPrefix indicates if the pod has this label, both kruise workload and + // pub will determine that the pod is unavailable, even if pod.status.ready=true. + // Main users non-destructive offline and other scenarios + PubUnavailablePodLabelPrefix = "unavailable-pod.kruise.io/" +) + +func HasUnavailableLabel(labels map[string]string) bool { + if len(labels) == 0 { + return false + } + for key := range labels { + if strings.HasPrefix(key, PubUnavailablePodLabelPrefix) { + return true + } + } + return false +} diff --git a/apis/policy/v1alpha1/podunavailablebudget_types.go b/apis/policy/v1alpha1/podunavailablebudget_types.go index 54383203f2..c8dddf2de8 100644 --- a/apis/policy/v1alpha1/podunavailablebudget_types.go +++ b/apis/policy/v1alpha1/podunavailablebudget_types.go @@ -24,12 +24,17 @@ import ( // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. +type PubOperation string + const ( // PubProtectOperationAnnotation indicates the pub protected Operation[DELETE,UPDATE] // the following indicates the pub only protect DELETE,UPDATE Operation // annotations[kruise.io/pub-protect-operations]=DELETE,UPDATE // if the annotations do not exist, the default DELETE and UPDATE are protected PubProtectOperationAnnotation = "kruise.io/pub-protect-operations" + // pod webhook operation + PubUpdateOperation PubOperation = "UPDATE" + PubDeleteOperation PubOperation = "DELETE" ) // PodUnavailableBudgetSpec defines the desired state of PodUnavailableBudget diff --git a/pkg/control/pubcontrol/pub_control.go b/pkg/control/pubcontrol/pub_control.go index 56b9114412..1d529d1ab0 100644 --- a/pkg/control/pubcontrol/pub_control.go +++ b/pkg/control/pubcontrol/pub_control.go @@ -21,6 +21,7 @@ import ( "reflect" "strings" + appspub "github.com/openkruise/kruise/apis/apps/pub" policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1" "github.com/openkruise/kruise/pkg/control/sidecarcontrol" "github.com/openkruise/kruise/pkg/util" @@ -43,19 +44,24 @@ type commonControl struct { func (c *commonControl) IsPodReady(pod *corev1.Pod) bool { // 1. pod.Status.Phase == v1.PodRunning // 2. pod.condition PodReady == true - return util.IsRunningAndReady(pod) + if !util.IsRunningAndReady(pod) { + return false + } + + // unavailable label + return !appspub.HasUnavailableLabel(pod.Labels) } func (c *commonControl) IsPodUnavailableChanged(oldPod, newPod *corev1.Pod) bool { - // kruise workload in-place situation - if newPod == nil || oldPod == nil { - return true - } // If pod.spec changed, pod will be in unavailable condition if !reflect.DeepEqual(oldPod.Spec, newPod.Spec) { klog.V(3).Infof("pod(%s/%s) specification changed, and maybe cause unavailability", newPod.Namespace, newPod.Name) return true } + // pod add unavailable label + if !appspub.HasUnavailableLabel(oldPod.Labels) && appspub.HasUnavailableLabel(newPod.Labels) { + return true + } // pod other changes will not cause unavailability situation, then return false return false } diff --git a/pkg/control/pubcontrol/pub_control_test.go b/pkg/control/pubcontrol/pub_control_test.go new file mode 100644 index 0000000000..70ab204ea3 --- /dev/null +++ b/pkg/control/pubcontrol/pub_control_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2021 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pubcontrol + +import ( + "fmt" + "testing" + + "github.com/openkruise/kruise/apis/apps/pub" + corev1 "k8s.io/api/core/v1" +) + +func TestIsPodUnavailableChanged(t *testing.T) { + cases := []struct { + name string + getOldPod func() *corev1.Pod + getNewPod func() *corev1.Pod + expect bool + }{ + { + name: "only annotations change", + getOldPod: func() *corev1.Pod { + demo := podDemo.DeepCopy() + return demo + }, + getNewPod: func() *corev1.Pod { + demo := podDemo.DeepCopy() + demo.Annotations["add"] = "annotations" + return demo + }, + expect: false, + }, + { + name: "add unvailable label", + getOldPod: func() *corev1.Pod { + demo := podDemo.DeepCopy() + return demo + }, + getNewPod: func() *corev1.Pod { + demo := podDemo.DeepCopy() + demo.Labels[fmt.Sprintf("%sdata", pub.PubUnavailablePodLabelPrefix)] = "true" + return demo + }, + expect: true, + }, + { + name: "image changed", + getOldPod: func() *corev1.Pod { + demo := podDemo.DeepCopy() + return demo + }, + getNewPod: func() *corev1.Pod { + demo := podDemo.DeepCopy() + demo.Spec.Containers[0].Image = "nginx:v2" + return demo + }, + expect: true, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + control := commonControl{} + is := control.IsPodUnavailableChanged(cs.getOldPod(), cs.getNewPod()) + if cs.expect != is { + t.Fatalf("IsPodUnavailableChanged failed") + } + }) + } +} + +func TestIsPodReady(t *testing.T) { + cases := []struct { + name string + getPod func() *corev1.Pod + expect bool + }{ + { + name: "pod ready", + getPod: func() *corev1.Pod { + demo := podDemo.DeepCopy() + return demo + }, + expect: true, + }, + { + name: "pod not ready", + getPod: func() *corev1.Pod { + demo := podDemo.DeepCopy() + demo.Status.Conditions[0].Status = corev1.ConditionFalse + return demo + }, + expect: false, + }, + { + name: "pod contains unavailable label", + getPod: func() *corev1.Pod { + demo := podDemo.DeepCopy() + demo.Labels[fmt.Sprintf("%sdata", pub.PubUnavailablePodLabelPrefix)] = "true" + return demo + }, + expect: false, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + control := commonControl{} + is := control.IsPodReady(cs.getPod()) + if cs.expect != is { + t.Fatalf("IsPodReady failed") + } + }) + } +} diff --git a/pkg/control/pubcontrol/utils.go b/pkg/control/pubcontrol/utils.go index 0e5bd3a97b..d1e4db99d0 100644 --- a/pkg/control/pubcontrol/utils.go +++ b/pkg/control/pubcontrol/utils.go @@ -19,6 +19,7 @@ package pubcontrol import ( "context" "fmt" + "strings" "time" policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1" @@ -30,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" @@ -48,15 +50,9 @@ var ConflictRetry = wait.Backoff{ Jitter: 0.1, } -type Operation string - const ( - UpdateOperation = "UPDATE" - DeleteOperation = "DELETE" - // Marked pods will not be pub-protected, solving the scenario of force pod deletion PodPubNoProtectionAnnotation = "pub.kruise.io/no-protect" - // related-pub annotation in pod PodRelatedPubAnnotation = "kruise.io/related-pub" ) @@ -64,22 +60,37 @@ const ( // parameters: // 1. allowed(bool) indicates whether to allow this update operation // 2. err(error) -func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, pub *policyv1alpha1.PodUnavailableBudget, pod *corev1.Pod, operation Operation, dryRun bool) (allowed bool, reason string, err error) { - // If the pod is not ready, it doesn't count towards healthy and we should not decrement - if !control.IsPodReady(pod) { +func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, pod *corev1.Pod, operation policyv1alpha1.PubOperation, dryRun bool) (allowed bool, reason string, err error) { + klog.V(3).Infof("validating pod(%s/%s) operation(%s) for PodUnavailableBudget", pod.Namespace, pod.Name, operation) + // pods that contain annotations[pod.kruise.io/pub-no-protect]="true" will be ignore + // and will no longer check the pub quota + if pod.Annotations[PodPubNoProtectionAnnotation] == "true" { + klog.V(3).Infof("pod(%s/%s) contains annotations[%s]=true, then don't need check pub", pod.Namespace, pod.Name, PodPubNoProtectionAnnotation) + return true, "", nil + // If the pod is not ready, it doesn't count towards healthy and we should not decrement + } else if !control.IsPodReady(pod) { klog.V(3).Infof("pod(%s/%s) is not ready, then don't need check pub", pod.Namespace, pod.Name) return true, "", nil } - // pod is in pub.Status.DisruptedPods or pub.Status.UnavailablePods, then don't need check it - if isPodRecordedInPub(pod.Name, pub) { - klog.V(5).Infof("pod(%s/%s) already is recorded in pub(%s/%s)", pod.Namespace, pod.Name, pub.Namespace, pub.Name) + + // pub for pod + pub, err := control.GetPubForPod(pod) + if err != nil { + return false, "", err + // if there is no matching PodUnavailableBudget, just return true + } else if pub == nil { + return true, "", nil + } else if !isNeedPubProtection(pub, operation) { + klog.V(3).Infof("pod(%s/%s) operation(%s) is not in pub(%s) protection", pod.Namespace, pod.Name, pub.Name) + return true, "", nil + // pod is in pub.Status.DisruptedPods or pub.Status.UnavailablePods, then don't need check it + } else if isPodRecordedInPub(pod.Name, pub) { + klog.V(3).Infof("pod(%s/%s) already is recorded in pub(%s/%s)", pod.Namespace, pod.Name, pub.Namespace, pub.Name) return true, "", nil } - - // for debug + // check and decrement pub quota var conflictTimes int var costOfGet, costOfUpdate time.Duration - refresh := false var pubClone *policyv1alpha1.PodUnavailableBudget err = retry.RetryOnConflict(ConflictRetry, func() error { @@ -128,7 +139,7 @@ func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, p // If this is a dry-run, we don't need to go any further than that. if dryRun { - klog.V(5).Infof("pod(%s) operation for pub(%s/%s) is a dry run", pod.Name, pubClone.Namespace, pubClone.Name) + klog.V(3).Infof("pod(%s) operation for pub(%s/%s) is a dry run", pod.Name, pubClone.Namespace, pubClone.Name) return nil } klog.V(3).Infof("pub(%s/%s) update status(disruptedPods:%d, unavailablePods:%d, expectedCount:%d, desiredAvailable:%d, currentAvailable:%d, unavailableAllowed:%d)", @@ -163,7 +174,7 @@ func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, p return true, "", nil } -func checkAndDecrement(podName string, pub *policyv1alpha1.PodUnavailableBudget, operation Operation) error { +func checkAndDecrement(podName string, pub *policyv1alpha1.PodUnavailableBudget, operation policyv1alpha1.PubOperation) error { if pub.Status.UnavailableAllowed <= 0 { return errors.NewForbidden(policyv1alpha1.Resource("podunavailablebudget"), pub.Name, fmt.Errorf("pub unavailable allowed is negative")) } @@ -180,7 +191,7 @@ func checkAndDecrement(podName string, pub *policyv1alpha1.PodUnavailableBudget, pub.Status.UnavailablePods = make(map[string]metav1.Time) } - if operation == UpdateOperation { + if operation == policyv1alpha1.PubUpdateOperation { pub.Status.UnavailablePods[podName] = metav1.Time{Time: time.Now()} klog.V(3).Infof("pod(%s) is recorded in pub(%s/%s) UnavailablePods", podName, pub.Namespace, pub.Name) } else { @@ -212,3 +223,12 @@ func IsReferenceEqual(ref1, ref2 *policyv1alpha1.TargetReference) bool { } return gv1.Group == gv2.Group && ref1.Kind == ref2.Kind && ref1.Name == ref2.Name } + +func isNeedPubProtection(pub *policyv1alpha1.PodUnavailableBudget, operation policyv1alpha1.PubOperation) bool { + operationValue, ok := pub.Annotations[policyv1alpha1.PubProtectOperationAnnotation] + if !ok || operationValue == "" { + return true + } + operations := sets.NewString(strings.Split(operationValue, ",")...) + return operations.Has(string(operation)) +} diff --git a/pkg/control/pubcontrol/utils_test.go b/pkg/control/pubcontrol/utils_test.go index e18f295d3c..365bd9f457 100644 --- a/pkg/control/pubcontrol/utils_test.go +++ b/pkg/control/pubcontrol/utils_test.go @@ -17,8 +17,11 @@ limitations under the License. package pubcontrol import ( + "fmt" "testing" + "time" + appspub "github.com/openkruise/kruise/apis/apps/pub" policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1" apps "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -26,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" utilpointer "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -61,17 +65,20 @@ var ( }, }, Status: policyv1alpha1.PodUnavailableBudgetStatus{ - UnavailablePods: map[string]metav1.Time{}, - DisruptedPods: map[string]metav1.Time{}, + UnavailablePods: map[string]metav1.Time{}, + DisruptedPods: map[string]metav1.Time{}, + UnavailableAllowed: 0, }, } podDemo = &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: "test-pod", - Namespace: "default", - Labels: map[string]string{"app": "nginx", "pub-controller": "true"}, - Annotations: map[string]string{}, + Name: "test-pod", + Namespace: "default", + Labels: map[string]string{"app": "nginx", "pub-controller": "true"}, + Annotations: map[string]string{ + PodRelatedPubAnnotation: pubDemo.Name, + }, OwnerReferences: []metav1.OwnerReference{ { APIVersion: "apps/v1", @@ -162,6 +169,124 @@ var ( } ) +func TestPodUnavailableBudgetValidatePod(t *testing.T) { + cases := []struct { + name string + getPod func() *corev1.Pod + getPub func() *policyv1alpha1.PodUnavailableBudget + operation policyv1alpha1.PubOperation + expectAllow bool + expectPubStatus func() *policyv1alpha1.PodUnavailableBudgetStatus + }{ + { + name: "valid update pod, allow", + getPod: func() *corev1.Pod { + pod := podDemo.DeepCopy() + return pod + }, + getPub: func() *policyv1alpha1.PodUnavailableBudget { + pub := pubDemo.DeepCopy() + pub.Status.UnavailableAllowed = 1 + return pub + }, + operation: policyv1alpha1.PubUpdateOperation, + expectAllow: true, + expectPubStatus: func() *policyv1alpha1.PodUnavailableBudgetStatus { + pubStatus := pubDemo.Status.DeepCopy() + pubStatus.UnavailablePods[podDemo.Name] = metav1.Now() + pubStatus.UnavailableAllowed = 0 + return pubStatus + }, + }, + { + name: "valid update pod, reject", + getPod: func() *corev1.Pod { + pod := podDemo.DeepCopy() + return pod + }, + getPub: func() *policyv1alpha1.PodUnavailableBudget { + pub := pubDemo.DeepCopy() + return pub + }, + operation: policyv1alpha1.PubUpdateOperation, + expectAllow: false, + expectPubStatus: func() *policyv1alpha1.PodUnavailableBudgetStatus { + pubStatus := pubDemo.Status.DeepCopy() + return pubStatus + }, + }, + { + name: "valid update pod, pod deletion, ignore", + getPod: func() *corev1.Pod { + pod := podDemo.DeepCopy() + pod.DeletionTimestamp = &metav1.Time{Time: time.Now()} + return pod + }, + getPub: func() *policyv1alpha1.PodUnavailableBudget { + pub := pubDemo.DeepCopy() + return pub + }, + operation: policyv1alpha1.PubUpdateOperation, + expectAllow: true, + expectPubStatus: func() *policyv1alpha1.PodUnavailableBudgetStatus { + pubStatus := pubDemo.Status.DeepCopy() + return pubStatus + }, + }, + { + name: "valid update pod, pod not ready, ignore", + getPod: func() *corev1.Pod { + pod := podDemo.DeepCopy() + podReadyCondition := podutil.GetPodReadyCondition(pod.Status) + podReadyCondition.Status = corev1.ConditionFalse + return pod + }, + getPub: func() *policyv1alpha1.PodUnavailableBudget { + pub := pubDemo.DeepCopy() + return pub + }, + operation: policyv1alpha1.PubUpdateOperation, + expectAllow: true, + expectPubStatus: func() *policyv1alpha1.PodUnavailableBudgetStatus { + pubStatus := pubDemo.Status.DeepCopy() + return pubStatus + }, + }, + { + name: "valid update pod, pod unavailable labels, ignore", + getPod: func() *corev1.Pod { + pod := podDemo.DeepCopy() + pod.Labels[fmt.Sprintf("%sdata", appspub.PubUnavailablePodLabelPrefix)] = "true" + return pod + }, + getPub: func() *policyv1alpha1.PodUnavailableBudget { + pub := pubDemo.DeepCopy() + return pub + }, + operation: policyv1alpha1.PubUpdateOperation, + expectAllow: true, + expectPubStatus: func() *policyv1alpha1.PodUnavailableBudgetStatus { + pubStatus := pubDemo.Status.DeepCopy() + return pubStatus + }, + }, + } + + for _, cs := range cases { + t.Run(cs.name, func(t *testing.T) { + fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cs.getPub()).Build() + control := NewPubControl(fakeClient) + allow, _, err := PodUnavailableBudgetValidatePod(fakeClient, control, cs.getPod(), cs.operation, false) + if err != nil { + t.Fatalf("PodUnavailableBudgetValidatePod failed: %s", err.Error()) + } + if cs.expectAllow != allow { + t.Fatalf("PodUnavailableBudgetValidatePod failed") + } + }) + } +} + func TestGetPodUnavailableBudgetForPod(t *testing.T) { cases := []struct { name string diff --git a/pkg/controller/cloneset/sync/cloneset_update.go b/pkg/controller/cloneset/sync/cloneset_update.go index 750ad6a202..5a692437b2 100644 --- a/pkg/controller/cloneset/sync/cloneset_update.go +++ b/pkg/controller/cloneset/sync/cloneset_update.go @@ -129,21 +129,12 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet, // 5. limit max count of pods can update waitUpdateIndexes = limitUpdateIndexes(coreControl, cs.Spec.MinReadySeconds, diffRes, waitUpdateIndexes, pods, targetRevision.Name) - // Determine the pub before updating the pod - var pub *policyv1alpha1.PodUnavailableBudget - var err error - if utilfeature.DefaultFeatureGate.Enabled(features.PodUnavailableBudgetUpdateGate) && len(waitUpdateIndexes) > 0 { - pub, err = c.pubControl.GetPubForPod(pods[waitUpdateIndexes[0]]) - if err != nil { - return err - } - } // 6. update pods for _, idx := range waitUpdateIndexes { pod := pods[idx] // Determine the pub before updating the pod - if pub != nil { - allowed, _, err := pubcontrol.PodUnavailableBudgetValidatePod(c.Client, c.pubControl, pub, pod, pubcontrol.UpdateOperation, false) + if utilfeature.DefaultFeatureGate.Enabled(features.PodUnavailableBudgetUpdateGate) { + allowed, _, err := pubcontrol.PodUnavailableBudgetValidatePod(c.Client, c.pubControl, pod, policyv1alpha1.PubUpdateOperation, false) if err != nil { return err // pub check does not pass, try again in seconds diff --git a/pkg/util/pods.go b/pkg/util/pods.go index 6e625741e7..9c873ed59f 100644 --- a/pkg/util/pods.go +++ b/pkg/util/pods.go @@ -201,7 +201,7 @@ func GetPodVolume(pod *v1.Pod, volumeName string) *v1.Volume { } func IsRunningAndReady(pod *v1.Pod) bool { - return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod) + return pod.Status.Phase == v1.PodRunning && podutil.IsPodReady(pod) && pod.DeletionTimestamp.IsZero() } func GetPodContainerImageIDs(pod *v1.Pod) map[string]string { diff --git a/pkg/webhook/pod/validating/pod_unavailable_budget.go b/pkg/webhook/pod/validating/pod_unavailable_budget.go index e88e36664b..149210b623 100644 --- a/pkg/webhook/pod/validating/pod_unavailable_budget.go +++ b/pkg/webhook/pod/validating/pod_unavailable_budget.go @@ -18,7 +18,6 @@ package validating import ( "context" - "strings" policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1" "github.com/openkruise/kruise/pkg/control/pubcontrol" @@ -26,7 +25,6 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/util/dryrun" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/apis/policy" @@ -46,8 +44,6 @@ var ( // 2. reason(string) // 3. err(error) func (p *PodCreateHandler) podUnavailableBudgetValidatingPod(ctx context.Context, req admission.Request) (bool, string, error) { - var newPod, oldPod *corev1.Pod - var dryRun bool // ignore kube-system, kube-public for _, namespace := range IgnoredNamespaces { if req.Namespace == namespace { @@ -55,23 +51,30 @@ func (p *PodCreateHandler) podUnavailableBudgetValidatingPod(ctx context.Context } } - klog.V(6).Infof("pub validate operation(%s) pod(%s/%s)", req.Operation, req.Namespace, req.Name) - newPod = &corev1.Pod{} + var checkPod *corev1.Pod + var dryRun bool + var operation policyv1alpha1.PubOperation switch req.AdmissionRequest.Operation { // filter out invalid Update operation, we only validate update Pod.MetaData, Pod.Spec case admissionv1.Update: + newPod := &corev1.Pod{} //decode new pod err := p.Decoder.Decode(req, newPod) if err != nil { return false, "", err } - oldPod = &corev1.Pod{} + oldPod := &corev1.Pod{} if err = p.Decoder.Decode( admission.Request{AdmissionRequest: admissionv1.AdmissionRequest{Object: req.AdmissionRequest.OldObject}}, oldPod); err != nil { return false, "", err } - + // the change will not cause pod unavailability, then pass + if !p.pubControl.IsPodUnavailableChanged(oldPod, newPod) { + klog.V(6).Infof("validate pod(%s/%s) changed can not cause unavailability, then don't need check pub", newPod.Namespace, newPod.Name) + return true, "", nil + } + checkPod = oldPod options := &metav1.UpdateOptions{} err = p.Decoder.DecodeRaw(req.Options, options) if err != nil { @@ -79,6 +82,7 @@ func (p *PodCreateHandler) podUnavailableBudgetValidatingPod(ctx context.Context } // if dry run dryRun = dryrun.IsDryRun(options.DryRun) + operation = policyv1alpha1.PubUpdateOperation // filter out invalid Delete operation, only validate delete pods resources case admissionv1.Delete: @@ -86,7 +90,8 @@ func (p *PodCreateHandler) podUnavailableBudgetValidatingPod(ctx context.Context klog.V(6).Infof("pod(%s/%s) AdmissionRequest operation(DELETE) subResource(%s), then admit", req.Namespace, req.Name, req.SubResource) return true, "", nil } - if err := p.Decoder.DecodeRaw(req.OldObject, newPod); err != nil { + checkPod = &corev1.Pod{} + if err := p.Decoder.DecodeRaw(req.OldObject, checkPod); err != nil { return false, "", err } deletion := &metav1.DeleteOptions{} @@ -96,6 +101,7 @@ func (p *PodCreateHandler) podUnavailableBudgetValidatingPod(ctx context.Context } // if dry run dryRun = dryrun.IsDryRun(deletion.DryRun) + operation = policyv1alpha1.PubDeleteOperation // filter out invalid Create operation, only validate create pod eviction subresource case admissionv1.Create: @@ -114,74 +120,25 @@ func (p *PodCreateHandler) podUnavailableBudgetValidatingPod(ctx context.Context if eviction.DeleteOptions != nil { dryRun = dryrun.IsDryRun(eviction.DeleteOptions.DryRun) } + checkPod = &corev1.Pod{} key := types.NamespacedName{ Namespace: req.AdmissionRequest.Namespace, Name: req.AdmissionRequest.Name, } - if err = p.Client.Get(ctx, key, newPod); err != nil { + if err = p.Client.Get(ctx, key, checkPod); err != nil { return false, "", err } + operation = policyv1alpha1.PubDeleteOperation } // Get the workload corresponding to the pod, if it has been deleted then it is not protected - if ref := metav1.GetControllerOf(newPod); ref != nil { - workload, err := p.finders.GetScaleAndSelectorForRef(ref.APIVersion, ref.Kind, newPod.Namespace, ref.Name, ref.UID) + if ref := metav1.GetControllerOf(checkPod); ref != nil { + workload, err := p.finders.GetScaleAndSelectorForRef(ref.APIVersion, ref.Kind, checkPod.Namespace, ref.Name, ref.UID) if err != nil { return false, "", err } else if workload == nil || !workload.Metadata.DeletionTimestamp.IsZero() { return true, "", nil } } - - // returns true for pod conditions that allow the operation for pod without checking PUB. - if newPod.Status.Phase == corev1.PodSucceeded || newPod.Status.Phase == corev1.PodFailed || - newPod.Status.Phase == corev1.PodPending || newPod.Status.Phase == "" || !newPod.ObjectMeta.DeletionTimestamp.IsZero() { - klog.V(3).Infof("pod(%s/%s) Status(%s) Deletion(%v), then admit", newPod.Namespace, newPod.Name, newPod.Status.Phase, !newPod.ObjectMeta.DeletionTimestamp.IsZero()) - return true, "", nil - } - - pub, err := p.pubControl.GetPubForPod(newPod) - if err != nil { - return false, "", err - // if there is no matching PodUnavailableBudget, just return true - } else if pub == nil { - return true, "", nil - } else if !isNeedPubProtection(pub, pubcontrol.Operation(req.Operation)) { - return true, "", nil - } - - klog.V(3).Infof("validating pod(%s/%s) operation(%s) for pub(%s/%s)", newPod.Namespace, newPod.Name, req.Operation, pub.Namespace, pub.Name) - // pods that contain annotations[pod.kruise.io/pub-no-protect]="true" will be ignore - // and will no longer check the pub quota - if newPod.Annotations[pubcontrol.PodPubNoProtectionAnnotation] == "true" { - klog.V(3).Infof("pod(%s/%s) contains annotations[%s], then don't need check pub", newPod.Namespace, newPod.Name, pubcontrol.PodPubNoProtectionAnnotation) - return true, "", nil - } - - // the change will not cause pod unavailability, then pass - if !p.pubControl.IsPodUnavailableChanged(oldPod, newPod) { - klog.V(3).Infof("validate pod(%s/%s) changed cannot cause unavailability, then don't need check pub", newPod.Namespace, newPod.Name) - return true, "", nil - } - - return pubcontrol.PodUnavailableBudgetValidatePod(p.Client, p.pubControl, pub, newPod, pubcontrol.Operation(req.Operation), dryRun) -} - -func isNeedPubProtection(pub *policyv1alpha1.PodUnavailableBudget, operation pubcontrol.Operation) bool { - operationValue, ok := pub.Annotations[policyv1alpha1.PubProtectOperationAnnotation] - if !ok { - return true - } - operations := sets.NewString() - for _, o := range strings.Split(operationValue, ",") { - operations.Insert(o) - } - // update operation - if operation == pubcontrol.UpdateOperation && operations.Has(pubcontrol.UpdateOperation) { - return true - // delete, eviction operation - } else if operation != pubcontrol.UpdateOperation && operations.Has(pubcontrol.DeleteOperation) { - return true - } - return false + return pubcontrol.PodUnavailableBudgetValidatePod(p.Client, p.pubControl, checkPod, operation, dryRun) } diff --git a/test/e2e/policy/podunavailablebudget.go b/test/e2e/policy/podunavailablebudget.go index 3fc952b293..7def2a9201 100644 --- a/test/e2e/policy/podunavailablebudget.go +++ b/test/e2e/policy/podunavailablebudget.go @@ -23,6 +23,7 @@ import ( "github.com/onsi/ginkgo" "github.com/onsi/gomega" + appspub "github.com/openkruise/kruise/apis/apps/pub" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1" kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned" @@ -31,6 +32,7 @@ import ( corev1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/util/retry" @@ -346,12 +348,112 @@ var _ = SIGDescribe("PodUnavailableBudget", func() { gomega.Expect(err).NotTo(gomega.HaveOccurred()) newPods := make([]corev1.Pod, 0) for _, pod := range pods { - if !pod.DeletionTimestamp.IsZero() || pod.Spec.Containers[0].Image != NewWebserverImage { + if !pod.DeletionTimestamp.IsZero() { continue } + gomega.Expect(pod.Spec.Containers[0].Image).To(gomega.Equal(NewWebserverImage)) newPods = append(newPods, *pod) } gomega.Expect(newPods).To(gomega.HaveLen(2)) + + // add unavailable label + labelKey := fmt.Sprintf("%sdata", appspub.PubUnavailablePodLabelPrefix) + labelBody := fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`, labelKey, "true") + _, err = c.CoreV1().Pods(ns).Patch(context.TODO(), newPods[0].Name, types.MergePatchType, []byte(labelBody), metav1.PatchOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + ginkgo.By(fmt.Sprintf("check PodUnavailableBudget(%s/%s) Status", pub.Namespace, pub.Name)) + expectStatus = &policyv1alpha1.PodUnavailableBudgetStatus{ + UnavailableAllowed: 0, + DesiredAvailable: 1, + CurrentAvailable: 1, + TotalReplicas: 2, + } + setPubStatus(expectStatus) + gomega.Eventually(func() *policyv1alpha1.PodUnavailableBudgetStatus { + pub, err = kc.PolicyV1alpha1().PodUnavailableBudgets(pub.Namespace).Get(context.TODO(), pub.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + nowStatus := &pub.Status + setPubStatus(nowStatus) + return nowStatus + }, 60*time.Second, time.Second).Should(gomega.Equal(expectStatus)) + + // update pod image, ignore + podIn1, err := c.CoreV1().Pods(ns).Get(context.TODO(), newPods[0].Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + podIn1.Spec.Containers[0].Image = WebserverImage + _, err = c.CoreV1().Pods(ns).Update(context.TODO(), podIn1, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // add unavailable label reject + _, err = c.CoreV1().Pods(ns).Patch(context.TODO(), newPods[1].Name, types.MergePatchType, []byte(labelBody), metav1.PatchOptions{}) + gomega.Expect(err).To(gomega.HaveOccurred()) + // update pod image, reject + podIn2, err := c.CoreV1().Pods(ns).Get(context.TODO(), newPods[1].Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + podIn2.Spec.Containers[0].Image = WebserverImage + _, err = c.CoreV1().Pods(ns).Update(context.TODO(), podIn2, metav1.UpdateOptions{}) + gomega.Expect(err).To(gomega.HaveOccurred()) + + // add pub protect operation DELETE + annotationBody := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%s"}}}`, policyv1alpha1.PubProtectOperationAnnotation, policyv1alpha1.PubDeleteOperation) + _, err = kc.PolicyV1alpha1().PodUnavailableBudgets(ns).Patch(context.TODO(), pub.Name, types.MergePatchType, []byte(annotationBody), metav1.PatchOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second * 3) + // update pod image, allow + podIn2, err = c.CoreV1().Pods(ns).Get(context.TODO(), newPods[1].Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + podIn2.Spec.Containers[0].Image = WebserverImage + _, err = c.CoreV1().Pods(ns).Update(context.TODO(), podIn2, metav1.UpdateOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + time.Sleep(time.Second * 3) + + // check pod image + pods, err = sidecarTester.GetSelectorPods(deployment.Namespace, deployment.Spec.Selector) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + for _, pod := range pods { + if !pod.DeletionTimestamp.IsZero() { + continue + } + gomega.Expect(pod.Spec.Containers[0].Image).To(gomega.Equal(WebserverImage)) + } + + ginkgo.By(fmt.Sprintf("check PodUnavailableBudget(%s/%s) Status", pub.Namespace, pub.Name)) + expectStatus = &policyv1alpha1.PodUnavailableBudgetStatus{ + UnavailableAllowed: 0, + DesiredAvailable: 1, + CurrentAvailable: 1, + TotalReplicas: 2, + } + setPubStatus(expectStatus) + gomega.Eventually(func() *policyv1alpha1.PodUnavailableBudgetStatus { + pub, err = kc.PolicyV1alpha1().PodUnavailableBudgets(pub.Namespace).Get(context.TODO(), pub.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + nowStatus := &pub.Status + setPubStatus(nowStatus) + return nowStatus + }, 60*time.Second, time.Second).Should(gomega.Equal(expectStatus)) + + // delete unavailable label + deleteLabelBody := fmt.Sprintf(`{"metadata":{"labels":{"%s":null}}}`, labelKey) + _, err = c.CoreV1().Pods(ns).Patch(context.TODO(), newPods[0].Name, types.StrategicMergePatchType, []byte(deleteLabelBody), metav1.PatchOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + ginkgo.By(fmt.Sprintf("check PodUnavailableBudget(%s/%s) Status", pub.Namespace, pub.Name)) + expectStatus = &policyv1alpha1.PodUnavailableBudgetStatus{ + UnavailableAllowed: 1, + DesiredAvailable: 1, + CurrentAvailable: 2, + TotalReplicas: 2, + } + setPubStatus(expectStatus) + gomega.Eventually(func() *policyv1alpha1.PodUnavailableBudgetStatus { + pub, err = kc.PolicyV1alpha1().PodUnavailableBudgets(pub.Namespace).Get(context.TODO(), pub.Name, metav1.GetOptions{}) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + nowStatus := &pub.Status + setPubStatus(nowStatus) + return nowStatus + }, 60*time.Second, time.Second).Should(gomega.Equal(expectStatus)) + ginkgo.By("PodUnavailableBudget targetReference pods, update failed image and block done") })