Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature]Replace update #147

Merged
merged 15 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apis/apps/v1alpha1/collaset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ const (
// CollaSetInPlaceOnlyPodUpdateStrategyType indicates that CollaSet will always update Pod in-place, instead of
// recreating pod. It will encounter an error on original Kubernetes cluster.
CollaSetInPlaceOnlyPodUpdateStrategyType PodUpdateStrategyType = "InPlaceOnly"

// CollaSetReplaceUpdatePodUpdateStrategyType indicates that CollaSet will always update Pod by replace, it will
// create a new Pod and delete update pod when new created pod service available.
CollaSetReplaceUpdatePodUpdateStrategyType PodUpdateStrategyType = "ReplaceUpdate"
)

// CollaSetSpec defines the desired state of CollaSet
Expand Down
4 changes: 4 additions & 0 deletions apis/apps/v1alpha1/well_known_labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ const (
PodDeletionIndicationLabelKey = "podopslifecycle.kusionstack.io/to-delete" // users can use this label to indicate a pod to delete

PodInstanceIDLabelKey = "collaset.kusionstack.io/instance-id" // used to attach Pod instance ID on Pod

PodReplacePairOriginName = "collaset.kusionstack.io/replace-pair-origin-name" // used to indicate the original Pod name for replacement.

PodReplacePairNewId = "collaset.kusionstack.io/replace-pair-new-id" // used to indicate the new created Pod instance ID for replacement.
)

