Skip to content

Commit

Permalink
performance optimization pub
Browse files Browse the repository at this point in the history
Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>
  • Loading branch information
zmberg committed Apr 29, 2022
1 parent 5b52b6e commit 9604082
Show file tree
Hide file tree
Showing 33 changed files with 423 additions and 405 deletions.
12 changes: 6 additions & 6 deletions pkg/control/pubcontrol/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,6 @@ import (
)

type PubControl interface {
// Common
// get PodUnavailableBudget
GetPodUnavailableBudget() *policyv1alpha1.PodUnavailableBudget
// IsPodReady indicates whether pod is fully ready
// 1. pod.Status.Phase == v1.PodRunning
// 2. pod.condition PodReady == true
Expand All @@ -37,13 +34,16 @@ type PubControl interface {
// return two parameters
// 1. podList
// 2. expectedCount, the default is workload.Replicas
GetPodsForPub() ([]*corev1.Pod, int32, error)
GetPodsForPub(pub *policyv1alpha1.PodUnavailableBudget) ([]*corev1.Pod, int32, error)

// webhook
// determine if this change to pod might cause unavailability
IsPodUnavailableChanged(oldPod, newPod *corev1.Pod) bool
// get pub for pod
GetPubForPod(pod *corev1.Pod) (*policyv1alpha1.PodUnavailableBudget, error)
}

func NewPubControl(pub *policyv1alpha1.PodUnavailableBudget, controllerFinder *controllerfinder.ControllerFinder, client client.Client) PubControl {
return &commonControl{PodUnavailableBudget: pub, controllerFinder: controllerFinder, Client: client}
func NewPubControl(client client.Client) PubControl {
controllerFinder := controllerfinder.NewControllerFinder(client)
return &commonControl{controllerFinder: controllerFinder, Client: client}
}
36 changes: 24 additions & 12 deletions pkg/control/pubcontrol/pub_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ import (
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
"github.com/openkruise/kruise/pkg/control/sidecarcontrol"
"github.com/openkruise/kruise/pkg/util"
utilclient "github.com/openkruise/kruise/pkg/util/client"
"github.com/openkruise/kruise/pkg/util/controllerfinder"
"github.com/openkruise/kruise/pkg/util/inplaceupdate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
kubecontroller "k8s.io/kubernetes/pkg/controller"
Expand All @@ -35,14 +37,9 @@ import (

type commonControl struct {
client.Client
*policyv1alpha1.PodUnavailableBudget
controllerFinder *controllerfinder.ControllerFinder
}

func (c *commonControl) GetPodUnavailableBudget() *policyv1alpha1.PodUnavailableBudget {
return c.PodUnavailableBudget
}

func (c *commonControl) IsPodReady(pod *corev1.Pod) bool {
// 1. pod.Status.Phase == v1.PodRunning
// 2. pod.condition PodReady == true
Expand All @@ -56,7 +53,7 @@ func (c *commonControl) IsPodUnavailableChanged(oldPod, newPod *corev1.Pod) bool
}
// If pod.spec changed, pod will be in unavailable condition
if !reflect.DeepEqual(oldPod.Spec, newPod.Spec) {
klog.V(3).Infof("pod(%s.%s) specification changed, and maybe cause unavailability", newPod.Namespace, newPod.Name)
klog.V(3).Infof("pod(%s/%s) specification changed, and maybe cause unavailability", newPod.Namespace, newPod.Name)
return true
}
// pod other changes will not cause unavailability situation, then return false
Expand All @@ -67,8 +64,7 @@ func (c *commonControl) IsPodUnavailableChanged(oldPod, newPod *corev1.Pod) bool
// return two parameters
// 1. podList
// 2. expectedCount, the default is workload.Replicas
func (c *commonControl) GetPodsForPub() ([]*corev1.Pod, int32, error) {
pub := c.GetPodUnavailableBudget()
func (c *commonControl) GetPodsForPub(pub *policyv1alpha1.PodUnavailableBudget) ([]*corev1.Pod, int32, error) {
// if targetReference isn't nil, priority to take effect
var listOptions *client.ListOptions
if pub.Spec.TargetReference != nil {
Expand All @@ -87,7 +83,7 @@ func (c *commonControl) GetPodsForPub() ([]*corev1.Pod, int32, error) {
}
listOptions = &client.ListOptions{Namespace: pub.Namespace, LabelSelector: labelSelector}
podList := &corev1.PodList{}
if err := c.List(context.TODO(), podList, listOptions); err != nil {
if err = c.List(context.TODO(), podList, listOptions, utilclient.DisableDeepCopy); err != nil {
return nil, 0, err
}

Expand All @@ -102,7 +98,6 @@ func (c *commonControl) GetPodsForPub() ([]*corev1.Pod, int32, error) {
if err != nil {
return nil, 0, err
}

return matchedPods, expectedCount, nil
}

Expand All @@ -119,7 +114,7 @@ func (c *commonControl) IsPodStateConsistent(pod *corev1.Pod) bool {
}

if !util.IsPodContainerDigestEqual(sets.NewString(container.Name), pod) {
klog.V(5).Infof("pod(%s.%s) container(%s) image is inconsistent", pod.Namespace, pod.Name, container.Name)
klog.V(5).Infof("pod(%s/%s) container(%s) image is inconsistent", pod.Namespace, pod.Name, container.Name)
return false
}
}
Expand All @@ -138,13 +133,30 @@ func (c *commonControl) IsPodStateConsistent(pod *corev1.Pod) bool {

// whether other containers is consistent
if err := inplaceupdate.DefaultCheckInPlaceUpdateCompleted(pod); err != nil {
klog.V(5).Infof("check pod(%s.%s) InPlaceUpdate failed: %s", pod.Namespace, pod.Name, err.Error())
klog.V(5).Infof("check pod(%s/%s) InPlaceUpdate failed: %s", pod.Namespace, pod.Name, err.Error())
return false
}

return true
}

func (c *commonControl) GetPubForPod(pod *corev1.Pod) (*policyv1alpha1.PodUnavailableBudget, error) {
if len(pod.Annotations) == 0 || pod.Annotations[PodRelatedPubAnnotation] == "" {
return nil, nil
}
pubName := pod.Annotations[PodRelatedPubAnnotation]
pub := &policyv1alpha1.PodUnavailableBudget{}
err := c.Get(context.TODO(), client.ObjectKey{Namespace: pod.Namespace, Name: pubName}, pub)
if err != nil {
if errors.IsNotFound(err) {
klog.Warningf("pod(%s/%s) pub(%s) Is NotFound", pod.Namespace, pod.Name, pubName)
return nil, nil
}
return nil, err
}
return pub, nil
}

func getSidecarSetsInPod(pod *corev1.Pod) (sidecarSets, containers sets.String) {
containers = sets.NewString()
sidecarSets = sets.NewString()
Expand Down
104 changes: 13 additions & 91 deletions pkg/control/pubcontrol/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,10 @@ import (
policyv1alpha1 "github.com/openkruise/kruise/apis/policy/v1alpha1"
kubeClient "github.com/openkruise/kruise/pkg/client"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/pkg/util/controllerfinder"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -59,21 +55,23 @@ const (

// Marked pods will not be pub-protected, solving the scenario of force pod deletion
PodPubNoProtectionAnnotation = "pub.kruise.io/no-protect"

// related-pub annotation in pod
PodRelatedPubAnnotation = "kruise.io/related-pub"
)

// parameters:
// 1. allowed(bool) indicates whether to allow this update operation
// 2. err(error)
func PodUnavailableBudgetValidatePod(client client.Client, pod *corev1.Pod, control PubControl, operation Operation, dryRun bool) (allowed bool, reason string, err error) {
pub := control.GetPodUnavailableBudget()
func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, pub *policyv1alpha1.PodUnavailableBudget, pod *corev1.Pod, operation Operation, dryRun bool) (allowed bool, reason string, err error) {
// If the pod is not ready, it doesn't count towards healthy and we should not decrement
if !control.IsPodReady(pod) {
klog.V(3).Infof("pod(%s.%s) is not ready, then don't need check pub", pod.Namespace, pod.Name)
klog.V(3).Infof("pod(%s/%s) is not ready, then don't need check pub", pod.Namespace, pod.Name)
return true, "", nil
}
// pod is in pub.Status.DisruptedPods or pub.Status.UnavailablePods, then don't need check it
if isPodRecordedInPub(pod.Name, pub) {
klog.V(5).Infof("pod(%s.%s) already is recorded in pub(%s.%s)", pod.Namespace, pod.Name, pub.Namespace, pub.Name)
klog.V(5).Infof("pod(%s/%s) already is recorded in pub(%s/%s)", pod.Namespace, pod.Name, pub.Namespace, pub.Name)
return true, "", nil
}

Expand Down Expand Up @@ -129,10 +127,10 @@ func PodUnavailableBudgetValidatePod(client client.Client, pod *corev1.Pod, cont

// If this is a dry-run, we don't need to go any further than that.
if dryRun {
klog.V(5).Infof("pod(%s) operation for pub(%s.%s) is a dry run", pod.Name, pubClone.Namespace, pubClone.Name)
klog.V(5).Infof("pod(%s) operation for pub(%s/%s) is a dry run", pod.Name, pubClone.Namespace, pubClone.Name)
return nil
}
klog.V(3).Infof("pub(%s.%s) update status(disruptedPods:%d, unavailablePods:%d, expectedCount:%d, desiredAvailable:%d, currentAvailable:%d, unavailableAllowed:%d)",
klog.V(3).Infof("pub(%s/%s) update status(disruptedPods:%d, unavailablePods:%d, expectedCount:%d, desiredAvailable:%d, currentAvailable:%d, unavailableAllowed:%d)",
pubClone.Namespace, pubClone.Name, len(pubClone.Status.DisruptedPods), len(pubClone.Status.UnavailablePods),
pubClone.Status.TotalReplicas, pubClone.Status.DesiredAvailable, pubClone.Status.CurrentAvailable, pubClone.Status.UnavailableAllowed)
start = time.Now()
Expand All @@ -152,15 +150,15 @@ func PodUnavailableBudgetValidatePod(client client.Client, pod *corev1.Pod, cont
klog.V(3).Infof("Webhook cost of pub(%s/%s): conflict times %v, cost of Get %v, cost of Update %v",
pub.Namespace, pub.Name, conflictTimes, costOfGet, costOfUpdate)
if err != nil && err != wait.ErrWaitTimeout {
klog.V(3).Infof("pod(%s.%s) operation(%s) for pub(%s.%s) failed: %s", pod.Namespace, pod.Name, operation, pub.Namespace, pub.Name, err.Error())
klog.V(3).Infof("pod(%s/%s) operation(%s) for pub(%s/%s) failed: %s", pod.Namespace, pod.Name, operation, pub.Namespace, pub.Name, err.Error())
return false, err.Error(), nil
} else if err == wait.ErrWaitTimeout {
err = errors.NewTimeoutError(fmt.Sprintf("couldn't update PodUnavailableBudget %s due to conflicts", pub.Name), 10)
klog.Errorf("pod(%s.%s) operation(%s) failed: %s", pod.Namespace, pod.Name, operation, err.Error())
klog.Errorf("pod(%s/%s) operation(%s) failed: %s", pod.Namespace, pod.Name, operation, err.Error())
return false, err.Error(), nil
}

klog.V(3).Infof("admit pod(%s.%s) operation(%s) for pub(%s.%s)", pod.Namespace, pod.Name, operation, pub.Namespace, pub.Name)
klog.V(3).Infof("admit pod(%s/%s) operation(%s) for pub(%s/%s)", pod.Namespace, pod.Name, operation, pub.Namespace, pub.Name)
return true, "", nil
}

Expand All @@ -183,10 +181,10 @@ func checkAndDecrement(podName string, pub *policyv1alpha1.PodUnavailableBudget,

if operation == UpdateOperation {
pub.Status.UnavailablePods[podName] = metav1.Time{Time: time.Now()}
klog.V(3).Infof("pod(%s) is recorded in pub(%s.%s) UnavailablePods", podName, pub.Namespace, pub.Name)
klog.V(3).Infof("pod(%s) is recorded in pub(%s/%s) UnavailablePods", podName, pub.Namespace, pub.Name)
} else {
pub.Status.DisruptedPods[podName] = metav1.Time{Time: time.Now()}
klog.V(3).Infof("pod(%s) is recorded in pub(%s.%s) DisruptedPods", podName, pub.Namespace, pub.Name)
klog.V(3).Infof("pod(%s) is recorded in pub(%s/%s) DisruptedPods", podName, pub.Namespace, pub.Name)
}
return nil
}
Expand All @@ -200,79 +198,3 @@ func isPodRecordedInPub(podName string, pub *policyv1alpha1.PodUnavailableBudget
}
return false
}

func GetPodUnavailableBudgetForPod(kClient client.Client, finders *controllerfinder.ControllerFinder, pod *corev1.Pod) (*policyv1alpha1.PodUnavailableBudget, error) {
var err error
if len(pod.Labels) == 0 {
return nil, nil
}

pubList := &policyv1alpha1.PodUnavailableBudgetList{}
if err = kClient.List(context.TODO(), pubList, &client.ListOptions{Namespace: pod.Namespace}); err != nil {
return nil, err
}

var matchedPubs []policyv1alpha1.PodUnavailableBudget
for _, pub := range pubList.Items {
// if targetReference isn't nil, priority to take effect
if pub.Spec.TargetReference != nil {
targetRef := pub.Spec.TargetReference
// check whether APIVersion, Kind, Name is equal
ref := metav1.GetControllerOf(pod)
if ref == nil {
continue
}
// recursive fetch pod reference, e.g. ref.Kind=Replicas, return podRef.Kind=Deployment
podRef, err := finders.GetScaleAndSelectorForRef(ref.APIVersion, ref.Kind, pod.Namespace, ref.Name, ref.UID)
if err != nil {
return nil, err
}
pubRef, err := finders.GetScaleAndSelectorForRef(targetRef.APIVersion, targetRef.Kind, pub.Namespace, targetRef.Name, "")
if err != nil {
return nil, err
}
if podRef == nil || pubRef == nil {
continue
}
// belongs the same workload
if isReferenceEqual(podRef, pubRef) {
matchedPubs = append(matchedPubs, pub)
}
} else {
// This error is irreversible, so continue
labelSelector, err := util.GetFastLabelSelector(pub.Spec.Selector)
if err != nil {
continue
}
// If a PUB with a nil or empty selector creeps in, it should match nothing, not everything.
if labelSelector.Empty() || !labelSelector.Matches(labels.Set(pod.Labels)) {
continue
}
matchedPubs = append(matchedPubs, pub)
}
}

if len(matchedPubs) == 0 {
klog.V(6).Infof("could not find PodUnavailableBudget for pod %s in namespace %s with labels: %v", pod.Name, pod.Namespace, pod.Labels)
return nil, nil
}
if len(matchedPubs) > 1 {
klog.Warningf("Pod %q/%q matches multiple PodUnavailableBudgets. Choose %q arbitrarily.", pod.Namespace, pod.Name, matchedPubs[0].Name)
}

return &matchedPubs[0], nil
}

// check APIVersion, Kind, Name
func isReferenceEqual(ref1, ref2 *controllerfinder.ScaleAndSelector) bool {
gv1, err := schema.ParseGroupVersion(ref1.APIVersion)
if err != nil {
return false
}
gv2, err := schema.ParseGroupVersion(ref2.APIVersion)
if err != nil {
return false
}
return gv1.Group == gv2.Group && ref1.Kind == ref2.Kind &&
ref1.Name == ref2.Name && ref1.UID == ref2.UID
}
Loading

0 comments on commit 9604082

Please sign in to comment.