Skip to content

Commit

Permalink
enhancement (CollaSet): support pod deletion indication AND resourceC…
Browse files Browse the repository at this point in the history
…ontext reclaim controller
  • Loading branch information
wu8685 committed Aug 9, 2023
1 parent 76d8e86 commit 7b531a0
Show file tree
Hide file tree
Showing 16 changed files with 1,134 additions and 25 deletions.
2 changes: 2 additions & 0 deletions apis/apps/v1alpha1/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const (
PodScalingInLabelKey = "apps.kafed.kusionstack.io/scaling-in"

// --- End: Labels for CollaSet ---

PodDeletionIndicationLabelKey = "kafed.kusionstack.io/to-delete" // Users can use this label to indicate a pod to delete
)

var (
Expand Down
25 changes: 25 additions & 0 deletions pkg/controllers/add_poddeletion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright 2023 The KusionStack Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"kusionstack.io/kafed/pkg/controllers/poddeletion"
)

func init() {
AddToManagerFuncs = append(AddToManagerFuncs, poddeletion.Add)
}
25 changes: 25 additions & 0 deletions pkg/controllers/add_resourcecontext.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright 2023 The KusionStack Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"kusionstack.io/kafed/pkg/controllers/resourcecontext"
)

func init() {
AddToManagerFuncs = append(AddToManagerFuncs, resourcecontext.Add)
}
11 changes: 5 additions & 6 deletions pkg/controllers/collaset/collaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ func Add(mgr ctrl.Manager) error {
return AddToMgr(mgr, NewReconciler(mgr))
}

