Skip to content

Commit

Permalink
podopslifecycle uses ruleset
Browse files Browse the repository at this point in the history
  • Loading branch information
shaofan-hs committed Aug 17, 2023
1 parent 7c2684b commit abc9ea7
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 37 deletions.
3 changes: 3 additions & 0 deletions apis/apps/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ package v1alpha1

const (
PodOperationProtectionFinalizerPrefix = "prot.lifecycle.kafed.kusionstack.io"

PodOpsLifecyclePreCheckStage = "precheck"
PodOpsLifecyclePostCheckStage = "postcheck"
)

// +kubebuilder:object:generate=false
Expand Down
122 changes: 91 additions & 31 deletions pkg/controllers/podopslifecycle/podopslifecycle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -36,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

"kusionstack.io/kafed/apis/apps/v1alpha1"
"kusionstack.io/kafed/pkg/controllers/ruleset"
"kusionstack.io/kafed/pkg/controllers/utils/expectations"
"kusionstack.io/kafed/pkg/log"
)
Expand All @@ -52,17 +54,6 @@ func Add(mgr manager.Manager) error {
return AddToMgr(mgr, NewReconciler(mgr))
}

func NewReconciler(mgr manager.Manager) reconcile.Reconciler {
logger := log.New(logf.Log.WithName("podopslifecycle-controller"))
expectation = expectations.NewResourceVersionExpectation(logger)

r := &ReconcilePodOpsLifecycle{
Client: mgr.GetClient(),
logger: logger,
}
return r
}

