Skip to content

Commit

Permalink
enhancement (CollaSet): support ResourceContext after CollaSet is del… (
Browse files Browse the repository at this point in the history
#40)

* enhancement (CollaSet): support ResourceContext after CollaSet is deleted
  • Loading branch information
wu8685 committed Aug 21, 2023
1 parent 3d9f496 commit 93d00d0
Show file tree
Hide file tree
Showing 8 changed files with 126 additions and 410 deletions.
28 changes: 28 additions & 0 deletions pkg/controllers/collaset/collaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

appsv1alpha1 "kusionstack.io/kafed/apis/apps/v1alpha1"
"kusionstack.io/kafed/pkg/controllers/collaset/podcontext"
"kusionstack.io/kafed/pkg/controllers/collaset/podcontrol"
"kusionstack.io/kafed/pkg/controllers/collaset/synccontrol"
"kusionstack.io/kafed/pkg/controllers/collaset/utils"
Expand All @@ -46,6 +48,8 @@ import (

const (
controllerName = "collaset-controller"

preReclaimFinalizer = "apps.kusionstack.io/pre-reclaim"
)

// CollaSetReconciler reconciles a CollaSet object
Expand Down Expand Up @@ -130,6 +134,21 @@ func (r *CollaSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}

if instance.DeletionTimestamp != nil {
if controllerutil.ContainsFinalizer(instance, preReclaimFinalizer) {
// reclaim owner IDs in ResourceContext
if err := r.reclaimResourceContext(instance); err != nil {
return ctrl.Result{}, err
}
}

return ctrl.Result{}, nil
}

if !controllerutil.ContainsFinalizer(instance, preReclaimFinalizer) {
return ctrl.Result{}, controllerutils.AddFinalizer(context.TODO(), r, instance, preReclaimFinalizer)
}

currentRevision, updatedRevision, revisions, collisionCount, _, err := r.revisionManager.ConstructRevisions(instance, false)
if err != nil {
return ctrl.Result{}, fmt.Errorf("fail to construct revision for CollaSet %s/%s: %s", instance.Namespace, instance.Name, err)
Expand Down Expand Up @@ -248,3 +267,12 @@ func (r *CollaSetReconciler) updateStatus(ctx context.Context, instance *appsv1a

return err
}

func (r *CollaSetReconciler) reclaimResourceContext(cls *appsv1alpha1.CollaSet) error {
// clean the owner IDs from this CollaSet
if err := podcontext.UpdateToPodContext(r, cls, nil); err != nil {
return err
}

return controllerutils.RemoveFinalizer(context.TODO(), r, cls, preReclaimFinalizer)
}
4 changes: 4 additions & 0 deletions pkg/controllers/collaset/podcontext/podcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func UpdateToPodContext(c client.Client, instance *appsv1alpha1.CollaSet, ownedI
return fmt.Errorf("fail to find ResourceContext %s/%s: %s", instance.Namespace, contextName, err)
}

if len(ownedIDs) == 0 {
return nil
}

if err := doCreatePodContext(c, instance, ownedIDs); err != nil {
return fmt.Errorf("fail to create ResourceContext %s/%s after not found: %s", instance.Namespace, contextName, err)
}
Expand Down
65 changes: 62 additions & 3 deletions pkg/controllers/resourcecontext/resourcecontext_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ var (
var _ = Describe("ResourceContext controller", func() {

It("resource context reconcile", func() {
testcase := "test-reclaim"
testcase := "test-rc-reconcile"
Expect(createNamespace(c, testcase)).Should(BeNil())

cs := &appsv1alpha1.CollaSet{
Expand Down Expand Up @@ -169,7 +169,66 @@ var _ = Describe("ResourceContext controller", func() {

Eventually(func() error {
return c.Get(context.TODO(), types.NamespacedName{Namespace: cs.Namespace, Name: cs.Name}, resourceContext)
}, 500*time.Second, 1*time.Second).ShouldNot(BeNil())
}, 5*time.Second, 1*time.Second).ShouldNot(BeNil())
})

It("resource context reclaim", func() {
testcase := "test-rc-reclaim"
Expect(createNamespace(c, testcase)).Should(BeNil())

cs := &appsv1alpha1.CollaSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: testcase,
Name: "foo",
},
Spec: appsv1alpha1.CollaSetSpec{
Replicas: int32Pointer(2),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "foo",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "foo",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "foo",
Image: "nginx:v1",
},
},
},
},
},
}

Expect(c.Create(context.TODO(), cs)).Should(BeNil())

podList := &corev1.PodList{}
Eventually(func() bool {
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
return len(podList.Items) == 2
}, 5*time.Second, 1*time.Second).Should(BeTrue())
Expect(c.Get(context.TODO(), types.NamespacedName{Namespace: cs.Namespace, Name: cs.Name}, cs)).Should(BeNil())
Expect(expectedStatusReplicas(c, cs, 0, 0, 0, 2, 2, 0, 0, 0)).Should(BeNil())

Expect(c.Delete(context.TODO(), cs)).Should(BeNil())
for _, pod := range podList.Items {
Expect(c.Delete(context.TODO(), &pod)).Should(BeNil())
}
Eventually(func() bool {
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
return len(podList.Items) == 0
}, 5*time.Second, 1*time.Second).Should(BeTrue())

resourceContext := &appsv1alpha1.ResourceContext{}
Eventually(func() error {
return c.Get(context.TODO(), types.NamespacedName{Namespace: cs.Namespace, Name: cs.Name}, resourceContext)
}, 5*time.Second, 1*time.Second).ShouldNot(BeNil())
})
})

