Skip to content

Commit

Permalink
add change qos validator and recreate pod when qos changed
Browse files Browse the repository at this point in the history
Signed-off-by: Abner-1 <yuanyuxing.yyx@alibaba-inc.com>
  • Loading branch information
ABNER-1 committed Dec 20, 2024
1 parent 6968bd8 commit 58849d1
Show file tree
Hide file tree
Showing 6 changed files with 288 additions and 9 deletions.
10 changes: 8 additions & 2 deletions pkg/util/inplaceupdate/inplace_update_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,8 +326,6 @@ func defaultCalculateInPlaceUpdateSpec(oldRevision, newRevision *apps.Controller
continue
}

// TODO(Abner-1): if pod qos changed, we should recreate the pod.
// I will resolve it in another PR
if utilfeature.DefaultFeatureGate.Enabled(features.InPlaceWorkloadVerticalScaling) &&
containerResourcesPatchRexp.MatchString(op.Path) {
err = verticalUpdateImpl.UpdateInplaceUpdateMetadata(&op, oldTemp, updateSpec)
Expand All @@ -339,6 +337,14 @@ func defaultCalculateInPlaceUpdateSpec(oldRevision, newRevision *apps.Controller
}
return nil
}
if utilfeature.DefaultFeatureGate.Enabled(features.InPlaceWorkloadVerticalScaling) &&
len(updateSpec.ContainerResources) != 0 {
// when container resources changes exist, we should check pod qos
if changed := verticalUpdateImpl.IsPodQoSChanged(oldTemp, newTemp); changed {
klog.InfoS("can not inplace update when qos changed")
return nil
}
}

if len(metadataPatches) > 0 {
if utilfeature.DefaultFeatureGate.Enabled(features.InPlaceUpdateEnvFromMetadata) {
Expand Down
81 changes: 81 additions & 0 deletions pkg/util/inplaceupdate/inplace_update_defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,30 @@ func TestDefaultCalculateInPlaceUpdateSpec(t *testing.T) {
},
}
}
qosRevisionGetter := func(imageChanged, resourceChanged, qosChanged, otherChanged bool, updateContainerNum int) *apps.ControllerRevision {
base := getFakeControllerRevisionData()
if imageChanged {
base = strings.Replace(base, `"image": "nginx:stable-alpine22"`, `"image": "nginx:stable-alpine23"`, updateContainerNum)
}
if resourceChanged {
base = strings.Replace(base, `"cpu": "1",`, `"cpu": "2",`, updateContainerNum)
if qosChanged {
base = strings.Replace(base, `"memory": "2Gi",`, `"memory": "4Gi",`, updateContainerNum)
}
}
if otherChanged {
base = strings.Replace(base, `"imagePullPolicy": "Always",`, `"imagePullPolicy": "222",`, updateContainerNum)
}
return &apps.ControllerRevision{
ObjectMeta: metav1.ObjectMeta{
Name: "new-revision",
Annotations: map[string]string{},
},
Data: runtime.RawExtension{
Raw: []byte(base),
},
}
}
// Define your test cases
tests := []struct {
name string
Expand Down Expand Up @@ -1163,6 +1187,63 @@ func TestDefaultCalculateInPlaceUpdateSpec(t *testing.T) {
opts: &UpdateOptions{},
expectedResult: nil,
},

// change qos
{
vpaEnabled: true,
name: "change qos of two containers",
oldRevision: baseRevision,
newRevision: qosRevisionGetter(false, true, true, false, 2),
opts: &UpdateOptions{},
expectedResult: nil,
},
{
vpaEnabled: true,
name: "change qos of one containers(Burstable->Burstable)",
oldRevision: baseRevision,
newRevision: qosRevisionGetter(false, true, true, false, 1),
opts: &UpdateOptions{},
expectedResult: &UpdateSpec{
Revision: "new-revision",
ContainerResources: map[string]v1.ResourceRequirements{
"nginx": {
Requests: v1.ResourceList{
"cpu": resource.MustParse("2"),
"memory": resource.MustParse("4Gi"),
},
},
},
},
},
{
vpaEnabled: true,
name: "change qos and image of one containers(Burstable->Burstable)",
oldRevision: baseRevision,
newRevision: qosRevisionGetter(true, true, true, false, 1),
opts: &UpdateOptions{},
expectedResult: &UpdateSpec{
Revision: "new-revision",
ContainerImages: map[string]string{
"nginx": "nginx:stable-alpine23",
},
ContainerResources: map[string]v1.ResourceRequirements{
"nginx": {
Requests: v1.ResourceList{
"cpu": resource.MustParse("2"),
"memory": resource.MustParse("4Gi"),
},
},
},
},
},
{
vpaEnabled: true,
name: "change qos and other fields of one containers",
oldRevision: baseRevision,
newRevision: qosRevisionGetter(true, true, true, true, 1),
opts: &UpdateOptions{},
expectedResult: nil,
},
}