func AddToMgr(mgr manager.Manager, r reconcile.Reconciler) error {
c, err := controller.New(controllerName, mgr, controller.Options{MaxConcurrentReconciles: 5, Reconciler: r})
if err != nil {
Expand All @@ -83,9 +74,29 @@ func AddToMgr(mgr manager.Manager, r reconcile.Reconciler) error {

var _ reconcile.Reconciler = &ReconcilePodOpsLifecycle{}

func NewReconciler(mgr manager.Manager) *ReconcilePodOpsLifecycle {
logger := log.New(logf.Log.WithName("podopslifecycle-controller"))
expectation = expectations.NewResourceVersionExpectation(logger)

r := &ReconcilePodOpsLifecycle{
Client: mgr.GetClient(),
ruleSetManager: ruleset.RuleSetManager(),

logger: logger,
recorder: mgr.GetEventRecorderFor(controllerName),
expectation: expectation,
}
r.registerStages()

return r
}

type ReconcilePodOpsLifecycle struct {
client.Client
logger *log.Logger
ruleSetManager ruleset.ManagerInterface
logger *log.Logger
recorder record.EventRecorder
expectation *expectations.ResourceVersionExpectation
}

func (r *ReconcilePodOpsLifecycle) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
Expand All @@ -97,13 +108,42 @@ func (r *ReconcilePodOpsLifecycle) Reconcile(ctx context.Context, request reconc
if err != nil {
r.logger.Warningf("failed to get pod %s: %s", key, err)
if errors.IsNotFound(err) {
expectation.DeleteExpectations(key)
r.expectation.DeleteExpectations(key)
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}

if !expectation.SatisfiedExpectations(key, pod.ResourceVersion) {
state, err := r.ruleSetManager.GetState(r.Client, pod)
if err != nil {
r.logger.Errorf("failed to get pod %s state: %s", key, err)
return reconcile.Result{}, err
}
if state.Passed {
var labels map[string]string
if state.InStage(v1alpha1.PodOpsLifecyclePreCheckStage) {
labels, err = r.preCheckStage(pod)
} else if state.InStage(v1alpha1.PodOpsLifecyclePostCheckStage) {
labels, err = r.postCheckStage(pod)
}
if err != nil {
return reconcile.Result{}, err
}

if len(labels) > 0 {
expectation.ExpectUpdate(key, pod.ResourceVersion)
err = r.addLabels(ctx, pod, labels)
if err != nil {
r.logger.Errorf("failed to update pod %s: %s", key, err)
expectation.DeleteExpectations(key)

return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}
}

if !r.expectation.SatisfiedExpectations(key, pod.ResourceVersion) {
r.logger.V(2).Infof("skip pod %s with no satisfied", key)
return reconcile.Result{}, nil
}
Expand All @@ -112,30 +152,33 @@ func (r *ReconcilePodOpsLifecycle) Reconcile(ctx context.Context, request reconc
if err != nil {
return reconcile.Result{}, err
}
fmt.Println(idToLabelsMap)

expected := map[string]bool{
v1alpha1.PodPrepareLabelPrefix: false, // set readiness gate to false
v1alpha1.PodCompleteLabelPrefix: true, // set readiness gate to true
}
for _, labels := range idToLabelsMap {
for k, v := range expected {
if _, ok := labels[k]; ok {
needUpdate, _ := r.setServiceReadiness(pod, v)
if needUpdate {
expectation.ExpectUpdate(key, pod.ResourceVersion)

if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.Client.Status().Update(ctx, pod)
}); err != nil {
r.logger.Errorf("failed to update pod status %s: %s", key, err)
expectation.DeleteExpectations(key)

return reconcile.Result{}, err
}
break
if _, ok := labels[k]; !ok {
continue
}

needUpdate, _ := r.setServiceReadiness(pod, v)
if needUpdate {
r.expectation.ExpectUpdate(key, pod.ResourceVersion)

if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.Client.Status().Update(ctx, pod)
}); err != nil {
r.logger.Errorf("failed to update pod status %s: %s", key, err)
r.expectation.DeleteExpectations(key)

return reconcile.Result{}, err
}
return reconcile.Result{}, nil // only need set once
break
}
return reconcile.Result{}, nil // only need set once
}
}

Expand Down Expand Up @@ -186,7 +229,7 @@ func (r *ReconcilePodOpsLifecycle) setServiceReadiness(pod *corev1.Pod, isReady
return true, fmt.Sprintf("update service readiness gate to: %s", string(status))
}

func (r *ReconcilePodOpsLifecycle) stagePreCheck(pod *corev1.Pod) (labels map[string]string, err error) {
func (r *ReconcilePodOpsLifecycle) preCheckStage(pod *corev1.Pod) (labels map[string]string, err error) {
idToLabelsMap, _, err := PodIDAndTypesMap(pod)
if err != nil {
return nil, err
Expand Down Expand Up @@ -214,7 +257,7 @@ func (r *ReconcilePodOpsLifecycle) stagePreCheck(pod *corev1.Pod) (labels map[st
return
}

func (r *ReconcilePodOpsLifecycle) stagePostCheck(pod *corev1.Pod) (labels map[string]string, err error) {
func (r *ReconcilePodOpsLifecycle) postCheckStage(pod *corev1.Pod) (labels map[string]string, err error) {
idToLabelsMap, _, err := PodIDAndTypesMap(pod)
if err != nil {
return nil, err
Expand Down Expand Up @@ -245,6 +288,23 @@ func (r *ReconcilePodOpsLifecycle) addLabels(ctx context.Context, pod *corev1.Po
})
}

func (r *ReconcilePodOpsLifecycle) registerStages() {
r.ruleSetManager.RegisterStage(v1alpha1.PodOpsLifecyclePreCheckStage, func(po client.Object) bool {
labels := po.GetLabels()
if labels == nil {
return false
}
return labelHasPrefix(labels, v1alpha1.PodPreCheckLabelPrefix)
})
r.ruleSetManager.RegisterStage(v1alpha1.PodOpsLifecyclePostCheckStage, func(po client.Object) bool {
labels := po.GetLabels()
if labels == nil {
return false
}
return labelHasPrefix(labels, v1alpha1.PodPostCheckLabelPrefix)
})
}

func labelHasPrefix(labels map[string]string, prefix string) bool {
for k := range labels {
if strings.HasPrefix(k, prefix) {
Expand Down
123 changes: 117 additions & 6 deletions pkg/controllers/podopslifecycle/podopslifecycle_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"kusionstack.io/kafed/apis/apps/v1alpha1"
"kusionstack.io/kafed/pkg/controllers/ruleset"
"kusionstack.io/kafed/pkg/controllers/ruleset/checker"
)

var (
env *envtest.Environment
mgr manager.Manager
request chan reconcile.Request
env *envtest.Environment
podOpsLifecycle *ReconcilePodOpsLifecycle
mgr manager.Manager
request chan reconcile.Request

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -69,8 +72,11 @@ var _ = BeforeSuite(func() {
})
Expect(err).NotTo(HaveOccurred())

podOpsLifecycle = NewReconciler(mgr)
podOpsLifecycle.ruleSetManager = &mockRuleSetManager{}

var r reconcile.Reconciler
r, request = testReconcile(NewReconciler(mgr))
r, request = testReconcile(podOpsLifecycle)
err = AddToMgr(mgr, r)
Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -104,8 +110,9 @@ var _ = Describe("podopslifecycle controller", func() {
},
},
}
id = "123"
time = "1402144848"
operationType = "restart"
id = "123"
time = "1402144848"
)

AfterEach(func() {
Expand All @@ -126,6 +133,85 @@ var _ = Describe("podopslifecycle controller", func() {
}
})

It("update pod with stage pre-check", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
},
Spec: podSpec,
}
err := mgr.GetClient().Create(context.Background(), pod)
Expect(err).NotTo(HaveOccurred())

<-request

pod = &corev1.Pod{}
err = mgr.GetAPIReader().Get(context.Background(), client.ObjectKey{
Name: "test",
Namespace: "default",
}, pod)
Expect(err).NotTo(HaveOccurred())
Expect(pod.Status.Conditions).To(HaveLen(0))

podOpsLifecycle.ruleSetManager = &mockRuleSetManager{CheckState: &checker.CheckState{
States: []checker.State{
{
Detail: v1alpha1.Detail{
Stage: v1alpha1.PodOpsLifecyclePreCheckStage,
},
},
},
Passed: true,
}}

pod.ObjectMeta.Labels = map[string]string{
fmt.Sprintf("%s/%s", v1alpha1.PodPreCheckLabelPrefix, id): time,
fmt.Sprintf("%s/%s", v1alpha1.PodOperationTypeLabelPrefix, id): operationType,
}
err = mgr.GetClient().Update(context.Background(), pod)
Expect(err).NotTo(HaveOccurred())

<-request

pod = &corev1.Pod{}
err = mgr.GetAPIReader().Get(context.Background(), client.ObjectKey{
Name: "test",
Namespace: "default",
}, pod)
Expect(err).NotTo(HaveOccurred())

Expect(pod.GetLabels()).To(HaveKey(fmt.Sprintf("%s/%s", v1alpha1.PodPreCheckedLabelPrefix, id)))
Expect(pod.GetLabels()).To(HaveKey(fmt.Sprintf("%s/%s", v1alpha1.PodOperationPermissionLabelPrefix, operationType)))
})

It("create pod with label prepare", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "default",
Labels: map[string]string{
fmt.Sprintf("%s/%s", v1alpha1.PodPrepareLabelPrefix, id): time,
},
},
Spec: podSpec,
}
err := mgr.GetClient().Create(context.Background(), pod)
Expect(err).NotTo(HaveOccurred())

<-request

pod = &corev1.Pod{}
err = mgr.GetAPIReader().Get(context.Background(), client.ObjectKey{
Name: "test",
Namespace: "default",
}, pod)
Expect(err).NotTo(HaveOccurred())
Expect(pod.Status.Conditions).To(HaveLen(1))
Expect(string(pod.Status.Conditions[0].Type)).To(Equal(v1alpha1.ReadinessGatePodServiceReady))
Expect(pod.Status.Conditions[0].Status).To(Equal(corev1.ConditionFalse))
})

It("create pod with label complete", func() {
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -206,6 +292,31 @@ func testReconcile(inner reconcile.Reconciler) (reconcile.Reconciler, chan recon
return fn, requests
}

var _ ruleset.ManagerInterface = &mockRuleSetManager{}

type mockRuleSetManager struct {
*checker.CheckState
}

func (rsm *mockRuleSetManager) RegisterStage(key string, inStage func(obj client.Object) bool) error {
return nil
}

func (rsm *mockRuleSetManager) RegisterCondition(opsCondition string, inCondition func(obj client.Object) bool) error {
return nil
}

func (rsm *mockRuleSetManager) SetupRuleSetController(manager.Manager) error {
return nil
}

func (rsm *mockRuleSetManager) GetState(client.Client, client.Object) (checker.CheckState, error) {
if rsm.CheckState == nil {
return checker.CheckState{}, nil
}
return *rsm.CheckState, nil
}

func TestPodOpsLifecycleController(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "podopslifecycle controller suite test")
Expand Down

0 comments on commit abc9ea7

Please sign in to comment.