diff --git a/apis/apps/pub/inplace_update.go b/apis/apps/pub/inplace_update.go index 01122b66fc..a26cf1a886 100644 --- a/apis/apps/pub/inplace_update.go +++ b/apis/apps/pub/inplace_update.go @@ -94,7 +94,8 @@ type InPlaceUpdateContainerBatch struct { // InPlaceUpdateContainerStatus records the statuses of the container that are mainly used // to determine whether the InPlaceUpdate is completed. type InPlaceUpdateContainerStatus struct { - ImageID string `json:"imageID,omitempty"` + ImageID string `json:"imageID,omitempty"` + Resource v1.ResourceRequirements `json:"resource,omitempty"` } // InPlaceUpdateStrategy defines the strategies for in-place update. diff --git a/apis/apps/pub/zz_generated.deepcopy.go b/apis/apps/pub/zz_generated.deepcopy.go index f0a501645a..9a1e639bb5 100644 --- a/apis/apps/pub/zz_generated.deepcopy.go +++ b/apis/apps/pub/zz_generated.deepcopy.go @@ -50,6 +50,7 @@ func (in *InPlaceUpdateContainerBatch) DeepCopy() *InPlaceUpdateContainerBatch { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *InPlaceUpdateContainerStatus) DeepCopyInto(out *InPlaceUpdateContainerStatus) { *out = *in + in.Resource.DeepCopyInto(&out.Resource) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new InPlaceUpdateContainerStatus. @@ -90,7 +91,7 @@ func (in *InPlaceUpdateState) DeepCopyInto(out *InPlaceUpdateState) { in, out := &in.LastContainerStatuses, &out.LastContainerStatuses *out = make(map[string]InPlaceUpdateContainerStatus, len(*in)) for key, val := range *in { - (*out)[key] = val + (*out)[key] = *val.DeepCopy() } } if in.NextContainerImages != nil { diff --git a/pkg/util/inplaceupdate/inplace_update_defaults.go b/pkg/util/inplaceupdate/inplace_update_defaults.go index 9cff5205fd..09e99f3a27 100644 --- a/pkg/util/inplaceupdate/inplace_update_defaults.go +++ b/pkg/util/inplaceupdate/inplace_update_defaults.go @@ -45,6 +45,8 @@ func SetOptionsDefaults(opts *UpdateOptions) *UpdateOptions { } if utilfeature.DefaultFeatureGate.Enabled(features.InPlaceWorkloadVerticalScaling) { + registerVerticalUpdate() + if opts.CalculateSpec == nil { opts.CalculateSpec = defaultCalculateInPlaceUpdateSpecWithVerticalUpdate } @@ -60,7 +62,6 @@ func SetOptionsDefaults(opts *UpdateOptions) *UpdateOptions { if opts.CheckContainersUpdateCompleted == nil { opts.CheckContainersUpdateCompleted = defaultCheckContainersInPlaceUpdateCompletedWithVerticalUpdate } - } else { if opts.CalculateSpec == nil { opts.CalculateSpec = defaultCalculateInPlaceUpdateSpec @@ -77,7 +78,6 @@ func SetOptionsDefaults(opts *UpdateOptions) *UpdateOptions { if opts.CheckContainersUpdateCompleted == nil { opts.CheckContainersUpdateCompleted = defaultCheckContainersInPlaceUpdateCompleted } - } return opts @@ -498,6 +498,8 @@ func checkAllContainersHashConsistent(pod *v1.Pod, runtimeContainerMetaSet *apps func defaultPatchUpdateSpecToPodWithVerticalUpdate(pod *v1.Pod, spec *UpdateSpec, state *appspub.InPlaceUpdateState) (*v1.Pod, error) { klog.V(5).Infof("Begin to in-place update pod %s/%s with update spec %v, state %v", pod.Namespace, pod.Name, util.DumpJSON(spec), util.DumpJSON(state)) + registerVerticalUpdate() + state.NextContainerImages = make(map[string]string) state.NextContainerRefMetadata = make(map[string]metav1.ObjectMeta) state.NextContainerResources = make(map[string]v1.ResourceRequirements) @@ -566,12 +568,7 @@ func defaultPatchUpdateSpecToPodWithVerticalUpdate(pod *v1.Pod, spec *UpdateSpec containersImageChanged.Insert(c.Name) } if resourceExists { - for key, quantity := range newResource.Limits { - c.Resources.Limits[key] = quantity - } - for key, quantity := range newResource.Requests { - c.Resources.Requests[key] = quantity - } + verticalUpdateOperator.UpdateContainerResource(c, &newResource) containersResourceChanged.Insert(c.Name) } } else { @@ -579,16 +576,31 @@ func defaultPatchUpdateSpecToPodWithVerticalUpdate(pod *v1.Pod, spec *UpdateSpec state.NextContainerResources[c.Name] = newResource } } + + // This provides a hook for vertical updates, + // so that internal enterprise implementations can update pod resources here at once + verticalUpdateOperator.UpdatePodResource(pod) + for _, c := range pod.Status.ContainerStatuses { if containersImageChanged.Has(c.Name) { if state.LastContainerStatuses == nil { state.LastContainerStatuses = map[string]appspub.InPlaceUpdateContainerStatus{} } - state.LastContainerStatuses[c.Name] = appspub.InPlaceUpdateContainerStatus{ImageID: c.ImageID} + if cs, ok := state.LastContainerStatuses[c.Name]; !ok { + state.LastContainerStatuses[c.Name] = appspub.InPlaceUpdateContainerStatus{ImageID: c.ImageID} + } else { + cs.ImageID = c.ImageID + } + } + if containersResourceChanged.Has(c.Name) { + verticalUpdateOperator.SyncContainerResource(&c, state) } - // TODO(LavenderQAQ): The status of resource needs to be printed } + // This provides a hook for vertical updates, + // so that internal enterprise implementations can sync pod resources here at once + verticalUpdateOperator.SyncPodResource(pod, state) + // update annotations and labels for the containers to update for cName, objMeta := range spec.ContainerRefMetadata { if containersToUpdate.Has(cName) { @@ -780,6 +792,8 @@ func defaultCalculateInPlaceUpdateSpecWithVerticalUpdate(oldRevision, newRevisio // If the imageID in containerStatuses has not been changed, we assume that kubelet has not updated // containers in Pod. func DefaultCheckInPlaceUpdateCompletedWithVerticalUpdate(pod *v1.Pod) error { + registerVerticalUpdate() + if _, isInGraceState := appspub.GetInPlaceUpdateGrace(pod); isInGraceState { return fmt.Errorf("still in grace period of in-place update") } @@ -798,6 +812,8 @@ func DefaultCheckInPlaceUpdateCompletedWithVerticalUpdate(pod *v1.Pod) error { } func defaultCheckContainersInPlaceUpdateCompletedWithVerticalUpdate(pod *v1.Pod, inPlaceUpdateState *appspub.InPlaceUpdateState) error { + registerVerticalUpdate() + runtimeContainerMetaSet, err := appspub.GetRuntimeContainerMetaSet(pod) if err != nil { return err @@ -840,7 +856,11 @@ func defaultCheckContainersInPlaceUpdateCompletedWithVerticalUpdate(pod *v1.Pod, return fmt.Errorf("container %s imageID not changed", cs.Name) } } - // TODO(LavenderQAQ): Check the vertical updating status of the container + // Determine whether the vertical update was successful by the resource values in the pod's spec and status + // TODO(LavenderQAQ): The third parameter here should be passed to the resources value in the status field of all containers and will need to be modified after the k8s api upgrade. + if !verticalUpdateOperator.IsUpdateCompleted(pod, containerResources[cs.Name], containerResources[cs.Name], inPlaceUpdateState.LastContainerStatuses[cs.Name]) { + return fmt.Errorf("container %s resources not changed", cs.Name) + } delete(inPlaceUpdateState.LastContainerStatuses, cs.Name) } } diff --git a/pkg/util/inplaceupdate/inplace_update_vertical.go b/pkg/util/inplaceupdate/inplace_update_vertical.go new file mode 100644 index 0000000000..b1c1659a7a --- /dev/null +++ b/pkg/util/inplaceupdate/inplace_update_vertical.go @@ -0,0 +1,91 @@ +/* +Copyright 2023 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package inplaceupdate + +import ( + appspub "github.com/openkruise/kruise/apis/apps/pub" + v1 "k8s.io/api/core/v1" +) + +// For In-place workload vertical scaling +type VerticalUpdateInterface interface { + // Get the expected resource values of the container and its current status + SyncContainerResource(container *v1.ContainerStatus, state *appspub.InPlaceUpdateState) + // Pass in the container to be modified and the expected resource values. + UpdateContainerResource(container *v1.Container, resource *v1.ResourceRequirements) + // Get the expected resource values of all containers in the pod and their current status + SyncPodResource(pod *v1.Pod, state *appspub.InPlaceUpdateState) + // All containers of a pod can be updated at once within this interface. + UpdatePodResource(pod *v1.Pod) + // To determine whether the container has been successfully vertical updated, + // pass in the expected resources of the container and its current status, + // as well as the information for the entire pod for compatibility with some internal vertical scaling implementations. + IsUpdateCompleted(pod *v1.Pod, containerResourcesInSpec v1.ResourceRequirements, containerResourcesInStatus v1.ResourceRequirements, lastContainerStatus appspub.InPlaceUpdateContainerStatus) bool +} + +var verticalUpdateOperator VerticalUpdateInterface = nil + +// To register vertical update operations, +// you can register different vertical update implementations here +func registerVerticalUpdate() { + if verticalUpdateOperator == nil { + verticalUpdateOperator = &VerticalUpdate{} + } +} + +// VerticalUpdate represents the vertical scaling of k8s standard +type VerticalUpdate struct{} + +var _ VerticalUpdateInterface = &VerticalUpdate{} + +// Get the resource status from the container and synchronize it to state +func (v *VerticalUpdate) SyncContainerResource(container *v1.ContainerStatus, state *appspub.InPlaceUpdateState) { + // TODO(LavenderQAQ): Need to write the status synchronization module after api upgrade +} + +// UpdateResource implements vertical updates by directly modifying the container's resources, +// conforming to the k8s community standard +func (v *VerticalUpdate) UpdateContainerResource(container *v1.Container, newResource *v1.ResourceRequirements) { + for key, quantity := range newResource.Limits { + container.Resources.Limits[key] = quantity + } + for key, quantity := range newResource.Requests { + container.Resources.Requests[key] = quantity + } +} + +// Get the resource status from the pod and synchronize it to state +func (v *VerticalUpdate) SyncPodResource(pod *v1.Pod, state *appspub.InPlaceUpdateState) { + // TODO(LavenderQAQ): Need to write the status synchronization module after api upgrade +} + +// For the community-standard vertical scale-down implementation, +// there is no need to do anything here because the container has already been updated in the UpdateContainerResource interface +func (v *VerticalUpdate) UpdatePodResource(pod *v1.Pod) { + return +} + +// IsUpdateCompleted directly determines whether the current container is vertically updated by the spec and status of the container, +// which conforms to the k8s community standard +func (v *VerticalUpdate) IsUpdateCompleted(pod *v1.Pod, containerResourcesInSpec v1.ResourceRequirements, containerResourcesInStatus v1.ResourceRequirements, lastContainerStatus appspub.InPlaceUpdateContainerStatus) bool { + return true +} + +// Internal implementation of vertical updates +// type VerticalUpdateInternal struct{} + +// var _ VerticalUpdateInterface = &VerticalUpdateInternal{}