Skip to content

Commit

Permalink
add prometheus for pub and deletion protection
Browse files Browse the repository at this point in the history
Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>
  • Loading branch information
zmberg committed Jul 27, 2023
1 parent 4e35a1d commit 614876c
Show file tree
Hide file tree
Showing 21 changed files with 141 additions and 58 deletions.
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
appsv1beta1 "github.com/openkruise/kruise/apis/apps/v1beta1"
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
extclient "github.com/openkruise/kruise/pkg/client"
"github.com/openkruise/kruise/pkg/control/pubcontrol"
"github.com/openkruise/kruise/pkg/controller"
"github.com/openkruise/kruise/pkg/features"
"github.com/openkruise/kruise/pkg/util"
Expand Down Expand Up @@ -186,6 +187,7 @@ func main() {
setupLog.Error(err, "unable to start ControllerFinder")
os.Exit(1)
}
pubcontrol.InitPubControl(mgr.GetClient(), mgr.GetEventRecorderFor("pub-controller"))

setupLog.Info("register field index")
if err := fieldindex.RegisterFieldIndexes(mgr.GetCache()); err != nil {
Expand Down
17 changes: 13 additions & 4 deletions pkg/control/pubcontrol/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ import (
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type PubControl interface {
var PubControl pubControl
var recorder record.EventRecorder
var kclient client.Client

type pubControl interface {
// IsPodReady indicates whether pod is fully ready
// 1. pod.Status.Phase == v1.PodRunning
// 2. pod.condition PodReady == true
Expand All @@ -41,9 +47,12 @@ type PubControl interface {
IsPodUnavailableChanged(oldPod, newPod *corev1.Pod) bool
// get pub for pod
GetPubForPod(pod *corev1.Pod) (*policyv1alpha1.PodUnavailableBudget, error)
// get pod controller of
GetPodControllerOf(pod *corev1.Pod) *metav1.OwnerReference
}

func NewPubControl(client client.Client) PubControl {
controllerFinder := controllerfinder.Finder
return &commonControl{controllerFinder: controllerFinder, Client: client}
func InitPubControl(cli client.Client, rec record.EventRecorder) {
recorder = rec
kclient = cli
PubControl = &commonControl{controllerFinder: controllerfinder.Finder, Client: cli}
}
5 changes: 5 additions & 0 deletions pkg/control/pubcontrol/pub_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
kubecontroller "k8s.io/kubernetes/pkg/controller"
Expand Down Expand Up @@ -168,6 +169,10 @@ func (c *commonControl) GetPubForPod(pod *corev1.Pod) (*policyv1alpha1.PodUnavai
return pub, nil
}

func (c *commonControl) GetPodControllerOf(pod *corev1.Pod) *metav1.OwnerReference {
return metav1.GetControllerOf(pod)
}

func getSidecarSetsInPod(pod *corev1.Pod) (sidecarSets, containers sets.String) {
containers = sets.NewString()
sidecarSets = sets.NewString()
Expand Down
42 changes: 34 additions & 8 deletions pkg/control/pubcontrol/pub_control_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
kubeClient "github.com/openkruise/kruise/pkg/client"
"github.com/openkruise/kruise/pkg/util"
"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -35,7 +36,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

const (
Expand All @@ -55,24 +56,39 @@ const (
PodRelatedPubAnnotation = "kruise.io/related-pub"
)

var (
PodUnavailableBudgetMetrics = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "pod_unavailable_budget",
Help: "Pod Unavailable Budget Metrics",
// kind = CloneSet, Deployment, StatefulSet, etc.
// name = workload.name, if pod don't have workload, then name is pod.name
}, []string{"kind", "name", "username"},
)
)

func init() {
metrics.Registry.MustRegister(PodUnavailableBudgetMetrics)
}

// parameters:
// 1. allowed(bool) indicates whether to allow this update operation
// 2. err(error)
func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, pod *corev1.Pod, operation policyv1alpha1.PubOperation, dryRun bool) (allowed bool, reason string, err error) {
func PodUnavailableBudgetValidatePod(pod *corev1.Pod, operation policyv1alpha1.PubOperation, username string, dryRun bool) (allowed bool, reason string, err error) {
klog.V(3).Infof("validating pod(%s/%s) operation(%s) for PodUnavailableBudget", pod.Namespace, pod.Name, operation)
// pods that contain annotations[pod.kruise.io/pub-no-protect]="true" will be ignore
// and will no longer check the pub quota
if pod.Annotations[policyv1alpha1.PodPubNoProtectionAnnotation] == "true" {
klog.V(3).Infof("pod(%s/%s) contains annotations[%s]=true, then don't need check pub", pod.Namespace, pod.Name, policyv1alpha1.PodPubNoProtectionAnnotation)
return true, "", nil
// If the pod is not ready, it doesn't count towards healthy and we should not decrement
} else if !control.IsPodReady(pod) {
} else if !PubControl.IsPodReady(pod) {
klog.V(3).Infof("pod(%s/%s) is not ready, then don't need check pub", pod.Namespace, pod.Name)
return true, "", nil
}

// pub for pod
pub, err := control.GetPubForPod(pod)
pub, err := PubControl.GetPubForPod(pod)
if err != nil {
return false, "", err
// if there is no matching PodUnavailableBudget, just return true
Expand Down Expand Up @@ -119,7 +135,7 @@ func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, p
}

informerCached := &policyv1alpha1.PodUnavailableBudget{}
if err := client.Get(context.TODO(), types.NamespacedName{Namespace: pub.Namespace,
if err := kclient.Get(context.TODO(), types.NamespacedName{Namespace: pub.Namespace,
Name: pub.Name}, informerCached); err == nil {
var localRV, informerRV int64
_ = runtime.Convert_string_To_int64(&pubClone.ResourceVersion, &localRV, nil)
Expand All @@ -133,8 +149,18 @@ func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, p

// Try to verify-and-decrement
// If it was false already, or if it becomes false during the course of our retries,
err := checkAndDecrement(pod.Name, pubClone, operation)
err = checkAndDecrement(pod.Name, pubClone, operation)
if err != nil {
var kind, name string
if ref := PubControl.GetPodControllerOf(pod); ref != nil {
kind = ref.Kind
name = ref.Name
} else {
kind = "unknown"
name = pod.Name
}
PodUnavailableBudgetMetrics.WithLabelValues(kind, name, username).Add(1)
recorder.Eventf(pod, corev1.EventTypeWarning, "PubPreventPodDeletion", fmt.Sprintf("openkruise pub prevents pod deletion"))
return err
}

Expand All @@ -147,15 +173,15 @@ func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, p
pubClone.Namespace, pubClone.Name, len(pubClone.Status.DisruptedPods), len(pubClone.Status.UnavailablePods),
pubClone.Status.TotalReplicas, pubClone.Status.DesiredAvailable, pubClone.Status.CurrentAvailable, pubClone.Status.UnavailableAllowed)
start = time.Now()
err = client.Status().Update(context.TODO(), pubClone)
err = kclient.Status().Update(context.TODO(), pubClone)
costOfUpdate += time.Since(start)
if err == nil {
if err = util.GlobalCache.Add(pubClone); err != nil {
klog.Errorf("Add cache failed for PodUnavailableBudget(%s/%s): %s", pub.Namespace, pub.Name, err.Error())
}
return nil
}
// if conflict, then retry
// if conflicts, then retry
conflictTimes++
refresh = true
return err
Expand Down
9 changes: 5 additions & 4 deletions pkg/control/pubcontrol/pub_control_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
utilpointer "k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -276,8 +277,8 @@ func TestPodUnavailableBudgetValidatePod(t *testing.T) {
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cs.getPub()).Build()
control := NewPubControl(fakeClient)
allow, _, err := PodUnavailableBudgetValidatePod(fakeClient, control, cs.getPod(), cs.operation, false)
InitPubControl(fakeClient, record.NewFakeRecorder(10))
allow, _, err := PodUnavailableBudgetValidatePod(cs.getPod(), cs.operation, "fake-user", false)
if err != nil {
t.Fatalf("PodUnavailableBudgetValidatePod failed: %s", err.Error())
}
Expand Down Expand Up @@ -383,9 +384,9 @@ func TestGetPodUnavailableBudgetForPod(t *testing.T) {
for _, cs := range cases {
t.Run(cs.name, func(t *testing.T) {
fakeClient := fake.NewClientBuilder().WithScheme(scheme).WithObjects(cs.getDeployment(), cs.getReplicaSet(), cs.getPub()).Build()
control := NewPubControl(fakeClient)
InitPubControl(fakeClient, record.NewFakeRecorder(10))
pod := cs.getPod()
pub, err := control.GetPubForPod(pod)
pub, err := PubControl.GetPubForPod(pod)
if err != nil {
t.Fatalf("GetPubForPod failed: %s", err.Error())
}
Expand Down
3 changes: 0 additions & 3 deletions pkg/controller/cloneset/sync/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package sync

import (
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/control/pubcontrol"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
Expand Down Expand Up @@ -49,7 +48,6 @@ type realControl struct {
inplaceControl inplaceupdate.Interface
recorder record.EventRecorder
controllerFinder *controllerfinder.ControllerFinder
pubControl pubcontrol.PubControl
}

func New(c client.Client, recorder record.EventRecorder) Interface {
Expand All @@ -59,6 +57,5 @@ func New(c client.Client, recorder record.EventRecorder) Interface {
lifecycleControl: lifecycle.New(c),
recorder: recorder,
controllerFinder: controllerfinder.Finder,
pubControl: pubcontrol.NewPubControl(c),
}
}
8 changes: 2 additions & 6 deletions pkg/controller/cloneset/sync/cloneset_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,9 @@ import (

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
"github.com/openkruise/kruise/pkg/control/pubcontrol"
clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/features"
"github.com/openkruise/kruise/pkg/util"
utilfeature "github.com/openkruise/kruise/pkg/util/feature"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
"github.com/openkruise/kruise/pkg/util/lifecycle"
"github.com/openkruise/kruise/pkg/util/specifieddelete"
Expand Down Expand Up @@ -133,7 +129,7 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
for _, idx := range waitUpdateIndexes {
pod := pods[idx]
// Determine the pub before updating the pod
if utilfeature.DefaultFeatureGate.Enabled(features.PodUnavailableBudgetUpdateGate) {
/*if utilfeature.DefaultFeatureGate.Enabled(features.PodUnavailableBudgetUpdateGate) {
allowed, _, err := pubcontrol.PodUnavailableBudgetValidatePod(c.Client, c.pubControl, pod, policyv1alpha1.PubUpdateOperation, false)
if err != nil {
return err
Expand All @@ -142,7 +138,7 @@ func (c *realControl) Update(cs *appsv1alpha1.CloneSet,
clonesetutils.DurationStore.Push(key, time.Second)
return nil
}
}
}*/
duration, err := c.updatePod(cs, coreControl, targetRevision, revisions, pod, pvcs)
if duration > 0 {
clonesetutils.DurationStore.Push(key, duration)
Expand Down
2 changes: 0 additions & 2 deletions pkg/controller/cloneset/sync/cloneset_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/openkruise/kruise/apis"
appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
"github.com/openkruise/kruise/pkg/control/pubcontrol"
clonesetcore "github.com/openkruise/kruise/pkg/controller/cloneset/core"
clonesetutils "github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/features"
Expand Down Expand Up @@ -957,7 +956,6 @@ func TestUpdate(t *testing.T) {
inplaceupdate.New(fakeClient, clonesetutils.RevisionAdapterImpl),
record.NewFakeRecorder(10),
&controllerfinder.ControllerFinder{Client: fakeClient},
pubcontrol.NewPubControl(fakeClient),
}
currentRevision := mc.updateRevision
if len(mc.revisions) > 0 {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func newReconciler(mgr manager.Manager) reconcile.Reconciler {
Scheme: mgr.GetScheme(),
recorder: mgr.GetEventRecorderFor("podunavailablebudget-controller"),
controllerFinder: controllerfinder.Finder,
pubControl: pubcontrol.NewPubControl(mgr.GetClient()),
}
}

Expand Down Expand Up @@ -203,7 +202,6 @@ type ReconcilePodUnavailableBudget struct {
Scheme *runtime.Scheme
recorder record.EventRecorder
controllerFinder *controllerfinder.ControllerFinder
pubControl pubcontrol.PubControl
}

// +kubebuilder:rbac:groups=policy.kruise.io,resources=podunavailablebudgets,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -251,7 +249,7 @@ func (r *ReconcilePodUnavailableBudget) Reconcile(_ context.Context, req ctrl.Re

func (r *ReconcilePodUnavailableBudget) syncPodUnavailableBudget(pub *policyv1alpha1.PodUnavailableBudget) (*time.Time, error) {
currentTime := time.Now()
pods, expectedCount, err := r.pubControl.GetPodsForPub(pub)
pods, expectedCount, err := pubcontrol.PubControl.GetPodsForPub(pub)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -319,7 +317,7 @@ func (r *ReconcilePodUnavailableBudget) syncPodUnavailableBudget(pub *policyv1al
// unavailablePods contains information about pods whose specification changed(in-place update), in case of informer cache latency, after 5 seconds to remove it.
var disruptedPods, unavailablePods map[string]metav1.Time
disruptedPods, unavailablePods, recheckTime = r.buildDisruptedAndUnavailablePods(pods, pubClone, currentTime)
currentAvailable := countAvailablePods(pods, disruptedPods, unavailablePods, r.pubControl)
currentAvailable := countAvailablePods(pods, disruptedPods, unavailablePods)

start = time.Now()
updateErr := r.updatePubStatus(pubClone, currentAvailable, desiredAvailable, expectedCount, disruptedPods, unavailablePods)
Expand Down Expand Up @@ -361,7 +359,7 @@ func (r *ReconcilePodUnavailableBudget) patchRelatedPubAnnotationInPod(pub *poli
return nil
}

func countAvailablePods(pods []*corev1.Pod, disruptedPods, unavailablePods map[string]metav1.Time, control pubcontrol.PubControl) (currentAvailable int32) {
func countAvailablePods(pods []*corev1.Pod, disruptedPods, unavailablePods map[string]metav1.Time) (currentAvailable int32) {
recordPods := sets.String{}
for pName := range disruptedPods {
recordPods.Insert(pName)
Expand All @@ -379,7 +377,7 @@ func countAvailablePods(pods []*corev1.Pod, disruptedPods, unavailablePods map[s
continue
}
// pod consistent and ready
if control.IsPodStateConsistent(pod) && control.IsPodReady(pod) {
if pubcontrol.PubControl.IsPodStateConsistent(pod) && pubcontrol.PubControl.IsPodReady(pod) {
currentAvailable++
}
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/controller/podunavailablebudget/pub_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1148,13 +1148,12 @@ func TestPubReconcile(t *testing.T) {
for _, obj := range cs.getReplicaSet() {
_ = fakeClient.Create(context.TODO(), obj)
}

pubcontrol.InitPubControl(fakeClient, record.NewFakeRecorder(10))
controllerfinder.Finder = &controllerfinder.ControllerFinder{Client: fakeClient}
reconciler := ReconcilePodUnavailableBudget{
Client: fakeClient,
recorder: record.NewFakeRecorder(10),
controllerFinder: &controllerfinder.ControllerFinder{Client: fakeClient},
pubControl: pubcontrol.NewPubControl(fakeClient),
}
_, err := reconciler.syncPodUnavailableBudget(pub)
if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions pkg/controller/podunavailablebudget/pub_pod_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,12 @@ var _ handler.EventHandler = &enqueueRequestForPod{}
func newEnqueueRequestForPod(c client.Client) handler.EventHandler {
e := &enqueueRequestForPod{client: c}
e.controllerFinder = controllerfinder.Finder
e.pubControl = pubcontrol.NewPubControl(c)
return e
}

type enqueueRequestForPod struct {
client client.Client
controllerFinder *controllerfinder.ControllerFinder
pubControl pubcontrol.PubControl
}

func (p *enqueueRequestForPod) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
Expand All @@ -78,7 +76,7 @@ func (p *enqueueRequestForPod) addPod(q workqueue.RateLimitingInterface, obj run
}
var pub *policyv1alpha1.PodUnavailableBudget
if pod.Annotations[pubcontrol.PodRelatedPubAnnotation] != "" {
pub, _ = p.pubControl.GetPubForPod(pod)
pub, _ = pubcontrol.PubControl.GetPubForPod(pod)
}
if pub == nil {
return
Expand Down Expand Up @@ -142,11 +140,11 @@ func (p *enqueueRequestForPod) updatePod(q workqueue.RateLimitingInterface, old,
return
}

pub, _ := p.pubControl.GetPubForPod(newPod)
pub, _ := pubcontrol.PubControl.GetPubForPod(newPod)
if pub == nil {
return
}
if isReconcile, enqueueDelayTime := isPodAvailableChanged(oldPod, newPod, pub, p.pubControl); isReconcile {
if isReconcile, enqueueDelayTime := isPodAvailableChanged(oldPod, newPod, pub); isReconcile {
q.AddAfter(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: pub.Name,
Expand All @@ -157,7 +155,7 @@ func (p *enqueueRequestForPod) updatePod(q workqueue.RateLimitingInterface, old,

}

func isPodAvailableChanged(oldPod, newPod *corev1.Pod, pub *policyv1alpha1.PodUnavailableBudget, control pubcontrol.PubControl) (bool, time.Duration) {
func isPodAvailableChanged(oldPod, newPod *corev1.Pod, pub *policyv1alpha1.PodUnavailableBudget) (bool, time.Duration) {
var enqueueDelayTime time.Duration
// If the pod's deletion timestamp is set, remove endpoint from ready address.
if oldPod.DeletionTimestamp.IsZero() && !newPod.DeletionTimestamp.IsZero() {
Expand All @@ -169,6 +167,7 @@ func isPodAvailableChanged(oldPod, newPod *corev1.Pod, pub *policyv1alpha1.PodUn
return false, enqueueDelayTime
}

control := pubcontrol.PubControl
// If the pod's readiness has changed, the associated endpoint address
// will move from the unready endpoints set to the ready endpoints.
// So for the purposes of an endpoint, a readiness change on a pod
Expand Down
Loading

0 comments on commit 614876c

Please sign in to comment.