Skip to content

Commit

Permalink
pub support get total replicas from annotations (#1135)
Browse files Browse the repository at this point in the history
Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>

Signed-off-by: liheng.zms <liheng.zms@alibaba-inc.com>
  • Loading branch information
zmberg committed Dec 13, 2022
1 parent cdb3b02 commit a0659ec
Show file tree
Hide file tree
Showing 13 changed files with 188 additions and 47 deletions.
3 changes: 3 additions & 0 deletions apis/policy/v1alpha1/podunavailablebudget_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
PubUpdateOperation PubOperation = "UPDATE"
PubDeleteOperation PubOperation = "DELETE"
PubEvictOperation PubOperation = "EVICT"
// PubProtectTotalReplicas indicates the pub protected total replicas, rather than workload.spec.replicas.
// and must be used with pub.spec.selector.
PubProtectTotalReplicas = "pub.kruise.io/protect-total-replicas"
// Marked the pod will not be pub-protected, solving the scenario of force pod deletion
PodPubNoProtectionAnnotation = "pub.kruise.io/no-protect"
)
Expand Down
6 changes: 5 additions & 1 deletion pkg/control/pubcontrol/pub_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pubcontrol
import (
"context"
"reflect"
"strconv"
"strings"

appspub "github.com/openkruise/kruise/apis/apps/pub"
Expand Down Expand Up @@ -92,7 +93,6 @@ func (c *commonControl) GetPodsForPub(pub *policyv1alpha1.PodUnavailableBudget)
if err = c.List(context.TODO(), podList, listOptions, utilclient.DisableDeepCopy); err != nil {
return nil, 0, err
}

matchedPods := make([]*corev1.Pod, 0, len(podList.Items))
for i := range podList.Items {
pod := &podList.Items[i]
Expand All @@ -104,6 +104,10 @@ func (c *commonControl) GetPodsForPub(pub *policyv1alpha1.PodUnavailableBudget)
if err != nil {
return nil, 0, err
}
if expectedCount == 0 && pub.Annotations[policyv1alpha1.PubProtectTotalReplicas] != "" {
expectedCount, _ := strconv.ParseInt(pub.Annotations[policyv1alpha1.PubProtectTotalReplicas], 10, 32)
return matchedPods, int32(expectedCount), nil
}
return matchedPods, expectedCount, nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/control/pubcontrol/pub_control_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ func PodUnavailableBudgetValidatePod(client client.Client, control PubControl, p
// if there is no matching PodUnavailableBudget, just return true
} else if pub == nil {
return true, "", nil
// if desired available == 0, then allow all request
} else if pub.Status.DesiredAvailable == 0 {
return true, "", nil
} else if !isNeedPubProtection(pub, operation) {
klog.V(3).Infof("pod(%s/%s) operation(%s) is not in pub(%s) protection", pod.Namespace, pod.Name, pub.Name)
return true, "", nil
Expand Down
1 change: 1 addition & 0 deletions pkg/control/pubcontrol/pub_control_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
UnavailablePods: map[string]metav1.Time{},
DisruptedPods: map[string]metav1.Time{},
UnavailableAllowed: 0,
DesiredAvailable: 1,
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}
return false
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return true
},
}); err != nil {
return err
}
Expand All @@ -159,6 +162,9 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}
return false
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return true
},
}); err != nil {
return err
}
Expand All @@ -173,6 +179,9 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}
return false
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return true
},
}); err != nil {
return err
}
Expand All @@ -187,6 +196,9 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}
return false
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return true
},
}); err != nil {
return err
}
Expand Down Expand Up @@ -301,7 +313,6 @@ func (r *ReconcilePodUnavailableBudget) syncPodUnavailableBudget(pub *policyv1al
} else {
pubClone = pub.DeepCopy()
}

