From 516cda994bc13eb38e7535ba653f4502bac873b3 Mon Sep 17 00:00:00 2001 From: sunshuai09 Date: Tue, 1 Nov 2022 19:24:30 +0800 Subject: [PATCH] enable cloneset deleting pvc when pod hanging --- apis/apps/v1alpha1/cloneset_types.go | 4 + .../crd/bases/apps.kruise.io_clonesets.yaml | 4 + .../apps.kruise.io_uniteddeployments.yaml | 4 + .../cloneset/sync/cloneset_scale.go | 185 ++++++++++++++++++ .../cloneset/utils/cloneset_utils.go | 59 +++++- 5 files changed, 250 insertions(+), 6 deletions(-) diff --git a/apis/apps/v1alpha1/cloneset_types.go b/apis/apps/v1alpha1/cloneset_types.go index 4d95dd224e..7ccfcec652 100644 --- a/apis/apps/v1alpha1/cloneset_types.go +++ b/apis/apps/v1alpha1/cloneset_types.go @@ -93,6 +93,10 @@ type CloneSetScaleStrategy struct { // The scale will fail if the number of unavailable pods were greater than this MaxUnavailable at scaling up. // MaxUnavailable works only when scaling up. MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"` + + // Indicate if cloneset will reuse aleady existed pvc to + // rebuild a new pod + DisablePVCReuse bool `json:"disablePVCReuse,omitempty"` } // CloneSetUpdateStrategy defines strategies for pods update. diff --git a/config/crd/bases/apps.kruise.io_clonesets.yaml b/config/crd/bases/apps.kruise.io_clonesets.yaml index da2fb66cdc..00d0151b88 100644 --- a/config/crd/bases/apps.kruise.io_clonesets.yaml +++ b/config/crd/bases/apps.kruise.io_clonesets.yaml @@ -149,6 +149,10 @@ spec: description: ScaleStrategy indicates the ScaleStrategy that will be employed to create and delete Pods in the CloneSet. properties: + disablePVCReuse: + description: Indicate if cloneset will reuse aleady existed pvc + to rebuild a new pod + type: boolean maxUnavailable: anyOf: - type: integer diff --git a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml index a274937b18..ef037cfc73 100644 --- a/config/crd/bases/apps.kruise.io_uniteddeployments.yaml +++ b/config/crd/bases/apps.kruise.io_uniteddeployments.yaml @@ -617,6 +617,10 @@ spec: that will be employed to create and delete Pods in the CloneSet. properties: + disablePVCReuse: + description: Indicate if cloneset will reuse aleady + existed pvc to rebuild a new pod + type: boolean maxUnavailable: anyOf: - type: integer diff --git a/pkg/controller/cloneset/sync/cloneset_scale.go b/pkg/controller/cloneset/sync/cloneset_scale.go index 69ca12c685..8a0447110e 100644 --- a/pkg/controller/cloneset/sync/cloneset_scale.go +++ b/pkg/controller/cloneset/sync/cloneset_scale.go @@ -29,11 +29,16 @@ import ( clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils" "github.com/openkruise/kruise/pkg/util" "github.com/openkruise/kruise/pkg/util/expectations" + "github.com/openkruise/kruise/pkg/util/fieldindex" "github.com/openkruise/kruise/pkg/util/lifecycle" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" + "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -60,6 +65,17 @@ func (r *realControl) Scale( return false, nil } + // If cloneset doesn't want to reuse pvc, clean up + // the existing pvc first. Then it looks like the pod + // is deleted by controller, new pod can be created. + if updateCS.Spec.ScaleStrategy.DisablePVCReuse { + uselessPVCs, _ := splitPVCsByInstaceIDs(pods, pvcs) + if len(uselessPVCs) > 0 { + klog.V(3).Infof("Begin to clean up cloneset %s useless PVCs", controllerKey) + return r.cleanupPVCs(updateCS, uselessPVCs) + } + } + // 1. manage pods to delete and in preDelete podsSpecifiedToDelete, podsInPreDelete, numToDelete := getPlannedDeletedPods(updateCS, pods) if modified, err := r.managePreparingDelete(updateCS, pods, podsInPreDelete, numToDelete); err != nil || modified { @@ -403,3 +419,172 @@ func (r *realControl) choosePodsToDelete(cs *appsv1alpha1.CloneSet, totalDiff in return podsToDelete } + +func getInstanceIDsFromPods(pods []*v1.Pod) sets.String { + ins := sets.NewString() + for _, pod := range pods { + ins.Insert(pod.Labels[appsv1alpha1.CloneSetInstanceID]) + } + return ins +} + +func splitPVCsByInstaceIDs(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) (useless, using []*v1.PersistentVolumeClaim) { + activeIds := getInstanceIDsFromPods(pods) + + uselessMap := map[types.UID]*v1.PersistentVolumeClaim{} + usingMap := map[types.UID]*v1.PersistentVolumeClaim{} + for _, pvc := range pvcs { + if activeIds.Has(pvc.Labels[appsv1alpha1.CloneSetInstanceID]) { + usingMap[pvc.UID] = pvc + } else { + uselessMap[pvc.UID] = pvc + } + } + + for _, p := range uselessMap { + useless = append(useless, p) + } + for _, p := range usingMap { + using = append(using, p) + } + return useless, using +} + +func (r *realControl) cleanupPVCs(cs *appsv1alpha1.CloneSet, uselessPVCs []*v1.PersistentVolumeClaim) (bool, error) { + var modified bool + + pods, err := getInactivePods(r.Client, cs) + if err != nil { + klog.Errorf("Could not get cloneset %s owned inactive pods", clonesetutils.GetControllerKey(cs)) + return modified, err + } + + // There are two scenarios to clean up the pvc: + // 1. If pvc belongs to pod in terminating state, set ownerReference of the + // pod to the pvc instead of the cloneset. + // 2. if pvc belongs to already cleaned up pods or in Succeeded/Failed state, + // just delete the pvc. + for _, pvc := range uselessPVCs { + isTerminating := false + + for _, pod := range pods { + if clonesetutils.IsPVCAndPodRelated(pvc, pod) { + if clonesetutils.IsPodTerminating(pod) { + isTerminating = true + + if updateClaimOwnerRefToPod(pvc, cs, pod) { + if modified, err := r.updatePVC(cs, pvc); err != nil { + return modified, err + } + } + } + // One pvc can only belongs to one pod, so it's ok to skip left pods. + break + } + } + + if !isTerminating { + // It's safe to delete pvc that has no pod found or pod is in + // Succeeded/Failed state. + if modified, err := r.deletePVC(cs, pvc); err != nil { + return modified, err + } + } + } + return modified, err +} + +func removeOwnerRef(target, owner metav1.Object) bool { + if !hasOwnerRef(target, owner) { + return false + } + ownerUID := owner.GetUID() + oldRefs := target.GetOwnerReferences() + newRefs := make([]metav1.OwnerReference, len(oldRefs)-1) + skip := 0 + for i := range oldRefs { + if oldRefs[i].UID == ownerUID { + skip = -1 + } else { + newRefs[i+skip] = oldRefs[i] + } + } + target.SetOwnerReferences(newRefs) + return true +} + +func hasOwnerRef(target, owner metav1.Object) bool { + ownerUID := owner.GetUID() + for _, ownerRef := range target.GetOwnerReferences() { + if ownerRef.UID == ownerUID { + return true + } + } + return false +} + +func setOwnerRef(target, owner metav1.Object, ownerType *metav1.TypeMeta) bool { + if hasOwnerRef(target, owner) { + return false + } + ownerRefs := append( + target.GetOwnerReferences(), + metav1.OwnerReference{ + APIVersion: ownerType.APIVersion, + Kind: ownerType.Kind, + Name: owner.GetName(), + UID: owner.GetUID(), + }) + target.SetOwnerReferences(ownerRefs) + return true +} + +func getInactivePods(reader client.Reader, cs *appsv1alpha1.CloneSet) ([]*v1.Pod, error) { + opts := &client.ListOptions{ + Namespace: cs.Namespace, + FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(cs.UID)}), + } + // Including pods in terminating state + pods, err := clonesetutils.GetInactivePods(reader, opts) + if err != nil { + return nil, err + } + return pods, nil +} + +func (r *realControl) updatePVC(cs *appsv1alpha1.CloneSet, pvc *v1.PersistentVolumeClaim) (bool, error) { + var modified bool + if err := r.Client.Update(context.TODO(), pvc); err != nil { + r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedUpdate", "failed to update PVC %s: %v", pvc.Name, err) + return modified, err + } + return true, nil +} + +func (r *realControl) deletePVC(cs *appsv1alpha1.CloneSet, pvc *v1.PersistentVolumeClaim) (bool, error) { + var modified bool + clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name) + if err := r.Delete(context.TODO(), pvc); err != nil { + clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name) + r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to clean up PVC %s: %v", pvc.Name, err) + return modified, err + } + return true, nil +} + +func updateClaimOwnerRefToPod(pvc *v1.PersistentVolumeClaim, cs *appsv1alpha1.CloneSet, pod *v1.Pod) bool { + needsUpdate := false + updateMeta := func(tm *metav1.TypeMeta) { + if tm.APIVersion == "" { + tm.APIVersion = "v1" + } + if tm.Kind == "" { + tm.Kind = "kind" + } + } + + needsUpdate = removeOwnerRef(pvc, cs) + podMeta := &pod.TypeMeta + updateMeta(podMeta) + return setOwnerRef(pvc, pod, podMeta) || needsUpdate +} diff --git a/pkg/controller/cloneset/utils/cloneset_utils.go b/pkg/controller/cloneset/utils/cloneset_utils.go index 9b0cc0a332..27e8b40fe7 100644 --- a/pkg/controller/cloneset/utils/cloneset_utils.go +++ b/pkg/controller/cloneset/utils/cloneset_utils.go @@ -24,7 +24,6 @@ import ( appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1" "github.com/openkruise/kruise/pkg/features" - utilclient "github.com/openkruise/kruise/pkg/util/client" "github.com/openkruise/kruise/pkg/util/expectations" utilfeature "github.com/openkruise/kruise/pkg/util/feature" "github.com/openkruise/kruise/pkg/util/requeueduration" @@ -93,22 +92,53 @@ func GetControllerKey(cs *appsv1alpha1.CloneSet) string { // GetActivePods returns all active pods in this namespace. func GetActivePods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, error) { - podList := &v1.PodList{} - if err := reader.List(context.TODO(), podList, opts, utilclient.DisableDeepCopy); err != nil { + podList, err := GetAllPods(reader, opts) + if err != nil { return nil, err } // Ignore inactive pods var activePods []*v1.Pod - for i, pod := range podList.Items { + for i, pod := range podList { // Consider all rebuild pod as active pod, should not recreate - if kubecontroller.IsPodActive(&pod) { - activePods = append(activePods, &podList.Items[i]) + if kubecontroller.IsPodActive(pod) { + activePods = append(activePods, podList[i]) } } return activePods, nil } +// GetAllPods returns all pods in this namespace. +func GetInactivePods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, error) { + podList, err := GetAllPods(reader, opts) + if err != nil { + return nil, err + } + + var inactivePods []*v1.Pod + for i, pod := range podList { + // Consider all rebuild pod as active pod, should not recreate + if !kubecontroller.IsPodActive(pod) { + inactivePods = append(inactivePods, podList[i]) + } + } + return inactivePods, nil +} + +// GetAllPods returns all pods in this namespace. +func GetAllPods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, error) { + podList := &v1.PodList{} + if err := reader.List(context.TODO(), podList, opts); err != nil { + return nil, err + } + + var pods []*v1.Pod + for i := range podList.Items { + pods = append(pods, &podList.Items[i]) + } + return pods, nil +} + // NextRevision finds the next valid revision number based on revisions. If the length of revisions // is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method // assumes that revisions has been sorted by Revision. @@ -166,6 +196,14 @@ func UpdateStorage(cs *appsv1alpha1.CloneSet, pod *v1.Pod) { pod.Spec.Volumes = newVolumes } +// IsPodTerminating judges if pod is in terminating state. +func IsPodTerminating(pod *v1.Pod) bool { + if pod.DeletionTimestamp != nil { + return true + } + return false +} + // GetPersistentVolumeClaims gets a map of PersistentVolumeClaims to their template names, as defined in set. The // returned PersistentVolumeClaims are each constructed with a the name specific to the Pod. This name is determined // by getPersistentVolumeClaimName. @@ -233,3 +271,12 @@ func DoItSlowly(count int, initialBatchSize int, fn func() error) (int, error) { } return successes, nil } + +func IsPVCAndPodRelated(pvc *v1.PersistentVolumeClaim, pod *v1.Pod) bool { + pvcIns := pvc.Labels[appsv1alpha1.CloneSetInstanceID] + podIns := pod.Labels[appsv1alpha1.CloneSetInstanceID] + if pvcIns == "" || podIns == "" { + return false + } + return pvcIns == podIns +}