Skip to content

Commit

Permalink
Merge pull request #26 from KusionStack/podopslifecycle3
Browse files Browse the repository at this point in the history
register podopslifecycle webhook
  • Loading branch information
Eikykun committed Aug 17, 2023
2 parents 035bb84 + 8e008b3 commit becbf99
Show file tree
Hide file tree
Showing 25 changed files with 803 additions and 442 deletions.
7 changes: 7 additions & 0 deletions apis/apps/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ limitations under the License.

package v1alpha1

const (
PodOperationProtectionFinalizerPrefix = "prot.lifecycle.kafed.kusionstack.io"

PodOpsLifecyclePreCheckStage = "pre-check"
PodOpsLifecyclePostCheckStage = "post-check"
)

// +kubebuilder:object:generate=false
type PodAvailableConditions struct {
ExpectedFinalizers []string `json:"expectedFinalizers,omitempty"` // indicate the expected finalizers of a pod
Expand Down
4 changes: 3 additions & 1 deletion apis/apps/v1alpha1/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package v1alpha1

// pod ops lifecyle labels
const (
ControlledByPodOpsLifecycle = "podopslifecycle.kusionstack.io/control" // indicate a pod is controlled by podopslifecycle

PodOperatingLabelPrefix = "operating.podopslifecycle.kusionstack.io" // indicate a pod is operating
PodOperationTypeLabelPrefix = "operation-type.podopslifecycle.kusionstack.io" // indicate the type of operation
PodOperationPermissionLabelPrefix = "operation-permission.podopslifecycle.kusionstack.io" // indicate the permission of operation
Expand All @@ -44,6 +46,6 @@ const (

var (
WellKnownLabelPrefixesWithID = []string{PodOperatingLabelPrefix, PodOperationTypeLabelPrefix, PodPreCheckLabelPrefix, PodPreCheckedLabelPrefix,
PodPrepareLabelPrefix, PodUndoOperationTypeLabelPrefix, PodOperateLabelPrefix, PodOperatedLabelPrefix, PodPostCheckLabelPrefix,
PodPrepareLabelPrefix, PodDoneOperationTypeLabelPrefix, PodUndoOperationTypeLabelPrefix, PodOperateLabelPrefix, PodOperatedLabelPrefix, PodPostCheckLabelPrefix,
PodPostCheckedLabelPrefix, PodCompleteLabelPrefix}
)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/docker/distribution v2.8.1+incompatible
github.com/go-logr/logr v1.2.3
github.com/google/uuid v1.3.0
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.26.0
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -42,7 +43,6 @@ require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/gofuzz v1.1.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/gnostic v0.5.5 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,22 @@
/*
Copyright 2023 The KusionStack Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package register
package controllers

import (
"sigs.k8s.io/controller-runtime/pkg/client"
"kusionstack.io/kafed/pkg/controllers/ruleset"
)

type KindCache struct {
newFunc map[string]func() client.Object
}

func (k *KindCache) AddNewKind(gvk string, newFunc func() client.Object) {
k.newFunc[gvk] = newFunc
}

func (k *KindCache) GetNewObject(gvk string) client.Object {
f, ok := k.newFunc[gvk]
if !ok {
return nil
}
return f()
func init() {
AddToManagerFuncs = append(AddToManagerFuncs, ruleset.RuleSetManager().SetupRuleSetController)
}
114 changes: 83 additions & 31 deletions pkg/controllers/podopslifecycle/podopslifecycle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand All @@ -36,6 +37,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

"kusionstack.io/kafed/apis/apps/v1alpha1"
"kusionstack.io/kafed/pkg/controllers/ruleset"
"kusionstack.io/kafed/pkg/controllers/utils/expectations"
"kusionstack.io/kafed/pkg/log"
)
Expand All @@ -52,17 +54,6 @@ func Add(mgr manager.Manager) error {
return AddToMgr(mgr, NewReconciler(mgr))
}

func NewReconciler(mgr manager.Manager) reconcile.Reconciler {
logger := log.New(logf.Log.WithName("podopslifecycle-controller"))
expectation = expectations.NewResourceVersionExpectation(logger)

r := &ReconcilePodOpsLifecycle{
Client: mgr.GetClient(),
logger: logger,
}
return r
}

func AddToMgr(mgr manager.Manager, r reconcile.Reconciler) error {
c, err := controller.New(controllerName, mgr, controller.Options{MaxConcurrentReconciles: 5, Reconciler: r})
if err != nil {
Expand All @@ -83,9 +74,29 @@ func AddToMgr(mgr manager.Manager, r reconcile.Reconciler) error {

var _ reconcile.Reconciler = &ReconcilePodOpsLifecycle{}

func NewReconciler(mgr manager.Manager) *ReconcilePodOpsLifecycle {
logger := log.New(logf.Log.WithName("podopslifecycle-controller"))
expectation = expectations.NewResourceVersionExpectation(logger)

r := &ReconcilePodOpsLifecycle{
Client: mgr.GetClient(),
ruleSetManager: ruleset.RuleSetManager(),

logger: logger,
recorder: mgr.GetEventRecorderFor(controllerName),
expectation: expectation,
}
r.registerStages()

return r
}

type ReconcilePodOpsLifecycle struct {
client.Client
logger *log.Logger
ruleSetManager ruleset.ManagerInterface
logger *log.Logger
recorder record.EventRecorder
expectation *expectations.ResourceVersionExpectation
}

func (r *ReconcilePodOpsLifecycle) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
Expand All @@ -97,13 +108,40 @@ func (r *ReconcilePodOpsLifecycle) Reconcile(ctx context.Context, request reconc
if err != nil {
r.logger.Warningf("failed to get pod %s: %s", key, err)
if errors.IsNotFound(err) {
expectation.DeleteExpectations(key)
r.expectation.DeleteExpectations(key)
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}

if !expectation.SatisfiedExpectations(key, pod.ResourceVersion) {
state, err := r.ruleSetManager.GetState(r.Client, pod)
if err != nil {
r.logger.Errorf("failed to get pod %s state: %s", key, err)
return reconcile.Result{}, err
}
if state.Passed {
var labels map[string]string
if state.InStage(v1alpha1.PodOpsLifecyclePreCheckStage) {
labels, err = r.preCheckStage(pod)
} else if state.InStage(v1alpha1.PodOpsLifecyclePostCheckStage) {
labels, err = r.postCheckStage(pod)
}
if err != nil {
return reconcile.Result{}, err
}

if len(labels) > 0 {
expectation.ExpectUpdate(key, pod.ResourceVersion)
err = r.addLabels(ctx, pod, labels)
if err != nil {
r.logger.Errorf("failed to update pod %s: %s", key, err)
expectation.DeleteExpectations(key)
}
return reconcile.Result{}, err
}
}

if !r.expectation.SatisfiedExpectations(key, pod.ResourceVersion) {
r.logger.V(2).Infof("skip pod %s with no satisfied", key)
return reconcile.Result{}, nil
}
Expand All @@ -112,30 +150,33 @@ func (r *ReconcilePodOpsLifecycle) Reconcile(ctx context.Context, request reconc
if err != nil {
return reconcile.Result{}, err
}
fmt.Println(idToLabelsMap)

expected := map[string]bool{
v1alpha1.PodPrepareLabelPrefix: false, // set readiness gate to false
v1alpha1.PodCompleteLabelPrefix: true, // set readiness gate to true
}
for _, labels := range idToLabelsMap {
for k, v := range expected {
if _, ok := labels[k]; ok {
needUpdate, _ := r.setServiceReadiness(pod, v)
if needUpdate {
expectation.ExpectUpdate(key, pod.ResourceVersion)

if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.Client.Status().Update(ctx, pod)
}); err != nil {
r.logger.Errorf("failed to update pod status %s: %s", key, err)
expectation.DeleteExpectations(key)

return reconcile.Result{}, err
}
break
if _, ok := labels[k]; !ok {
continue
}

needUpdate, _ := r.setServiceReadiness(pod, v)
if needUpdate {
r.expectation.ExpectUpdate(key, pod.ResourceVersion)

if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.Client.Status().Update(ctx, pod)
}); err != nil {
r.logger.Errorf("failed to update pod status %s: %s", key, err)
r.expectation.DeleteExpectations(key)

return reconcile.Result{}, err
}
return reconcile.Result{}, nil // only need set once
break
}
return reconcile.Result{}, nil // only need set once
}
}

Expand Down Expand Up @@ -186,7 +227,7 @@ func (r *ReconcilePodOpsLifecycle) setServiceReadiness(pod *corev1.Pod, isReady
return true, fmt.Sprintf("update service readiness gate to: %s", string(status))
}

func (r *ReconcilePodOpsLifecycle) stagePreCheck(pod *corev1.Pod) (labels map[string]string, err error) {
func (r *ReconcilePodOpsLifecycle) preCheckStage(pod *corev1.Pod) (labels map[string]string, err error) {
idToLabelsMap, _, err := PodIDAndTypesMap(pod)
if err != nil {
return nil, err
Expand Down Expand Up @@ -214,7 +255,7 @@ func (r *ReconcilePodOpsLifecycle) stagePreCheck(pod *corev1.Pod) (labels map[st
return
}

func (r *ReconcilePodOpsLifecycle) stagePostCheck(pod *corev1.Pod) (labels map[string]string, err error) {
func (r *ReconcilePodOpsLifecycle) postCheckStage(pod *corev1.Pod) (labels map[string]string, err error) {
idToLabelsMap, _, err := PodIDAndTypesMap(pod)
if err != nil {
return nil, err
Expand Down Expand Up @@ -245,6 +286,17 @@ func (r *ReconcilePodOpsLifecycle) addLabels(ctx context.Context, pod *corev1.Po
})
}

func (r *ReconcilePodOpsLifecycle) registerStages() {
r.ruleSetManager.RegisterStage(v1alpha1.PodOpsLifecyclePreCheckStage, func(po client.Object) bool {
labels := po.GetLabels()
return labels != nil && labelHasPrefix(labels, v1alpha1.PodPreCheckLabelPrefix)
})
r.ruleSetManager.RegisterStage(v1alpha1.PodOpsLifecyclePostCheckStage, func(po client.Object) bool {
labels := po.GetLabels()
return labels != nil && labelHasPrefix(labels, v1alpha1.PodPostCheckLabelPrefix)
})
}

func labelHasPrefix(labels map[string]string, prefix string) bool {
for k := range labels {
if strings.HasPrefix(k, prefix) {
Expand Down
Loading

0 comments on commit becbf99

Please sign in to comment.