Skip to content

Commit

Permalink
enhancement: 1,delete ruleset event channel 2, sync ruleset stage det… (
Browse files Browse the repository at this point in the history
#53)

* enhancement: 1,delete ruleset event channel 2, sync ruleset stage detail on pod

* fix ut

* support ruleset IndexField

* fix: comments and ruleset ut

* revert ruleset eventhandler

* move ruleSet index
  • Loading branch information
Eikykun committed Aug 24, 2023
1 parent e4959c0 commit b4cceb9
Show file tree
Hide file tree
Showing 13 changed files with 191 additions and 301 deletions.
2 changes: 1 addition & 1 deletion apis/apps/v1alpha1/well_known_annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ const (

// RuleSet Annotation
const (
AnnotationRuleSets = "ruleset.kusionstack.io/rulesets"
AnnotationPodSkipRuleConditions = "ruleset.kusionstack.io/skip-rule-conditions"
AnnotationRuleSetDetailPrefix = "detail.ruleset.kusionstack.io"
)
7 changes: 0 additions & 7 deletions pkg/controllers/podopslifecycle/podopslifecycle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ func AddToMgr(mgr manager.Manager, r reconcile.Reconciler) error {
if err != nil {
return err
}

sourceChannel := ruleset.RegisterListenChan(context.Background())
err = c.Watch(sourceChannel, &handler.EnqueueRequestForObject{})
if err != nil {
return err
}

return nil
}

Expand Down
28 changes: 12 additions & 16 deletions pkg/controllers/ruleset/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,12 @@ import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/fields"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "kusionstack.io/kafed/apis/apps/v1alpha1"
"kusionstack.io/kafed/pkg/controllers/ruleset/register"
"kusionstack.io/kafed/pkg/controllers/ruleset/utils"
rulesetutils "kusionstack.io/kafed/pkg/controllers/ruleset/utils"
)

type Check interface {
Expand All @@ -43,35 +42,32 @@ type checker struct {
}

// GetState get item current check state from all related ruleSets
func (c *checker) GetState(client client.Client, item client.Object) (CheckState, error) {
func (c *checker) GetState(cl client.Client, item client.Object) (CheckState, error) {

result := CheckState{}
rulesetNames := utils.GetRuleSets(item)
for _, name := range rulesetNames {
rs := &appsv1alpha1.RuleSet{}
if err := client.Get(context.TODO(), types.NamespacedName{Namespace: item.GetNamespace(), Name: name}, rs); err != nil {
if errors.IsNotFound(err) {
continue
}
return result, err
}
ruleSetList := &appsv1alpha1.RuleSetList{}
if err := cl.List(context.TODO(), ruleSetList, &client.ListOptions{FieldSelector: fields.OneTermEqualSelector(rulesetutils.FieldIndexRuleSet, item.GetName())}); err != nil {
return result, err
}
for i := range ruleSetList.Items {
rs := &ruleSetList.Items[i]
findStatus := false
for i, detail := range rs.Status.Details {
if detail.Name != item.GetName() {
continue
}
findStatus = true
if !detail.Passed {
result.Message += CollectInfo(name, rs.Status.Details[i])
result.Message += CollectInfo(rs.Name, rs.Status.Details[i])
}
result.States = append(result.States, State{
RuleSetName: name,
RuleSetName: rs.Name,
Detail: rs.Status.Details[i],
})
}
if !findStatus {
result.States = append(result.States, State{
RuleSetName: name,
RuleSetName: rs.Name,
Detail: &appsv1alpha1.Detail{
Passed: false,
},
Expand Down
123 changes: 53 additions & 70 deletions pkg/controllers/ruleset/ruleset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
Expand All @@ -46,7 +45,8 @@ import (
"kusionstack.io/kafed/pkg/controllers/ruleset/processor"
"kusionstack.io/kafed/pkg/controllers/ruleset/register"
rulesetutils "kusionstack.io/kafed/pkg/controllers/ruleset/utils"
"kusionstack.io/kafed/pkg/controllers/utils"
controllerutils "kusionstack.io/kafed/pkg/controllers/utils"
"kusionstack.io/kafed/pkg/utils"
)

const (
Expand All @@ -56,10 +56,6 @@ const (
rulesetTerminatingLabel = "ruleset.kusionstack.io/terminating"
)

var (
podEventQueues []workqueue.DelayingInterface
)

// NewReconciler returns a new reconcile.Reconciler
func newReconciler(mgr manager.Manager) reconcile.Reconciler {
return &RuleSetReconciler{
Expand All @@ -79,7 +75,16 @@ func addToMgr(mgr manager.Manager, r reconcile.Reconciler) (controller.Controlle
if err != nil {
return nil, err
}

err = mgr.GetCache().IndexField(context.TODO(), &appsv1alpha1.RuleSet{}, rulesetutils.FieldIndexRuleSet, func(obj client.Object) []string {
rs, ok := obj.(*appsv1alpha1.RuleSet)
if !ok {
return nil
}
return rs.Status.Targets
})
if err != nil {
return nil, err
}
// Watch for changes to RuleSet
err = c.Watch(&source.Kind{Type: &appsv1alpha1.RuleSet{}}, &RulesetEventHandler{})
if err != nil {
Expand Down Expand Up @@ -118,9 +123,8 @@ func (r *RuleSetReconciler) Reconcile(ctx context.Context, request reconcile.Req
return reconcile.Result{}, err
}

if !rulesetutils.RulesetVersionExpectation.SatisfiedExpectations(utils.ObjectKey(ruleSet), ruleSet.ResourceVersion) {
r.Info(fmt.Sprintf("expected ruleset %s update, resource version %s, retry later", utils.ObjectKey(ruleSet), ruleSet.ResourceVersion))
fmt.Printf("expected ruleset %s update, resource version %s, retry later", utils.ObjectKey(ruleSet), ruleSet.ResourceVersion)
if !rulesetutils.RulesetVersionExpectation.SatisfiedExpectations(controllerutils.ObjectKey(ruleSet), ruleSet.ResourceVersion) {
r.Info(fmt.Sprintf("expected ruleset %s update, resource version %s, retry later", controllerutils.ObjectKey(ruleSet), ruleSet.ResourceVersion))
return reconcile.Result{}, nil
}

Expand All @@ -142,21 +146,21 @@ func (r *RuleSetReconciler) Reconcile(ctx context.Context, request reconcile.Req
return reconcile.Result{}, nil
}

return reconcile.Result{}, utils.RemoveFinalizer(ctx, r.Client, ruleSet, cleanUpFinalizer)
return reconcile.Result{}, controllerutils.RemoveFinalizer(ctx, r.Client, ruleSet, cleanUpFinalizer)
}
msg := fmt.Sprintf("can not delete ruleset: there are some pods waiting for process by ruleset %s/%s. Please terminate pods first or label ruleset kafed.kusionstack.io/terminating=true to force delete it", ruleSet.Namespace, ruleSet.Name)
result.RequeueAfter = 5 * time.Second
r.recorder.Event(ruleSet, corev1.EventTypeWarning, "BlockProtection", msg)
} else if !controllerutil.ContainsFinalizer(ruleSet, cleanUpFinalizer) {
if err := utils.AddFinalizer(ctx, r.Client, ruleSet, cleanUpFinalizer); err != nil {
if err := controllerutils.AddFinalizer(ctx, r.Client, ruleSet, cleanUpFinalizer); err != nil {
return result, fmt.Errorf("fail to add finalizer on RuleSet %s: %s", request, err)
}
}

selectedPodNames := sets.String{}
for _, pod := range selectedPods.Items {
if !rulesetutils.PodVersionExpectation.SatisfiedExpectations(utils.ObjectKey(&pod), pod.ResourceVersion) {
r.Info(fmt.Sprintf("expected pod %s update, resource version %s, retry later", utils.ObjectKey(&pod), pod.ResourceVersion))
if !rulesetutils.PodVersionExpectation.SatisfiedExpectations(controllerutils.ObjectKey(&pod), pod.ResourceVersion) {
r.Info(fmt.Sprintf("expected pod %s update, resource version %s, retry later", controllerutils.ObjectKey(&pod), pod.ResourceVersion))
return reconcile.Result{}, nil
}
selectedPodNames.Insert(pod.Name)
Expand All @@ -172,22 +176,12 @@ func (r *RuleSetReconciler) Reconcile(ctx context.Context, request reconcile.Req
continue
}

if _, err := r.updateRuleSetOnPod(ctx, ruleSet.Name, name, ruleSet.Namespace, rulesetutils.MoveRulesetAnno); err != nil {
if _, err := r.updateRuleSetOnPod(ctx, ruleSet.Name, name, ruleSet.Namespace, rulesetutils.MoveAllRuleSetInfo); err != nil {
r.Info(fmt.Sprintf("fail to remove ruleset on pod %s, %v", name, err))
return result, err
}
}

// own selected pods
for name, pod := range targetPods {
newPod, err := r.updateRuleSetOnPod(ctx, ruleSet.Name, pod.Name, pod.Namespace, rulesetutils.AddRuleSetAnno)
if err != nil {
r.Info(fmt.Sprintf("fail to add ruleset on pod %s, %v", name, err))
return result, err
}
targetPods[name] = newPod
}

// process rules
shouldRetry, interval, details, ruleStates := r.process(ruleSet, targetPods)

Expand Down Expand Up @@ -221,22 +215,43 @@ func (r *RuleSetReconciler) Reconcile(ctx context.Context, request reconcile.Req
}

if !equalStatus(newStatus, &ruleSet.Status) {
rulesetutils.RulesetVersionExpectation.ExpectUpdate(utils.ObjectKey(ruleSet), ruleSet.ResourceVersion)
rulesetutils.RulesetVersionExpectation.ExpectUpdate(controllerutils.ObjectKey(ruleSet), ruleSet.ResourceVersion)
ruleSet.Status = *newStatus
if err := r.Status().Update(ctx, ruleSet); err != nil {
rulesetutils.RulesetVersionExpectation.DeleteExpectations(utils.ObjectKey(ruleSet))
r.Error(err, fmt.Sprintf("fail to update ruleset %s status", utils.ObjectKey(ruleSet)))
rulesetutils.RulesetVersionExpectation.DeleteExpectations(controllerutils.ObjectKey(ruleSet))
r.Error(err, fmt.Sprintf("fail to update ruleset %s status", controllerutils.ObjectKey(ruleSet)))
return reconcile.Result{}, err
}
}
pods := make([]*corev1.Pod, 0, len(targetPods))
for _, pod := range targetPods {
pods = append(pods, pod)
}
return res, r.syncPodsDetail(ctx, ruleSet.Name, pods, details)
}

oldDetails := map[string]*appsv1alpha1.Detail{}
func (r *RuleSetReconciler) syncPodsDetail(ctx context.Context, ruleSetName string, pods []*corev1.Pod, details map[string]*appsv1alpha1.Detail) error {
_, err := controllerutils.SlowStartBatch(len(pods), 1, false, func(i int, _ error) error {
return r.updatePodDetail(ctx, pods[i], ruleSetName, details[pods[i].Name])
})
return err
}

for i, de := range ruleSet.Status.Details {
oldDetails[de.Name] = ruleSet.Status.Details[i]
func (r *RuleSetReconciler) updatePodDetail(ctx context.Context, pod *corev1.Pod, ruleSetName string, detail *appsv1alpha1.Detail) error {
detailAnno := appsv1alpha1.AnnotationRuleSetDetailPrefix + "/" + ruleSetName
var newDetail string
if detail != nil {
newDetail = utils.DumpJSON(&appsv1alpha1.Detail{Stage: detail.Stage, Passed: detail.Passed})
} else {
newDetail = utils.DumpJSON(&appsv1alpha1.Detail{Stage: "Unknown", Passed: true})
}
if pod.Annotations != nil && pod.Annotations[detailAnno] == newDetail {
return nil
}
addQueues(ruleSet, details, oldDetails)
return res, nil
patch := client.RawPatch(types.MergePatchType, controllerutils.GetLabelAnnoPatchBytes(nil, nil, nil, map[string]string{detailAnno: newDetail}))
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.Patch(ctx, pod, patch)
})
}

func (r *RuleSetReconciler) process(rs *appsv1alpha1.RuleSet, pods map[string]*corev1.Pod) (shouldRetry bool, interval *time.Duration, details map[string]*appsv1alpha1.Detail, ruleStates []*appsv1alpha1.RuleState) {
Expand Down Expand Up @@ -270,45 +285,10 @@ func (r *RuleSetReconciler) process(rs *appsv1alpha1.RuleSet, pods map[string]*c
return shouldRetry, interval, details, ruleStates
}

func addQueues(rs *appsv1alpha1.RuleSet, details map[string]*appsv1alpha1.Detail, oldDetails map[string]*appsv1alpha1.Detail) {
if len(podEventQueues) == 0 {
return
}
for name, old := range oldDetails {
detail, ok := details[name]
if !ok {
addToEveryQueue(rs.Namespace, name)
continue
}
if !equality.Semantic.DeepEqual(old, detail) {
addToEveryQueue(rs.Namespace, name)
}
}
for name := range details {
_, ok := oldDetails[name]
if !ok {
addToEveryQueue(rs.Namespace, name)
continue
}
}
}

func addToEveryQueue(namespace, name string) {
for _, q := range podEventQueues {
if q.ShuttingDown() {
continue
}
q.Add(types.NamespacedName{
Name: name,
Namespace: namespace,
})
}
}

func (r *RuleSetReconciler) cleanUpRuleSetPods(ctx context.Context, ruleSet *appsv1alpha1.RuleSet) error {
for _, name := range ruleSet.Status.Targets {
if _, err := r.updateRuleSetOnPod(ctx, ruleSet.Name, name, ruleSet.Namespace, rulesetutils.MoveRulesetAnno); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("fail to remove RuleSet %s on pod %s: %v", utils.ObjectKey(ruleSet), name, err)
if _, err := r.updateRuleSetOnPod(ctx, ruleSet.Name, name, ruleSet.Namespace, rulesetutils.MoveAllRuleSetInfo); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("fail to remove RuleSet %s on pod %s: %v", controllerutils.ObjectKey(ruleSet), name, err)
}
}
return nil
Expand All @@ -318,6 +298,9 @@ func (r *RuleSetReconciler) updateRuleSetOnPod(ctx context.Context, ruleSet, nam
pod := &corev1.Pod{}
return pod, retry.RetryOnConflict(retry.DefaultRetry, func() error {
if err := r.Get(ctx, types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
if fn(pod, ruleSet) {
Expand Down
40 changes: 0 additions & 40 deletions pkg/controllers/ruleset/ruleset_controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,10 @@ limitations under the License.
package ruleset

import (
"context"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/source"

"kusionstack.io/kafed/pkg/controllers/ruleset/checker"
"kusionstack.io/kafed/pkg/controllers/ruleset/register"
Expand All @@ -52,33 +45,6 @@ func RuleSetManager() ManagerInterface {
return defaultManager
}

func RegisterListenChan(ctx context.Context) *source.Channel {
ch := make(chan event.GenericEvent, 32)
go func() {
q := loadPodEventQueue()
for {
select {
case <-ctx.Done():
q.ShutDown()
break
default:
item, shutdown := q.Get()
if shutdown {
break
}
tp, ok := item.(types.NamespacedName)
if !ok {
q.Done(item)
continue
}
ch <- event.GenericEvent{Object: &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Namespace: tp.Namespace, Name: tp.Name}}}
q.Done(item)
}
}
}()
return &source.Channel{Source: ch}
}

func AddUnAvailableFunc(f func(pod *corev1.Pod) (bool, *int64)) {
register.UnAvailableFuncList = append(register.UnAvailableFuncList, f)
}
Expand All @@ -90,12 +56,6 @@ func newRulesetManager() ManagerInterface {
}
}

func loadPodEventQueue() workqueue.DelayingInterface {
podEventQueue := workqueue.NewDelayingQueue()
podEventQueues = append(podEventQueues, podEventQueue)
return podEventQueue
}

type rsManager struct {
register register.Register
checker checker.Check
Expand Down
1 change: 1 addition & 0 deletions pkg/controllers/ruleset/ruleset_controller_suit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func TestMain(m *testing.M) {
Host: "127.0.0.1:10001",
}
initRulesetManager()

config, err := env.Start()
if err != nil {
panic(err)
Expand Down
Loading

0 comments on commit b4cceb9

Please sign in to comment.