const (
Expand Down
5 changes: 5 additions & 0 deletions pkg/controllers/collaset/podcontrol/pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Interface interface {
CreatePod(pod *corev1.Pod) (*corev1.Pod, error)
DeletePod(pod *corev1.Pod) error
UpdatePod(pod *corev1.Pod) error
PatchPod(pod *corev1.Pod, patch client.Patch) error
}

func NewRealPodControl(client client.Client, scheme *runtime.Scheme) Interface {
Expand Down Expand Up @@ -83,6 +84,10 @@ func (pc *RealPodControl) UpdatePod(pod *corev1.Pod) error {
return pc.client.Update(context.TODO(), pod)
}

func (pc *RealPodControl) PatchPod(pod *corev1.Pod, patch client.Patch) error {
return pc.client.Patch(context.TODO(), pod, patch)
}

func (pc *RealPodControl) getPodSetPods(pods []*corev1.Pod, selector *metav1.LabelSelector, owner client.Object) ([]*corev1.Pod, error) {
// Use ControllerRefManager to adopt/orphan as needed.
cm, err := refmanagerutil.NewRefManager(pc.client, selector, owner, pc.scheme)
Expand Down
30 changes: 26 additions & 4 deletions pkg/controllers/collaset/synccontrol/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,35 @@ import (
"kusionstack.io/operating/pkg/controllers/utils/podopslifecycle"
)

func getPodsToDelete(filteredPods []*collasetutils.PodWrapper, diff int) []*collasetutils.PodWrapper {
sort.Sort(ActivePodsForDeletion(filteredPods))
start := len(filteredPods) - diff
func getPodsToDelete(filteredPods []*collasetutils.PodWrapper, replaceMapping map[string]*collasetutils.PodWrapper, diff int) []*collasetutils.PodWrapper {
targetsPods := getTargetsDeletePods(filteredPods, replaceMapping)
sort.Sort(ActivePodsForDeletion(targetsPods))
start := len(targetsPods) - diff
if start < 0 {
start = 0
}
return filteredPods[start:]
needDeletePods := targetsPods[start:]
for _, pod := range needDeletePods {
if replacePairPod, exist := replaceMapping[pod.Name]; exist && replacePairPod != nil {
needDeletePods = append(needDeletePods, replacePairPod)
}
}

return needDeletePods
}

// when sort pods to choose delete, only sort pods without replacement or these replace origin pods
func getTargetsDeletePods(filteredPods []*collasetutils.PodWrapper, replaceMapping map[string]*collasetutils.PodWrapper) []*collasetutils.PodWrapper {
targetPods := make([]*collasetutils.PodWrapper, len(replaceMapping))
index := 0
for _, pod := range filteredPods {
if _, exist := replaceMapping[pod.Name]; exist {
targetPods[index] = pod
index++
}
}

return targetPods
}

type ActivePodsForDeletion []*collasetutils.PodWrapper
Expand Down
166 changes: 154 additions & 12 deletions pkg/controllers/collaset/synccontrol/sync_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package synccontrol
import (
"context"
"fmt"
"strings"
"time"

"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
Expand Down Expand Up @@ -178,8 +180,9 @@ func (r *RealSyncControl) Scale(

logger := r.logger.WithValues("collaset", commonutils.ObjectKeyString(cls))
var recordedRequeueAfter *time.Duration
replacePodMap := classifyPodReplacingMapping(podWrappers)

diff := int(realValue(cls.Spec.Replicas)) - len(podWrappers)
diff := int(realValue(cls.Spec.Replicas)) - len(replacePodMap)
scaling := false

if diff > 0 {
Expand Down Expand Up @@ -259,7 +262,7 @@ func (r *RealSyncControl) Scale(
return succCount > 0, recordedRequeueAfter, err
} else if diff < 0 {
// chose the pods to scale in
podsToScaleIn := getPodsToDelete(podWrappers, diff*-1)
podsToScaleIn := getPodsToDelete(podWrappers, replacePodMap, diff*-1)
// filter out Pods need to trigger PodOpsLifecycle
podCh := make(chan *collasetutils.PodWrapper, len(podsToScaleIn))
for i := range podsToScaleIn {
Expand Down Expand Up @@ -394,6 +397,47 @@ func (r *RealSyncControl) Scale(
return scaling, recordedRequeueAfter, nil
}

// classify the pair relationship for Pod replacement.
func classifyPodReplacingMapping(podWrappers []*collasetutils.PodWrapper) map[string]*collasetutils.PodWrapper {
var podNameMap = make(map[string]*collasetutils.PodWrapper)
var podIdMap = make(map[string]*collasetutils.PodWrapper)
for _, podWrapper := range podWrappers {
podNameMap[podWrapper.Name] = podWrapper
instanceId := podWrapper.Labels[appsv1alpha1.PodInstanceIDLabelKey]
podIdMap[instanceId] = podWrapper
}

var replacePodMapping = make(map[string]*collasetutils.PodWrapper)
for _, podWrapper := range podWrappers {
name := podWrapper.Name
if podWrapper.DeletionTimestamp != nil {
replacePodMapping[name] = nil
continue
}

if podWrapper.Labels != nil && podWrapper.Labels[appsv1alpha1.PodDeletionIndicationLabelKey] != "" {
dbug-dk marked this conversation as resolved.
Show resolved Hide resolved
replacePodMapping[name] = nil
continue
}

if replacePairNewIdStr, exist := podWrapper.Labels[appsv1alpha1.PodReplacePairNewId]; exist {
if pairNewPod, exist := podIdMap[replacePairNewIdStr]; exist {
replacePodMapping[name] = pairNewPod
continue
}
} else if replaceOriginStr, exist := podWrapper.Labels[appsv1alpha1.PodReplacePairOriginName]; exist {
if originPod, exist := podNameMap[replaceOriginStr]; exist {
if originPod.Labels[appsv1alpha1.PodReplacePairNewId] == podWrapper.Labels[appsv1alpha1.PodInstanceIDLabelKey] {
continue
}
}
}

replacePodMapping[name] = nil
}
return replacePodMapping
}

func extractAvailableContexts(diff int, ownedIDs map[int]*appsv1alpha1.ContextDetail, podInstanceIDSet map[int]struct{}) []*appsv1alpha1.ContextDetail {
availableContexts := make([]*appsv1alpha1.ContextDetail, diff)

Expand Down Expand Up @@ -431,9 +475,16 @@ func (r *RealSyncControl) Update(
// 3. prepare Pods to begin PodOpsLifecycle
podCh := make(chan *PodUpdateInfo, len(podToUpdate))
for i, podInfo := range podToUpdate {
// The pod is in a "replacing" state, always requires further update processing.
if podInfo.isInReplacing {
podCh <- podToUpdate[i]
dbug-dk marked this conversation as resolved.
Show resolved Hide resolved
continue
}

if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
continue
}

if podopslifecycle.IsDuringOps(collasetutils.UpdateOpsLifecycleAdapter, podInfo) {
continue
}
Expand Down Expand Up @@ -493,6 +544,11 @@ func (r *RealSyncControl) Update(
ownedIDs[podInfo.ID].Put(podcontext.PodDecorationRevisionKey, decorationStr)
}
}
if podInfo.isInReplacing {
podCh <- podToUpdate[i]
continue
}

if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
continue
}
Expand Down Expand Up @@ -520,6 +576,10 @@ func (r *RealSyncControl) Update(

// 6. update Pod
updater := newPodUpdater(ctx, r.client, cls)
// When PodUpdatePolicy is ReplaceUpdate, during the AnalyseAndGetUpdatedPod process, first record the original pod and the corresponding new pod to be created.
// Then, proceed to create them in batches subsequently.
var needReplaceOriginPods []*PodUpdateInfo
var replacePairNewCreatePods []*corev1.Pod
succCount, err = controllerutils.SlowStartBatch(len(podCh), controllerutils.SlowStartInitialBatchSize, false, func(_ int, _ error) error {
podInfo := <-podCh
// analyse Pod to get update information
Expand Down Expand Up @@ -552,6 +612,11 @@ func (r *RealSyncControl) Update(
return err
}
}
} else if cls.Spec.UpdateStrategy.PodUpdatePolicy == appsv1alpha1.CollaSetReplaceUpdatePodUpdateStrategyType {
if updatedPod != nil {
needReplaceOriginPods = append(needReplaceOriginPods, podInfo)
replacePairNewCreatePods = append(replacePairNewCreatePods, updatedPod)
}
} else {
// 6.2 if pod has changes not in-place supported, recreate it
if err = r.podControl.DeletePod(podInfo.Pod); err != nil {
Expand All @@ -573,6 +638,56 @@ func (r *RealSyncControl) Update(
return nil
})

// batch create replace pair new pods
if cls.Spec.UpdateStrategy.PodUpdatePolicy == appsv1alpha1.CollaSetReplaceUpdatePodUpdateStrategyType && len(needReplaceOriginPods) > 0 {
if err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
ownedIDs, err = podcontext.AllocateID(r.client, cls, resources.UpdatedRevision.Name, len(podUpdateInfos)+len(needReplaceOriginPods))
return err
}); err != nil {
return false, nil, fmt.Errorf("fail to allocate %d IDs using context when create Pods for ReplaceUpdate: %s", len(needReplaceOriginPods), err)
}
// collect instance ID in used from owned Pods
podInstanceIDSet := collasetutils.CollectPodInstanceID(podWrappers)
// find IDs and their contexts which have not been used by owned Pods
availableContext := extractAvailableContexts(len(needReplaceOriginPods), ownedIDs, podInstanceIDSet)
succCount, err = controllerutils.SlowStartBatch(len(needReplaceOriginPods), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error {
originPodInfo := needReplaceOriginPods[i]
needCreatePod := replacePairNewCreatePods[i]
// add instance id and replace pair label
instanceId := fmt.Sprintf("%d", availableContext[i].ID)
needCreatePod.Labels[appsv1alpha1.PodInstanceIDLabelKey] = instanceId
needCreatePod.Labels[appsv1alpha1.PodReplacePairOriginName] = originPodInfo.GetName()
newPod := needCreatePod.DeepCopy()
if newCreatedPod, err := r.podControl.CreatePod(newPod); err == nil {
r.recorder.Eventf(originPodInfo.Pod,
corev1.EventTypeNormal,
"CreatePairPod",
"succeed to create replace pair Pod %s/%s to from revision %s to revision %s by replace update",
originPodInfo.Namespace,
originPodInfo.Name,
originPodInfo.CurrentRevision.Name, resources.UpdatedRevision.Name)
if err := collasetutils.ActiveExpectations.ExpectCreate(cls, expectations.Pod, newCreatedPod.Name); err != nil {
return err
}

patch := client.RawPatch(types.StrategicMergePatchType, []byte(fmt.Sprintf(`{"metadata":{"labels":{"%s":"%s"}}}`, appsv1alpha1.PodReplacePairNewId, instanceId)))
if err = r.podControl.PatchPod(originPodInfo.Pod, patch); err != nil {
return fmt.Errorf("fail to update origin pod %s/%s pair label %s when updating by replaceUpdate: %s", originPodInfo.Namespace, originPodInfo.Name, newCreatedPod.Name, err)
}
} else {
r.recorder.Eventf(originPodInfo.Pod,
corev1.EventTypeNormal,
"UpdatePod",
"failed to create replace pair Pod %s/%s to from revision %s to revision %s by replace update",
originPodInfo.Namespace,
originPodInfo.Name,
originPodInfo.CurrentRevision.Name, resources.UpdatedRevision.Name)
return err
}
return nil
})
}

updating = updating || succCount > 0
if err != nil {
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, err, "UpdateFailed", err.Error())
Expand All @@ -596,17 +711,44 @@ func (r *RealSyncControl) Update(
}

if finished {
logger.V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod))
if updated, err := podopslifecycle.Finish(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil {
return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
return err
if podInfo.isInReplacing {
replacePairNewPodInfo := podInfo.replacePairNewPodInfo
if err = r.podControl.DeletePod(podInfo.Pod); err != nil {
return fmt.Errorf("failed to delete origin pod %s/%s when replace finish %s/%s: %s", podInfo.Namespace, podInfo.Name, replacePairNewPodInfo.Namespace, replacePairNewPodInfo.Name, err)
}

if _, exist := replacePairNewPodInfo.Labels[appsv1alpha1.PodReplacePairOriginName]; exist {
deletePatchStr := []byte(fmt.Sprintf(`[{"op": "remove", "path": "/metadata/labels/%s"}]`, strings.ReplaceAll(appsv1alpha1.PodReplacePairOriginName, "/", "~1")))
if err = r.podControl.PatchPod(replacePairNewPodInfo.Pod, client.RawPatch(types.JSONPatchType, deletePatchStr)); err != nil {
return fmt.Errorf("failed to remove new pod pair label when replace finish %s/%s: %s", replacePairNewPodInfo.Namespace, replacePairNewPodInfo.Name, err)
}
}

logger.V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod))
if updated, err := podopslifecycle.Finish(r.client, collasetutils.UpdateOpsLifecycleAdapter, replacePairNewPodInfo.Pod); err != nil {
dbug-dk marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", replacePairNewPodInfo.Namespace, replacePairNewPodInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, replacePairNewPodInfo.Name, replacePairNewPodInfo.ResourceVersion); err != nil {
return err
}
r.recorder.Eventf(replacePairNewPodInfo.Pod,
corev1.EventTypeNormal,
"UpdateReady", "pod %s/%s update finished", replacePairNewPodInfo.Namespace, replacePairNewPodInfo.Name)
}
} else {
logger.V(1).Info("try to finish update PodOpsLifecycle for Pod", "pod", commonutils.ObjectKeyString(podInfo.Pod))
if updated, err := podopslifecycle.Finish(r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod); err != nil {
return fmt.Errorf("failed to finish PodOpsLifecycle for updating Pod %s/%s: %s", podInfo.Namespace, podInfo.Name, err)
} else if updated {
// add an expectation for this pod update, before next reconciling
if err := collasetutils.ActiveExpectations.ExpectUpdate(cls, expectations.Pod, podInfo.Name, podInfo.ResourceVersion); err != nil {
return err
}
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name)
}
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdateReady", "pod %s/%s update finished", podInfo.Namespace, podInfo.Name)
}
} else {
r.recorder.Eventf(podInfo.Pod,
Expand Down
Loading
Loading