Skip to content

Commit

Permalink
refactor: define vertically updated interfaces for different implemen…
Browse files Browse the repository at this point in the history
…tations

Signed-off-by: LavenderQAQ <lavenderqaq.cs@gmail.com>
  • Loading branch information
LavenderQAQ committed Aug 19, 2023
1 parent bef6a4e commit ff3e27e
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 15 deletions.
3 changes: 2 additions & 1 deletion apis/apps/pub/inplace_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ type InPlaceUpdateContainerBatch struct {
// InPlaceUpdateContainerStatus records the statuses of the container that are mainly used
// to determine whether the InPlaceUpdate is completed.
type InPlaceUpdateContainerStatus struct {
ImageID string `json:"imageID,omitempty"`
ImageID string `json:"imageID,omitempty"`
Resource v1.ResourceRequirements `json:"resource,omitempty"`
}

// InPlaceUpdateStrategy defines the strategies for in-place update.
Expand Down
3 changes: 2 additions & 1 deletion apis/apps/pub/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 37 additions & 13 deletions pkg/util/inplaceupdate/inplace_update_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func SetOptionsDefaults(opts *UpdateOptions) *UpdateOptions {
}

if utilfeature.DefaultFeatureGate.Enabled(features.InPlaceWorkloadVerticalScaling) {
registerVerticalUpdate()

if opts.CalculateSpec == nil {
opts.CalculateSpec = defaultCalculateInPlaceUpdateSpecWithVerticalUpdate
}
Expand All @@ -60,7 +62,6 @@ func SetOptionsDefaults(opts *UpdateOptions) *UpdateOptions {
if opts.CheckContainersUpdateCompleted == nil {
opts.CheckContainersUpdateCompleted = defaultCheckContainersInPlaceUpdateCompletedWithVerticalUpdate
}

} else {
if opts.CalculateSpec == nil {
opts.CalculateSpec = defaultCalculateInPlaceUpdateSpec
Expand All @@ -77,7 +78,6 @@ func SetOptionsDefaults(opts *UpdateOptions) *UpdateOptions {
if opts.CheckContainersUpdateCompleted == nil {
opts.CheckContainersUpdateCompleted = defaultCheckContainersInPlaceUpdateCompleted
}

}

return opts
Expand Down Expand Up @@ -498,6 +498,8 @@ func checkAllContainersHashConsistent(pod *v1.Pod, runtimeContainerMetaSet *apps
func defaultPatchUpdateSpecToPodWithVerticalUpdate(pod *v1.Pod, spec *UpdateSpec, state *appspub.InPlaceUpdateState) (*v1.Pod, error) {
klog.V(5).Infof("Begin to in-place update pod %s/%s with update spec %v, state %v", pod.Namespace, pod.Name, util.DumpJSON(spec), util.DumpJSON(state))

registerVerticalUpdate()

state.NextContainerImages = make(map[string]string)
state.NextContainerRefMetadata = make(map[string]metav1.ObjectMeta)
state.NextContainerResources = make(map[string]v1.ResourceRequirements)
Expand Down Expand Up @@ -566,29 +568,39 @@ func defaultPatchUpdateSpecToPodWithVerticalUpdate(pod *v1.Pod, spec *UpdateSpec
containersImageChanged.Insert(c.Name)
}
if resourceExists {
for key, quantity := range newResource.Limits {
c.Resources.Limits[key] = quantity
}
for key, quantity := range newResource.Requests {
c.Resources.Requests[key] = quantity
}
verticalUpdateOperator.UpdateContainerResource(c, &newResource)
containersResourceChanged.Insert(c.Name)
}
} else {
state.NextContainerImages[c.Name] = newImage
state.NextContainerResources[c.Name] = newResource
}
}

// This provides a hook for vertical updates,
// so that internal enterprise implementations can update pod resources here at once
verticalUpdateOperator.UpdatePodResource(pod)

for _, c := range pod.Status.ContainerStatuses {
if containersImageChanged.Has(c.Name) {
if state.LastContainerStatuses == nil {
state.LastContainerStatuses = map[string]appspub.InPlaceUpdateContainerStatus{}
}
state.LastContainerStatuses[c.Name] = appspub.InPlaceUpdateContainerStatus{ImageID: c.ImageID}
if cs, ok := state.LastContainerStatuses[c.Name]; !ok {
state.LastContainerStatuses[c.Name] = appspub.InPlaceUpdateContainerStatus{ImageID: c.ImageID}
} else {
cs.ImageID = c.ImageID
}
}
if containersResourceChanged.Has(c.Name) {
verticalUpdateOperator.SyncContainerResource(&c, state)
}
// TODO(LavenderQAQ): The status of resource needs to be printed
}

// This provides a hook for vertical updates,
// so that internal enterprise implementations can sync pod resources here at once
verticalUpdateOperator.SyncPodResource(pod, state)

// update annotations and labels for the containers to update
for cName, objMeta := range spec.ContainerRefMetadata {
if containersToUpdate.Has(cName) {
Expand Down Expand Up @@ -780,6 +792,8 @@ func defaultCalculateInPlaceUpdateSpecWithVerticalUpdate(oldRevision, newRevisio
// If the imageID in containerStatuses has not been changed, we assume that kubelet has not updated
// containers in Pod.
func DefaultCheckInPlaceUpdateCompletedWithVerticalUpdate(pod *v1.Pod) error {
registerVerticalUpdate()

if _, isInGraceState := appspub.GetInPlaceUpdateGrace(pod); isInGraceState {
return fmt.Errorf("still in grace period of in-place update")
}
Expand All @@ -794,10 +808,16 @@ func DefaultCheckInPlaceUpdateCompletedWithVerticalUpdate(pod *v1.Pod) error {
return fmt.Errorf("existing containers to in-place update in next batches")
}

if ok := verticalUpdateOperator.IsPodUpdateCompleted(pod); !ok {
return fmt.Errorf("waiting for pod vertical update")
}

return defaultCheckContainersInPlaceUpdateCompletedWithVerticalUpdate(pod, &inPlaceUpdateState)
}

func defaultCheckContainersInPlaceUpdateCompletedWithVerticalUpdate(pod *v1.Pod, inPlaceUpdateState *appspub.InPlaceUpdateState) error {
registerVerticalUpdate()

runtimeContainerMetaSet, err := appspub.GetRuntimeContainerMetaSet(pod)
if err != nil {
return err
Expand All @@ -822,11 +842,11 @@ func defaultCheckContainersInPlaceUpdateCompletedWithVerticalUpdate(pod *v1.Pod,
}

containerImages := make(map[string]string, len(pod.Spec.Containers))
containerResources := make(map[string]v1.ResourceRequirements, len(pod.Spec.Containers))
containers := make(map[string]*v1.Container, len(pod.Spec.Containers))
for i := range pod.Spec.Containers {
c := &pod.Spec.Containers[i]
containerImages[c.Name] = c.Image
containerResources[c.Name] = c.Resources
containers[c.Name] = c
if len(strings.Split(c.Image, ":")) <= 1 {
containerImages[c.Name] = fmt.Sprintf("%s:latest", c.Image)
}
Expand All @@ -840,7 +860,11 @@ func defaultCheckContainersInPlaceUpdateCompletedWithVerticalUpdate(pod *v1.Pod,
return fmt.Errorf("container %s imageID not changed", cs.Name)
}
}
// TODO(LavenderQAQ): Check the vertical updating status of the container
// Determine whether the vertical update was successful by the resource values in the pod's spec and status
// TODO(LavenderQAQ): The third parameter here should be passed to the resources value in the status field of all containers and will need to be modified after the k8s api upgrade.
if !verticalUpdateOperator.IsContainerUpdateCompleted(pod, containers[cs.Name], &cs, inPlaceUpdateState.LastContainerStatuses[cs.Name]) {
return fmt.Errorf("container %s resources not changed", cs.Name)
}
delete(inPlaceUpdateState.LastContainerStatuses, cs.Name)
}
}
Expand Down
97 changes: 97 additions & 0 deletions pkg/util/inplaceupdate/inplace_update_vertical.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copyright 2023 The Kruise Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package inplaceupdate

import (
appspub "github.com/openkruise/kruise/apis/apps/pub"
v1 "k8s.io/api/core/v1"
)

// For In-place workload vertical scaling
type VerticalUpdateInterface interface {
// Get the expected resource values of the container and its current status
SyncContainerResource(container *v1.ContainerStatus, state *appspub.InPlaceUpdateState)
// Pass in the container to be modified and the expected resource values.
UpdateContainerResource(container *v1.Container, resource *v1.ResourceRequirements)
// Get the expected resource values of all containers in the pod and their current status
SyncPodResource(pod *v1.Pod, state *appspub.InPlaceUpdateState)
// All containers of a pod can be updated at once within this interface.
UpdatePodResource(pod *v1.Pod)
// To determine whether the container has been successfully vertical updated
IsContainerUpdateCompleted(pod *v1.Pod, container *v1.Container, containerStatus *v1.ContainerStatus, lastContainerStatus appspub.InPlaceUpdateContainerStatus) bool
// To determine whether the pod has been successfully vertical updated
IsPodUpdateCompleted(pod *v1.Pod) bool
}

var verticalUpdateOperator VerticalUpdateInterface = nil

// To register vertical update operations,
// you can register different vertical update implementations here
func registerVerticalUpdate() {
if verticalUpdateOperator == nil {
verticalUpdateOperator = &VerticalUpdate{}
}
}

// VerticalUpdate represents the vertical scaling of k8s standard
type VerticalUpdate struct{}

var _ VerticalUpdateInterface = &VerticalUpdate{}

// Get the resource status from the container and synchronize it to state
func (v *VerticalUpdate) SyncContainerResource(container *v1.ContainerStatus, state *appspub.InPlaceUpdateState) {
// TODO(LavenderQAQ): Need to write the status synchronization module after api upgrade
}

// UpdateResource implements vertical updates by directly modifying the container's resources,
// conforming to the k8s community standard
func (v *VerticalUpdate) UpdateContainerResource(container *v1.Container, newResource *v1.ResourceRequirements) {
for key, quantity := range newResource.Limits {
container.Resources.Limits[key] = quantity
}
for key, quantity := range newResource.Requests {
container.Resources.Requests[key] = quantity
}
}

// Get the resource status from the pod and synchronize it to state
func (v *VerticalUpdate) SyncPodResource(pod *v1.Pod, state *appspub.InPlaceUpdateState) {
// TODO(LavenderQAQ): Need to write the status synchronization module after api upgrade
}

// For the community-standard vertical scale-down implementation,
// there is no need to do anything here because the container has already been updated in the UpdateContainerResource interface
func (v *VerticalUpdate) UpdatePodResource(pod *v1.Pod) {
return
}

// IsUpdateCompleted directly determines whether the current container is vertically updated by the spec and status of the container,
// which conforms to the k8s community standard
func (v *VerticalUpdate) IsContainerUpdateCompleted(pod *v1.Pod, container *v1.Container, containerStatus *v1.ContainerStatus, lastContainerStatus appspub.InPlaceUpdateContainerStatus) bool {
return true
}

// IsUpdateCompleted directly determines whether the current pod is vertically updated by the spec and status of the container,
// which conforms to the k8s community standard
func (v *VerticalUpdate) IsPodUpdateCompleted(pod *v1.Pod) bool {
return true
}

// Internal implementation of vertical updates
// type VerticalUpdateInternal struct{}

// var _ VerticalUpdateInterface = &VerticalUpdateInternal{}

0 comments on commit ff3e27e

Please sign in to comment.