Skip to content

Commit

Permalink
enable cloneset deleting pvc when pod hanging (#1113)
Browse files Browse the repository at this point in the history
Signed-off-by: willise <willisesun@gmail.com>

Signed-off-by: willise <willisesun@gmail.com>
  • Loading branch information
willise authored Jan 3, 2023
1 parent 69512c6 commit 7b23f68
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 21 deletions.
4 changes: 4 additions & 0 deletions apis/apps/v1alpha1/cloneset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type CloneSetScaleStrategy struct {
// The scale will fail if the number of unavailable pods were greater than this MaxUnavailable at scaling up.
// MaxUnavailable works only when scaling up.
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`

// Indicate if cloneset will reuse already existed pvc to
// rebuild a new pod
DisablePVCReuse bool `json:"disablePVCReuse,omitempty"`
}

// CloneSetUpdateStrategy defines strategies for pods update.
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/apps.kruise.io_clonesets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ spec:
description: ScaleStrategy indicates the ScaleStrategy that will be
employed to create and delete Pods in the CloneSet.
properties:
disablePVCReuse:
description: Indicate if cloneset will reuse already existed pvc
to rebuild a new pod
type: boolean
maxUnavailable:
anyOf:
- type: integer
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/apps.kruise.io_uniteddeployments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,10 @@ spec:
that will be employed to create and delete Pods in the
CloneSet.
properties:
disablePVCReuse:
description: Indicate if cloneset will reuse already
existed pvc to rebuild a new pod
type: boolean
maxUnavailable:
anyOf:
- type: integer
Expand Down
152 changes: 146 additions & 6 deletions pkg/controller/cloneset/cloneset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,25 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
}

// list all active Pods and PVCs belongs to cs
filteredPods, filteredPVCs, err := r.getOwnedResource(instance)
// list active and inactive Pods belongs to cs
filteredPods, filterOutPods, err := r.getOwnedPods(instance)
if err != nil {
return reconcile.Result{}, err
}
filteredPVCs, err := r.getOwnedPVCs(instance)
if err != nil {
return reconcile.Result{}, err
}

// If mcloneset doesn't want to reuse pvc, clean up
// the existing pvc first. Then it looks like the pod
// is deleted by controller, new pod can be created.
if instance.Spec.ScaleStrategy.DisablePVCReuse {
filteredPVCs, err = r.cleanupPVCs(instance, filteredPods, filterOutPods, filteredPVCs)
if err != nil {
return reconcile.Result{}, err
}
}

//release Pods ownerRef
filteredPods, err = r.claimPods(instance, filteredPods)
Expand Down Expand Up @@ -449,20 +463,29 @@ func (r *ReconcileCloneSet) getActiveRevisions(cs *appsv1alpha1.CloneSet, revisi
return currentRevision, updateRevision, collisionCount, nil
}

func (r *ReconcileCloneSet) getOwnedResource(cs *appsv1alpha1.CloneSet) ([]*v1.Pod, []*v1.PersistentVolumeClaim, error) {
func (r *ReconcileCloneSet) getOwnedPods(cs *appsv1alpha1.CloneSet) ([]*v1.Pod, []*v1.Pod, error) {
opts := &client.ListOptions{
Namespace: cs.Namespace,
FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(cs.UID)}),
}

filteredPods, err := clonesetutils.GetActivePods(r.Client, opts)
filteredPods, filterOutPods, err := clonesetutils.GetActiveAndInactivePods(r.Client, opts)
if err != nil {
return nil, nil, err
}

return filteredPods, filterOutPods, nil
}

func (r *ReconcileCloneSet) getOwnedPVCs(cs *appsv1alpha1.CloneSet) ([]*v1.PersistentVolumeClaim, error) {
opts := &client.ListOptions{
Namespace: cs.Namespace,
FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(cs.UID)}),
}

pvcList := v1.PersistentVolumeClaimList{}
if err := r.List(context.TODO(), &pvcList, opts, utilclient.DisableDeepCopy); err != nil {
return nil, nil, err
return nil, err
}
var filteredPVCs []*v1.PersistentVolumeClaim
for i, pvc := range pvcList.Items {
Expand All @@ -471,7 +494,7 @@ func (r *ReconcileCloneSet) getOwnedResource(cs *appsv1alpha1.CloneSet) ([]*v1.P
}
}

return filteredPods, filteredPVCs, nil
return filteredPVCs, nil
}

// truncatePodsToDelete truncates any non-live pod names in spec.scaleStrategy.podsToDelete.
Expand Down Expand Up @@ -571,3 +594,120 @@ func (r *ReconcileCloneSet) claimPods(instance *appsv1alpha1.CloneSet, pods []*v

return claimedPods, nil
}

func (r *ReconcileCloneSet) cleanupPVCs(
cs *appsv1alpha1.CloneSet,
activePods, filterOutPods []*v1.Pod,
pvcs []*v1.PersistentVolumeClaim,
) ([]*v1.PersistentVolumeClaim, error) {
// If useless pvc owner pod does not exist, the pvc can be deleted directly,
// else update pvc's ownerreference to pod.
toDeletePVCs, usedPVCs := filterPVCsByIfUsing(activePods, pvcs)
if len(toDeletePVCs) == 0 {
return usedPVCs, nil
}

klog.V(3).Infof("Begin to clean up mcloneset %s/%s useless PVCs", cs.Namespace, cs.Name)
toDeletePVCsMap := generatePodAndPVCsMap(toDeletePVCs)
for _, pod := range filterOutPods {
instanceID := clonesetutils.GetInstanceID(pod)
if pvcs, ok := toDeletePVCsMap[instanceID]; ok {
if err := r.updatePVCs(cs, pod, pvcs); err != nil {
return nil, err
}
// Left pvcs will be deleted directly.
delete(toDeletePVCsMap, instanceID)
}
}

for _, pvcs := range toDeletePVCsMap {
// It's safe to delete pvc that has no pod found.
if err := r.deletePVCs(cs, pvcs); err != nil {
return nil, err
}
}
return usedPVCs, nil
}

func (r *ReconcileCloneSet) updatePVCs(cs *appsv1alpha1.CloneSet, pod *v1.Pod, pvcs []*v1.PersistentVolumeClaim) error {
for i := range pvcs {
pvc := pvcs[i]
if updateClaimOwnerRefToPod(pvc, cs, pod) {
if err := r.updateOnePVC(cs, pvcs[i]); err != nil && !errors.IsNotFound(err) {
return err
}
}
}
return nil
}

func (r *ReconcileCloneSet) updateOnePVC(cs *appsv1alpha1.CloneSet, pvc *v1.PersistentVolumeClaim) error {
if err := r.Client.Update(context.TODO(), pvc); err != nil {
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedUpdate", "failed to update PVC %s: %v", pvc.Name, err)
return err
}
return nil
}
func (r *ReconcileCloneSet) deletePVCs(cs *appsv1alpha1.CloneSet, pvcs []*v1.PersistentVolumeClaim) error {
for i := range pvcs {
if err := r.deleteOnePVC(cs, pvcs[i]); err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}

func (r *ReconcileCloneSet) deleteOnePVC(cs *appsv1alpha1.CloneSet, pvc *v1.PersistentVolumeClaim) error {
clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
if err := r.Delete(context.TODO(), pvc); err != nil {
clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to clean up PVC %s: %v", pvc.Name, err)
return err
}
return nil
}

func updateClaimOwnerRefToPod(pvc *v1.PersistentVolumeClaim, cs *appsv1alpha1.CloneSet, pod *v1.Pod) bool {
needsUpdate := false
needsUpdate = util.RemoveOwnerRef(pvc, cs)
return util.SetOwnerRef(pvc, pod, metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"}) || needsUpdate
}

func filterPVCsByIfUsing(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) (toDeletePVCs, usedPVCs []*v1.PersistentVolumeClaim) {
activeIDs := getInstanceIDsFromPods(pods)
for i := range pvcs {
pvc := pvcs[i]
id := clonesetutils.GetInstanceID(pvc)
if id == "" {
continue
}
if !activeIDs.Has(id) {
toDeletePVCs = append(toDeletePVCs, pvc)
} else {
usedPVCs = append(usedPVCs, pvc)
}
}
return
}

func generatePodAndPVCsMap(pvcs []*v1.PersistentVolumeClaim) map[string][]*v1.PersistentVolumeClaim {
// pod may have multiple pvcs
pvcsMap := map[string][]*v1.PersistentVolumeClaim{}
for i := range pvcs {
pvc := pvcs[i]
if id := clonesetutils.GetInstanceID(pvc); id != "" {
pvcsMap[id] = append(pvcsMap[id], pvc)
}
}
return pvcsMap
}

func getInstanceIDsFromPods(pods []*v1.Pod) sets.String {
ins := sets.NewString()
for _, pod := range pods {
if id := clonesetutils.GetInstanceID(pod); id != "" {
ins.Insert(id)
}
}
return ins
}
9 changes: 5 additions & 4 deletions pkg/controller/cloneset/sync/cloneset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@ import (
"sync"
"sync/atomic"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/lifecycle"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)

const (
Expand Down
59 changes: 48 additions & 11 deletions pkg/controller/cloneset/utils/cloneset_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ import (
"strings"
"sync"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/expectations"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/requeueduration"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -36,6 +30,13 @@ import (
kubecontroller "k8s.io/kubernetes/pkg/controller"
"k8s.io/utils/integer"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/expectations"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/requeueduration"
)

var (
Expand Down Expand Up @@ -93,22 +94,54 @@ func GetControllerKey(cs *appsv1alpha1.CloneSet) string {

// GetActivePods returns all active pods in this namespace.
func GetActivePods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, error) {
podList := &v1.PodList{}
if err := reader.List(context.TODO(), podList, opts, utilclient.DisableDeepCopy); err != nil {
podList, err := GetAllPods(reader, opts)
if err != nil {
return nil, err
}

// Ignore inactive pods
var activePods []*v1.Pod
for i, pod := range podList.Items {
for i := range podList {
pod := podList[i]
// Consider all rebuild pod as active pod, should not recreate
if kubecontroller.IsPodActive(&pod) {
activePods = append(activePods, &podList.Items[i])
if kubecontroller.IsPodActive(pod) {
activePods = append(activePods, pod)
}
}
return activePods, nil
}

func GetActiveAndInactivePods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, []*v1.Pod, error) {
podList, err := GetAllPods(reader, opts)
if err != nil {
return nil, nil, err
}
var activePods, inactivePods []*v1.Pod
for i := range podList {
pod := podList[i]
if kubecontroller.IsPodActive(pod) {
activePods = append(activePods, pod)
} else {
inactivePods = append(inactivePods, pod)
}
}
return activePods, inactivePods, nil
}

// GetAllPods returns all pods in this namespace.
func GetAllPods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, error) {
podList := &v1.PodList{}
if err := reader.List(context.TODO(), podList, opts, utilclient.DisableDeepCopy); err != nil {
return nil, err
}

var pods []*v1.Pod
for i := range podList.Items {
pods = append(pods, &podList.Items[i])
}
return pods, nil
}

// NextRevision finds the next valid revision number based on revisions. If the length of revisions
// is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method
// assumes that revisions has been sorted by Revision.
Expand Down Expand Up @@ -166,6 +199,10 @@ func UpdateStorage(cs *appsv1alpha1.CloneSet, pod *v1.Pod) {
pod.Spec.Volumes = newVolumes
}

func GetInstanceID(obj metav1.Object) string {
return obj.GetLabels()[appsv1alpha1.CloneSetInstanceID]
}

// GetPersistentVolumeClaims gets a map of PersistentVolumeClaims to their template names, as defined in set. The
// returned PersistentVolumeClaims are each constructed with a the name specific to the Pod. This name is determined
// by getPersistentVolumeClaimName.
Expand Down
Loading

0 comments on commit 7b23f68

Please sign in to comment.