Skip to content

Commit

Permalink
feat, init podopslifecycle (#16)
Browse files Browse the repository at this point in the history
* feat, podopslifecycle

---------

Co-authored-by: shaofan-hs <shaofan-hs@hotmail.com>
  • Loading branch information
shaofan-hs and shaofan-hs committed Aug 8, 2023
1 parent ad262df commit ee2487e
Show file tree
Hide file tree
Showing 16 changed files with 2,082 additions and 2 deletions.
23 changes: 23 additions & 0 deletions apis/apps/v1alpha1/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,26 @@ package v1alpha1
const (
PodScalingInPhaseLabel = "cafed.kusionstack.io/scaling-in" // indicate a pod is scaling in
)

const (
PodOperatingLabelPrefix = "operating.kafed.kusionstack.io" // indicate a pod is operating
PodOperationTypeLabelPrefix = "operation-type.kafed.kusionstack.io" // indicate the type of operation
PodOperationPermissionLabelPrefix = "operation-permission.kafed.kusionstack.io" // indicate the permission of operation
PodUndoOperationTypeLabelPrefix = "undo-operation-type.kafed.kusionstack.io" // indicate the type of operation has been canceled
PodDoneOperationTypeLabelPrefix = "done-operation-type.kafed.kusionstack.io" // indicate the type of operation has been done

PodPreCheckLabelPrefix = "pre-check.lifecycle.kafed.kusionstack.io" // indicate a pod is in pre-check phase
PodPreCheckedLabelPrefix = "pre-checked.lifecycle.kafed.kusionstack.io" // indicate a pod has finished pre-check phase
PodPrepareLabelPrefix = "prepare.lifecycle.kafed.kusionstack.io" // indicate a pod is in prepare phase
PodOperateLabelPrefix = "operate.lifecycle.kafed.kusionstack.io" // indicate a pod is in operate phase
PodOperatedLabelPrefix = "operated.lifecycle.kafed.kusionstack.io" // indicate a pod has finished operate phase
PodPostCheckLabelPrefix = "post-check.lifecycle.kafed.kusionstack.io" // indicate a pod is in post-check phase
PodPostCheckedLabelPrefix = "post-checked.lifecycle.kafed.kusionstack.io" // indicate a pod has finished post-check phase
PodCompleteLabelPrefix = "complete.lifecycle.kafed.kusionstack.io" // indicate a pod has finished all phases
)

var (
WellKnownLabelPrefixesWithID = []string{PodOperatingLabelPrefix, PodOperationTypeLabelPrefix, PodPreCheckLabelPrefix, PodPreCheckedLabelPrefix,
PodPrepareLabelPrefix, PodUndoOperationTypeLabelPrefix, PodOperateLabelPrefix, PodOperatedLabelPrefix, PodPostCheckLabelPrefix,
PodPostCheckedLabelPrefix, PodCompleteLabelPrefix}
)
4 changes: 4 additions & 0 deletions apis/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ limitations under the License.
*/

package apis

const (
EnvTestMode = "ENV_TEST_MODE"
)
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ go 1.19

require (
github.com/docker/distribution v2.8.1+incompatible
github.com/go-logr/logr v1.2.3
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.26.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.14.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.1
k8s.io/api v0.22.6
k8s.io/apimachinery v0.22.6
k8s.io/client-go v0.22.6
Expand All @@ -33,7 +35,6 @@ require (
github.com/evanphx/json-patch v4.11.0+incompatible // indirect
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/zapr v0.4.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
Expand All @@ -49,10 +50,10 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/stretchr/testify v1.8.1 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.19.0 // indirect
Expand Down
29 changes: 29 additions & 0 deletions pkg/controllers/add_podopslifecycle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
Copyright 2023 The KusionStack Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"kusionstack.io/kafed/pkg/controllers/podopslifecycle"
"kusionstack.io/kafed/pkg/features"
"kusionstack.io/kafed/pkg/utils/feature"
)

func init() {
if feature.DefaultFeatureGate.Enabled(features.PodOpsLifecycle) {
AddToManagerFuncs = append(AddToManagerFuncs, podopslifecycle.Add)
}
}
255 changes: 255 additions & 0 deletions pkg/controllers/podopslifecycle/podopslifecycle_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
/*
Copyright 2023 The KusionStack Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podopslifecycle

import (
"context"
"fmt"
"strconv"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

"kusionstack.io/kafed/apis/apps/v1alpha1"
"kusionstack.io/kafed/pkg/controllers/utils/expectations"
"kusionstack.io/kafed/pkg/log"
)

const (
controllerName = "podopslifecycle-controller"
)

var (
expectation *expectations.ResourceVersionExpectation
)

func Add(mgr manager.Manager) error {
return AddToMgr(mgr, NewReconciler(mgr))
}

func NewReconciler(mgr manager.Manager) reconcile.Reconciler {
logger := log.New(logf.Log.WithName("podopslifecycle-controller"))
expectation = expectations.NewResourceVersionExpectation(logger)

r := &ReconcilePodOpsLifecycle{
Client: mgr.GetClient(),
logger: logger,
}
return r
}

func AddToMgr(mgr manager.Manager, r reconcile.Reconciler) error {
c, err := controller.New(controllerName, mgr, controller.Options{MaxConcurrentReconciles: 5, Reconciler: r})
if err != nil {
return err
}

err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}, &PodPredicate{
NeedOpsLifecycle: func(oldPod, newPod *corev1.Pod) bool {
return true
},
})
if err != nil {
return err
}

return nil
}

var _ reconcile.Reconciler = &ReconcilePodOpsLifecycle{}

type ReconcilePodOpsLifecycle struct {
client.Client
logger *log.Logger
}

func (r *ReconcilePodOpsLifecycle) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
key := fmt.Sprintf("%s/%s", request.Namespace, request.Name)
r.logger.V(0).Infof("Reconcile Pod %s", key)

pod := &corev1.Pod{}
err := r.Client.Get(ctx, request.NamespacedName, pod)
if err != nil {
r.logger.Warningf("failed to get pod %s: %s", key, err)
if errors.IsNotFound(err) {
expectation.DeleteExpectations(key)
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}

if !expectation.SatisfiedExpectations(key, pod.ResourceVersion) {
r.logger.V(2).Infof("skip pod %s with no satisfied", key)
return reconcile.Result{}, nil
}

idToLabelsMap, _, err := PodIDAndTypesMap(pod)
if err != nil {
return reconcile.Result{}, err
}

expected := map[string]bool{
v1alpha1.PodPrepareLabelPrefix: false, // set readiness gate to false
v1alpha1.PodCompleteLabelPrefix: true, // set readiness gate to true
}
for _, labels := range idToLabelsMap {
for k, v := range expected {
if _, ok := labels[k]; ok {
needUpdate, _ := r.setServiceReadiness(pod, v)
if needUpdate {
expectation.ExpectUpdate(key, pod.ResourceVersion)

if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.Client.Status().Update(ctx, pod)
}); err != nil {
r.logger.Errorf("failed to update pod status %s: %s", key, err)
expectation.DeleteExpectations(key)

return reconcile.Result{}, err
}
break
}
return reconcile.Result{}, nil // only need set once
}
}
}

return reconcile.Result{}, nil
}

func (r *ReconcilePodOpsLifecycle) setServiceReadiness(pod *corev1.Pod, isReady bool) (bool, string) {
found := false
for _, rg := range pod.Spec.ReadinessGates {
if rg.ConditionType == v1alpha1.ReadinessGatePodServiceReady {
found = true
}
}

if !found {
return false, ""
}

index := -1
if pod.Status.Conditions != nil {
for idx, cond := range pod.Status.Conditions {
if cond.Type == v1alpha1.ReadinessGatePodServiceReady {
index = idx
}
}
}

status := corev1.ConditionTrue
if !isReady {
status = corev1.ConditionFalse
}
if index == -1 { // append readiness gate
pod.Status.Conditions = append(pod.Status.Conditions, corev1.PodCondition{
Type: v1alpha1.ReadinessGatePodServiceReady,
Status: status,
LastTransitionTime: metav1.Now(),
})
return true, fmt.Sprintf("append service readiness gate to: %s", string(status))
}

if pod.Status.Conditions[index].Status == status {
return false, ""
}

// update readiness gate
pod.Status.Conditions[index].Status = status
pod.Status.Conditions[index].LastTransitionTime = metav1.Now()
return true, fmt.Sprintf("update service readiness gate to: %s", string(status))
}

func (r *ReconcilePodOpsLifecycle) stagePreCheck(pod *corev1.Pod) (labels map[string]string, err error) {
idToLabelsMap, _, err := PodIDAndTypesMap(pod)
if err != nil {
return nil, err
}

labels = map[string]string{}
currentTime := strconv.FormatInt(time.Now().Unix(), 10)
for k, v := range idToLabelsMap {
t, ok := v[v1alpha1.PodOperationTypeLabelPrefix]
if !ok {
continue
}

key := fmt.Sprintf("%s/%s", v1alpha1.PodOperationPermissionLabelPrefix, t)
if _, ok := pod.GetLabels()[key]; !ok {
labels[key] = currentTime
}

key = fmt.Sprintf("%s/%s", v1alpha1.PodPreCheckedLabelPrefix, k)
if _, ok := pod.GetLabels()[key]; !ok {
labels[key] = currentTime
}
}

return
}

func (r *ReconcilePodOpsLifecycle) stagePostCheck(pod *corev1.Pod) (labels map[string]string, err error) {
idToLabelsMap, _, err := PodIDAndTypesMap(pod)
if err != nil {
return nil, err
}

labels = map[string]string{}
currentTime := strconv.FormatInt(time.Now().Unix(), 10)
for k := range idToLabelsMap {
key := fmt.Sprintf("%s/%s", v1alpha1.PodPostCheckedLabelPrefix, k)
if _, ok := pod.GetLabels()[key]; !ok {
labels[key] = currentTime
}
}

return
}

func (r *ReconcilePodOpsLifecycle) addLabels(ctx context.Context, pod *corev1.Pod, labels map[string]string) error {
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
for k, v := range labels {
pod.Labels[k] = v
}

return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.Client.Update(ctx, pod)
})
}

func labelHasPrefix(labels map[string]string, prefix string) bool {
for k := range labels {
if strings.HasPrefix(k, prefix) {
return true
}
}
return false
}
Loading

0 comments on commit ee2487e

Please sign in to comment.