Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable cloneset deleting pvc when pod hanging #1113

Merged
merged 1 commit into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apis/apps/v1alpha1/cloneset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ type CloneSetScaleStrategy struct {
// The scale will fail if the number of unavailable pods were greater than this MaxUnavailable at scaling up.
// MaxUnavailable works only when scaling up.
MaxUnavailable *intstr.IntOrString `json:"maxUnavailable,omitempty"`

// Indicate if cloneset will reuse already existed pvc to
// rebuild a new pod
DisablePVCReuse bool `json:"disablePVCReuse,omitempty"`
}

// CloneSetUpdateStrategy defines strategies for pods update.
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/apps.kruise.io_clonesets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ spec:
description: ScaleStrategy indicates the ScaleStrategy that will be
employed to create and delete Pods in the CloneSet.
properties:
disablePVCReuse:
description: Indicate if cloneset will reuse already existed pvc
to rebuild a new pod
type: boolean
maxUnavailable:
anyOf:
- type: integer
Expand Down
4 changes: 4 additions & 0 deletions config/crd/bases/apps.kruise.io_uniteddeployments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,10 @@ spec:
that will be employed to create and delete Pods in the
CloneSet.
properties:
disablePVCReuse:
description: Indicate if cloneset will reuse already
existed pvc to rebuild a new pod
type: boolean
maxUnavailable:
anyOf:
- type: integer
Expand Down
152 changes: 146 additions & 6 deletions pkg/controller/cloneset/cloneset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,25 @@ func (r *ReconcileCloneSet) doReconcile(request reconcile.Request) (res reconcil
return reconcile.Result{RequeueAfter: expectations.ExpectationTimeout - unsatisfiedDuration}, nil
}

// list all active Pods and PVCs belongs to cs
filteredPods, filteredPVCs, err := r.getOwnedResource(instance)
// list active and inactive Pods belongs to cs
filteredPods, filterOutPods, err := r.getOwnedPods(instance)
if err != nil {
return reconcile.Result{}, err
}
filteredPVCs, err := r.getOwnedPVCs(instance)
if err != nil {
return reconcile.Result{}, err
}

// If mcloneset doesn't want to reuse pvc, clean up
// the existing pvc first. Then it looks like the pod
// is deleted by controller, new pod can be created.
if instance.Spec.ScaleStrategy.DisablePVCReuse {
filteredPVCs, err = r.cleanupPVCs(instance, filteredPods, filterOutPods, filteredPVCs)
if err != nil {
return reconcile.Result{}, err
}
}

//release Pods ownerRef
filteredPods, err = r.claimPods(instance, filteredPods)
Expand Down Expand Up @@ -449,20 +463,29 @@ func (r *ReconcileCloneSet) getActiveRevisions(cs *appsv1alpha1.CloneSet, revisi
return currentRevision, updateRevision, collisionCount, nil
}

func (r *ReconcileCloneSet) getOwnedResource(cs *appsv1alpha1.CloneSet) ([]*v1.Pod, []*v1.PersistentVolumeClaim, error) {
func (r *ReconcileCloneSet) getOwnedPods(cs *appsv1alpha1.CloneSet) ([]*v1.Pod, []*v1.Pod, error) {
opts := &client.ListOptions{
Namespace: cs.Namespace,
FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(cs.UID)}),
}

filteredPods, err := clonesetutils.GetActivePods(r.Client, opts)
filteredPods, filterOutPods, err := clonesetutils.GetActiveAndInactivePods(r.Client, opts)
if err != nil {
return nil, nil, err
}

return filteredPods, filterOutPods, nil
}

func (r *ReconcileCloneSet) getOwnedPVCs(cs *appsv1alpha1.CloneSet) ([]*v1.PersistentVolumeClaim, error) {
opts := &client.ListOptions{
Namespace: cs.Namespace,
FieldSelector: fields.SelectorFromSet(fields.Set{fieldindex.IndexNameForOwnerRefUID: string(cs.UID)}),
}

pvcList := v1.PersistentVolumeClaimList{}
if err := r.List(context.TODO(), &pvcList, opts, utilclient.DisableDeepCopy); err != nil {
return nil, nil, err
return nil, err
}
var filteredPVCs []*v1.PersistentVolumeClaim
for i, pvc := range pvcList.Items {
Expand All @@ -471,7 +494,7 @@ func (r *ReconcileCloneSet) getOwnedResource(cs *appsv1alpha1.CloneSet) ([]*v1.P
}
}

return filteredPods, filteredPVCs, nil
return filteredPVCs, nil
}

