Skip to content

Commit

Permalink
Refactor inplaceOnlyPodUpdater plugin (#250)
Browse files Browse the repository at this point in the history
* support register inplaceOnlyPodUpdater

* remove shit codes

* rename parameters of GenericPodUpdater

* refactor recreate pod

* (1) refactor Setup; (2) remove ctx from filed

* refactor GetPodUpdateFinishStatus
  • Loading branch information
ColdsteelRail committed Aug 19, 2024
1 parent d3f1f41 commit 9ee5e51
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 116 deletions.
53 changes: 38 additions & 15 deletions pkg/controllers/collaset/synccontrol/sync_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"kusionstack.io/operating/pkg/controllers/collaset/podcontrol"
"kusionstack.io/operating/pkg/controllers/collaset/pvccontrol"
collasetutils "kusionstack.io/operating/pkg/controllers/collaset/utils"
ojutils "kusionstack.io/operating/pkg/controllers/operationjob/utils"
controllerutils "kusionstack.io/operating/pkg/controllers/utils"
"kusionstack.io/operating/pkg/controllers/utils/expectations"
utilspoddecoration "kusionstack.io/operating/pkg/controllers/utils/poddecoration"
Expand Down Expand Up @@ -627,19 +628,19 @@ func (r *RealSyncControl) Update(

// 2. decide Pod update candidates
candidates := decidePodToUpdate(cls, podUpdateInfos)
activePodToUpdate := filterOutPlaceHolderUpdateInfos(candidates)
podCh := make(chan *PodUpdateInfo, len(activePodToUpdate))
updater := newPodUpdater(ctx, r.client, cls, r.podControl, r.recorder)
podToUpdate := filterOutPlaceHolderUpdateInfos(candidates)
podCh := make(chan *PodUpdateInfo, len(podToUpdate))
updater := newPodUpdater(r.client, cls, r.podControl, r.recorder)
updating := false

// 3. filter already updated revision,
for i, podInfo := range activePodToUpdate {
for i, podInfo := range podToUpdate {
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged && !podInfo.PvcTmpHashChanged {
continue
}

// 3.1 fulfillPodUpdateInfo to all not updatedRevision pod
if err = updater.FulfillPodUpdatedInfo(resources.UpdatedRevision, podInfo); err != nil {
if err = updater.FulfillPodUpdatedInfo(ctx, resources.UpdatedRevision, podInfo); err != nil {
logger.Error(err, fmt.Sprintf("fail to analyse pod %s/%s in-place update support", podInfo.Namespace, podInfo.Name))
continue
}
Expand All @@ -648,17 +649,17 @@ func (r *RealSyncControl) Update(
continue
}

podCh <- activePodToUpdate[i]
podCh <- podToUpdate[i]
}

// 4. begin pod update lifecycle
updating, err = updater.BeginUpdatePod(resources, podCh)
updating, err = updater.BeginUpdatePod(ctx, resources, podCh)
if err != nil {
return updating, recordedRequeueAfter, err
}

// 5. (1) filter out pods not allow to ops now, such as OperationDelaySeconds strategy; (2) update PlaceHolder Pods resourceContext revision
recordedRequeueAfter, err = updater.FilterAllowOpsPods(candidates, ownedIDs, resources, podCh)
recordedRequeueAfter, err = updater.FilterAllowOpsPods(ctx, candidates, ownedIDs, resources, podCh)
if err != nil {
collasetutils.AddOrUpdateCondition(resources.NewStatus,
appsv1alpha1.CollaSetScale, err, "UpdateFailed",
Expand Down Expand Up @@ -687,7 +688,7 @@ func (r *RealSyncControl) Update(
return err
}
} else {
if err = updater.UpgradePod(podInfo); err != nil {
if err = updater.UpgradePod(ctx, podInfo); err != nil {
return err
}
}
Expand All @@ -703,25 +704,47 @@ func (r *RealSyncControl) Update(
collasetutils.AddOrUpdateCondition(resources.NewStatus, appsv1alpha1.CollaSetUpdate, nil, "Updated", "")
}

podToUpdateSet := sets.String{}
for i := range podToUpdate {
podToUpdateSet.Insert(podToUpdate[i].Name)
}
// 7. try to finish all Pods'PodOpsLifecycle if its update is finished.
succCount, err = controllerutils.SlowStartBatch(len(activePodToUpdate), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error {
podInfo := activePodToUpdate[i]
succCount, err = controllerutils.SlowStartBatch(len(podUpdateInfos), controllerutils.SlowStartInitialBatchSize, false, func(i int, _ error) error {
podInfo := podUpdateInfos[i]

if !podInfo.isDuringOps || podInfo.PlaceHolder {
return nil
}

if !podInfo.isDuringOps {
if !podInfo.isAllowOps {
// pod is not included by podToUpdate, not allowOps, but isDuringOps, just cancel
if !podToUpdateSet.Has(podInfo.Name) {
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdatePodCanceled",
"pod %s/%s update is canceled due to not started and not included partition %s",
podInfo.Namespace, podInfo.Name, podInfo.CurrentRevision.Name)
return ojutils.CancelOpsLifecycle(ctx, r.client, collasetutils.UpdateOpsLifecycleAdapter, podInfo.Pod)
}
// not allowedOps, skip GetPodUpdateFinishStatus
return nil
}

// check Pod is during updating, and it is finished or not
finished, msg, err := updater.GetPodUpdateFinishStatus(podInfo)
finished, msg, err := updater.GetPodUpdateFinishStatus(ctx, podInfo)
if err != nil {
return fmt.Errorf("failed to get pod %s/%s update finished: %s", podInfo.Namespace, podInfo.Name, err)
}

if finished {
err := updater.FinishUpdatePod(podInfo)
if err != nil {
if err := updater.FinishUpdatePod(ctx, podInfo); err != nil {
return err
}
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
"UpdatePodFinished",
"pod %s/%s is finished for upgrade to revision %s",
podInfo.Namespace, podInfo.Name, podInfo.CurrentRevision.Name)
} else {
r.recorder.Eventf(podInfo.Pod,
corev1.EventTypeNormal,
Expand Down
Loading

0 comments on commit 9ee5e51

Please sign in to comment.