Expand Down Expand Up @@ -324,7 +383,7 @@ var _ = AfterEach(func() {
Expect(mgr.GetClient().List(context.Background(), csList)).Should(BeNil())

for i := range csList.Items {
Expect(mgr.GetClient().Delete(context.TODO(), &csList.Items[i])).Should(BeNil())
mgr.GetClient().Delete(context.TODO(), &csList.Items[i])
}

nsList := &corev1.NamespaceList{}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/ruleset/register/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package register
import (
corev1 "k8s.io/api/core/v1"

"kusionstack.io/kafed/pkg/utils"
"kusionstack.io/kafed/pkg/controllers/utils"
)

var (
Expand Down
14 changes: 11 additions & 3 deletions pkg/controllers/ruleset/ruleset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -45,7 +46,7 @@ import (
"kusionstack.io/kafed/pkg/controllers/ruleset/processor"
"kusionstack.io/kafed/pkg/controllers/ruleset/register"
rulesetutils "kusionstack.io/kafed/pkg/controllers/ruleset/utils"
"kusionstack.io/kafed/pkg/utils"
"kusionstack.io/kafed/pkg/controllers/utils"
)

const (
Expand Down Expand Up @@ -136,13 +137,20 @@ func (r *RuleSetReconciler) Reconcile(ctx context.Context, request reconcile.Req
if err := r.cleanUpRuleSetPods(ctx, ruleSet); err != nil {
return reconcile.Result{}, err
}

if !controllerutil.ContainsFinalizer(ruleSet, cleanUpFinalizer) {
return reconcile.Result{}, nil
}

return reconcile.Result{}, utils.RemoveFinalizer(ctx, r.Client, ruleSet, cleanUpFinalizer)
}
msg := fmt.Sprintf("can not delete ruleset: there are some pods waiting for process by ruleset %s/%s. Please terminate pods first or label ruleset kafed.kusionstack.io/terminating=true to force delete it", ruleSet.Namespace, ruleSet.Name)
result.RequeueAfter = 5 * time.Second
r.recorder.Event(ruleSet, corev1.EventTypeWarning, "BlockProtection", msg)
} else if err := utils.AddFinalizer(ctx, r.Client, ruleSet, cleanUpFinalizer); err != nil {
return result, fmt.Errorf("fail to add finalizer on RuleSet %s: %s", request, err)
} else if !controllerutil.ContainsFinalizer(ruleSet, cleanUpFinalizer) {
if err := utils.AddFinalizer(ctx, r.Client, ruleSet, cleanUpFinalizer); err != nil {
return result, fmt.Errorf("fail to add finalizer on RuleSet %s: %s", request, err)
}
}

selectedPodNames := sets.String{}
Expand Down
16 changes: 10 additions & 6 deletions pkg/utils/finalizer.go → pkg/controllers/utils/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ func ObjectKey(obj client.Object) string {
}

func RemoveFinalizer(ctx context.Context, c client.Client, obj client.Object, finalizer string) error {
if !controllerutil.ContainsFinalizer(obj, finalizer) {
return nil
}
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
controllerutil.RemoveFinalizer(obj, finalizer)
var updateErr error
Expand All @@ -50,9 +47,6 @@ func RemoveFinalizer(ctx context.Context, c client.Client, obj client.Object, fi
}

func AddFinalizer(ctx context.Context, c client.Client, obj client.Object, finalizer string) error {
if controllerutil.ContainsFinalizer(obj, finalizer) {
return nil
}
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
controllerutil.AddFinalizer(obj, finalizer)
var updateErr error
Expand All @@ -65,3 +59,13 @@ func AddFinalizer(ctx context.Context, c client.Client, obj client.Object, final
return updateErr
})
}

func ContainsFinalizer(obj client.Object, finalizer string) bool {
for _, f := range obj.GetFinalizers() {
if f == finalizer {
return true
}
}

return false
}
10 changes: 10 additions & 0 deletions pkg/controllers/utils/pod_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,16 @@ func IsPodReady(pod *corev1.Pod) bool {
return IsPodReadyConditionTrue(pod.Status)
}

// IsPodTerminal returns true if a pod is terminal, all containers are stopped and cannot ever regress.
func IsPodTerminal(pod *corev1.Pod) bool {
return IsPodPhaseTerminal(pod.Status.Phase)
}

// IsPodPhaseTerminal returns true if the pod's phase is terminal.
func IsPodPhaseTerminal(phase corev1.PodPhase) bool {
return phase == corev1.PodFailed || phase == corev1.PodSucceeded
}

// IsPodReadyConditionTrue returns true if a pod is ready; false otherwise.
func IsPodReadyConditionTrue(status corev1.PodStatus) bool {
condition := GetPodReadyCondition(status)
Expand Down
Loading

0 comments on commit 93d00d0

Please sign in to comment.