// truncatePodsToDelete truncates any non-live pod names in spec.scaleStrategy.podsToDelete.
Expand Down Expand Up @@ -571,3 +594,120 @@ func (r *ReconcileCloneSet) claimPods(instance *appsv1alpha1.CloneSet, pods []*v

return claimedPods, nil
}

func (r *ReconcileCloneSet) cleanupPVCs(
cs *appsv1alpha1.CloneSet,
activePods, filterOutPods []*v1.Pod,
pvcs []*v1.PersistentVolumeClaim,
) ([]*v1.PersistentVolumeClaim, error) {
// If useless pvc owner pod does not exist, the pvc can be deleted directly,
// else update pvc's ownerreference to pod.
toDeletePVCs, usedPVCs := filterPVCsByIfUsing(activePods, pvcs)
if len(toDeletePVCs) == 0 {
return usedPVCs, nil
}

klog.V(3).Infof("Begin to clean up mcloneset %s/%s useless PVCs", cs.Namespace, cs.Name)
toDeletePVCsMap := generatePodAndPVCsMap(toDeletePVCs)
for _, pod := range filterOutPods {
instanceID := clonesetutils.GetInstanceID(pod)
if pvcs, ok := toDeletePVCsMap[instanceID]; ok {
if err := r.updatePVCs(cs, pod, pvcs); err != nil {
return nil, err
}
// Left pvcs will be deleted directly.
delete(toDeletePVCsMap, instanceID)
}
}

for _, pvcs := range toDeletePVCsMap {
// It's safe to delete pvc that has no pod found.
if err := r.deletePVCs(cs, pvcs); err != nil {
return nil, err
}
}
return usedPVCs, nil
}

func (r *ReconcileCloneSet) updatePVCs(cs *appsv1alpha1.CloneSet, pod *v1.Pod, pvcs []*v1.PersistentVolumeClaim) error {
for i := range pvcs {
pvc := pvcs[i]
if updateClaimOwnerRefToPod(pvc, cs, pod) {
if err := r.updateOnePVC(cs, pvcs[i]); err != nil && !errors.IsNotFound(err) {
return err
}
}
}
return nil
}

