Skip to content

Commit

Permalink
Support label the pods that need to be deleted before the preparing p…
Browse files Browse the repository at this point in the history
…hase (#161)

support label the pods that need to be deleted before the preparing phase
  • Loading branch information
Eikykun committed Mar 11, 2024
1 parent 5eae342 commit 0cdef01
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 55 deletions.
1 change: 1 addition & 0 deletions apis/apps/v1alpha1/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
PodDeletionIndicationLabelKey = "podopslifecycle.kusionstack.io/to-delete" // users can use this label to indicate a pod to delete
PodReplaceIndicationLabelKey = "podopslifecycle.kusionstack.io/to-replace" // users can use this label to indicate a pod to replace
PodReplaceByReplaceUpdateLabelKey = "podopslifecycle.kusionstack.io/replaced-by-replace-update"
PodPreparingDeleteLabel = "podopslifecycle.kusionstack.io/preparing-to-delete"

PodInstanceIDLabelKey = "collaset.kusionstack.io/instance-id" // used to attach Pod instance ID on Pod

Expand Down
38 changes: 24 additions & 14 deletions pkg/controllers/collaset/synccontrol/sync_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,9 @@ func (r *RealSyncControl) Update(
// 2. decide Pod update candidates
podToUpdate := decidePodToUpdate(cls, podUpdateInfos)
podCh := make(chan *PodUpdateInfo, len(podToUpdate))
updater := newPodUpdater(ctx, r.client, cls)
updating := false
analysedPod := sets.NewString()

if cls.Spec.UpdateStrategy.PodUpdatePolicy != appsv1alpha1.CollaSetReplaceUpdatePodUpdateStrategyType {
// 3. prepare Pods to begin PodOpsLifecycle
Expand All @@ -680,11 +682,20 @@ func (r *RealSyncControl) Update(

// 4. begin podOpsLifecycle parallel

succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(_ int, err error) error {
succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(int, error) error {
podInfo := <-podCh

// fulfill Pod update information
if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil {
return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err)
}
analysedPod.Insert(podInfo.Name)
logger.V(1).Info("try to begin PodOpsLifecycle for updating Pod of CollaSet", "pod", commonutils.ObjectKeyString(podInfo.Pod))
if updated, err := podopslifecycle.Begin(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil {
if updated, err := podopslifecycle.Begin(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod, func(obj client.Object) (bool, error) {
if !podInfo.OnlyMetadataChanged && !podInfo.InPlaceUpdateSupport {
return podopslifecycle.WhenBeginDelete(obj)
}
return false, nil
}); err != nil {
return fmt.Errorf("fail to begin PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
Expand Down Expand Up @@ -767,36 +778,35 @@ func (r *RealSyncControl) Update(
}

// 6. update Pod
updater := newPodUpdater(ctx, r.client, cls)
succCount, err := controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(_ int, _ error) error {
podInfo := <-podCh
// analyse Pod to get update information
inPlaceSupport, onlyMetadataChanged, updatedPod, err := updater.AnalyseAndGetUpdatedPod(resources.UpdatedRevision, podInfo)
if err != nil {
return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err)
if !analysedPod.Has(podInfo.Name) {
if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil {
return fmt.Errorf("fail to analyse pod %s/%s in-place update support: %s", podInfo.Namespace, podInfo.Name, err)
}
}

logger.V(1).Info("before pod update operation",
"pod", commonutils.ObjectKeyString(podInfo.Pod),
"revision.from", podInfo.CurrentRevision.Name,
"revision.to", resources.UpdatedRevision.Name,
"inPlaceUpdate", inPlaceSupport,
"onlyMetadataChanged", onlyMetadataChanged,
"inPlaceUpdate", podInfo.InPlaceUpdateSupport,
"onlyMetadataChanged", podInfo.OnlyMetadataChanged,
)
if onlyMetadataChanged || inPlaceSupport {
if podInfo.OnlyMetadataChanged || podInfo.InPlaceUpdateSupport {
// 6.1 if pod template changes only include metadata or support in-place update, just apply these changes to pod directly
if err = r.podControl.UpdatePod(updatedPod); err != nil {
if err = r.podControl.UpdatePod(podInfo.UpdatedPod); err != nil {
return fmt.Errorf("fail to update Pod %s/%s when updating by in-place: %s", podInfo.Namespace, podInfo.Name, err)
} else {
podInfo.Pod = updatedPod
podInfo.Pod = podInfo.UpdatedPod
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdatePod",
"succeed to update Pod %s/%s to from revision %s to revision %s by in-place",
podInfo.Namespace, podInfo.Name,
podInfo.CurrentRevision.Name,
resources.UpdatedRevision.Name)
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, updatedPod.ResourceVersion); err != nil {
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.UpdatedPod.ResourceVersion); err != nil {
return err
}
}
Expand Down
60 changes: 31 additions & 29 deletions pkg/controllers/collaset/synccontrol/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ import (
type PodUpdateInfo struct {
*utils.PodWrapper

UpdatedPod *corev1.Pod

InPlaceUpdateSupport bool
OnlyMetadataChanged bool

// indicate if this pod has up-to-date revision from its owner, like CollaSet
IsUpdatedRevision bool
// carry the pod's current revision
Expand Down Expand Up @@ -248,8 +253,7 @@ func (o orderByDefault) Less(i, j int) bool {
}

type PodUpdater interface {
AnalyseAndGetUpdatedPod(revision *appsv1.ControllerRevision, podUpdateInfo *PodUpdateInfo) (
inPlaceUpdateSupport bool, onlyMetadataChanged bool, updatedPod *corev1.Pod, err error)
FulfillPodUpdatedInfo(revision *appsv1.ControllerRevision, podUpdateInfo *PodUpdateInfo) error
GetPodUpdateFinishStatus(podUpdateInfo *PodUpdateInfo) (bool, string, error)
}

Expand Down Expand Up @@ -284,10 +288,9 @@ type inPlaceIfPossibleUpdater struct {
client.Client
}

func (u *inPlaceIfPossibleUpdater) AnalyseAndGetUpdatedPod(
func (u *inPlaceIfPossibleUpdater) FulfillPodUpdatedInfo(
updatedRevision *appsv1.ControllerRevision,
podUpdateInfo *PodUpdateInfo) (
inPlaceUpdateSupport bool, onlyMetadataChanged bool, updatedPod *corev1.Pod, err error) {
podUpdateInfo *PodUpdateInfo) error {

// 1. build pod from current and updated revision
ownerRef := metav1.NewControllerRef(u.collaSet, appsv1alpha1.GroupVersion.WithKind("CollaSet"))
Expand All @@ -296,33 +299,33 @@ func (u *inPlaceIfPossibleUpdater) AnalyseAndGetUpdatedPod(
return utilspoddecoration.PatchListOfDecorations(in, podUpdateInfo.CurrentPodDecorations)
})
if err != nil {
err = fmt.Errorf("fail to build Pod from current revision %s: %v", podUpdateInfo.CurrentRevision.Name, err)
return
return fmt.Errorf("fail to build Pod from current revision %s: %v", podUpdateInfo.CurrentRevision.Name, err)
}

// TODO: use cache
updatedPod, err = collasetutils.NewPodFrom(u.collaSet, ownerRef, updatedRevision, func(in *corev1.Pod) error {
podUpdateInfo.UpdatedPod, err = collasetutils.NewPodFrom(u.collaSet, ownerRef, updatedRevision, func(in *corev1.Pod) error {
return utilspoddecoration.PatchListOfDecorations(in, podUpdateInfo.UpdatedPodDecorations)
})
if err != nil {
err = fmt.Errorf("fail to build Pod from updated revision %s: %v", updatedRevision.Name, err)
return
return fmt.Errorf("fail to build Pod from updated revision %s: %v", updatedRevision.Name, err)
}

// 2. compare current and updated pods. Only pod image and metadata are supported to update in-place
// TODO: use cache
inPlaceUpdateSupport, onlyMetadataChanged = u.diffPod(currentPod, updatedPod)
podUpdateInfo.InPlaceUpdateSupport, podUpdateInfo.OnlyMetadataChanged = u.diffPod(currentPod, podUpdateInfo.UpdatedPod)
// 3. if pod has changes more than metadata and image
if !inPlaceUpdateSupport {
return false, onlyMetadataChanged, nil, nil
if !podUpdateInfo.InPlaceUpdateSupport {
return nil
}

inPlaceUpdateSupport = true
updatedPod, err = utils.PatchToPod(currentPod, updatedPod, podUpdateInfo.Pod)
podUpdateInfo.UpdatedPod, err = utils.PatchToPod(currentPod, podUpdateInfo.UpdatedPod, podUpdateInfo.Pod)
if err != nil {
return err
}

if onlyMetadataChanged {
if updatedPod.Annotations != nil {
delete(updatedPod.Annotations, appsv1alpha1.LastPodStatusAnnotationKey)
if podUpdateInfo.OnlyMetadataChanged {
if podUpdateInfo.UpdatedPod.Annotations != nil {
delete(podUpdateInfo.UpdatedPod.Annotations, appsv1alpha1.LastPodStatusAnnotationKey)
}
} else {
containerCurrentStatusMapping := map[string]*corev1.ContainerStatus{}
Expand All @@ -332,7 +335,7 @@ func (u *inPlaceIfPossibleUpdater) AnalyseAndGetUpdatedPod(
}

podStatus := &PodStatus{ContainerStates: map[string]*ContainerStatus{}}
for _, container := range updatedPod.Spec.Containers {
for _, container := range podUpdateInfo.UpdatedPod.Spec.Containers {
podStatus.ContainerStates[container.Name] = &ContainerStatus{
// store image of each container in updated Pod
LatestImage: container.Image,
Expand All @@ -349,16 +352,15 @@ func (u *inPlaceIfPossibleUpdater) AnalyseAndGetUpdatedPod(

podStatusStr, err := json.Marshal(podStatus)
if err != nil {
return inPlaceUpdateSupport, onlyMetadataChanged, updatedPod, err
return err
}

if updatedPod.Annotations == nil {
updatedPod.Annotations = map[string]string{}
if podUpdateInfo.UpdatedPod.Annotations == nil {
podUpdateInfo.UpdatedPod.Annotations = map[string]string{}
}
updatedPod.Annotations[appsv1alpha1.LastPodStatusAnnotationKey] = string(podStatusStr)
podUpdateInfo.UpdatedPod.Annotations[appsv1alpha1.LastPodStatusAnnotationKey] = string(podStatusStr)
}

return
return nil
}

func (u inPlaceIfPossibleUpdater) diffPod(currentPod, updatedPod *corev1.Pod) (inPlaceSetUpdateSupport bool, onlyMetadataChanged bool) {
Expand Down Expand Up @@ -461,7 +463,7 @@ func (u *inPlaceIfPossibleUpdater) GetPodUpdateFinishStatus(podUpdateInfo *PodUp
type inPlaceOnlyPodUpdater struct {
}

func (u *inPlaceOnlyPodUpdater) AnalyseAndGetUpdatedPod(_ *appsv1.ControllerRevision, _ *PodUpdateInfo) (inPlaceUpdateSupport bool, onlyMetadataChanged bool, updatedPod *corev1.Pod, err error) {
func (u *inPlaceOnlyPodUpdater) FulfillPodUpdatedInfo(_ *appsv1.ControllerRevision, _ *PodUpdateInfo) (inPlaceUpdateSupport bool, onlyMetadataChanged bool, updatedPod *corev1.Pod, err error) {

return
}
Expand All @@ -473,8 +475,8 @@ func (u *inPlaceOnlyPodUpdater) GetPodUpdateFinishStatus(_ *PodUpdateInfo) (fini
type recreatePodUpdater struct {
}

func (u *recreatePodUpdater) AnalyseAndGetUpdatedPod(_ *appsv1.ControllerRevision, _ *PodUpdateInfo) (inPlaceUpdateSupport bool, onlyMetadataChanged bool, updatedPod *corev1.Pod, err error) {
return false, false, nil, nil
func (u *recreatePodUpdater) FulfillPodUpdatedInfo(_ *appsv1.ControllerRevision, _ *PodUpdateInfo) error {
return nil
}

func (u *recreatePodUpdater) GetPodUpdateFinishStatus(podInfo *PodUpdateInfo) (finished bool, msg string, err error) {
Expand All @@ -488,7 +490,7 @@ type replaceUpdatePodUpdater struct {
client.Client
}

func (u *replaceUpdatePodUpdater) AnalyseAndGetUpdatedPod(updatedRevision *appsv1.ControllerRevision, podUpdateInfo *PodUpdateInfo) (inPlaceUpdateSupport bool, onlyMetadataChanged bool, updatedPod *corev1.Pod, err error) {
func (u *replaceUpdatePodUpdater) FulfillPodUpdatedInfo(updatedRevision *appsv1.ControllerRevision, podUpdateInfo *PodUpdateInfo) (err error) {
// when replaceUpdate, inPlaceUpdateSupport and onlyMetadataChanged always false

// 1. judge replace pair new pod is updated revision, if not, delete.
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/collaset/utils/lifecycle_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (a *CollaSetScaleInOpsLifecycleAdapter) AllowMultiType() bool {

// WhenBegin will be executed when begin a lifecycle
func (a *CollaSetScaleInOpsLifecycleAdapter) WhenBegin(pod client.Object) (bool, error) {
return false, nil
return podopslifecycle.WhenBeginDelete(pod)
}

// WhenFinish will be executed when finish a lifecycle
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/poddeletion/lifecycle_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (a *PodDeleteOpsLifecycleAdapter) AllowMultiType() bool {
}

// WhenBegin will be executed when begin a lifecycle
func (a *PodDeleteOpsLifecycleAdapter) WhenBegin(_ client.Object) (bool, error) {
return false, nil
func (a *PodDeleteOpsLifecycleAdapter) WhenBegin(obj client.Object) (bool, error) {
return podopslifecycle.WhenBeginDelete(obj)
}

// WhenFinish will be executed when finish a lifecycle
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/utils/podopslifecycle/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ var (
OpsLifecycleTypeDelete OperationType = "delete"
)

type UpdateFunc func(object client.Object) (bool, error)

// LifecycleAdapter helps CRD Operators to easily access PodOpsLifecycle
type LifecycleAdapter interface {
// GetID indicates ID of one PodOpsLifecycle
Expand Down
39 changes: 39 additions & 0 deletions pkg/controllers/utils/podopslifecycle/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
Copyright 2024 The KusionStack Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podopslifecycle

import (
"strconv"
"time"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "kusionstack.io/operating/apis/apps/v1alpha1"
)

var WhenBeginDelete UpdateFunc = func(obj client.Object) (bool, error) {
return AddLabel(obj.(*corev1.Pod), appsv1alpha1.PodPreparingDeleteLabel, strconv.FormatInt(time.Now().UnixNano(), 10)), nil
}

func AddLabel(po *corev1.Pod, k, v string) bool {
if po.Labels == nil {
po.Labels = map[string]string{}
}
if _, ok := po.Labels[k]; !ok {
po.Labels[k] = v
return true
}
return false
}
27 changes: 18 additions & 9 deletions pkg/controllers/utils/podopslifecycle/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func IsDuringOps(adapter LifecycleAdapter, obj client.Object) bool {
}

// Begin is used for an CRD Operator to begin a lifecycle
func Begin(c client.Client, adapter LifecycleAdapter, obj client.Object) (updated bool, err error) {
func Begin(c client.Client, adapter LifecycleAdapter, obj client.Object, updateFunc ...UpdateFunc) (updated bool, err error) {
if obj.GetLabels() == nil {
obj.SetLabels(map[string]string{})
}
Expand Down Expand Up @@ -72,14 +72,14 @@ func Begin(c client.Client, adapter LifecycleAdapter, obj client.Object) (update
}
}

updated, err = adapter.WhenBegin(obj)
updated, err = DefaultUpdateAll(obj, append(updateFunc, adapter.WhenBegin)...)
if err != nil {
return
}

if needUpdate || updated {
err = c.Update(context.Background(), obj)
return true, err
return err == nil, err
}

return false, nil
Expand Down Expand Up @@ -113,7 +113,7 @@ func AllowOps(adapter LifecycleAdapter, operationDelaySeconds int32, obj client.
}

// Finish is used for an CRD Operator to finish a lifecycle
func Finish(c client.Client, adapter LifecycleAdapter, obj client.Object) (updated bool, err error) {
func Finish(c client.Client, adapter LifecycleAdapter, obj client.Object, updateFunc ...UpdateFunc) (updated bool, err error) {
operatingID, hasID := checkOperatingID(adapter, obj)
operationType, hasType := checkOperationType(adapter, obj)

Expand All @@ -128,18 +128,16 @@ func Finish(c client.Client, adapter LifecycleAdapter, obj client.Object) (updat
deleteOperationType(adapter, obj)
}

updated, err = adapter.WhenFinish(obj)
updated, err = DefaultUpdateAll(obj, append(updateFunc, adapter.WhenFinish)...)
if err != nil {
return
}
if needUpdate || updated {
err = c.Update(context.Background(), obj)
if err != nil {
return
}
return err == nil, err
}

return needUpdate, err
return false, err
}

func checkOperatingID(adapter LifecycleAdapter, obj client.Object) (val string, ok bool) {
Expand Down Expand Up @@ -207,3 +205,14 @@ func queryByOperationType(adapter LifecycleAdapter, obj client.Object) sets.Stri

return res
}

func DefaultUpdateAll(pod client.Object, updateFunc ...UpdateFunc) (updated bool, err error) {
for _, f := range updateFunc {
ok, updateErr := f(pod)
if updateErr != nil {
return updated, updateErr
}
updated = updated || ok
}
return updated, nil
}

0 comments on commit 0cdef01

Please sign in to comment.