Skip to content

Commit

Permalink
feat(CollaSet): supports operationDelaySeconds in Scaling-in and Upda…
Browse files Browse the repository at this point in the history
…ting Pods (#79)

* feat(CollaSet): supports operationDelaySeconds in Scale and Update operation

* fix golint

* fix UT
  • Loading branch information
wu8685 committed Aug 31, 2023
1 parent b623161 commit 55052b3
Show file tree
Hide file tree
Showing 13 changed files with 296 additions and 115 deletions.
31 changes: 28 additions & 3 deletions apis/apps/v1alpha1/collaset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ import (
type CollaSetConditionType string

const (
CollaSetSyncPod CollaSetConditionType = "SyncPod"
CollaSetScale CollaSetConditionType = "Scale"
CollaSetUpdate CollaSetConditionType = "Update"
CollaSetScale CollaSetConditionType = "Scale"
CollaSetUpdate CollaSetConditionType = "Update"
)

// PersistentVolumeClaimRetentionPolicyType is a string enumeration of the policies that will determine
Expand Down Expand Up @@ -98,9 +97,11 @@ type CollaSetSpec struct {
// UpdateStrategy indicates the CollaSetUpdateStrategy that will be
// employed to update Pods in the CollaSet when a revision is made to
// Template.
// +optional
UpdateStrategy UpdateStrategy `json:"updateStrategy,omitempty"`

// ScaleStrategy indicates the strategy detail that will be used during pod scaling.
// +optional
ScaleStrategy ScaleStrategy `json:"scaleStrategy,omitempty"`

// Indicate the number of histories to be conserved
Expand All @@ -113,11 +114,15 @@ type ScaleStrategy struct {
// Context indicates the pool from which to allocate Pod instance ID. CollaSets are allowed to share the
// same Context. It is not allowed to change.
// Context defaults to be CollaSet's name.
// +optional
Context string `json:"context,omitempty"`

// PodToExclude indicates the pods which will be orphaned by CollaSet.
// +optional
PodToExclude []string `json:"podToExclude,omitempty"`

// PodToInclude indicates the pods which will be adapted by CollaSet.
// +optional
PodToInclude []string `json:"podToInclude,omitempty"`

// PersistentVolumeClaimRetentionPolicy describes the lifecycle of PersistentVolumeClaim
Expand All @@ -126,25 +131,33 @@ type ScaleStrategy struct {
// by deleting persistent volume claims when their CollaSet is deleted, or when their pod is scaled down.
// +optional
PersistentVolumeClaimRetentionPolicy *PersistentVolumeClaimRetentionPolicy `json:"persistentVolumeClaimRetentionPolicy,omitempty"`

// OperationDelaySeconds indicates how many seconds it should delay before operating scale.
// +optional
OperationDelaySeconds *int32 `json:"operationDelaySeconds,omitempty"`
}

type PersistentVolumeClaimRetentionPolicy struct {
// WhenDeleted specifies what happens to PVCs created from CollaSet
// VolumeClaimTemplates when the CollaSet is deleted. The default policy
// of `Delete` policy causes those PVCs to be deleted.
//`Retain` causes PVCs to not be affected by StatefulSet deletion. The
// +optional
WhenDeleted PersistentVolumeClaimRetentionPolicyType `json:"whenDeleted,omitempty"`

// WhenScaled specifies what happens to PVCs created from StatefulSet
// VolumeClaimTemplates when the StatefulSet is scaled down. The default
// policy of `Retain` causes PVCs to not be affected by a scaledown. The
// `Delete` policy causes the associated PVCs for any excess pods above
// the replica count to be deleted.
// +optional
WhenScaled PersistentVolumeClaimRetentionPolicyType `json:"whenScaled,omitempty"`
}

type ByPartition struct {
// Partition controls the update progress by indicating how many pods should be updated.
// Defaults to nil (all pods will be updated)
// +optional
Partition *int32 `json:"partition,omitempty"`
}

Expand All @@ -154,19 +167,26 @@ type ByLabel struct {
// RollingUpdateCollaSetStrategy is used to communicate parameter for rolling update.
type RollingUpdateCollaSetStrategy struct {
// ByPartition indicates the update progress is controlled by partition value.
// +optional
ByPartition *ByPartition `json:"byPartition,omitempty"`

// ByLabel indicates the update progress is controlled by attaching pod label.
// +optional
ByLabel *ByLabel `json:"byLabel,omitempty"`
}

type UpdateStrategy struct {
// RollingUpdate is used to communicate parameters when Type is RollingUpdateStatefulSetStrategyType.
// +optional
RollingUpdate *RollingUpdateCollaSetStrategy `json:"rollingUpdate,omitempty"`

// PodUpdatePolicy indicates the policy by to update pods.
// +optional
PodUpdatePolicy PodUpdateStrategyType `json:"podUpgradePolicy,omitempty"`

// OperationDelaySeconds indicates how many seconds it should delay before operating update.
// +optional
OperationDelaySeconds *int32 `json:"operationDelaySeconds,omitempty"`
}

// CollaSetStatus defines the observed state of CollaSet
Expand All @@ -177,9 +197,11 @@ type CollaSetStatus struct {
ObservedGeneration int64 `json:"observedGeneration,omitempty"`

// CurrentRevision, if not empty, indicates the version of the CollaSet.
// +optional
CurrentRevision string `json:"currentRevision,omitempty"`

// UpdatedRevision, if not empty, indicates the version of the CollaSet currently updated.
// +optional
UpdatedRevision string `json:"updatedRevision,omitempty"`

// Count of hash collisions for the DaemonSet. The DaemonSet controller
Expand All @@ -201,12 +223,15 @@ type CollaSetStatus struct {
AvailableReplicas int32 `json:"availableReplicas,omitempty"`

// Replicas is the most recently observed number of replicas.
// +optional
Replicas int32 `json:"replicas,omitempty"`

// The number of pods in updated version.
// +optional
UpdatedReplicas int32 `json:"updatedReplicas,omitempty"`

// OperatingReplicas indicates the number of pods during pod ops lifecycle and not finish update-phase.
// +optional
OperatingReplicas int32 `json:"operatingReplicas,omitempty"`

// UpdatedReadyReplicas indicates the number of the pod with updated revision and ready condition
Expand Down
10 changes: 10 additions & 0 deletions apis/apps/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions config/crd/bases/apps.kusionstack.io_collasets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ spec:
It is not allowed to change. Context defaults to be CollaSet's
name.
type: string
operationDelaySeconds:
description: OperationDelaySeconds indicates how many seconds
it should delay before operating scale.
format: int32
type: integer
persistentVolumeClaimRetentionPolicy:
description: PersistentVolumeClaimRetentionPolicy describes the
lifecycle of PersistentVolumeClaim created from volumeClaimTemplates.
Expand Down Expand Up @@ -190,6 +195,11 @@ spec:
will be employed to update Pods in the CollaSet when a revision
is made to Template.
properties:
operationDelaySeconds:
description: OperationDelaySeconds indicates how many seconds
it should delay before operating update.
format: int32
type: integer
podUpgradePolicy:
description: PodUpdatePolicy indicates the policy by to update
pods.
Expand Down
28 changes: 16 additions & 12 deletions pkg/controllers/collaset/collaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package collaset
import (
"context"
"fmt"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -160,38 +161,41 @@ func (r *CollaSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
UpdatedRevision: updatedRevision.Name,
}

newStatus, err = r.DoReconcile(instance, updatedRevision, revisions, newStatus)
requeueAfter, newStatus, err := r.DoReconcile(instance, updatedRevision, revisions, newStatus)
// update status anyway
if err := r.updateStatus(ctx, instance, newStatus); err != nil {
return ctrl.Result{}, fmt.Errorf("fail to update status of CollaSet %s: %s", req, err)
return ctrl.Result{RequeueAfter: requeueAfter}, fmt.Errorf("fail to update status of CollaSet %s: %s", req, err)
}

return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: requeueAfter}, err
}

func (r *CollaSetReconciler) DoReconcile(instance *appsv1alpha1.CollaSet, updatedRevision *appsv1.ControllerRevision, revisions []*appsv1.ControllerRevision, newStatus *appsv1alpha1.CollaSetStatus) (*appsv1alpha1.CollaSetStatus, error) {
podWrappers, newStatus, syncErr := r.doSync(instance, updatedRevision, revisions, newStatus)
return calculateStatus(instance, newStatus, updatedRevision, podWrappers, syncErr), syncErr
func (r *CollaSetReconciler) DoReconcile(instance *appsv1alpha1.CollaSet, updatedRevision *appsv1.ControllerRevision, revisions []*appsv1.ControllerRevision, newStatus *appsv1alpha1.CollaSetStatus) (time.Duration, *appsv1alpha1.CollaSetStatus, error) {
podWrappers, newStatus, requeueAfter, syncErr := r.doSync(instance, updatedRevision, revisions, newStatus)
return requeueAfter, calculateStatus(instance, newStatus, updatedRevision, podWrappers, syncErr), syncErr
}

// doSync is responsible for reconcile Pods with CollaSet spec.
// 1. sync Pods to prepare information, especially IDs, for following Scale and Update
// 2. scale Pods to match the Pod number indicated in `spec.replcas`. if an error thrown out or Pods is not matched recently, update will be skipped.
// 3. update Pods, to update each Pod to the updated revision indicated by `spec.template`
func (r *CollaSetReconciler) doSync(instance *appsv1alpha1.CollaSet, updatedRevision *appsv1.ControllerRevision, revisions []*appsv1.ControllerRevision, newStatus *appsv1alpha1.CollaSetStatus) ([]*collasetutils.PodWrapper, *appsv1alpha1.CollaSetStatus, error) {
func (r *CollaSetReconciler) doSync(instance *appsv1alpha1.CollaSet, updatedRevision *appsv1.ControllerRevision, revisions []*appsv1.ControllerRevision, newStatus *appsv1alpha1.CollaSetStatus) ([]*collasetutils.PodWrapper, *appsv1alpha1.CollaSetStatus, time.Duration, error) {
synced, podWrappers, ownedIDs, err := r.syncControl.SyncPods(instance, updatedRevision, newStatus)
if err != nil || synced {
return podWrappers, newStatus, err
return podWrappers, newStatus, 0, err
}

scaling, err := r.syncControl.Scale(instance, podWrappers, revisions, updatedRevision, ownedIDs, newStatus)
scaling, scaleRequeueAfter, err := r.syncControl.Scale(instance, podWrappers, revisions, updatedRevision, ownedIDs, newStatus)
if err != nil || scaling {
return podWrappers, newStatus, err
return podWrappers, newStatus, scaleRequeueAfter, err
}

_, err = r.syncControl.Update(instance, podWrappers, revisions, updatedRevision, ownedIDs, newStatus)
_, updateRequeueAfter, err := r.syncControl.Update(instance, podWrappers, revisions, updatedRevision, ownedIDs, newStatus)
if updateRequeueAfter > 0 && (scaleRequeueAfter == 0 || updateRequeueAfter < scaleRequeueAfter) {
return podWrappers, newStatus, updateRequeueAfter, err
}

return podWrappers, newStatus, err
return podWrappers, newStatus, scaleRequeueAfter, err
}

func calculateStatus(instance *appsv1alpha1.CollaSet, newStatus *appsv1alpha1.CollaSetStatus, updatedRevision *appsv1.ControllerRevision, podWrappers []*collasetutils.PodWrapper, syncErr error) *appsv1alpha1.CollaSetStatus {
Expand Down
25 changes: 16 additions & 9 deletions pkg/controllers/collaset/collaset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,10 @@ var _ = Describe("collaset controller", func() {
return c.Get(context.TODO(), types.NamespacedName{Namespace: podToDelete.Namespace, Name: podToDelete.Name}, podToDelete) != nil
}, 5*time.Second, 1*time.Second).Should(BeTrue())

// scale in pods
// scale in pods with delay seconds
Expect(updateCollaSetWithRetry(c, cs.Namespace, cs.Name, func(cls *appsv1alpha1.CollaSet) bool {
cls.Spec.Replicas = int32Pointer(0)
cls.Spec.ScaleStrategy.OperationDelaySeconds = int32Pointer(1)
return true
})).Should(BeNil())

Expand All @@ -162,17 +163,20 @@ var _ = Describe("collaset controller", func() {
pod := &podList.Items[i]
Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool {
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, collasetutils.ScaleInOpsLifecycleAdapter.GetID())
pod.Labels[labelOperate] = "true"
pod.Labels[labelOperate] = fmt.Sprintf("%d", time.Now().UnixNano())
return true
})).Should(BeNil())
}

Eventually(func() bool {
Eventually(func() error {
Expect(c.List(context.TODO(), podList, client.InNamespace(cs.Namespace))).Should(BeNil())
return len(podList.Items) == 0
}, 5*time.Second, 1*time.Second).Should(BeTrue())
Expect(c.Get(context.TODO(), types.NamespacedName{Namespace: cs.Namespace, Name: cs.Name}, cs)).Should(BeNil())
Expect(expectedStatusReplicas(c, cs, 0, 0, 0, 0, 0, 0, 0, 0)).Should(BeNil())
if len(podList.Items) != 0 {
return fmt.Errorf("expected 0 pods, got %d", len(podList.Items))
}

Expect(c.Get(context.TODO(), types.NamespacedName{Namespace: cs.Namespace, Name: cs.Name}, cs)).Should(BeNil())
return expectedStatusReplicas(c, cs, 0, 0, 0, 0, 0, 0, 0, 0)
}, 5*time.Second, 1*time.Second).Should(BeNil())
})

It("update reconcile", func() {
Expand Down Expand Up @@ -206,6 +210,9 @@ var _ = Describe("collaset controller", func() {
},
},
},
UpdateStrategy: appsv1alpha1.UpdateStrategy{
OperationDelaySeconds: int32Pointer(1),
},
},
}

Expand Down Expand Up @@ -247,13 +254,13 @@ var _ = Describe("collaset controller", func() {
continue
}

if podopslifecycle.AllowOps(collasetutils.UpdateOpsLifecycleAdapter, pod) {
if _, allowed := podopslifecycle.AllowOps(collasetutils.UpdateOpsLifecycleAdapter, 0, pod); allowed {
continue
}
// allow Pod to do update
Expect(updatePodWithRetry(c, pod.Namespace, pod.Name, func(pod *corev1.Pod) bool {
labelOperate := fmt.Sprintf("%s/%s", appsv1alpha1.PodOperateLabelPrefix, collasetutils.UpdateOpsLifecycleAdapter.GetID())
pod.Labels[labelOperate] = "true"
pod.Labels[labelOperate] = fmt.Sprintf("%d", time.Now().UnixNano())
return true
})).Should(BeNil())
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/collaset/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
appsalphav1 "kusionstack.io/kafed/apis/apps/v1alpha1"
)

func getCollaSetPatch(ds *appsalphav1.CollaSet) ([]byte, error) {
dsBytes, err := json.Marshal(ds)
func getCollaSetPatch(cls *appsalphav1.CollaSet) ([]byte, error) {
dsBytes, err := json.Marshal(cls)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 55052b3

Please sign in to comment.