diff --git a/apis/apps/v1alpha1/well_known_labels.go b/apis/apps/v1alpha1/well_known_labels.go index f82d6cbb..9d185b72 100644 --- a/apis/apps/v1alpha1/well_known_labels.go +++ b/apis/apps/v1alpha1/well_known_labels.go @@ -42,6 +42,8 @@ const ( PodScalingInLabelKey = "apps.kafed.kusionstack.io/scaling-in" // --- End: Labels for CollaSet --- + + PodDeletionIndicationLabelKey = "kafed.kusionstack.io/to-delete" // Users can use this label to indicate a pod to delete ) var ( diff --git a/pkg/controllers/add_poddeletion.go b/pkg/controllers/add_poddeletion.go new file mode 100644 index 00000000..dbe8a461 --- /dev/null +++ b/pkg/controllers/add_poddeletion.go @@ -0,0 +1,25 @@ +/* +Copyright 2023 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "kusionstack.io/kafed/pkg/controllers/poddeletion" +) + +func init() { + AddToManagerFuncs = append(AddToManagerFuncs, poddeletion.Add) +} diff --git a/pkg/controllers/add_resourcecontext.go b/pkg/controllers/add_resourcecontext.go new file mode 100644 index 00000000..b2c90553 --- /dev/null +++ b/pkg/controllers/add_resourcecontext.go @@ -0,0 +1,25 @@ +/* +Copyright 2023 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "kusionstack.io/kafed/pkg/controllers/resourcecontext" +) + +func init() { + AddToManagerFuncs = append(AddToManagerFuncs, resourcecontext.Add) +} diff --git a/pkg/controllers/collaset/collaset_controller.go b/pkg/controllers/collaset/collaset_controller.go index 3d383d45..f64208c7 100644 --- a/pkg/controllers/collaset/collaset_controller.go +++ b/pkg/controllers/collaset/collaset_controller.go @@ -65,7 +65,7 @@ func Add(mgr ctrl.Manager) error { return AddToMgr(mgr, NewReconciler(mgr)) } -// newReconciler returns a new reconcile.Reconciler +// NewReconciler returns a new reconcile.Reconciler func NewReconciler(mgr ctrl.Manager) reconcile.Reconciler { recorder := mgr.GetEventRecorderFor(controllerName) @@ -73,7 +73,7 @@ func NewReconciler(mgr ctrl.Manager) reconcile.Reconciler { podControl = podcontrol.NewRealPodControl(mgr.GetClient(), mgr.GetScheme()) syncControl = synccontrol.NewRealSyncControl(mgr.GetClient(), podControl, recorder) - utils.InitExpectations(mgr.GetClient()) + collasetutils.InitExpectations(mgr.GetClient()) return &CollaSetReconciler{ Client: mgr.GetClient(), @@ -91,7 +91,6 @@ func AddToMgr(mgr ctrl.Manager, r reconcile.Reconciler) error { return err } - // Watch for changes to RuleSet err = c.Watch(&source.Kind{Type: &appsv1alpha1.CollaSet{}}, &handler.EnqueueRequestForObject{}) if err != nil { return err @@ -119,7 +118,7 @@ func (r *CollaSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if err := r.Get(ctx, req.NamespacedName, instance); err != nil { if !errors.IsNotFound(err) { klog.Error("fail to find CollaSet %s: %s", req, err) - utils.ActiveExpectations.Delete(req.Namespace, req.Name) + collasetutils.ActiveExpectations.Delete(req.Namespace, req.Name) return reconcile.Result{}, err } @@ -128,7 +127,7 @@ func (r *CollaSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } // if expectation not satisfied, shortcut this reconciling till informer cache is updated. - if satisfied, err := utils.ActiveExpectations.IsSatisfied(instance); err != nil { + if satisfied, err := collasetutils.ActiveExpectations.IsSatisfied(instance); err != nil { return ctrl.Result{}, err } else if !satisfied { klog.Warningf("CollaSet %s is not satisfied to reconcile.", req) @@ -260,7 +259,7 @@ func (r *CollaSetReconciler) updateStatus(ctx context.Context, instance *appsv1a err := r.Status().Update(ctx, instance) if err == nil { - if err := utils.ActiveExpectations.ExpectUpdate(instance, expectations.CollaSet, instance.Name, instance.ResourceVersion); err != nil { + if err := collasetutils.ActiveExpectations.ExpectUpdate(instance, expectations.CollaSet, instance.Name, instance.ResourceVersion); err != nil { return err } } diff --git a/pkg/controllers/collaset/podcontext/podcontext.go b/pkg/controllers/collaset/podcontext/podcontext.go index fb987fff..76f1b519 100644 --- a/pkg/controllers/collaset/podcontext/podcontext.go +++ b/pkg/controllers/collaset/podcontext/podcontext.go @@ -19,6 +19,8 @@ package podcontext import ( "context" "fmt" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "kusionstack.io/kafed/pkg/controllers/collaset/utils" "sort" "k8s.io/apimachinery/pkg/api/errors" @@ -26,7 +28,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" appsv1alpha1 "kusionstack.io/kafed/apis/apps/v1alpha1" - "kusionstack.io/kafed/pkg/controllers/collaset/utils" "kusionstack.io/kafed/pkg/controllers/utils/expectations" ) @@ -38,16 +39,15 @@ const ( func AllocateID(c client.Client, instance *appsv1alpha1.CollaSet, defaultRevision string, replicas int) (map[int]*appsv1alpha1.ContextDetail, error) { contextName := getContextName(instance) podContext := &appsv1alpha1.ResourceContext{} + notFound := false if err := c.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: contextName}, podContext); err != nil { if !errors.IsNotFound(err) { return nil, fmt.Errorf("fail to find ResourceContext %s/%s for owner %s: %s", instance.Namespace, contextName, instance.Name, err) } + notFound = true podContext.Namespace = instance.Namespace podContext.Name = contextName - if err := c.Create(context.TODO(), podContext); err != nil { - return nil, fmt.Errorf("fail to create ResourceContext %s/%s for owner %s after not found: %s", instance.Namespace, contextName, instance.Name, err) - } } // store all the IDs crossing Multiple workload @@ -92,7 +92,11 @@ func AllocateID(c client.Client, instance *appsv1alpha1.CollaSet, defaultRevisio ownedIDs[candidateID] = detail } - return ownedIDs, doUpdateToPodContext(c, instance, ownedIDs, podContext, instance.Name) + if notFound { + return ownedIDs, doCreatePodContext(c, instance, ownedIDs) + } + + return ownedIDs, doUpdatePodContext(c, instance, ownedIDs, podContext) } func UpdateToPodContext(c client.Client, instance *appsv1alpha1.CollaSet, ownedIDs map[int]*appsv1alpha1.ContextDetail) error { @@ -103,17 +107,36 @@ func UpdateToPodContext(c client.Client, instance *appsv1alpha1.CollaSet, ownedI return fmt.Errorf("fail to find ResourceContext %s/%s: %s", instance.Namespace, contextName, err) } - podContext.Namespace = instance.Namespace - podContext.Name = contextName - if err := c.Create(context.TODO(), podContext); err != nil { + if err := doCreatePodContext(c, instance, ownedIDs); err != nil { return fmt.Errorf("fail to create ResourceContext %s/%s after not found: %s", instance.Namespace, contextName, err) } } - return doUpdateToPodContext(c, instance, ownedIDs, podContext, instance.Name) + return doUpdatePodContext(c, instance, ownedIDs, podContext) +} + +func doCreatePodContext(c client.Client, instance *appsv1alpha1.CollaSet, ownerIDs map[int]*appsv1alpha1.ContextDetail) error { + contextName := getContextName(instance) + podContext := &appsv1alpha1.ResourceContext{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: instance.Namespace, + Name: contextName, + }, + Spec: appsv1alpha1.ResourceContextSpec{ + Contexts: make([]appsv1alpha1.ContextDetail, len(ownerIDs)), + }, + } + + i := 0 + for _, detail := range ownerIDs { + podContext.Spec.Contexts[i] = *detail + i++ + } + + return c.Create(context.TODO(), podContext) } -func doUpdateToPodContext(c client.Client, instance *appsv1alpha1.CollaSet, ownedIDs map[int]*appsv1alpha1.ContextDetail, podContext *appsv1alpha1.ResourceContext, owner string) error { +func doUpdatePodContext(c client.Client, instance client.Object, ownedIDs map[int]*appsv1alpha1.ContextDetail, podContext *appsv1alpha1.ResourceContext) error { // store all IDs crossing all workload existingIDs := map[int]*appsv1alpha1.ContextDetail{} for k, detail := range ownedIDs { @@ -122,7 +145,7 @@ func doUpdateToPodContext(c client.Client, instance *appsv1alpha1.CollaSet, owne for i := range podContext.Spec.Contexts { detail := podContext.Spec.Contexts[i] - if detail.Contains(OwnerContextKey, owner) { + if detail.Contains(OwnerContextKey, instance.GetName()) { continue } diff --git a/pkg/controllers/collaset/synccontrol/sync_control.go b/pkg/controllers/collaset/synccontrol/sync_control.go index 7dea7b51..db793c2b 100644 --- a/pkg/controllers/collaset/synccontrol/sync_control.go +++ b/pkg/controllers/collaset/synccontrol/sync_control.go @@ -18,7 +18,6 @@ package synccontrol import ( "fmt" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -166,7 +165,7 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll if pod, err := sc.podControl.CreatePod(newPod, updatedRevision); err == nil { // add an expectation for this pod creation, before next reconciling - if err := utils.ActiveExpectations.ExpectCreate(set, expectations.Pod, pod.Name); err != nil { + if err := collasetutils.ActiveExpectations.ExpectCreate(set, expectations.Pod, pod.Name); err != nil { return err } } @@ -204,7 +203,7 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll return fmt.Errorf("fail to begin PodOpsLifecycle for Scaling in Pod %s/%s: %s", pod.Namespace, pod.Name, err) } else if updated { // add an expectation for this pod creation, before next reconciling - if err := utils.ActiveExpectations.ExpectUpdate(set, expectations.Pod, pod.Name, pod.ResourceVersion); err != nil { + if err := collasetutils.ActiveExpectations.ExpectUpdate(set, expectations.Pod, pod.Name, pod.ResourceVersion); err != nil { return err } } @@ -258,7 +257,7 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll succCount, err = controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error { pod := <-podCh if err := sc.podControl.DeletePod(pod.Pod); err == nil { - if err := utils.ActiveExpectations.ExpectDelete(set, expectations.Pod, pod.Name); err != nil { + if err := collasetutils.ActiveExpectations.ExpectDelete(set, expectations.Pod, pod.Name); err != nil { return err } } @@ -345,7 +344,7 @@ func (sc *RealSyncControl) Update(instance *appsv1alpha1.CollaSet, podWrapers [] return fmt.Errorf("fail to begin PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) } else if updated { // add an expectation for this pod update, before next reconciling - if err := utils.ActiveExpectations.ExpectUpdate(instance, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { + if err := collasetutils.ActiveExpectations.ExpectUpdate(instance, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { return err } } @@ -448,7 +447,7 @@ func (sc *RealSyncControl) Update(instance *appsv1alpha1.CollaSet, podWrapers [] return fmt.Errorf("fail to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err) } else if updated { // add an expectation for this pod update, before next reconciling - if err := utils.ActiveExpectations.ExpectUpdate(instance, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { + if err := collasetutils.ActiveExpectations.ExpectUpdate(instance, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil { return err } sc.recorder.Eventf(podInfo.Pod, corev1.EventTypeNormal, "UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name) diff --git a/pkg/controllers/poddeletion/expectation.go b/pkg/controllers/poddeletion/expectation.go new file mode 100644 index 00000000..587af089 --- /dev/null +++ b/pkg/controllers/poddeletion/expectation.go @@ -0,0 +1,32 @@ +/* +Copyright 2023 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package poddeletion + +import ( + "sigs.k8s.io/controller-runtime/pkg/client" + + "kusionstack.io/kafed/pkg/controllers/utils/expectations" +) + +var ( + // activeExpectations is used to check the cache in informer is updated, before reconciling. + activeExpectations *expectations.ActiveExpectations +) + +func InitExpectations(c client.Client) { + activeExpectations = expectations.NewActiveExpectations(c) +} diff --git a/pkg/controllers/poddeletion/lifecycle_adapter.go b/pkg/controllers/poddeletion/lifecycle_adapter.go new file mode 100644 index 00000000..36d6f6ae --- /dev/null +++ b/pkg/controllers/poddeletion/lifecycle_adapter.go @@ -0,0 +1,57 @@ +/* +Copyright 2023 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package poddeletion + +import ( + "sigs.k8s.io/controller-runtime/pkg/client" + + "kusionstack.io/kafed/pkg/controllers/utils/podopslifecycle" +) + +var ( + OpsLifecycleAdapter = &PodDeleteOpsLifecycleAdapter{} +) + +// PodDeleteOpsLifecycleAdapter tells PodOpsLifecycle the Pod deletion ops info +type PodDeleteOpsLifecycleAdapter struct { +} + +// GetID indicates ID of one PodOpsLifecycle +func (a *PodDeleteOpsLifecycleAdapter) GetID() string { + return "pod-delete" +} + +// GetType indicates type for an Operator +func (a *PodDeleteOpsLifecycleAdapter) GetType() podopslifecycle.OperationType { + return podopslifecycle.OpsLifecycleTypeDelete +} + +// AllowMultiType indicates whether multiple IDs which have the same Type are allowed +func (a *PodDeleteOpsLifecycleAdapter) AllowMultiType() bool { + return true +} + +// WhenBegin will be executed when begin a lifecycle +func (a *PodDeleteOpsLifecycleAdapter) WhenBegin(pod client.Object) (bool, error) { + return false, nil +} + +// WhenFinish will be executed when finish a lifecycle +func (a *PodDeleteOpsLifecycleAdapter) WhenFinish(pod client.Object) (bool, error) { + + return false, nil +} diff --git a/pkg/controllers/poddeletion/poddeletion_controller.go b/pkg/controllers/poddeletion/poddeletion_controller.go new file mode 100644 index 00000000..3b576f80 --- /dev/null +++ b/pkg/controllers/poddeletion/poddeletion_controller.go @@ -0,0 +1,135 @@ +/* +Copyright 2023 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package poddeletion + +import ( + "context" + "fmt" + corev1 "k8s.io/api/core/v1" + "kusionstack.io/kafed/pkg/controllers/utils/expectations" + "kusionstack.io/kafed/pkg/controllers/utils/podopslifecycle" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + "kusionstack.io/kafed/pkg/controllers/collaset/utils" +) + +const ( + controllerName = "poddeletion-controller" +) + +// PodDeletionReconciler reconciles and reclaims a Pod object +type PodDeletionReconciler struct { + client.Client + + recorder record.EventRecorder +} + +func Add(mgr ctrl.Manager) error { + return AddToMgr(mgr, NewReconciler(mgr)) +} + +// NewReconciler returns a new reconcile.Reconciler +func NewReconciler(mgr ctrl.Manager) reconcile.Reconciler { + recorder := mgr.GetEventRecorderFor(controllerName) + + InitExpectations(mgr.GetClient()) + + return &PodDeletionReconciler{ + Client: mgr.GetClient(), + recorder: recorder, + } +} + +func AddToMgr(mgr ctrl.Manager, r reconcile.Reconciler) error { + // Create a new controller + c, err := controller.New(controllerName, mgr, controller.Options{ + MaxConcurrentReconciles: 5, + Reconciler: r, + }) + if err != nil { + return err + } + + err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, &PredicateDeletionIndicatedPod{}) + if err != nil { + return err + } + + return nil +} + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +func (r *PodDeletionReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + instance := &corev1.Pod{} + if err := r.Get(ctx, req.NamespacedName, instance); err != nil { + if !errors.IsNotFound(err) { + klog.Error("fail to find Pod %s: %s", req, err) + utils.ActiveExpectations.Delete(req.Namespace, req.Name) + return reconcile.Result{}, err + } + + klog.Infof("Pod %s is deleted", req) + return ctrl.Result{}, nil + } + + // if expectation not satisfied, shortcut this reconciling till informer cache is updated. + if satisfied, err := activeExpectations.IsSatisfied(instance); err != nil { + return ctrl.Result{}, err + } else if !satisfied { + klog.Warningf("Pod %s is not satisfied to reconcile.", req) + return ctrl.Result{}, nil + } + + if instance.DeletionTimestamp != nil { + return ctrl.Result{}, nil + } + + // if Pod is not begin a deletion PodOpsLifecycle, trigger it + if !podopslifecycle.IsDuringOps(OpsLifecycleAdapter, instance) { + if updated, err := podopslifecycle.Begin(r, OpsLifecycleAdapter, instance); err != nil { + return ctrl.Result{}, fmt.Errorf("fail to begin PodOpsLifecycle to delete Pod %s: %s", req, err) + } else if updated { + if err := activeExpectations.ExpectUpdate(instance, expectations.Pod, instance.Name, instance.ResourceVersion); err != nil { + return ctrl.Result{}, fmt.Errorf("fail to expect Pod updated after beginning PodOpsLifecycle to delete Pod %s: %s", req, err) + } + } + } + + // if Pod is allow to operate, delete it + if podopslifecycle.AllowOps(OpsLifecycleAdapter, instance) { + klog.Infof("try to delete Pod %s with deletion indication", req) + if err := r.Delete(context.TODO(), instance); err != nil { + return ctrl.Result{}, fmt.Errorf("fail to delete Pod %s with deletion indication: %s", req, err) + } else { + if err := activeExpectations.ExpectDelete(instance, expectations.Pod, instance.Name); err != nil { + return ctrl.Result{}, fmt.Errorf("fail to expect Pod %s deleted: %s", req, err) + } + } + } + + return ctrl.Result{}, nil +} diff --git a/pkg/controllers/poddeletion/poddeletion_controller_test.go b/pkg/controllers/poddeletion/poddeletion_controller_test.go new file mode 100644 index 00000000..bffa6fe6 --- /dev/null +++ b/pkg/controllers/poddeletion/poddeletion_controller_test.go @@ -0,0 +1,227 @@ +/* +Copyright 2023 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package poddeletion + +import ( + "context" + "fmt" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + appsv1alpha1 "kusionstack.io/kafed/apis/apps/v1alpha1" + "net/url" + "os" + "path/filepath" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "strings" + "testing" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "kusionstack.io/kafed/apis" + "kusionstack.io/kafed/pkg/utils/inject" +) + +var ( + env *envtest.Environment + mgr manager.Manager + request chan reconcile.Request + + ctx context.Context + cancel context.CancelFunc + c client.Client +) + +var _ = Describe("Pod Deletion controller", func() { + + It("deletion reconcile", func() { + testcase := "delete-pod" + Expect(createNamespace(c, testcase)).Should(BeNil()) + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testcase, + Name: "test", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + Image: "nginx:v1", + }, + }, + }, + } + + Expect(c.Create(context.TODO(), pod)).Should(BeNil()) + Eventually(func() error { + return c.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, pod) + }, 5*time.Second, 1*time.Second).Should(BeNil()) + + Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool { + pod.Labels = map[string]string{ + appsv1alpha1.PodDeletionIndicationLabelKey: "true", + } + return true + })).Should(BeNil()) + + time.After(3 * time.Second) + Expect(c.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, pod)).Should(BeNil()) + + // allow Pod to update + Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool { + labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, OpsLifecycleAdapter.GetID()) + if pod.Labels == nil { + pod.Labels = map[string]string{} + } + pod.Labels[labelOperate] = "true" + return true + })).Should(BeNil()) + + // pod should be deleted + Eventually(func() error { + return c.Get(context.TODO(), types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, pod) + }, 5*time.Second, 1*time.Second).ShouldNot(BeNil()) + }) +}) + +func updatePodWithRetry(c client.Client, namespace, name string, updateFn func(pod *corev1.Pod) bool) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + pod := &corev1.Pod{} + if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil { + return err + } + + if !updateFn(pod) { + return nil + } + + return c.Update(context.TODO(), pod) + }) +} + +func updatePodStatusWithRetry(c client.Client, namespace, name string, updateFn func(pod *corev1.Pod) bool) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + pod := &corev1.Pod{} + if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil { + return err + } + + if !updateFn(pod) { + return nil + } + + return c.Status().Update(context.TODO(), pod) + }) +} + +func testReconcile(inner reconcile.Reconciler) (reconcile.Reconciler, chan reconcile.Request) { + requests := make(chan reconcile.Request, 5) + fn := reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + result, err := inner.Reconcile(ctx, req) + if _, done := ctx.Deadline(); !done && len(requests) == 0 { + requests <- req + } + return result, err + }) + return fn, requests +} + +func TestPodDeletionController(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "PodDeletionController Test Suite") +} + +var _ = BeforeSuite(func() { + By("bootstrapping test environment") + + ctx, cancel = context.WithCancel(context.TODO()) + logf.SetLogger(zap.New(zap.WriteTo(os.Stdout), zap.UseDevMode(true))) + + env = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, + } + env.ControlPlane.GetAPIServer().URL = &url.URL{ + Host: "127.0.0.1:10001", + } + config, err := env.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(config).NotTo(BeNil()) + + mgr, err = manager.New(config, manager.Options{ + MetricsBindAddress: "0", + NewCache: inject.NewCacheWithFieldIndex, + }) + Expect(err).NotTo(HaveOccurred()) + + scheme := mgr.GetScheme() + err = appsv1.SchemeBuilder.AddToScheme(scheme) + Expect(err).NotTo(HaveOccurred()) + err = apis.AddToScheme(scheme) + Expect(err).NotTo(HaveOccurred()) + + c = mgr.GetClient() + + var r reconcile.Reconciler + r, request = testReconcile(NewReconciler(mgr)) + err = AddToMgr(mgr, r) + Expect(err).NotTo(HaveOccurred()) + + go func() { + err = mgr.Start(ctx) + Expect(err).NotTo(HaveOccurred()) + }() +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + + cancel() + + err := env.Stop() + Expect(err).NotTo(HaveOccurred()) +}) + +var _ = AfterEach(func() { + nsList := &corev1.NamespaceList{} + Expect(mgr.GetClient().List(context.Background(), nsList)).Should(BeNil()) + + for i := range nsList.Items { + if strings.HasPrefix(nsList.Items[i].Name, "test-") { + mgr.GetClient().Delete(context.TODO(), &nsList.Items[i]) + } + } +}) + +func createNamespace(c client.Client, namespaceName string) error { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespaceName, + }, + } + + return c.Create(context.TODO(), ns) +} diff --git a/pkg/controllers/poddeletion/predict.go b/pkg/controllers/poddeletion/predict.go new file mode 100644 index 00000000..46883474 --- /dev/null +++ b/pkg/controllers/poddeletion/predict.go @@ -0,0 +1,59 @@ +/* +Copyright 2023 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package poddeletion + +import ( + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + + appsv1alpha1 "kusionstack.io/kafed/apis/apps/v1alpha1" +) + +type PredicateDeletionIndicatedPod struct { +} + +// Create returns true if the Create event should be processed +func (p *PredicateDeletionIndicatedPod) Create(e event.CreateEvent) bool { + return hasTerminatingLabel(e.Object) +} + +// Delete returns true if the Delete event should be processed +func (p *PredicateDeletionIndicatedPod) Delete(e event.DeleteEvent) bool { + return hasTerminatingLabel(e.Object) +} + +// Update returns true if the Update event should be processed +func (p *PredicateDeletionIndicatedPod) Update(e event.UpdateEvent) bool { + return hasTerminatingLabel(e.ObjectNew) +} + +// Generic returns true if the Generic event should be processed +func (p *PredicateDeletionIndicatedPod) Generic(e event.GenericEvent) bool { + return hasTerminatingLabel(e.Object) +} + +func hasTerminatingLabel(pod client.Object) bool { + if pod.GetLabels() == nil { + return false + } + + if _, exist := pod.GetLabels()[appsv1alpha1.PodDeletionIndicationLabelKey]; exist { + return true + } + + return false +} diff --git a/pkg/controllers/resourcecontext/expectation.go b/pkg/controllers/resourcecontext/expectation.go new file mode 100644 index 00000000..4a25fd3c --- /dev/null +++ b/pkg/controllers/resourcecontext/expectation.go @@ -0,0 +1,52 @@ +/* +Copyright 2023 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcecontext + +import ( + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + + "kusionstack.io/kafed/pkg/controllers/utils/expectations" +) + +var ( + // activeExpectations is used to check the cache in informer is updated, before reconciling. + activeExpectations *expectations.ActiveExpectations +) + +func InitExpectations(c client.Client) { + activeExpectations = expectations.NewActiveExpectations(c) +} + +type ExpectationEventHandler struct { +} + +// Create is called in response to an create event - e.g. Pod Creation. +func (h *ExpectationEventHandler) Create(event.CreateEvent, workqueue.RateLimitingInterface) {} + +// Update is called in response to an update event - e.g. Pod Updated. +func (h *ExpectationEventHandler) Update(event.UpdateEvent, workqueue.RateLimitingInterface) {} + +// Delete is called in response to a delete event - e.g. Pod Deleted. +func (h *ExpectationEventHandler) Delete(e event.DeleteEvent, q workqueue.RateLimitingInterface) { + activeExpectations.Delete(e.Object.GetNamespace(), e.Object.GetName()) +} + +// Generic is called in response to an event of an unknown type or a synthetic event triggered as a cron or +// external trigger request - e.g. reconcile Autoscaling, or a Webhook. +func (h *ExpectationEventHandler) Generic(event.GenericEvent, workqueue.RateLimitingInterface) {} diff --git a/pkg/controllers/resourcecontext/resourcecontext_controller.go b/pkg/controllers/resourcecontext/resourcecontext_controller.go new file mode 100644 index 00000000..3dba712d --- /dev/null +++ b/pkg/controllers/resourcecontext/resourcecontext_controller.go @@ -0,0 +1,125 @@ +/* +Copyright 2023 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcecontext + +import ( + "context" + "kusionstack.io/kafed/pkg/controllers/utils/expectations" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + appsv1alpha1 "kusionstack.io/kafed/apis/apps/v1alpha1" + "kusionstack.io/kafed/pkg/controllers/collaset/utils" +) + +const ( + controllerName = "resourcecontext-controller" +) + +// ResourceContextReconciler reconciles and reclaims a ResourceContext object +type ResourceContextReconciler struct { + client.Client + + recorder record.EventRecorder +} + +func Add(mgr ctrl.Manager) error { + return AddToMgr(mgr, NewReconciler(mgr)) +} + +// NewReconciler returns a new reconcile.Reconciler +func NewReconciler(mgr ctrl.Manager) reconcile.Reconciler { + recorder := mgr.GetEventRecorderFor(controllerName) + + InitExpectations(mgr.GetClient()) + + return &ResourceContextReconciler{ + Client: mgr.GetClient(), + recorder: recorder, + } +} + +func AddToMgr(mgr ctrl.Manager, r reconcile.Reconciler) error { + // Create a new controller + c, err := controller.New(controllerName, mgr, controller.Options{ + MaxConcurrentReconciles: 5, + Reconciler: r, + }) + if err != nil { + return err + } + + err = c.Watch(&source.Kind{Type: &appsv1alpha1.ResourceContext{}}, &handler.EnqueueRequestForObject{}) + if err != nil { + return err + } + + // Watch for changes to maintain expectation + err = c.Watch(&source.Kind{Type: &appsv1alpha1.ResourceContext{}}, &ExpectationEventHandler{}) + if err != nil { + return err + } + + return nil +} + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +func (r *ResourceContextReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + instance := &appsv1alpha1.ResourceContext{} + if err := r.Get(ctx, req.NamespacedName, instance); err != nil { + if !errors.IsNotFound(err) { + klog.Error("fail to find ResourceContext %s: %s", req, err) + utils.ActiveExpectations.Delete(req.Namespace, req.Name) + return reconcile.Result{}, err + } + + klog.Infof("ResourceContext %s is deleted", req) + return ctrl.Result{}, nil + } + + // if expectation not satisfied, shortcut this reconciling till informer cache is updated. + if satisfied, err := activeExpectations.IsSatisfied(instance); err != nil { + return ctrl.Result{}, err + } else if !satisfied { + klog.Warningf("ResourceContext %s is not satisfied to reconcile.", req) + return ctrl.Result{}, nil + } + + // if ResourceContext is empty, delete it + if len(instance.Spec.Contexts) == 0 { + klog.Infof("try to delete ResourceContext %s as empty", req) + if err := r.Delete(context.TODO(), instance); err != nil { + klog.Error("fail to delete ResourceContext %s: %s", req, err) + return ctrl.Result{}, err + } + if err := activeExpectations.ExpectDelete(instance, expectations.ResourceContext, instance.Name); err != nil { + klog.Error("fail to expect deletion after deleting ResourceContext %s: %s", req, err) + return ctrl.Result{}, err + } + } + + return ctrl.Result{}, nil +} diff --git a/pkg/controllers/resourcecontext/resourcecontext_controller_test.go b/pkg/controllers/resourcecontext/resourcecontext_controller_test.go new file mode 100644 index 00000000..ede4498c --- /dev/null +++ b/pkg/controllers/resourcecontext/resourcecontext_controller_test.go @@ -0,0 +1,347 @@ +/* +Copyright 2023 The KusionStack Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcecontext + +import ( + "context" + "fmt" + "k8s.io/apimachinery/pkg/util/sets" + "kusionstack.io/kafed/pkg/controllers/collaset" + collasetutils "kusionstack.io/kafed/pkg/controllers/collaset/utils" + "kusionstack.io/kafed/pkg/controllers/poddeletion" + "net/url" + "os" + "path/filepath" + "strings" + "testing" + "time" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/retry" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/envtest" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "kusionstack.io/kafed/apis" + appsv1alpha1 "kusionstack.io/kafed/apis/apps/v1alpha1" + "kusionstack.io/kafed/pkg/utils/inject" +) + +var ( + env *envtest.Environment + mgr manager.Manager + request chan reconcile.Request + + ctx context.Context + cancel context.CancelFunc + c client.Client +) + +var _ = Describe("ResourceContext controller", func() { + + It("resource context reconcile", func() { + testcase := "test-reclaim" + Expect(createNamespace(c, testcase)).Should(BeNil()) + + cs := &appsv1alpha1.CollaSet{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testcase, + Name: "foo", + }, + Spec: appsv1alpha1.CollaSetSpec{ + Replicas: 2, + Selector: metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "foo", + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "foo", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "foo", + Image: "nginx:v1", + }, + }, + }, + }, + }, + } + + Expect(c.Create(context.TODO(), cs)).Should(BeNil()) + + podList := &corev1.PodList{} + Eventually(func() bool { + Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil()) + return len(podList.Items) == 2 + }, 5*time.Second, 1*time.Second).Should(BeTrue()) + Expect(c.Get(context.TODO(), types.NamespacedName{Namespace: cs.Namespace, Name: cs.Name}, cs)).Should(BeNil()) + Expect(expectedStatusReplicas(c, cs, 0, 0, 0, 2, 2, 0, 0, 0)).Should(BeNil()) + + names := sets.NewString() + for _, pod := range podList.Items { + names.Insert(pod.Name) + // mark Pod to delete + Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool { + pod.Labels[appsv1alpha1.PodDeletionIndicationLabelKey] = "true" + return true + })).Should(BeNil()) + // allow Pod to delete + Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool { + labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, poddeletion.OpsLifecycleAdapter.GetID()) + pod.Labels[labelOperate] = "true" + return true + })).Should(BeNil()) + } + + // wait for all original Pods deleted + Eventually(func() bool { + Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil()) + if len(podList.Items) != 2 { + return false + } + + for _, pod := range podList.Items { + if pod.DeletionTimestamp != nil { + // still in terminating + return false + } + } + + for _, pod := range podList.Items { + if names.Has(pod.Name) { + return false + } + } + + return true + }, 5*time.Second, 1*time.Second).Should(BeTrue()) + + resourceContext := &appsv1alpha1.ResourceContext{} + Expect(c.Get(context.TODO(), types.NamespacedName{Namespace: cs.Namespace, Name: cs.Name}, resourceContext)).Should(BeNil()) + + Expect(updateCollaSetWithRetry(c, cs.Namespace, cs.Name, func(cls *appsv1alpha1.CollaSet) bool { + cls.Spec.Replicas = 0 + return true + })).Should(BeNil()) + + for _, pod := range podList.Items { + // allow Pod to scale in + Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool { + labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, collasetutils.ScaleInOpsLifecycleAdapter.GetID()) + pod.Labels[labelOperate] = "true" + return true + })).Should(BeNil()) + } + + Eventually(func() error { + return expectedStatusReplicas(c, cs, 0, 0, 0, 0, 0, 0, 0, 0) + }, 5*time.Second, 1*time.Second).Should(BeNil()) + + Eventually(func() error { + return c.Get(context.TODO(), types.NamespacedName{Namespace: cs.Namespace, Name: cs.Name}, resourceContext) + }, 500*time.Second, 1*time.Second).ShouldNot(BeNil()) + }) +}) + +func expectedStatusReplicas(c client.Client, cls *appsv1alpha1.CollaSet, scheduledReplicas, readyReplicas, availableReplicas, replicas, updatedReplicas, operatingReplicas, + updatedReadyReplicas, updatedAvailableReplicas int32) error { + if err := c.Get(context.TODO(), types.NamespacedName{Namespace: cls.Namespace, Name: cls.Name}, cls); err != nil { + return err + } + + if cls.Status.ScheduledReplicas != scheduledReplicas { + return fmt.Errorf("scheduledReplicas got %d, expected %d", cls.Status.ScheduledReplicas, scheduledReplicas) + } + + if cls.Status.ReadyReplicas != readyReplicas { + return fmt.Errorf("readyReplicas got %d, expected %d", cls.Status.ReadyReplicas, readyReplicas) + } + + if cls.Status.AvailableReplicas != availableReplicas { + return fmt.Errorf("availableReplicas got %d, expected %d", cls.Status.AvailableReplicas, availableReplicas) + } + + if cls.Status.Replicas != replicas { + return fmt.Errorf("replicas got %d, expected %d", cls.Status.Replicas, replicas) + } + + if cls.Status.UpdatedReplicas != updatedReplicas { + return fmt.Errorf("updatedReplicas got %d, expected %d", cls.Status.UpdatedReplicas, updatedReplicas) + } + + if cls.Status.OperatingReplicas != operatingReplicas { + return fmt.Errorf("operatingReplicas got %d, expected %d", cls.Status.OperatingReplicas, operatingReplicas) + } + + if cls.Status.UpdatedReadyReplicas != updatedReadyReplicas { + return fmt.Errorf("updatedReadyReplicas got %d, expected %d", cls.Status.UpdatedReadyReplicas, updatedReadyReplicas) + } + + if cls.Status.UpdatedAvailableReplicas != updatedAvailableReplicas { + return fmt.Errorf("updatedAvailableReplicas got %d, expected %d", cls.Status.UpdatedAvailableReplicas, updatedAvailableReplicas) + } + + return nil +} + +func updateCollaSetWithRetry(c client.Client, namespace, name string, updateFn func(cls *appsv1alpha1.CollaSet) bool) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + cls := &appsv1alpha1.CollaSet{} + if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, cls); err != nil { + return err + } + + if !updateFn(cls) { + return nil + } + + return c.Update(context.TODO(), cls) + }) +} + +func updatePodWithRetry(c client.Client, namespace, name string, updateFn func(pod *corev1.Pod) bool) error { + return retry.RetryOnConflict(retry.DefaultBackoff, func() error { + pod := &corev1.Pod{} + if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil { + return err + } + + if !updateFn(pod) { + return nil + } + + return c.Update(context.TODO(), pod) + }) +} + +func testReconcile(inner reconcile.Reconciler) (reconcile.Reconciler, chan reconcile.Request) { + requests := make(chan reconcile.Request, 5) + fn := reconcile.Func(func(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + result, err := inner.Reconcile(ctx, req) + if _, done := ctx.Deadline(); !done && len(requests) == 0 { + requests <- req + } + return result, err + }) + return fn, requests +} + +func TestResourceContextController(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "ResourceContext Test Suite") +} + +var _ = BeforeSuite(func() { + By("bootstrapping test environment") + + ctx, cancel = context.WithCancel(context.TODO()) + logf.SetLogger(zap.New(zap.WriteTo(os.Stdout), zap.UseDevMode(true))) + + env = &envtest.Environment{ + CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crd", "bases")}, + } + env.ControlPlane.GetAPIServer().URL = &url.URL{ + Host: "127.0.0.1:10001", + } + config, err := env.Start() + Expect(err).NotTo(HaveOccurred()) + Expect(config).NotTo(BeNil()) + + mgr, err = manager.New(config, manager.Options{ + MetricsBindAddress: "0", + NewCache: inject.NewCacheWithFieldIndex, + }) + Expect(err).NotTo(HaveOccurred()) + + scheme := mgr.GetScheme() + err = appsv1.SchemeBuilder.AddToScheme(scheme) + Expect(err).NotTo(HaveOccurred()) + err = apis.AddToScheme(scheme) + Expect(err).NotTo(HaveOccurred()) + + c = mgr.GetClient() + + var r reconcile.Reconciler + r, request = testReconcile(NewReconciler(mgr)) + err = AddToMgr(mgr, r) + Expect(err).NotTo(HaveOccurred()) + + r, request = testReconcile(collaset.NewReconciler(mgr)) + err = collaset.AddToMgr(mgr, r) + Expect(err).NotTo(HaveOccurred()) + + r, request = testReconcile(poddeletion.NewReconciler(mgr)) + err = poddeletion.AddToMgr(mgr, r) + Expect(err).NotTo(HaveOccurred()) + + go func() { + err = mgr.Start(ctx) + Expect(err).NotTo(HaveOccurred()) + }() +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + + cancel() + + err := env.Stop() + Expect(err).NotTo(HaveOccurred()) +}) + +var _ = AfterEach(func() { + csList := &appsv1alpha1.CollaSetList{} + Expect(mgr.GetClient().List(context.Background(), csList)).Should(BeNil()) + + for i := range csList.Items { + Expect(mgr.GetClient().Delete(context.TODO(), &csList.Items[i])).Should(BeNil()) + } + + nsList := &corev1.NamespaceList{} + Expect(mgr.GetClient().List(context.Background(), nsList)).Should(BeNil()) + + for i := range nsList.Items { + if strings.HasPrefix(nsList.Items[i].Name, "test-") { + mgr.GetClient().Delete(context.TODO(), &nsList.Items[i]) + } + } +}) + +func createNamespace(c client.Client, namespaceName string) error { + ns := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: namespaceName, + }, + } + + return c.Create(context.TODO(), ns) +} diff --git a/pkg/controllers/utils/podopslifecycle/adapter.go b/pkg/controllers/utils/podopslifecycle/adapter.go index 17c03f4f..5e94ab71 100644 --- a/pkg/controllers/utils/podopslifecycle/adapter.go +++ b/pkg/controllers/utils/podopslifecycle/adapter.go @@ -25,6 +25,7 @@ type OperationType string var ( OpsLifecycleTypeUpdate OperationType = "update" OpsLifecycleTypeScaleIn OperationType = "scaleIn" + OpsLifecycleTypeDelete OperationType = "delete" ) // LifecycleAdapter helps CRD Operators to easily access PodOpsLifecycle diff --git a/pkg/controllers/utils/podopslifecycle/utils.go b/pkg/controllers/utils/podopslifecycle/utils.go index 42649b83..eed59fe2 100644 --- a/pkg/controllers/utils/podopslifecycle/utils.go +++ b/pkg/controllers/utils/podopslifecycle/utils.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "strings" + "time" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" @@ -126,7 +127,7 @@ func Finish(c client.Client, adapter LifecycleAdapter, obj client.Object) (updat func checkOperatingID(adapter LifecycleAdapter, obj client.Object) (val string, ok bool) { labelID := fmt.Sprintf("%s/%s", v1alpha1.PodOperatingLabelPrefix, adapter.GetID()) val, ok = obj.GetLabels()[labelID] - return + return adapter.GetID(), ok } func checkOperationType(adapter LifecycleAdapter, obj client.Object) (val OperationType, ok bool) { @@ -147,7 +148,7 @@ func checkOperate(adapter LifecycleAdapter, obj client.Object) (val string, ok b func setOperatingID(adapter LifecycleAdapter, obj client.Object) (val string, ok bool) { labelID := fmt.Sprintf("%s/%s", v1alpha1.PodOperatingLabelPrefix, adapter.GetID()) - obj.GetLabels()[labelID] = adapter.GetID() + obj.GetLabels()[labelID] = fmt.Sprintf("%d", time.Now().UnixNano()) return }