informerCached := &policyv1alpha1.PodUnavailableBudget{}
if err := r.Get(context.TODO(), types.NamespacedName{Namespace: pub.Namespace,
Name: pub.Name}, informerCached); err == nil {
Expand Down
85 changes: 74 additions & 11 deletions pkg/controller/podunavailablebudget/pub_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ var (
Kind: "PodUnavailableBudget",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pub-test",
Namespace: "default",
Name: "pub-test",
Annotations: map[string]string{},
},
Spec: policyv1alpha1.PodUnavailableBudgetSpec{
Selector: &metav1.LabelSelector{
Expand Down Expand Up @@ -262,7 +263,7 @@ func TestPubReconcile(t *testing.T) {
},
},
{
name: "select matched deployment, selector and maxUnavailable 30%",
name: "select matched deployment(Deletion), selector and maxUnavailable 30%",
getPods: func() []*corev1.Pod {
var matchedPods []*corev1.Pod
for i := 0; int32(i) < *deploymentDemo.Spec.Replicas; i++ {
Expand All @@ -273,7 +274,10 @@ func TestPubReconcile(t *testing.T) {
return matchedPods
},
getDeployment: func() *apps.Deployment {
return deploymentDemo.DeepCopy()
obj := deploymentDemo.DeepCopy()
t := metav1.Now()
obj.DeletionTimestamp = &t
return obj
},
getReplicaSet: func() *apps.ReplicaSet {
return replicaSetDemo.DeepCopy()
Expand All @@ -285,10 +289,10 @@ func TestPubReconcile(t *testing.T) {
},
expectPubStatus: func() policyv1alpha1.PodUnavailableBudgetStatus {
return policyv1alpha1.PodUnavailableBudgetStatus{
UnavailableAllowed: 3,
UnavailableAllowed: 0,
CurrentAvailable: *deploymentDemo.Spec.Replicas,
DesiredAvailable: 7,
TotalReplicas: *deploymentDemo.Spec.Replicas,
DesiredAvailable: 0,
TotalReplicas: 0,
}
},
},
Expand Down Expand Up @@ -353,7 +357,7 @@ func TestPubReconcile(t *testing.T) {
},
expectPubStatus: func() policyv1alpha1.PodUnavailableBudgetStatus {
return policyv1alpha1.PodUnavailableBudgetStatus{
UnavailableAllowed: *deploymentDemo.Spec.Replicas,
UnavailableAllowed: 0,
CurrentAvailable: *deploymentDemo.Spec.Replicas,
DesiredAvailable: 0,
TotalReplicas: *deploymentDemo.Spec.Replicas,
Expand Down Expand Up @@ -387,7 +391,7 @@ func TestPubReconcile(t *testing.T) {
},
expectPubStatus: func() policyv1alpha1.PodUnavailableBudgetStatus {
return policyv1alpha1.PodUnavailableBudgetStatus{
UnavailableAllowed: *deploymentDemo.Spec.Replicas,
UnavailableAllowed: 0,
CurrentAvailable: *deploymentDemo.Spec.Replicas,
DesiredAvailable: 0,
TotalReplicas: *deploymentDemo.Spec.Replicas,
Expand Down Expand Up @@ -754,6 +758,66 @@ func TestPubReconcile(t *testing.T) {
return *status
},
},
{
name: "test select matched deployment, 10 UnavailablePods(5 ready), 10 DisruptionPods(5 delay) and 5 deletion",
getPods: func() []*corev1.Pod {
var matchedPods []*corev1.Pod
for i := 0; i < 100; i++ {
pod := podDemo.DeepCopy()
pod.OwnerReferences = nil
pod.Name = fmt.Sprintf("%s-%d", pod.Name, i)
if i >= 20 && i < 25 {
pod.DeletionTimestamp = &metav1.Time{Time: time.Now()}
}
matchedPods = append(matchedPods, pod)
}
return matchedPods
},
getDeployment: func() *apps.Deployment {
object := deploymentDemo.DeepCopy()
object.Spec.Replicas = utilpointer.Int32Ptr(100)
return object
},
getReplicaSet: func() *apps.ReplicaSet {
object := replicaSetDemo.DeepCopy()
object.Spec.Replicas = utilpointer.Int32Ptr(100)
return object
},
getPub: func() *policyv1alpha1.PodUnavailableBudget {
pub := pubDemo.DeepCopy()

pub.Annotations[policyv1alpha1.PubProtectTotalReplicas] = "50"
for i := 0; i < 10; i++ {
if i >= 0 && i < 5 {
pub.Status.UnavailablePods[fmt.Sprintf("test-pod-%d", i)] = metav1.Time{Time: time.Now().Add(-10 * time.Second)}
} else {
pub.Status.UnavailablePods[fmt.Sprintf("test-pod-%d", i)] = metav1.Now()
}
}
for i := 10; i < 20; i++ {
if i >= 10 && i < 15 {
pub.Status.DisruptedPods[fmt.Sprintf("test-pod-%d", i)] = metav1.Time{Time: time.Now().Add(-125 * time.Second)}
} else {
pub.Status.DisruptedPods[fmt.Sprintf("test-pod-%d", i)] = metav1.Now()
}
}
return pub
},
expectPubStatus: func() policyv1alpha1.PodUnavailableBudgetStatus {
status := pubDemo.Status.DeepCopy()
for i := 5; i < 10; i++ {
status.UnavailablePods[fmt.Sprintf("test-pod-%d", i)] = metav1.Now()
}
for i := 15; i < 20; i++ {
status.DisruptedPods[fmt.Sprintf("test-pod-%d", i)] = metav1.Now()
}
status.TotalReplicas = 50
status.DesiredAvailable = 35
status.CurrentAvailable = 85
status.UnavailableAllowed = 50
return *status
},
},
}

for _, cs := range cases {
Expand All @@ -774,7 +838,6 @@ func TestPubReconcile(t *testing.T) {
controllerFinder: &controllerfinder.ControllerFinder{Client: fakeClient},
pubControl: pubcontrol.NewPubControl(fakeClient),
}

_, err := reconciler.syncPodUnavailableBudget(pub)
if err != nil {
t.Fatalf("sync PodUnavailableBudget failed: %s", err.Error())
Expand All @@ -784,7 +847,7 @@ func TestPubReconcile(t *testing.T) {
t.Fatalf("getLatestPub failed: %s", err.Error())
}
if !isPubStatusEqual(cs.expectPubStatus(), newPub.Status) {
t.Fatalf("expect pub status(%v) but get(%v)", cs.expectPubStatus(), newPub.Status)
t.Fatalf("expect pub status(%s) but get(%s)", util.DumpJSON(cs.expectPubStatus()), util.DumpJSON(newPub.Status))
}
_ = util.GlobalCache.Delete(pub)
})
Expand Down
30 changes: 14 additions & 16 deletions pkg/controller/podunavailablebudget/pub_pod_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func (e *SetEnqueueRequestForPUB) Update(evt event.UpdateEvent, q workqueue.Rate

// Delete implements EventHandler
func (e *SetEnqueueRequestForPUB) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) {
e.addSetRequest(evt.Object, q)
}

// Generic implements EventHandler
Expand Down Expand Up @@ -240,23 +241,21 @@ func (e *SetEnqueueRequestForPUB) addSetRequest(object client.Object, q workqueu
targetRef.Name, namespace = obj.Name, obj.Namespace
temLabels = obj.Spec.Template.Labels
}
default:
return
}

// fetch matched pub
pubList := &policyv1alpha1.PodUnavailableBudgetList{}
if err := e.mgr.GetClient().List(context.TODO(), pubList, &client.ListOptions{Namespace: namespace}); err != nil {
klog.Errorf("SetEnqueueRequestForPUB list pub failed: %s", err.Error())
return
}
var matchedPubs []policyv1alpha1.PodUnavailableBudget
var matched policyv1alpha1.PodUnavailableBudget
for _, pub := range pubList.Items {
// if targetReference isn't nil, priority to take effect
if pub.Spec.TargetReference != nil {
// belongs the same workload
if pubcontrol.IsReferenceEqual(targetRef, pub.Spec.TargetReference) {
matchedPubs = append(matchedPubs, pub)
matched = pub
break
}
} else {
// This error is irreversible, so continue
Expand All @@ -268,18 +267,17 @@ func (e *SetEnqueueRequestForPUB) addSetRequest(object client.Object, q workqueu
if labelSelector.Empty() || !labelSelector.Matches(labels.Set(temLabels)) {
continue
}
matchedPubs = append(matchedPubs, pub)
matched = pub
break
}
}

for _, pub := range matchedPubs {
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: pub.Name,
Namespace: pub.Namespace,
},
})
klog.V(3).Infof("workload(%s/%s) replicas changed, and reconcile pub(%s/%s)",
namespace, targetRef.Name, pub.Namespace, pub.Name)
}
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: matched.Name,
Namespace: matched.Namespace,
},
})
klog.V(3).Infof("workload(%s/%s) changed, and reconcile pub(%s/%s)",
namespace, targetRef.Name, matched.Namespace, matched.Name)
}
7 changes: 4 additions & 3 deletions pkg/util/controllerfinder/controller_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,10 @@ func (r *ControllerFinder) GetExpectedScaleForPods(pods []*corev1.Pod) (int32, e
workload, err := r.GetScaleAndSelectorForRef(ref.APIVersion, ref.Kind, pod.Namespace, ref.Name, ref.UID)
if err != nil && !errors.IsNotFound(err) {
return 0, err
}
if workload != nil && workload.Metadata.DeletionTimestamp.IsZero() {
controllerScale[workload.UID] = workload.Scale
} else if workload != nil && workload.Metadata.DeletionTimestamp.IsZero() {
controllerScale[ref.UID] = workload.Scale
} else {
controllerScale[ref.UID] = 0
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/util/controllerfinder/pods_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti
if err != nil {
return nil, -1, err
}
if rs == nil {
if rs == nil || !rs.DeletionTimestamp.IsZero() {
return nil, 0, nil
}
workloadReplicas = *rs.Spec.Replicas
Expand All @@ -54,7 +54,7 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti
obj, err := r.GetScaleAndSelectorForRef(apiVersion, kind, ns, name, "")
if err != nil {
return nil, -1, err
} else if obj == nil {
} else if obj == nil || !obj.Metadata.DeletionTimestamp.IsZero() {
return nil, 0, nil
}
workloadReplicas = obj.Scale
Expand All @@ -64,7 +64,7 @@ func (r *ControllerFinder) GetPodsForRef(apiVersion, kind, ns, name string, acti
obj, err := r.GetScaleAndSelectorForRef(apiVersion, kind, ns, name, "")
if err != nil {
return nil, -1, err
} else if obj == nil {
} else if obj == nil || !obj.Metadata.DeletionTimestamp.IsZero() {
return nil, 0, nil
}
workloadReplicas = obj.Scale
Expand Down
10 changes: 0 additions & 10 deletions pkg/webhook/pod/validating/pod_unavailable_budget.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,5 @@ func (p *PodCreateHandler) podUnavailableBudgetValidatingPod(ctx context.Context
if checkPod.Annotations[pubcontrol.PodRelatedPubAnnotation] == "" {
return true, "", nil
}

// Get the workload corresponding to the pod, if it has been deleted then it is not protected
if ref := metav1.GetControllerOf(checkPod); ref != nil {
workload, err := p.finders.GetScaleAndSelectorForRef(ref.APIVersion, ref.Kind, checkPod.Namespace, ref.Name, ref.UID)
if err != nil {
return false, "", err
} else if workload == nil || !workload.Metadata.DeletionTimestamp.IsZero() {
return true, "", nil
}
}
return pubcontrol.PodUnavailableBudgetValidatePod(p.Client, p.pubControl, checkPod, operation, dryRun)
}
Loading

0 comments on commit a0659ec

Please sign in to comment.