for _, tt := range tests {
Expand Down
17 changes: 17 additions & 0 deletions pkg/util/inplaceupdate/inplace_update_vertical.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@ import (
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
)

// interface for In-place workload vertical scaling
type VerticalUpdateInterface interface {
// UpdateInplaceUpdateMetadata validates and applies the resource patch to the UpdateSpec.
UpdateInplaceUpdateMetadata(op *jsonpatch.Operation, oldTemp *v1.PodTemplateSpec, updateSpec *UpdateSpec) error

// IsPodQoSChanged check whether the pod qos has changed
IsPodQoSChanged(oldTemp, newTemp *v1.PodTemplateSpec) bool

// UpdateResource some or all containers of a pod can be updated at once within this interface.
// container or pod level resources can be updated by this interface
UpdateResource(pod *v1.Pod, expectedResources map[string]*v1.ResourceRequirements)
Expand Down Expand Up @@ -102,6 +106,19 @@ func (v *NativeVerticalUpdate) UpdateInplaceUpdateMetadata(op *jsonpatch.Operati
return nil
}

func (v *NativeVerticalUpdate) IsPodQoSChanged(oldTemp, newTemp *v1.PodTemplateSpec) bool {
oldPod := &v1.Pod{
Spec: *oldTemp.Spec.DeepCopy(),
}
newPod := &v1.Pod{
Spec: *newTemp.Spec.DeepCopy(),
}
if qos.GetPodQOS(oldPod) != qos.GetPodQOS(newPod) {
return true
}
return false
}

// updateContainerResource implements vertical updates by directly modifying the container's resources,
// conforming to the k8s community standard
func (v *NativeVerticalUpdate) updateContainerResource(container *v1.Container, newResource *v1.ResourceRequirements) {
Expand Down
86 changes: 80 additions & 6 deletions test/e2e/apps/cloneset.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ import (

"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
"github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/test/e2e/framework"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -44,6 +38,13 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
imageutils "k8s.io/kubernetes/test/utils/image"
utilpointer "k8s.io/utils/pointer"

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
kruiseclientset "github.com/openkruise/kruise/pkg/client/clientset/versioned"
"github.com/openkruise/kruise/pkg/controller/cloneset/utils"
"github.com/openkruise/kruise/pkg/util"
"github.com/openkruise/kruise/test/e2e/framework"
)

var _ = SIGDescribe("CloneSet", func() {
Expand Down Expand Up @@ -1071,6 +1072,10 @@ var _ = SIGDescribe("CloneSet", func() {
testUpdateVolumeClaimTemplates(tester, randStr, c)
})

ginkgo.It(`change resource and qos -> succeed to recreate`, func() {
testChangePodQOS(tester, randStr, c)
})

})

framework.KruiseDescribe("CloneSet pre-download images", func() {
Expand Down Expand Up @@ -1119,6 +1124,75 @@ var _ = SIGDescribe("CloneSet", func() {
})
})

func testChangePodQOS(tester *framework.CloneSetTester, randStr string, c clientset.Interface) {
cs := tester.NewCloneSet("clone-"+randStr, 1, appsv1alpha1.CloneSetUpdateStrategy{Type: appsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType})
cs.Spec.Template.Spec.Containers[0].Image = NginxImage
cs.Spec.Template.ObjectMeta.Labels["test-env"] = "foo"
cs.Spec.Template.Spec.Containers[0].Env = append(cs.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{
Name: "TEST_ENV",
ValueFrom: &v1.EnvVarSource{FieldRef: &v1.ObjectFieldSelector{FieldPath: "metadata.labels['test-env']"}},
})
cs, err := tester.CreateCloneSet(cs)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(cs.Spec.UpdateStrategy.Type).To(gomega.Equal(appsv1alpha1.InPlaceIfPossibleCloneSetUpdateStrategyType))

ginkgo.By("Wait for replicas satisfied")
gomega.Eventually(func() int32 {
cs, err = tester.GetCloneSet(cs.Name)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
return cs.Status.Replicas
}, 3*time.Second, time.Second).Should(gomega.Equal(int32(1)))

ginkgo.By("Wait for all pods ready")
gomega.Eventually(func() int32 {
cs, err = tester.GetCloneSet(cs.Name)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
return cs.Status.ReadyReplicas
}, 120*time.Second, 3*time.Second).Should(gomega.Equal(int32(1)))

pods, err := tester.ListPodsForCloneSet(cs.Name)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(len(pods)).Should(gomega.Equal(1))
oldPodUID := pods[0].UID

ginkgo.By("Update resource and qos")
err = tester.UpdateCloneSet(cs.Name, func(cs *appsv1alpha1.CloneSet) {
cs.Spec.Template.Spec.Containers[0].Resources = v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("1Gi"),
},
Limits: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("1Gi"),
},
}
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ginkgo.By("Wait for CloneSet generation consistent")
gomega.Eventually(func() bool {
cs, err = tester.GetCloneSet(cs.Name)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
return cs.Generation == cs.Status.ObservedGeneration
}, 10*time.Second, 3*time.Second).Should(gomega.Equal(true))

ginkgo.By("Wait for all pods updated and ready")
gomega.Eventually(func() int32 {
cs, err = tester.GetCloneSet(cs.Name)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
return cs.Status.UpdatedReadyReplicas
}, 180*time.Second, 3*time.Second).Should(gomega.Equal(int32(1)))

ginkgo.By("Verify the podID changed")
pods, err = tester.ListPodsForCloneSet(cs.Name)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(len(pods)).Should(gomega.Equal(1))
newPodUID := pods[0].UID

gomega.Expect(oldPodUID).ShouldNot(gomega.Equal(newPodUID))
}

func checkPVCsDoRecreate(numsOfPVCs int, recreate bool) func(instanceIds, newInstanceIds, pvcIds sets.String, pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) {
return func(instanceIds, newInstanceIds, pvcIds sets.String, pods []*v1.Pod, pvcs []*v1.PersistentVolumeClaim) {
gomega.Expect(len(pvcs)).Should(gomega.Equal(numsOfPVCs))
Expand Down
100 changes: 100 additions & 0 deletions test/e2e/apps/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,106 @@ var _ = SIGDescribe("StatefulSet", func() {
}
})

framework.ConformanceIt("should recreate update when pod qos changed", func() {
ginkgo.By("Creating a new StatefulSet")
ss := framework.NewStatefulSet(ssName, ns, headlessSvcName, 3, nil, nil, labels)
sst := framework.NewStatefulSetTester(c, kc)
sst.SetHTTPProbe(ss)
ss.Spec.UpdateStrategy = appsv1beta1.StatefulSetUpdateStrategy{
Type: apps.RollingUpdateStatefulSetStrategyType,
RollingUpdate: &appsv1beta1.RollingUpdateStatefulSetStrategy{
PodUpdatePolicy: appsv1beta1.InPlaceIfPossiblePodUpdateStrategyType,
InPlaceUpdateStrategy: &appspub.InPlaceUpdateStrategy{GracePeriodSeconds: 10},
},
}
ss.Spec.Template.ObjectMeta.Labels = map[string]string{"test-env": "foo"}
for k, v := range labels {
ss.Spec.Template.ObjectMeta.Labels[k] = v
}
ss.Spec.Template.Spec.Containers[0].Env = append(ss.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{
Name: "TEST_ENV",
ValueFrom: &v1.EnvVarSource{FieldRef: &v1.ObjectFieldSelector{FieldPath: "metadata.labels['test-env']"}},
})
ss.Spec.Template.Spec.ReadinessGates = append(ss.Spec.Template.Spec.ReadinessGates, v1.PodReadinessGate{ConditionType: appspub.InPlaceUpdateReady})
ss, err := kc.AppsV1beta1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
sst.WaitForRunningAndReady(*ss.Spec.Replicas, ss)
ss = sst.WaitForStatus(ss)
currentRevision, updateRevision := ss.Status.CurrentRevision, ss.Status.UpdateRevision
gomega.Expect(currentRevision).To(gomega.Equal(updateRevision),
fmt.Sprintf("StatefulSet %s/%s created with update revision %s not equal to current revision %s",
ss.Namespace, ss.Name, updateRevision, currentRevision))
pods := sst.GetPodList(ss)
for i := range pods.Items {
gomega.Expect(pods.Items[i].Labels[apps.StatefulSetRevisionLabel]).To(gomega.Equal(currentRevision),
fmt.Sprintf("Pod %s/%s revision %s is not equal to current revision %s",
pods.Items[i].Namespace,
pods.Items[i].Name,
pods.Items[i].Labels[apps.StatefulSetRevisionLabel],
currentRevision))
}

ginkgo.By("Restoring Pods to the current revision")
sst.DeleteStatefulPodAtIndex(0, ss)
sst.DeleteStatefulPodAtIndex(1, ss)
sst.DeleteStatefulPodAtIndex(2, ss)
sst.WaitForRunningAndReady(3, ss)
ss = sst.GetStatefulSet(ss.Namespace, ss.Name)
pods = sst.GetPodList(ss)
for i := range pods.Items {
gomega.Expect(pods.Items[i].Labels[apps.StatefulSetRevisionLabel]).To(gomega.Equal(currentRevision),
fmt.Sprintf("Pod %s/%s revision %s is not equal to current revision %s",
pods.Items[i].Namespace,
pods.Items[i].Name,
pods.Items[i].Labels[apps.StatefulSetRevisionLabel],
currentRevision))
}

updateTime := time.Now()
ginkgo.By("Updating stateful set template: change pod qos")
var partition int32 = 3
ss, err = framework.UpdateStatefulSetWithRetries(kc, ns, ss.Name, func(update *appsv1beta1.StatefulSet) {
update.Spec.Template.Spec.Containers[0].Resources = v1.ResourceRequirements{
Requests: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("1Gi"),
},
Limits: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse("1"),
v1.ResourceMemory: resource.MustParse("1Gi"),
},
}
if update.Spec.UpdateStrategy.RollingUpdate == nil {
update.Spec.UpdateStrategy.RollingUpdate = &appsv1beta1.RollingUpdateStatefulSetStrategy{}
}
update.Spec.UpdateStrategy.RollingUpdate.Partition = &partition
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ginkgo.By("Creating a new revision")
ss = sst.WaitForStatus(ss)
currentRevision, updateRevision = ss.Status.CurrentRevision, ss.Status.UpdateRevision
gomega.Expect(currentRevision).NotTo(gomega.Equal(updateRevision),
"Current revision should not equal update revision during rolling update")
ss, err = framework.UpdateStatefulSetWithRetries(kc, ns, ss.Name, func(update *appsv1beta1.StatefulSet) {
partition = 0
update.Spec.UpdateStrategy.RollingUpdate.Partition = &partition
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ginkgo.By("recreate update Pods at the new revision")
sst.WaitForPodUpdatedAndRunning(ss, pods.Items[0].Name, currentRevision)
sst.WaitForRunningAndReady(3, ss)

ss = sst.GetStatefulSet(ss.Namespace, ss.Name)
pods = sst.GetPodList(ss)
for i := range pods.Items {
gomega.Expect(pods.Items[i].Status.ContainerStatuses[0].RestartCount).To(gomega.Equal(int32(0)))
gomega.Expect(pods.Items[i].CreationTimestamp.After(updateTime)).To(gomega.Equal(true))
gomega.Expect(pods.Items[i].Labels[apps.StatefulSetRevisionLabel]).To(gomega.Equal(updateRevision))
}
})

/*
Release : v1.9
Testname: StatefulSet, Scaling
Expand Down
3 changes: 2 additions & 1 deletion tools/hack/run-kruise-e2e-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ set -ex
export KUBECONFIG=${HOME}/.kube/config
make ginkgo
set +e
./bin/ginkgo -p -timeout 60m -v --focus='\[apps\] InplaceVPA' test/e2e
#./bin/ginkgo -p -timeout 60m -v --focus='\[apps\] InplaceVPA' test/e2e
#./bin/ginkgo -p -timeout 60m -v --focus='\[apps\] CloneSet' test/e2e
./bin/ginkgo -p -timeout 60m -v --focus='\[apps\] StatefulSet' test/e2e
#./bin/ginkgo -p -timeout 60m -v --focus='\[apps\] (CloneSet|InplaceVPA)' test/e2e

retVal=$?
Expand Down

0 comments on commit 58849d1

Please sign in to comment.