Skip to content

Commit

Permalink
issue fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Eikykun committed Dec 21, 2023
1 parent 59718ba commit 77da55b
Show file tree
Hide file tree
Showing 24 changed files with 839 additions and 356 deletions.
4 changes: 2 additions & 2 deletions apis/apps/v1alpha1/poddecoration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type PodDecorationPodTemplate struct {
Metadata []*PodDecorationPodTemplateMeta `json:"metadata,omitempty"`

// InitContainers is the init containers needs to be attached to a pod.
// If there is a container with the same name, PodDecoration will override it entirely.
// If there is a container with the same name, PodDecoration will retain old Container.
InitContainers []*corev1.Container `json:"initContainers,omitempty"`

// Containers is the containers need to be attached to a pod.
Expand All @@ -63,6 +63,7 @@ type PodDecorationPodTemplate struct {
PrimaryContainers []*PrimaryContainerPatch `json:"primaryContainers,omitempty"`

// Volumes will be attached to a pod spec volume.
// If there is a volume with the same name, new volume will replace it.
Volumes []corev1.Volume `json:"volumes,omitempty"`

// If specified, the pod's scheduling constraints
Expand Down Expand Up @@ -152,7 +153,6 @@ type PodDecorationInjectStrategy struct {
// Group provides the name of the group this PodDecoration belongs to.
// Only one PodDecoration is active when multiple PodDecorations share the same group value.
Group string `json:"group,omitempty"`

// Weight indicates the priority to apply for a group of PodDecorations with same group value.
// The greater one has higher priority to apply.
// Default value is 0.
Expand Down
9 changes: 9 additions & 0 deletions apis/apps/v1alpha1/resourcecontext_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,15 @@ func (cd *ContextDetail) Put(key, value string) {
cd.Data[key] = value
}

// Get is used to get the specified key from Data.
func (cd *ContextDetail) Get(key string) (string, bool) {
if cd.Data == nil {
return "", false
}
val, ok := cd.Data[key]
return val, ok
}

// Remove is used to remove the specified key from Data .
func (cd *ContextDetail) Remove(key string) {
if cd.Data == nil {
Expand Down
6 changes: 4 additions & 2 deletions config/crd/bases/apps.kusionstack.io_poddecorations.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2336,7 +2336,7 @@ spec:
initContainers:
description: InitContainers is the init containers needs to be
attached to a pod. If there is a container with the same name,
PodDecoration will override it entirely.
PodDecoration will retain old Container.
items:
description: A single application container that you want to
run within a pod.
Expand Down Expand Up @@ -3782,7 +3782,9 @@ spec:
type: object
type: array
volumes:
description: Volumes will be attached to a pod spec volume.
description: Volumes will be attached to a pod spec volume. If
there is a volume with the same name, new volume will replace
it.
items:
description: Volume represents a named volume in a pod that
may be accessed by any container in the pod.
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/collaset/collaset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ func (r *CollaSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
UpdatedRevision: updatedRevision,
NewStatus: newStatus,
}
resources.PodDecorations, resources.OldRevisionDecorations, err = utils.GetEffectiveDecorationsByCollaSet(ctx, r.Client, instance)
resources.PDGetter, err = utils.NewPodDecorationGetter(ctx, r.Client, instance.Namespace)
if err != nil {
return ctrl.Result{}, fmt.Errorf("fail to get effective pod decorations by CollaSet %s: %s", key, err)
}

Check warning on line 178 in pkg/controllers/collaset/collaset_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/collaset/collaset_controller.go#L177-L178

Added lines #L177 - L178 were not covered by tests
for _, pd := range resources.PodDecorations {
for _, pd := range resources.PDGetter.GetLatestDecorations() {
if pd.Status.ObservedGeneration != pd.Generation {
logger.Info("wait for PodDecoration ObservedGeneration", "CollaSet", key, "PodDecoration", commonutils.ObjectKeyString(pd))
return ctrl.Result{}, nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/collaset/podcontext/podcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import (
)

const (
OwnerContextKey = "Owner"
RevisionContextDataKey = "Revision"
OwnerContextKey = "Owner"
RevisionContextDataKey = "Revision"
PodDecorationRevisionKey = "PodDecorationRevisions"
)

func AllocateID(c client.Client, instance *appsv1alpha1.CollaSet, defaultRevision string, replicas int) (map[int]*appsv1alpha1.ContextDetail, error) {
Expand Down
60 changes: 44 additions & 16 deletions pkg/controllers/collaset/synccontrol/sync_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,23 +203,43 @@ func (r *RealSyncControl) Scale(

// scale out new Pods with updatedRevision
// TODO use cache
pod, err := collasetutils.NewPodFrom(cls, metav1.NewControllerRef(cls, appsv1alpha1.GroupVersion.WithKind("CollaSet")), revision)
pod, err := collasetutils.NewPodFrom(
cls,
metav1.NewControllerRef(cls, appsv1alpha1.GroupVersion.WithKind("CollaSet")),
revision,
func(in *corev1.Pod) (localErr error) {
in.Labels[appsv1alpha1.PodInstanceIDLabelKey] = fmt.Sprintf("%d", availableIDContext.ID)
revisionsInfo, ok := availableIDContext.Get(podcontext.PodDecorationRevisionKey)
var pds map[string]*appsv1alpha1.PodDecoration
if !ok {
// get default PodDecorations if no revision in context
pds, localErr = resources.PDGetter.GetLatestDecorationsByTargetLabel(ctx, in.Labels)
if localErr != nil {
return localErr
}
} else {
// upgrade by recreate pod case
infos, marshallErr := utilspoddecoration.UnmarshallFromString(revisionsInfo)
if marshallErr != nil {
return marshallErr
}
var revisions []string
for _, info := range infos {
revisions = append(revisions, info.Revision)
}
pds, localErr = resources.PDGetter.GetDecorationByRevisions(ctx, revisions...)
if localErr != nil {
return localErr
}
}
logger.Info("get pod effective decorations before create it", "EffectivePodDecorations", utilspoddecoration.BuildInfo(pds))
return utilspoddecoration.PatchListOfDecorations(in, pds)
},
)
if err != nil {
return fmt.Errorf("fail to new Pod from revision %s: %s", revision.Name, err)
}
newPod := pod.DeepCopy()
// allocate new Pod a instance ID
newPod.Labels[appsv1alpha1.PodInstanceIDLabelKey] = fmt.Sprintf("%d", availableIDContext.ID)

// get PodDecorations which selected newPod
podDecorations := utilspoddecoration.GetPodEffectiveDecorations(newPod, resources.PodDecorations, resources.OldRevisionDecorations)
// patch pod with PodDecorations
if patchErr := utilspoddecoration.PatchListOfDecorations(newPod, podDecorations); patchErr != nil {
msg := fmt.Sprintf("fail to patch pod %s by PodDecoration, %v", commonutils.ObjectKeyString(newPod), patchErr)
logger.Error(patchErr, msg)
r.recorder.Eventf(cls, corev1.EventTypeWarning, "PodDecorationPatch", msg)
}

logger.V(1).Info("try to create Pod with revision of collaSet", "revision", revision.Name)
if pod, err = r.podControl.CreatePod(newPod); err != nil {
return err
Expand Down Expand Up @@ -401,8 +421,10 @@ func (r *RealSyncControl) Update(
logger := r.logger.WithValues("collaset", commonutils.ObjectKeyString(cls))
var recordedRequeueAfter *time.Duration
// 1. scan and analysis pods update info
podUpdateInfos := attachPodUpdateInfo(podWrappers, resources)

podUpdateInfos, err := attachPodUpdateInfo(ctx, podWrappers, resources)
if err != nil {
return false, nil, fmt.Errorf("fail to attach pod update info, %v", err)
}
// 2. decide Pod update candidates
podToUpdate := decidePodToUpdate(cls, podUpdateInfos)

Expand Down Expand Up @@ -464,7 +486,13 @@ func (r *RealSyncControl) Update(
needUpdateContext = true
ownedIDs[podInfo.ID].Put(podcontext.RevisionContextDataKey, resources.UpdatedRevision.Name)
}

if podInfo.PodDecorationChanged {
decorationStr := utilspoddecoration.GetDecorationInfoString(podInfo.UpdatedPodDecorations)
if val, ok := ownedIDs[podInfo.ID].Get(podcontext.PodDecorationRevisionKey); !ok || val != decorationStr {
needUpdateContext = true
ownedIDs[podInfo.ID].Put(podcontext.PodDecorationRevisionKey, decorationStr)
}
}
if podInfo.IsUpdatedRevision && !podInfo.PodDecorationChanged {
continue
}
Expand Down
78 changes: 38 additions & 40 deletions pkg/controllers/collaset/synccontrol/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "kusionstack.io/operating/apis/apps/v1alpha1"
Expand All @@ -47,24 +48,45 @@ type PodUpdateInfo struct {
// indicates effected PodDecorations changed
PodDecorationChanged bool

//OldPodDecorations
CurrentPodDecorations map[string]*appsv1alpha1.PodDecoration
UpdatedPodDecorations map[string]*appsv1alpha1.PodDecoration

// indicates the PodOpsLifecycle is started.
isDuringOps bool
}

func attachPodUpdateInfo(pods []*collasetutils.PodWrapper, resource *collasetutils.RelatedResources) []*PodUpdateInfo {
func attachPodUpdateInfo(ctx context.Context, pods []*collasetutils.PodWrapper, resource *collasetutils.RelatedResources) ([]*PodUpdateInfo, error) {
podUpdateInfoList := make([]*PodUpdateInfo, len(pods))

for i, pod := range pods {
updateInfo := &PodUpdateInfo{
PodWrapper: pod,
}
currentPDs, err := resource.PDGetter.GetCurrentDecorationsOnPod(ctx, pod.Pod)
if err != nil {
return nil, err
}
updatedPDs, err := resource.PDGetter.GetUpdatedDecorationsByOldPod(ctx, pod.Pod)
if err != nil {
return nil, err
}

decorations := utilspoddecoration.GetPodEffectiveDecorations(pod.Pod, resource.PodDecorations, resource.OldRevisionDecorations)
updateInfo.UpdatedPodDecorations = decorations
updateInfo.PodDecorationChanged = utilspoddecoration.ShouldUpdateDecorationInfo(pod.Pod, decorations)
if len(currentPDs) != len(updatedPDs) {
updateInfo.PodDecorationChanged = true
} else {
revisionSets := sets.NewString()
for rev := range currentPDs {
revisionSets.Insert(rev)
}
for rev := range updatedPDs {
if !revisionSets.Has(rev) {
updateInfo.PodDecorationChanged = true
break
}
}
}
updateInfo.CurrentPodDecorations = currentPDs
updateInfo.UpdatedPodDecorations = updatedPDs

// decide this pod current revision, or nil if not indicated
if pod.Labels != nil {
Expand All @@ -90,7 +112,7 @@ func attachPodUpdateInfo(pods []*collasetutils.PodWrapper, resource *collasetuti
podUpdateInfoList[i] = updateInfo
}

return podUpdateInfoList
return podUpdateInfoList, nil
}

func decidePodToUpdate(cls *appsv1alpha1.CollaSet, podInfos []*PodUpdateInfo) []*PodUpdateInfo {
Expand All @@ -105,6 +127,7 @@ func decidePodToUpdateByLabel(_ *appsv1alpha1.CollaSet, podInfos []*PodUpdateInf
for i := range podInfos {
if _, exist := podInfos[i].Labels[appsv1alpha1.CollaSetUpdateIndicateLabelKey]; exist {
podToUpdate = append(podToUpdate, podInfos[i])
continue
}
if podInfos[i].PodDecorationChanged {
podToUpdate = append(podToUpdate, podInfos[i])
Expand Down Expand Up @@ -197,52 +220,31 @@ func (u *inPlaceIfPossibleUpdater) AnalyseAndGetUpdatedPod(
updatedRevision *appsv1.ControllerRevision,
podUpdateInfo *PodUpdateInfo) (
inPlaceUpdateSupport bool, onlyMetadataChanged bool, updatedPod *corev1.Pod, err error) {

// 1. build pod from current and updated revision
ownerRef := metav1.NewControllerRef(u.collaSet, appsv1alpha1.GroupVersion.WithKind("CollaSet"))
// TODO: use cache
currentPod, err := collasetutils.NewPodFrom(u.collaSet, ownerRef, podUpdateInfo.CurrentRevision)
currentPod, err := collasetutils.NewPodFrom(u.collaSet, ownerRef, podUpdateInfo.CurrentRevision, func(in *corev1.Pod) error {
return utilspoddecoration.PatchListOfDecorations(in, podUpdateInfo.CurrentPodDecorations)
})
if err != nil {
err = fmt.Errorf("fail to build Pod from current revision %s: %v", podUpdateInfo.CurrentRevision.Name, err)
return
}

// TODO: use cache
updatedPod, err = collasetutils.NewPodFrom(u.collaSet, ownerRef, updatedRevision)
updatedPod, err = collasetutils.NewPodFrom(u.collaSet, ownerRef, updatedRevision, func(in *corev1.Pod) error {
return utilspoddecoration.PatchListOfDecorations(in, podUpdateInfo.UpdatedPodDecorations)
})
if err != nil {
err = fmt.Errorf("fail to build Pod from updated revision %s: %v", updatedRevision.Name, err)
return
}

// 2.1 patch PodDecorations on current pod
if podUpdateInfo.PodDecorationChanged {
var notFound bool
var currentPodDecorations map[string]*appsv1alpha1.PodDecoration
notFound, currentPodDecorations, err = utilspoddecoration.GetPodDecorationsByPodAnno(u.ctx, u.Client, podUpdateInfo.Pod)

if err != nil {
return false, false, nil, err
}
// if NotFound PD, recreate pod.
if notFound {
return false, false, nil, err
}
if err = utilspoddecoration.PatchListOfDecorations(currentPod, currentPodDecorations); err != nil {
return false, false, nil, err
}
} else {
if err = utilspoddecoration.PatchListOfDecorations(currentPod, podUpdateInfo.UpdatedPodDecorations); err != nil {
return false, false, nil, err
}
}
// 2.1 patch PodDecorations on updated pod
if err = utilspoddecoration.PatchListOfDecorations(updatedPod, podUpdateInfo.UpdatedPodDecorations); err != nil {
return false, false, nil, err
}

// 3. compare current and updated pods. Only pod image and metadata are supported to update in-place
// 2. compare current and updated pods. Only pod image and metadata are supported to update in-place
// TODO: use cache
inPlaceUpdateSupport, onlyMetadataChanged = u.diffPod(currentPod, updatedPod)
// 4. if pod has changes more than metadata and image
// 3. if pod has changes more than metadata and image
if !inPlaceUpdateSupport {
return false, onlyMetadataChanged, nil, nil
}
Expand Down Expand Up @@ -291,10 +293,6 @@ func (u *inPlaceIfPossibleUpdater) AnalyseAndGetUpdatedPod(
return
}

func (u *inPlaceIfPossibleUpdater) patchPodDecorations() {

}

func (u *inPlaceIfPossibleUpdater) diffPod(currentPod, updatedPod *corev1.Pod) (inPlaceSetUpdateSupport bool, onlyMetadataChanged bool) {
if len(currentPod.Spec.Containers) != len(updatedPod.Spec.Containers) {
return false, false
Expand Down
11 changes: 8 additions & 3 deletions pkg/controllers/collaset/utils/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func GetPodInstanceID(pod *corev1.Pod) (int, error) {
return int(id), nil
}

func NewPodFrom(owner metav1.Object, ownerRef *metav1.OwnerReference, revision *appsv1.ControllerRevision) (*corev1.Pod, error) {
func NewPodFrom(owner metav1.Object, ownerRef *metav1.OwnerReference, revision *appsv1.ControllerRevision, updateFn ...func(*corev1.Pod) error) (*corev1.Pod, error) {

Check warning on line 72 in pkg/controllers/collaset/utils/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/collaset/utils/pod.go#L72

Added line #L72 was not covered by tests
pod, err := GetPodFromRevision(revision)
if err != nil {
return pod, err
Expand All @@ -82,12 +82,17 @@ func NewPodFrom(owner metav1.Object, ownerRef *metav1.OwnerReference, revision *
pod.Labels[appsv1.ControllerRevisionHashLabelKey] = revision.Name

utils.ControllByKusionStack(pod)
for _, fn := range updateFn {
if err = fn(pod); err != nil {
return pod, err
}

Check warning on line 88 in pkg/controllers/collaset/utils/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/collaset/utils/pod.go#L85-L88

Added lines #L85 - L88 were not covered by tests
}
return pod, nil
}

func GetPodRevisionPatch(revision *appsv1.ControllerRevision) ([]byte, error) {
var raw map[string]interface{}
if err := json.Unmarshal([]byte(revision.Data.Raw), &raw); err != nil {
if err := json.Unmarshal(revision.Data.Raw, &raw); err != nil {

Check warning on line 95 in pkg/controllers/collaset/utils/pod.go

View check run for this annotation

Codecov / codecov/patch

pkg/controllers/collaset/utils/pod.go#L95

Added line #L95 was not covered by tests
return nil, err
}

Expand Down Expand Up @@ -115,7 +120,7 @@ func ApplyPatchFromRevision(pod *corev1.Pod, revision *appsv1.ControllerRevision
return clone, nil
}

// PatchToPod Use three way merge to get a updated pod.
// PatchToPod Use three-way merge to get a updated pod.
func PatchToPod(currentRevisionPod, updateRevisionPod, currentPod *corev1.Pod) (*corev1.Pod, error) {
currentRevisionPodBytes, err := json.Marshal(currentRevisionPod)
if err != nil {
Expand Down
Loading

0 comments on commit 77da55b

Please sign in to comment.