From 7b23f687b123948c52e474d904a2185511e29bc7 Mon Sep 17 00:00:00 2001 From: Shuai Sun Date: Tue, 3 Jan 2023 12:12:24 +0800 Subject: [PATCH] enable cloneset deleting pvc when pod hanging (#1113) Signed-off-by: willise Signed-off-by: willise --- apis/apps/v1alpha1/cloneset_types.go | 4 + .../crd/bases/apps.kruise.io_clonesets.yaml | 4 + .../apps.kruise.io_uniteddeployments.yaml | 4 + .../cloneset/cloneset_controller.go | 152 +++++++++++++++++- .../cloneset/sync/cloneset_scale.go | 9 +- .../cloneset/utils/cloneset_utils.go | 59 +++++-- pkg/util/ownerref.go | 62 +++++++ pkg/util/ownerref_test.go | 151 +++++++++++++++++ 8 files changed, 424 insertions(+), 21 deletions(-) create mode 100644 pkg/util/ownerref.go create mode 100644 pkg/util/ownerref_test.go diff --git a/apis/apps/v1alpha1/cloneset_types.go b/apis/apps/v1alpha1/cloneset_types.go index 65a51f8d6c..7addcce94d 100644 --- a/apis/apps/v1alpha1/cloneset_types.go +++ b/apis/apps/v1alpha1/cloneset_types.go @@ -93,6 +93,10 @@ type CloneSetScaleStrategy struct { // The scale will fail if the number of unavailable pods were greater than this MaxUnavailable at scaling up. // MaxUnavailable works only when scaling up. MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` + + // Indicate if cloneset will reuse already existed pvc to + // rebuild a new pod + DisablePVCReuse bool `json:"disablePVCReuse,omitempty"` } // CloneSetUpdateStrategy defines strategies for pods update. diff --git a/config/crd/bases/apps.kruise.io_clonesets.yaml b/config/crd/bases/apps.kruise.io_clonesets.yaml index 052eedf47a..eaf6f0ed93 100644 --- a/config/crd/bases/apps.kruise.io_clonesets.yaml +++ b/config/crd/bases/apps.kruise.io_clonesets.yaml @@ -172,6 +172,10 @@ spec: description: ScaleStrategy indicates the ScaleStrategy that will be employed to create and delete Pods in the CloneSet. properties: + disablePVCReuse: + description: Indicate if cloneset will reuse already existed pvc + to rebuild a new pod + type: boolean maxUnavailable: anyOf: - type: integer diff --git a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml index bcc2366ee9..bae3a7edd8 100644 --- a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml +++ b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml @@ -670,6 +670,10 @@ spec: that will be employed to create and delete Pods in the CloneSet. properties: + disablePVCReuse: + description: Indicate if cloneset will reuse already + existed pvc to rebuild a new pod + type: boolean maxUnavailable: anyOf: - type: integer diff --git a/pkg/controller/cloneset/cloneset_controller.go b/pkg/controller/cloneset/cloneset_controller.go index 53b4ebd741..2464cb2fd0 100644 --- a/pkg/controller/cloneset/cloneset_controller.go +++ b/pkg/controller/cloneset/cloneset_controller.go @@ -232,11 +232,25 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil } - // list all active Pods and PVCs belongs to cs - filteredPods, filteredPVCs, err := r.getOwnedResource(instance) + // list active and inactive Pods belongs to cs + filteredPods, filterOutPods, err := r.getOwnedPods(instance) if err != nil { return reconcile.Result{}, err } + filteredPVCs, err := r.getOwnedPVCs(instance) + if err != nil { + return reconcile.Result{}, err + } + + // If mcloneset doesn't want to reuse pvc, clean up + // the existing pvc first. Then it looks like the pod + // is deleted by controller, new pod can be created. + if instance.Spec.ScaleStrategy.DisablePVCReuse { + filteredPVCs, err = r.cleanupPVCs(instance, filteredPods, filterOutPods, filteredPVCs) + if err != nil { + return reconcile.Result{}, err + } + } //release Pods ownerRef filteredPods, err = r.claimPods(instance, filteredPods) @@ -449,20 +463,29 @@ func (r *ReconcileCloneSet) getActiveRevisions(cs *appsv1alpha1.CloneSet, revisi return currentRevision, updateRevision, collisionCount, nil } -func (r *ReconcileCloneSet) getOwnedResource(cs *appsv1alpha1.CloneSet) ([]*v1.Pod, []*v1.PersistentVolumeClaim, error) { +func (r *ReconcileCloneSet) getOwnedPods(cs *appsv1alpha1.CloneSet) ([]*v1.Pod, []*v1.Pod, error) { opts := &client.ListOptions{ Namespace: cs.Namespace, FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(cs.UID)}), } - filteredPods, err := clonesetutils.GetActivePods(r.Client, opts) + filteredPods, filterOutPods, err := clonesetutils.GetActiveAndInactivePods(r.Client, opts) if err != nil { return nil, nil, err } + return filteredPods, filterOutPods, nil +} + +func (r *ReconcileCloneSet) getOwnedPVCs(cs *appsv1alpha1.CloneSet) ([]*v1.PersistentVolumeClaim, error) { + opts := &client.ListOptions{ + Namespace: cs.Namespace, + FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(cs.UID)}), + } + pvcList := v1.PersistentVolumeClaimList{} if err := r.List(context.TODO(), &pvcList, opts, utilclient.DisableDeepCopy); err != nil { - return nil, nil, err + return nil, err } var filteredPVCs []*v1.PersistentVolumeClaim for i, pvc := range pvcList.Items { @@ -471,7 +494,7 @@ func (r *ReconcileCloneSet) getOwnedResource(cs *appsv1alpha1.CloneSet) ([]*v1.P } } - return filteredPods, filteredPVCs, nil + return filteredPVCs, nil } // truncatePodsToDelete truncates any non-live pod names in spec.scaleStrategy.podsToDelete. @@ -571,3 +594,120 @@ func (r *ReconcileCloneSet) claimPods(instance *appsv1alpha1.CloneSet, pods []*v return claimedPods, nil } + +func (r *ReconcileCloneSet) cleanupPVCs( + cs *appsv1alpha1.CloneSet, + activePods, filterOutPods []*v1.Pod, + pvcs []*v1.PersistentVolumeClaim, +) ([]*v1.PersistentVolumeClaim, error) { + // If useless pvc owner pod does not exist, the pvc can be deleted directly, + // else update pvc's ownerreference to pod. + toDeletePVCs, usedPVCs := filterPVCsByIfUsing(activePods, pvcs) + if len(toDeletePVCs) == 0 { + return usedPVCs, nil + } + + klog.V(3).Infof("Begin to clean up mcloneset %s/%s useless PVCs", cs.Namespace, cs.Name) + toDeletePVCsMap := generatePodAndPVCsMap(toDeletePVCs) + for _, pod := range filterOutPods { + instanceID := clonesetutils.GetInstanceID(pod) + if pvcs, ok := toDeletePVCsMap[instanceID]; ok { + if err := r.updatePVCs(cs, pod, pvcs); err != nil { + return nil, err + } + // Left pvcs will be deleted directly. + delete(toDeletePVCsMap, instanceID) + } + } + + for _, pvcs := range toDeletePVCsMap { + // It's safe to delete pvc that has no pod found. + if err := r.deletePVCs(cs, pvcs); err != nil { + return nil, err + } + } + return usedPVCs, nil +} + +func (r *ReconcileCloneSet) updatePVCs(cs *appsv1alpha1.CloneSet, pod *v1.Pod, pvcs []*v1.PersistentVolumeClaim) error { + for i := range pvcs { + pvc := pvcs[i] + if updateClaimOwnerRefToPod(pvc, cs, pod) { + if err := r.updateOnePVC(cs, pvcs[i]); err != nil && !errors.IsNotFound(err) { + return err + } + } + } + return nil +} + +func (r *ReconcileCloneSet) updateOnePVC(cs *appsv1alpha1.CloneSet, pvc *v1.PersistentVolumeClaim) error { + if err := r.Client.Update(context.TODO(), pvc); err != nil { + r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedUpdate", "failed to update PVC %s: %v", pvc.Name, err) + return err + } + return nil +} +func (r *ReconcileCloneSet) deletePVCs(cs *appsv1alpha1.CloneSet, pvcs []*v1.PersistentVolumeClaim) error { + for i := range pvcs { + if err := r.deleteOnePVC(cs, pvcs[i]); err != nil && !errors.IsNotFound(err) { + return err + } + } + return nil +} + +func (r *ReconcileCloneSet) deleteOnePVC(cs *appsv1alpha1.CloneSet, pvc *v1.PersistentVolumeClaim) error { + clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name) + if err := r.Delete(context.TODO(), pvc); err != nil { + clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name) + r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to clean up PVC %s: %v", pvc.Name, err) + return err + } + return nil +} + +func updateClaimOwnerRefToPod(pvc *v1.PersistentVolumeClaim, cs *appsv1alpha1.CloneSet, pod *v1.Pod) bool { + needsUpdate := false + needsUpdate = util.RemoveOwnerRef(pvc, cs) + return util.SetOwnerRef(pvc, pod, metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"}) || needsUpdate +} + +func filterPVCsByIfUsing(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) (toDeletePVCs, usedPVCs []*v1.PersistentVolumeClaim) { + activeIDs := getInstanceIDsFromPods(pods) + for i := range pvcs { + pvc := pvcs[i] + id := clonesetutils.GetInstanceID(pvc) + if id == "" { + continue + } + if !activeIDs.Has(id) { + toDeletePVCs = append(toDeletePVCs, pvc) + } else { + usedPVCs = append(usedPVCs, pvc) + } + } + return +} + +func generatePodAndPVCsMap(pvcs []*v1.PersistentVolumeClaim) map[string][]*v1.PersistentVolumeClaim { + // pod may have multiple pvcs + pvcsMap := map[string][]*v1.PersistentVolumeClaim{} + for i := range pvcs { + pvc := pvcs[i] + if id := clonesetutils.GetInstanceID(pvc); id != "" { + pvcsMap[id] = append(pvcsMap[id], pvc) + } + } + return pvcsMap +} + +func getInstanceIDsFromPods(pods []*v1.Pod) sets.String { + ins := sets.NewString() + for _, pod := range pods { + if id := clonesetutils.GetInstanceID(pod); id != "" { + ins.Insert(id) + } + } + return ins +} diff --git a/pkg/controller/cloneset/sync/cloneset_scale.go b/pkg/controller/cloneset/sync/cloneset_scale.go index 5751f58d53..edfdb9969c 100644 --- a/pkg/controller/cloneset/sync/cloneset_scale.go +++ b/pkg/controller/cloneset/sync/cloneset_scale.go @@ -23,6 +23,11 @@ import ( "sync" "sync/atomic" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + appspub "github.com/openkruise/kruise/apis/apps/pub" appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core" @@ -30,10 +35,6 @@ import ( "github.com/openkruise/kruise/pkg/util" "github.com/openkruise/kruise/pkg/util/expectations" "github.com/openkruise/kruise/pkg/util/lifecycle" - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/rand" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/klog/v2" ) const ( diff --git a/pkg/controller/cloneset/utils/cloneset_utils.go b/pkg/controller/cloneset/utils/cloneset_utils.go index 9b0cc0a332..ddac141c9e 100644 --- a/pkg/controller/cloneset/utils/cloneset_utils.go +++ b/pkg/controller/cloneset/utils/cloneset_utils.go @@ -22,12 +22,6 @@ import ( "strings" "sync" - appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" - "github.com/openkruise/kruise/pkg/features" - utilclient "github.com/openkruise/kruise/pkg/util/client" - "github.com/openkruise/kruise/pkg/util/expectations" - utilfeature "github.com/openkruise/kruise/pkg/util/feature" - "github.com/openkruise/kruise/pkg/util/requeueduration" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,6 +30,13 @@ import ( kubecontroller "k8s.io/kubernetes/pkg/controller" "k8s.io/utils/integer" "sigs.k8s.io/controller-runtime/pkg/client" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" + "github.com/openkruise/kruise/pkg/features" + utilclient "github.com/openkruise/kruise/pkg/util/client" + "github.com/openkruise/kruise/pkg/util/expectations" + utilfeature "github.com/openkruise/kruise/pkg/util/feature" + "github.com/openkruise/kruise/pkg/util/requeueduration" ) var ( @@ -93,22 +94,54 @@ func GetControllerKey(cs *appsv1alpha1.CloneSet) string { // GetActivePods returns all active pods in this namespace. func GetActivePods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, error) { - podList := &v1.PodList{} - if err := reader.List(context.TODO(), podList, opts, utilclient.DisableDeepCopy); err != nil { + podList, err := GetAllPods(reader, opts) + if err != nil { return nil, err } // Ignore inactive pods var activePods []*v1.Pod - for i, pod := range podList.Items { + for i := range podList { + pod := podList[i] // Consider all rebuild pod as active pod, should not recreate - if kubecontroller.IsPodActive(&pod) { - activePods = append(activePods, &podList.Items[i]) + if kubecontroller.IsPodActive(pod) { + activePods = append(activePods, pod) } } return activePods, nil } +func GetActiveAndInactivePods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, []*v1.Pod, error) { + podList, err := GetAllPods(reader, opts) + if err != nil { + return nil, nil, err + } + var activePods, inactivePods []*v1.Pod + for i := range podList { + pod := podList[i] + if kubecontroller.IsPodActive(pod) { + activePods = append(activePods, pod) + } else { + inactivePods = append(inactivePods, pod) + } + } + return activePods, inactivePods, nil +} + +// GetAllPods returns all pods in this namespace. +func GetAllPods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, error) { + podList := &v1.PodList{} + if err := reader.List(context.TODO(), podList, opts, utilclient.DisableDeepCopy); err != nil { + return nil, err + } + + var pods []*v1.Pod + for i := range podList.Items { + pods = append(pods, &podList.Items[i]) + } + return pods, nil +} + // NextRevision finds the next valid revision number based on revisions. If the length of revisions // is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method // assumes that revisions has been sorted by Revision. @@ -166,6 +199,10 @@ func UpdateStorage(cs *appsv1alpha1.CloneSet, pod *v1.Pod) { pod.Spec.Volumes = newVolumes } +func GetInstanceID(obj metav1.Object) string { + return obj.GetLabels()[appsv1alpha1.CloneSetInstanceID] +} + // GetPersistentVolumeClaims gets a map of PersistentVolumeClaims to their template names, as defined in set. The // returned PersistentVolumeClaims are each constructed with a the name specific to the Pod. This name is determined // by getPersistentVolumeClaimName. diff --git a/pkg/util/ownerref.go b/pkg/util/ownerref.go new file mode 100644 index 0000000000..181543b32d --- /dev/null +++ b/pkg/util/ownerref.go @@ -0,0 +1,62 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func HasOwnerRef(target, owner metav1.Object) bool { + ownerUID := owner.GetUID() + for _, ownerRef := range target.GetOwnerReferences() { + if ownerRef.UID == ownerUID { + return true + } + } + return false +} + +func RemoveOwnerRef(target, owner metav1.Object) bool { + if !HasOwnerRef(target, owner) { + return false + } + ownerUID := owner.GetUID() + oldRefs := target.GetOwnerReferences() + newRefs := make([]metav1.OwnerReference, len(oldRefs)-1) + skip := 0 + for i := range oldRefs { + if oldRefs[i].UID == ownerUID { + skip = -1 + } else { + newRefs[i+skip] = oldRefs[i] + } + } + target.SetOwnerReferences(newRefs) + return true +} + +func SetOwnerRef(target, owner metav1.Object, ownerType metav1.TypeMeta) bool { + if HasOwnerRef(target, owner) { + return false + } + ownerRefs := append( + target.GetOwnerReferences(), + *metav1.NewControllerRef(owner, ownerType.GroupVersionKind()), + ) + target.SetOwnerReferences(ownerRefs) + return true +} diff --git a/pkg/util/ownerref_test.go b/pkg/util/ownerref_test.go new file mode 100644 index 0000000000..b882c31ed4 --- /dev/null +++ b/pkg/util/ownerref_test.go @@ -0,0 +1,151 @@ +/* +Copyright 2022 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" +) + +func TestHasOwnerRef(t *testing.T) { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "datadir-foo-id1", + Labels: map[string]string{ + appsv1alpha1.CloneSetInstanceID: "id1", + "foo": "bar", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: "foo", + UID: "test", + Controller: func() *bool { a := true; return &a }(), + BlockOwnerDeletion: func() *bool { a := true; return &a }(), + }, + }, + ResourceVersion: "1", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("10"), + }, + }, + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo", + UID: "test", + }, + } + + if !HasOwnerRef(pvc, pod) { + t.Fatalf("expect pvc %v has pod %v ownerref", pvc, pod) + } +} + +func TestRemoveOnwer(t *testing.T) { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "datadir-foo-id1", + Labels: map[string]string{ + appsv1alpha1.CloneSetInstanceID: "id1", + "foo": "bar", + }, + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: "foo", + UID: "test", + Controller: func() *bool { a := true; return &a }(), + BlockOwnerDeletion: func() *bool { a := true; return &a }(), + }, + }, + ResourceVersion: "1", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("10"), + }, + }, + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo", + UID: "test", + }, + } + + if !RemoveOwnerRef(pvc, pod) { + t.Fatalf("expect pvc %v to remove pod %v ownerref", pvc, pod) + } + + if HasOwnerRef(pvc, pod) { + t.Fatalf("expect pvc %v has no pod %v ownerref", pvc, pod) + } +} + +func TestSetOwnerRef(t *testing.T) { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "datadir-foo-id1", + Labels: map[string]string{ + appsv1alpha1.CloneSetInstanceID: "id1", + "foo": "bar", + }, + ResourceVersion: "1", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceStorage: resource.MustParse("10"), + }, + }, + }, + } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "foo", + UID: "test", + }, + } + + if !SetOwnerRef(pvc, pod, metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"}) { + t.Fatalf("expect pvc %v has no pod %v ownerref", pvc, pod) + } + + if !HasOwnerRef(pvc, pod) { + t.Fatalf("expect pvc %s has pod %v ownerref", pvc, pod) + } +}