func (r *ReconcileCloneSet) updateOnePVC(cs *appsv1alpha1.CloneSet, pvc *v1.PersistentVolumeClaim) error {
if err := r.Client.Update(context.TODO(), pvc); err != nil {
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedUpdate", "failed to update PVC %s: %v", pvc.Name, err)
return err
}
return nil
}
func (r *ReconcileCloneSet) deletePVCs(cs *appsv1alpha1.CloneSet, pvcs []*v1.PersistentVolumeClaim) error {
for i := range pvcs {
if err := r.deleteOnePVC(cs, pvcs[i]); err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}

func (r *ReconcileCloneSet) deleteOnePVC(cs *appsv1alpha1.CloneSet, pvc *v1.PersistentVolumeClaim) error {
clonesetutils.ScaleExpectations.ExpectScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
if err := r.Delete(context.TODO(), pvc); err != nil {
clonesetutils.ScaleExpectations.ObserveScale(clonesetutils.GetControllerKey(cs), expectations.Delete, pvc.Name)
r.recorder.Eventf(cs, v1.EventTypeWarning, "FailedDelete", "failed to clean up PVC %s: %v", pvc.Name, err)
return err
}
return nil
}

func updateClaimOwnerRefToPod(pvc *v1.PersistentVolumeClaim, cs *appsv1alpha1.CloneSet, pod *v1.Pod) bool {
needsUpdate := false
needsUpdate = util.RemoveOwnerRef(pvc, cs)
return util.SetOwnerRef(pvc, pod, metav1.TypeMeta{Kind: "Pod", APIVersion: "v1"}) || needsUpdate
}

func filterPVCsByIfUsing(pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) (toDeletePVCs, usedPVCs []*v1.PersistentVolumeClaim) {
activeIDs := getInstanceIDsFromPods(pods)
for i := range pvcs {
pvc := pvcs[i]
id := clonesetutils.GetInstanceID(pvc)
if id == "" {
continue
}
if !activeIDs.Has(id) {
toDeletePVCs = append(toDeletePVCs, pvc)
} else {
usedPVCs = append(usedPVCs, pvc)
}
}
return
}

func generatePodAndPVCsMap(pvcs []*v1.PersistentVolumeClaim) map[string][]*v1.PersistentVolumeClaim {
// pod may have multiple pvcs
pvcsMap := map[string][]*v1.PersistentVolumeClaim{}
for i := range pvcs {
pvc := pvcs[i]
if id := clonesetutils.GetInstanceID(pvc); id != "" {
pvcsMap[id] = append(pvcsMap[id], pvc)
}
}
return pvcsMap
}

func getInstanceIDsFromPods(pods []*v1.Pod) sets.String {
ins := sets.NewString()
for _, pod := range pods {
if id := clonesetutils.GetInstanceID(pod); id != "" {
ins.Insert(id)
}
}
return ins
}
9 changes: 5 additions & 4 deletions pkg/controller/cloneset/sync/cloneset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,18 @@ import (
"sync"
"sync/atomic"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/expectations"
"github.com/openkruise/kruise/pkg/util/lifecycle"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)

const (
Expand Down
59 changes: 48 additions & 11 deletions pkg/controller/cloneset/utils/cloneset_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ import (
"strings"
"sync"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/expectations"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/requeueduration"
apps "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -36,6 +30,13 @@ import (
kubecontroller "k8s.io/kubernetes/pkg/controller"
"k8s.io/utils/integer"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/features"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/expectations"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/requeueduration"
)

var (
Expand Down Expand Up @@ -93,22 +94,54 @@ func GetControllerKey(cs *appsv1alpha1.CloneSet) string {

// GetActivePods returns all active pods in this namespace.
func GetActivePods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, error) {
podList := &v1.PodList{}
if err := reader.List(context.TODO(), podList, opts, utilclient.DisableDeepCopy); err != nil {
podList, err := GetAllPods(reader, opts)
if err != nil {
return nil, err
}

// Ignore inactive pods
var activePods []*v1.Pod
for i, pod := range podList.Items {
for i := range podList {
pod := podList[i]
// Consider all rebuild pod as active pod, should not recreate
if kubecontroller.IsPodActive(&pod) {
activePods = append(activePods, &podList.Items[i])
if kubecontroller.IsPodActive(pod) {
activePods = append(activePods, pod)
}
}
return activePods, nil
}

func GetActiveAndInactivePods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, []*v1.Pod, error) {
podList, err := GetAllPods(reader, opts)
if err != nil {
return nil, nil, err
}
var activePods, inactivePods []*v1.Pod
for i := range podList {
pod := podList[i]
if kubecontroller.IsPodActive(pod) {
activePods = append(activePods, pod)
} else {
inactivePods = append(inactivePods, pod)
}
}
return activePods, inactivePods, nil
}

// GetAllPods returns all pods in this namespace.
func GetAllPods(reader client.Reader, opts *client.ListOptions) ([]*v1.Pod, error) {
podList := &v1.PodList{}
if err := reader.List(context.TODO(), podList, opts, utilclient.DisableDeepCopy); err != nil {
return nil, err
}

var pods []*v1.Pod
for i := range podList.Items {
pods = append(pods, &podList.Items[i])
}
return pods, nil
}

// NextRevision finds the next valid revision number based on revisions. If the length of revisions
// is 0 this is 1. Otherwise, it is 1 greater than the largest revision's Revision. This method
// assumes that revisions has been sorted by Revision.
Expand Down Expand Up @@ -166,6 +199,10 @@ func UpdateStorage(cs *appsv1alpha1.CloneSet, pod *v1.Pod) {
pod.Spec.Volumes = newVolumes
}

func GetInstanceID(obj metav1.Object) string {
return obj.GetLabels()[appsv1alpha1.CloneSetInstanceID]
}

// GetPersistentVolumeClaims gets a map of PersistentVolumeClaims to their template names, as defined in set. The
// returned PersistentVolumeClaims are each constructed with a the name specific to the Pod. This name is determined
// by getPersistentVolumeClaimName.
Expand Down
Loading