// newReconciler returns a new reconcile.Reconciler
// NewReconciler returns a new reconcile.Reconciler
func NewReconciler(mgr ctrl.Manager) reconcile.Reconciler {
recorder := mgr.GetEventRecorderFor(controllerName)

revisionManager = revision.NewRevisionManager(mgr.GetClient(), mgr.GetScheme(), &revisionOwnerAdapter{})
podControl = podcontrol.NewRealPodControl(mgr.GetClient(), mgr.GetScheme())
syncControl = synccontrol.NewRealSyncControl(mgr.GetClient(), podControl, recorder)

utils.InitExpectations(mgr.GetClient())
collasetutils.InitExpectations(mgr.GetClient())

return &CollaSetReconciler{
Client: mgr.GetClient(),
Expand All @@ -91,7 +91,6 @@ func AddToMgr(mgr ctrl.Manager, r reconcile.Reconciler) error {
return err
}

// Watch for changes to RuleSet
err = c.Watch(&source.Kind{Type: &appsv1alpha1.CollaSet{}}, &handler.EnqueueRequestForObject{})
if err != nil {
return err
Expand Down Expand Up @@ -119,7 +118,7 @@ func (r *CollaSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
if !errors.IsNotFound(err) {
klog.Error("fail to find CollaSet %s: %s", req, err)
utils.ActiveExpectations.Delete(req.Namespace, req.Name)
collasetutils.ActiveExpectations.Delete(req.Namespace, req.Name)
return reconcile.Result{}, err
}

Expand All @@ -128,7 +127,7 @@ func (r *CollaSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

// if expectation not satisfied, shortcut this reconciling till informer cache is updated.
if satisfied, err := utils.ActiveExpectations.IsSatisfied(instance); err != nil {
if satisfied, err := collasetutils.ActiveExpectations.IsSatisfied(instance); err != nil {
return ctrl.Result{}, err
} else if !satisfied {
klog.Warningf("CollaSet %s is not satisfied to reconcile.", req)
Expand Down Expand Up @@ -260,7 +259,7 @@ func (r *CollaSetReconciler) updateStatus(ctx context.Context, instance *appsv1a

err := r.Status().Update(ctx, instance)
if err == nil {
if err := utils.ActiveExpectations.ExpectUpdate(instance, expectations.CollaSet, instance.Name, instance.ResourceVersion); err != nil {
if err := collasetutils.ActiveExpectations.ExpectUpdate(instance, expectations.CollaSet, instance.Name, instance.ResourceVersion); err != nil {
return err
}
}
Expand Down
45 changes: 34 additions & 11 deletions pkg/controllers/collaset/podcontext/podcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@ package podcontext
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"kusionstack.io/kafed/pkg/controllers/collaset/utils"
"sort"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "kusionstack.io/kafed/apis/apps/v1alpha1"
"kusionstack.io/kafed/pkg/controllers/collaset/utils"
"kusionstack.io/kafed/pkg/controllers/utils/expectations"
)

Expand All @@ -38,16 +39,15 @@ const (
func AllocateID(c client.Client, instance *appsv1alpha1.CollaSet, defaultRevision string, replicas int) (map[int]*appsv1alpha1.ContextDetail, error) {
contextName := getContextName(instance)
podContext := &appsv1alpha1.ResourceContext{}
notFound := false
if err := c.Get(context.TODO(), types.NamespacedName{Namespace: instance.Namespace, Name: contextName}, podContext); err != nil {
if !errors.IsNotFound(err) {
return nil, fmt.Errorf("fail to find ResourceContext %s/%s for owner %s: %s", instance.Namespace, contextName, instance.Name, err)
}

notFound = true
podContext.Namespace = instance.Namespace
podContext.Name = contextName
if err := c.Create(context.TODO(), podContext); err != nil {
return nil, fmt.Errorf("fail to create ResourceContext %s/%s for owner %s after not found: %s", instance.Namespace, contextName, instance.Name, err)
}
}

// store all the IDs crossing Multiple workload
Expand Down Expand Up @@ -92,7 +92,11 @@ func AllocateID(c client.Client, instance *appsv1alpha1.CollaSet, defaultRevisio
ownedIDs[candidateID] = detail
}

return ownedIDs, doUpdateToPodContext(c, instance, ownedIDs, podContext, instance.Name)
if notFound {
return ownedIDs, doCreatePodContext(c, instance, ownedIDs)
}

return ownedIDs, doUpdatePodContext(c, instance, ownedIDs, podContext)
}

func UpdateToPodContext(c client.Client, instance *appsv1alpha1.CollaSet, ownedIDs map[int]*appsv1alpha1.ContextDetail) error {
Expand All @@ -103,17 +107,36 @@ func UpdateToPodContext(c client.Client, instance *appsv1alpha1.CollaSet, ownedI
return fmt.Errorf("fail to find ResourceContext %s/%s: %s", instance.Namespace, contextName, err)
}

podContext.Namespace = instance.Namespace
podContext.Name = contextName
if err := c.Create(context.TODO(), podContext); err != nil {
if err := doCreatePodContext(c, instance, ownedIDs); err != nil {
return fmt.Errorf("fail to create ResourceContext %s/%s after not found: %s", instance.Namespace, contextName, err)
}
}

return doUpdateToPodContext(c, instance, ownedIDs, podContext, instance.Name)
return doUpdatePodContext(c, instance, ownedIDs, podContext)
}

func doCreatePodContext(c client.Client, instance *appsv1alpha1.CollaSet, ownerIDs map[int]*appsv1alpha1.ContextDetail) error {
contextName := getContextName(instance)
podContext := &appsv1alpha1.ResourceContext{
ObjectMeta: metav1.ObjectMeta{
Namespace: instance.Namespace,
Name: contextName,
},
Spec: appsv1alpha1.ResourceContextSpec{
Contexts: make([]appsv1alpha1.ContextDetail, len(ownerIDs)),
},
}

i := 0
for _, detail := range ownerIDs {
podContext.Spec.Contexts[i] = *detail
i++
}

return c.Create(context.TODO(), podContext)
}

func doUpdateToPodContext(c client.Client, instance *appsv1alpha1.CollaSet, ownedIDs map[int]*appsv1alpha1.ContextDetail, podContext *appsv1alpha1.ResourceContext, owner string) error {
func doUpdatePodContext(c client.Client, instance client.Object, ownedIDs map[int]*appsv1alpha1.ContextDetail, podContext *appsv1alpha1.ResourceContext) error {
// store all IDs crossing all workload
existingIDs := map[int]*appsv1alpha1.ContextDetail{}
for k, detail := range ownedIDs {
Expand All @@ -122,7 +145,7 @@ func doUpdateToPodContext(c client.Client, instance *appsv1alpha1.CollaSet, owne

for i := range podContext.Spec.Contexts {
detail := podContext.Spec.Contexts[i]
if detail.Contains(OwnerContextKey, owner) {
if detail.Contains(OwnerContextKey, instance.GetName()) {
continue
}

Expand Down
11 changes: 5 additions & 6 deletions pkg/controllers/collaset/synccontrol/sync_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package synccontrol

import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -166,7 +165,7 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll

if pod, err := sc.podControl.CreatePod(newPod, updatedRevision); err == nil {
// add an expectation for this pod creation, before next reconciling
if err := utils.ActiveExpectations.ExpectCreate(set, expectations.Pod, pod.Name); err != nil {
if err := collasetutils.ActiveExpectations.ExpectCreate(set, expectations.Pod, pod.Name); err != nil {
return err
}
}
Expand Down Expand Up @@ -204,7 +203,7 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll
return fmt.Errorf("fail to begin PodOpsLifecycle for Scaling in Pod %s/%s: %s", pod.Namespace, pod.Name, err)
} else if updated {
// add an expectation for this pod creation, before next reconciling
if err := utils.ActiveExpectations.ExpectUpdate(set, expectations.Pod, pod.Name, pod.ResourceVersion); err != nil {
if err := collasetutils.ActiveExpectations.ExpectUpdate(set, expectations.Pod, pod.Name, pod.ResourceVersion); err != nil {
return err
}
}
Expand Down Expand Up @@ -258,7 +257,7 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll
succCount, err = controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error {
pod := <-podCh
if err := sc.podControl.DeletePod(pod.Pod); err == nil {
if err := utils.ActiveExpectations.ExpectDelete(set, expectations.Pod, pod.Name); err != nil {
if err := collasetutils.ActiveExpectations.ExpectDelete(set, expectations.Pod, pod.Name); err != nil {
return err
}
}
Expand Down Expand Up @@ -345,7 +344,7 @@ func (sc *RealSyncControl) Update(instance *appsv1alpha1.CollaSet, podWrapers []
return fmt.Errorf("fail to begin PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
if err := utils.ActiveExpectations.ExpectUpdate(instance, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
if err := collasetutils.ActiveExpectations.ExpectUpdate(instance, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
return err
}
}
Expand Down Expand Up @@ -448,7 +447,7 @@ func (sc *RealSyncControl) Update(instance *appsv1alpha1.CollaSet, podWrapers []
return fmt.Errorf("fail to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
if err := utils.ActiveExpectations.ExpectUpdate(instance, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
if err := collasetutils.ActiveExpectations.ExpectUpdate(instance, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
return err
}
sc.recorder.Eventf(podInfo.Pod, corev1.EventTypeNormal, "UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name)
Expand Down
32 changes: 32 additions & 0 deletions pkg/controllers/poddeletion/expectation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright 2023 The KusionStack Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package poddeletion

import (
"sigs.k8s.io/controller-runtime/pkg/client"

"kusionstack.io/kafed/pkg/controllers/utils/expectations"
)

var (
// activeExpectations is used to check the cache in informer is updated, before reconciling.
activeExpectations *expectations.ActiveExpectations
)

func InitExpectations(c client.Client) {
activeExpectations = expectations.NewActiveExpectations(c)
}
57 changes: 57 additions & 0 deletions pkg/controllers/poddeletion/lifecycle_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
Copyright 2023 The KusionStack Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package poddeletion

import (
"sigs.k8s.io/controller-runtime/pkg/client"

"kusionstack.io/kafed/pkg/controllers/utils/podopslifecycle"
)

var (
OpsLifecycleAdapter = &PodDeleteOpsLifecycleAdapter{}
)

// PodDeleteOpsLifecycleAdapter tells PodOpsLifecycle the Pod deletion ops info
type PodDeleteOpsLifecycleAdapter struct {
}

// GetID indicates ID of one PodOpsLifecycle
func (a *PodDeleteOpsLifecycleAdapter) GetID() string {
return "pod-delete"
}

// GetType indicates type for an Operator
func (a *PodDeleteOpsLifecycleAdapter) GetType() podopslifecycle.OperationType {
return podopslifecycle.OpsLifecycleTypeDelete
}

// AllowMultiType indicates whether multiple IDs which have the same Type are allowed
func (a *PodDeleteOpsLifecycleAdapter) AllowMultiType() bool {
return true
}

// WhenBegin will be executed when begin a lifecycle
func (a *PodDeleteOpsLifecycleAdapter) WhenBegin(pod client.Object) (bool, error) {
return false, nil
}

// WhenFinish will be executed when finish a lifecycle
func (a *PodDeleteOpsLifecycleAdapter) WhenFinish(pod client.Object) (bool, error) {

return false, nil
}
Loading

0 comments on commit 7b531a0

Please sign in to comment.