Skip to content

Commit

Permalink
fix, podopslifecycle webhook (#60)
Browse files Browse the repository at this point in the history
* fix, podopslifecycle webhook

* fix

* fix, comments
  • Loading branch information
shaofan-hs committed Aug 26, 2023
1 parent fbed9ba commit 49126be
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 191 deletions.
6 changes: 3 additions & 3 deletions pkg/controllers/collaset/collaset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ func expectedStatusReplicas(c client.Client, cls *appsv1alpha1.CollaSet, schedul
}

func updateCollaSetWithRetry(c client.Client, namespace, name string, updateFn func(cls *appsv1alpha1.CollaSet) bool) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
cls := &appsv1alpha1.CollaSet{}
if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, cls); err != nil {
return err
Expand All @@ -650,7 +650,7 @@ func updateCollaSetWithRetry(c client.Client, namespace, name string, updateFn f
}

func updatePodWithRetry(c client.Client, namespace, name string, updateFn func(pod *corev1.Pod) bool) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
pod := &corev1.Pod{}
if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil {
return err
Expand All @@ -665,7 +665,7 @@ func updatePodWithRetry(c client.Client, namespace, name string, updateFn func(p
}

func updatePodStatusWithRetry(c client.Client, namespace, name string, updateFn func(pod *corev1.Pod) bool) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
pod := &corev1.Pod{}
if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil {
return err
Expand Down
11 changes: 6 additions & 5 deletions pkg/controllers/collaset/synccontrol/sync_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package synccontrol

import (
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (sc *RealSyncControl) SyncPods(instance *appsv1alpha1.CollaSet, updatedRevi

// get owned IDs
var ownedIDs map[int]*appsv1alpha1.ContextDetail
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
ownedIDs, err = podcontext.AllocateID(sc.client, instance, updatedRevision.Name, int(replicasRealValue(instance.Spec.Replicas)))
return err
}); err != nil {
Expand Down Expand Up @@ -126,7 +127,7 @@ func (sc *RealSyncControl) SyncPods(instance *appsv1alpha1.CollaSet, updatedRevi

if needUpdateContext {
klog.V(1).Infof("try to update ResourceContext for CollaSet %s/%s when sync", instance.Namespace, instance.Name)
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
return podcontext.UpdateToPodContext(sc.client, instance, ownedIDs)
}); err != nil {
return false, nil, ownedIDs, fmt.Errorf("fail to update ResourceContext when reclaiming IDs: %s", err)
Expand Down Expand Up @@ -248,7 +249,7 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll
// mark these Pods to scalingIn
if needUpdateContext {
klog.V(1).Infof("try to update ResourceContext for CollaSet %s/%s when scaling in Pod", set.Namespace, set.Name)
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
return podcontext.UpdateToPodContext(sc.client, set, ownedIDs)
})

Expand Down Expand Up @@ -303,7 +304,7 @@ func (sc *RealSyncControl) Scale(set *appsv1alpha1.CollaSet, podWrappers []*coll

if needUpdatePodContext {
klog.V(1).Infof("try to update ResourceContext for CollaSet %s/%s after scaling", set.Namespace, set.Name)
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
return podcontext.UpdateToPodContext(sc.client, set, ownedIDs)
}); err != nil {
return scaling, fmt.Errorf("fail to reset ResourceContext: %s", err)
Expand Down Expand Up @@ -401,7 +402,7 @@ func (sc *RealSyncControl) Update(set *appsv1alpha1.CollaSet, podWrapers []*coll
// 5. mark Pod to use updated revision before updating it.
if needUpdateContext {
klog.V(1).Infof("try to update ResourceContext for CollaSet %s/%s", set.Namespace, set.Name)
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
return podcontext.UpdateToPodContext(sc.client, set, ownedIDs)
})

Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/poddeletion/poddeletion_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ var _ = Describe("Pod Deletion controller", func() {
})

func updatePodWithRetry(c client.Client, namespace, name string, updateFn func(pod *corev1.Pod) bool) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
pod := &corev1.Pod{}
if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil {
return err
Expand All @@ -125,7 +125,7 @@ func updatePodWithRetry(c client.Client, namespace, name string, updateFn func(p
}

func updatePodStatusWithRetry(c client.Client, namespace, name string, updateFn func(pod *corev1.Pod) bool) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
pod := &corev1.Pod{}
if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil {
return err
Expand Down
129 changes: 84 additions & 45 deletions pkg/controllers/podopslifecycle/podopslifecycle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -116,45 +117,48 @@ func (r *ReconcilePodOpsLifecycle) Reconcile(ctx context.Context, request reconc
return reconcile.Result{}, err
}

if !r.expectation.SatisfiedExpectations(key, pod.ResourceVersion) {
klog.Errorf("skip pod %s with no satisfied", key)
return reconcile.Result{}, nil
}

idToLabelsMap, _, err := PodIDAndTypesMap(pod)
if err != nil {
return reconcile.Result{}, err
}
if len(idToLabelsMap) == 0 {
updated, err := r.addServiceAvailable(pod)
if updated {
return reconcile.Result{}, err
}

updated, err = r.updateServiceReadiness(ctx, pod, true)
if updated {
return reconcile.Result{}, err
}
}

state, err := r.ruleSetManager.GetState(r.Client, pod)
if err != nil {
klog.Errorf("failed to get pod %s state: %s", key, err)
return reconcile.Result{}, err
}

var labels map[string]string

if state.InStageAndPassed() {
switch state.Stage {
case v1alpha1.PodOpsLifecyclePreTrafficOffStage:
labels, err = r.preTrafficOffStage(pod)
labels, err = r.preTrafficOffStage(pod, idToLabelsMap)
case v1alpha1.PodOpsLifecyclePreTrafficOnStage:
labels, err = r.preTrafficOnStage(pod)
labels, err = r.preTrafficOnStage(pod, idToLabelsMap)
}
}

klog.Infof("pod %s in stage %q, labels: %v, error: %v", key, state.Stage, labels, err)
if err != nil {
return reconcile.Result{}, err
}

if len(labels) > 0 {
expectation.ExpectUpdate(key, pod.ResourceVersion)
err = r.addLabels(ctx, pod, labels)
if err != nil {
expectation.DeleteExpectations(key)
}
return reconcile.Result{}, err
}

if !r.expectation.SatisfiedExpectations(key, pod.ResourceVersion) {
klog.Errorf("skip pod %s with no satisfied", key)
return reconcile.Result{}, nil
}

idToLabelsMap, _, err := PodIDAndTypesMap(pod)
if err != nil {
return reconcile.Result{}, err
return reconcile.Result{}, r.addLabels(ctx, pod, labels)
}

expected := map[string]bool{
Expand All @@ -176,9 +180,30 @@ func (r *ReconcilePodOpsLifecycle) Reconcile(ctx context.Context, request reconc
}
}
}
return reconcile.Result{}, nil
}

_, err = r.updateServiceReadiness(ctx, pod, true)
return reconcile.Result{}, err
func (r *ReconcilePodOpsLifecycle) addServiceAvailable(pod *corev1.Pod) (bool, error) {
if pod.Labels == nil {
return false, nil
}
if _, ok := pod.Labels[v1alpha1.PodServiceAvailableLabel]; ok {
return false, nil
}

satisfied, _, err := controllerutils.SatisfyExpectedFinalizers(pod) // whether all expected finalizers are satisfied
if err != nil || !satisfied {
return false, err
}

if !controllerutils.IsPodReady(pod) {
return false, nil
}

labels := map[string]string{
v1alpha1.PodServiceAvailableLabel: strconv.FormatInt(time.Now().Unix(), 10),
}
return true, r.addLabels(context.Background(), pod, labels)
}

func (r *ReconcilePodOpsLifecycle) updateServiceReadiness(ctx context.Context, pod *corev1.Pod, isReady bool) (bool, error) {
Expand All @@ -189,9 +214,18 @@ func (r *ReconcilePodOpsLifecycle) updateServiceReadiness(ctx context.Context, p

key := controllerKey(pod)
r.expectation.ExpectUpdate(key, pod.ResourceVersion)
if err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
newPod := &corev1.Pod{}
err := r.Client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, newPod)
if err != nil {
return err
}
needUpdate, _ := r.setServiceReadiness(newPod, isReady)
if !needUpdate {
return nil
}

if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.Client.Status().Update(ctx, pod)
return r.Client.Status().Update(ctx, newPod)
}); err != nil {
klog.Errorf("failed to update pod status %s: %s", key, err)
r.expectation.DeleteExpectations(key)
Expand Down Expand Up @@ -248,12 +282,7 @@ func (r *ReconcilePodOpsLifecycle) setServiceReadiness(pod *corev1.Pod, isReady
return true, fmt.Sprintf("update service readiness gate to: %s", string(status))
}

func (r *ReconcilePodOpsLifecycle) preTrafficOffStage(pod *corev1.Pod) (labels map[string]string, err error) {
idToLabelsMap, _, err := PodIDAndTypesMap(pod)
if err != nil {
return nil, err
}

func (r *ReconcilePodOpsLifecycle) preTrafficOffStage(pod *corev1.Pod, idToLabelsMap map[string]map[string]string) (labels map[string]string, err error) {
labels = map[string]string{}
currentTime := strconv.FormatInt(time.Now().Unix(), 10)
for k, v := range idToLabelsMap {
Expand All @@ -276,17 +305,12 @@ func (r *ReconcilePodOpsLifecycle) preTrafficOffStage(pod *corev1.Pod) (labels m
return
}

func (r *ReconcilePodOpsLifecycle) preTrafficOnStage(pod *corev1.Pod) (labels map[string]string, err error) {
idToLabelsMap, _, err := PodIDAndTypesMap(pod)
if err != nil {
return nil, err
}

func (r *ReconcilePodOpsLifecycle) preTrafficOnStage(pod *corev1.Pod, idToLabelsMap map[string]map[string]string) (labels map[string]string, err error) {
labels = map[string]string{}
currentTime := strconv.FormatInt(time.Now().Unix(), 10)
for k := range idToLabelsMap {
key := fmt.Sprintf("%s/%s", v1alpha1.PodPostCheckedLabelPrefix, k)
if _, ok := pod.GetLabels()[key]; !ok {
if _, ok := pod.Labels[key]; !ok {
labels[key] = currentTime // post-checked
}
}
Expand All @@ -295,16 +319,31 @@ func (r *ReconcilePodOpsLifecycle) preTrafficOnStage(pod *corev1.Pod) (labels ma
}

func (r *ReconcilePodOpsLifecycle) addLabels(ctx context.Context, pod *corev1.Pod, labels map[string]string) error {
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
for k, v := range labels {
pod.Labels[k] = v
if len(labels) == 0 {
return nil
}

return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return r.Client.Update(ctx, pod)
key := controllerKey(pod)
expectation.ExpectUpdate(key, pod.ResourceVersion)
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
newPod := &corev1.Pod{}
err := r.Client.Get(ctx, types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}, newPod)
if err != nil {
return err
}
if newPod.Labels == nil {
newPod.Labels = map[string]string{}
}
for k, v := range labels {
newPod.Labels[k] = v
}
return r.Client.Update(ctx, newPod)
})
if err != nil {
klog.Errorf("failed to update pod %s with labels: %v: %s", key, labels, err)
expectation.DeleteExpectations(key)
}
return err
}

func (r *ReconcilePodOpsLifecycle) initRuleSetManager() {
Expand Down
40 changes: 2 additions & 38 deletions pkg/controllers/podopslifecycle/podopslifecycle_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,8 @@ var _ = Describe("podopslifecycle controller", func() {
},
},
}
operationType = "restart"
id = "123"
time = "1402144848"
id = "123"
time = "1402144848"
)

AfterEach(func() {
Expand Down Expand Up @@ -156,41 +155,6 @@ var _ = Describe("podopslifecycle controller", func() {
Expect(pod.Status.Conditions).To(HaveLen(1))
Expect(string(pod.Status.Conditions[0].Type)).To(Equal(v1alpha1.ReadinessGatePodServiceReady))
Expect(pod.Status.Conditions[0].Status).To(Equal(corev1.ConditionTrue))

podOpsLifecycle.ruleSetManager = &mockRuleSetManager{
CheckState: &checker.CheckState{
Stage: v1alpha1.PodOpsLifecyclePreTrafficOffStage,
States: []checker.State{
{
Detail: &v1alpha1.Detail{
Stage: v1alpha1.PodOpsLifecyclePreTrafficOffStage,
Passed: true,
},
},
},
},
}

pod.ObjectMeta.Labels = map[string]string{
v1alpha1.ControlledByPodOpsLifecycle: "true",
fmt.Sprintf("%s/%s", v1alpha1.PodOperateLabelPrefix, id): time,
fmt.Sprintf("%s/%s", v1alpha1.PodPreCheckLabelPrefix, id): time,
fmt.Sprintf("%s/%s", v1alpha1.PodOperationTypeLabelPrefix, id): operationType,
}
err = mgr.GetClient().Update(context.Background(), pod)
Expect(err).NotTo(HaveOccurred())

<-request

pod = &corev1.Pod{}
err = mgr.GetAPIReader().Get(context.Background(), client.ObjectKey{
Name: "test",
Namespace: "default",
}, pod)
Expect(err).NotTo(HaveOccurred())

Expect(pod.GetLabels()).To(HaveKey(fmt.Sprintf("%s/%s", v1alpha1.PodPreCheckedLabelPrefix, id)))
Expect(pod.GetLabels()).To(HaveKey(fmt.Sprintf("%s/%s", v1alpha1.PodOperationPermissionLabelPrefix, operationType)))
})

It("create pod with label prepare", func() {
Expand Down
7 changes: 1 addition & 6 deletions pkg/controllers/podopslifecycle/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package podopslifecycle

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"sigs.k8s.io/controller-runtime/pkg/event"
)

Expand Down Expand Up @@ -51,11 +50,7 @@ func (pp *PodPredicate) Update(evt event.UpdateEvent) bool {
if oldPod == nil && newPod == nil {
return false
}
if !pp.NeedOpsLifecycle(oldPod, newPod) {
return false
}

return !equality.Semantic.DeepEqual(oldPod.ObjectMeta.Annotations, newPod.ObjectMeta.Annotations) || !equality.Semantic.DeepEqual(oldPod.ObjectMeta.Labels, newPod.ObjectMeta.Labels)
return pp.NeedOpsLifecycle(oldPod, newPod)
}

func (pp *PodPredicate) Generic(evt event.GenericEvent) bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func expectedStatusReplicas(c client.Client, cls *appsv1alpha1.CollaSet, schedul
}

func updateCollaSetWithRetry(c client.Client, namespace, name string, updateFn func(cls *appsv1alpha1.CollaSet) bool) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
cls := &appsv1alpha1.CollaSet{}
if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, cls); err != nil {
return err
Expand All @@ -289,7 +289,7 @@ func updateCollaSetWithRetry(c client.Client, namespace, name string, updateFn f
}

func updatePodWithRetry(c client.Client, namespace, name string, updateFn func(pod *corev1.Pod) bool) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
pod := &corev1.Pod{}
if err := c.Get(context.TODO(), types.NamespacedName{Namespace: namespace, Name: name}, pod); err != nil {
return err
Expand Down
Loading

0 comments on commit 49126be

Please sign in to comment.