From cd9257a391ae3a63a00256b0e9a5062fe0c45bb1 Mon Sep 17 00:00:00 2001 From: Frame Han Date: Wed, 31 Jan 2024 20:17:12 +0800 Subject: [PATCH] koordlet: fix proxy container service and revise cpu.max updater (#1885) Signed-off-by: saintube --- pkg/koordlet/resourceexecutor/updater.go | 49 +++-- pkg/koordlet/resourceexecutor/updater_test.go | 150 +++++++++++++++ .../protocol/container_context.go | 5 +- .../runtimehooks/proxyserver/service.go | 10 +- .../runtimehooks/proxyserver/service_test.go | 179 ++++++++++++++++++ 5 files changed, 375 insertions(+), 18 deletions(-) create mode 100644 pkg/koordlet/runtimehooks/proxyserver/service_test.go diff --git a/pkg/koordlet/resourceexecutor/updater.go b/pkg/koordlet/resourceexecutor/updater.go index 48d8643e4..4a2a8f939 100644 --- a/pkg/koordlet/resourceexecutor/updater.go +++ b/pkg/koordlet/resourceexecutor/updater.go @@ -58,7 +58,7 @@ func init() { ) // special cases DefaultCgroupUpdaterFactory.Register(NewCgroupUpdaterWithUpdateFunc(CgroupUpdateCPUSharesFunc), sysutil.CPUSharesName) - DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterWithConditionFunc(CgroupUpdateWithUnlimitedFunc, MergeConditionIfValueIsLarger), + DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterWithConditionFunc(CgroupUpdateWithUnlimitedFunc, MergeConditionIfCFSQuotaIsLarger), sysutil.CPUCFSQuotaName, ) DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterIfValueLarger, @@ -432,16 +432,6 @@ func MergeFuncUpdateCgroup(resource ResourceUpdater, mergeCondition MergeConditi return resource, cgroupFileWrite(c.parentDir, c.file, mergedValue) } -func MergeFuncUpdateCgroupUnlimited(resource ResourceUpdater, mergeCondition MergeConditionFunc) (ResourceUpdater, error) { - c := resource.(*CgroupResourceUpdater) - // NOTE: convert "-1" to "max", since some cgroups-v2 files only accept "max" to unlimit resource instead of "-1". - // DO NOT use it on the cgroups which has a valid value of "-1". - if c.value == sysutil.CgroupUnlimitedSymbolStr && sysutil.GetCurrentCgroupVersion() == sysutil.CgroupVersionV2 { - c.value = sysutil.CgroupMaxSymbolStr - } - return MergeFuncUpdateCgroup(c, mergeCondition) -} - // MergeConditionIfValueIsLarger returns a merge condition where only do update when the new value is larger. func MergeConditionIfValueIsLarger(oldValue, newValue string) (string, bool, error) { var newV, oldV int64 @@ -454,7 +444,7 @@ func MergeConditionIfValueIsLarger(oldValue, newValue string) (string, bool, err return newValue, false, fmt.Errorf("new value is not int64, err: %v", err) } } - if oldValue == sysutil.CgroupMaxSymbolStr || newValue == sysutil.CgroupUnlimitedSymbolStr { // compatible with cgroup valued "max" + if oldValue == sysutil.CgroupMaxSymbolStr || oldValue == sysutil.CgroupUnlimitedSymbolStr { // compatible with cgroup valued "max" oldV = int64(math.MaxInt64) } else { oldV, err = strconv.ParseInt(oldValue, 10, 64) @@ -465,6 +455,41 @@ func MergeConditionIfValueIsLarger(oldValue, newValue string) (string, bool, err return newValue, newV > oldV, nil } +func MergeConditionIfCFSQuotaIsLarger(oldValue, newValue string) (string, bool, error) { + var newV, oldV int64 + var err error + if newValue == sysutil.CgroupMaxSymbolStr || newValue == sysutil.CgroupUnlimitedSymbolStr { + newV = int64(math.MaxInt64) + } else { + newV, err = strconv.ParseInt(newValue, 10, 64) + if err != nil { + return newValue, false, fmt.Errorf("new value is not int64, err: %v", err) + } + } + + // cgroup-v2 content: "max 100000", "100000 100000" + if sysutil.GetCurrentCgroupVersion() == sysutil.CgroupVersionV2 { + oldV, err = sysutil.ParseCPUCFSQuotaV2(oldValue) + if err != nil { + return newValue, false, fmt.Errorf("cannot parse old value %s, err: %v", oldValue, err) + } + if oldV == -1 { + oldV = int64(math.MaxInt64) + } + } else { // cgroup-v1 content: "-1", "100000" + if oldValue == sysutil.CgroupUnlimitedSymbolStr { // compatible with cgroup valued "max" + oldV = int64(math.MaxInt64) + } else { + oldV, err = strconv.ParseInt(oldValue, 10, 64) + if err != nil { + return newValue, false, fmt.Errorf("old value is not int64, err: %v", err) + } + } + } + + return newValue, newV > oldV, nil +} + // MergeConditionIfCPUSetIsLooser returns a merge condition where only do update when the new cpuset value is looser. func MergeConditionIfCPUSetIsLooser(oldValue, newValue string) (string, bool, error) { v, err := cpuset.Parse(newValue) diff --git a/pkg/koordlet/resourceexecutor/updater_test.go b/pkg/koordlet/resourceexecutor/updater_test.go index 15025b468..dbb5054ac 100644 --- a/pkg/koordlet/resourceexecutor/updater_test.go +++ b/pkg/koordlet/resourceexecutor/updater_test.go @@ -179,6 +179,156 @@ func TestCgroupResourceUpdater_Update(t *testing.T) { } } +func TestCgroupResourceUpdater_MergeUpdate(t *testing.T) { + type fields struct { + UseCgroupsV2 bool + initialValue string + } + type args struct { + resourceType sysutil.ResourceType + parentDir string + value string + } + tests := []struct { + name string + fields fields + args args + wantErr bool + wantMerged string + wantFinal string + }{ + { + name: "merge update cfs quota", + fields: fields{ + initialValue: "10000", + }, + args: args{ + resourceType: sysutil.CPUCFSQuotaName, + parentDir: "/kubepods.slice/kubepods.slice-podxxx", + value: "-1", + }, + wantMerged: "-1", + wantFinal: "-1", + wantErr: false, + }, + { + name: "merge update cfs quota (case 1)", + fields: fields{ + initialValue: "-1", + }, + args: args{ + resourceType: sysutil.CPUCFSQuotaName, + parentDir: "/kubepods.slice/kubepods.slice-podxxx", + value: "10000", + }, + wantMerged: "-1", + wantFinal: "10000", + wantErr: false, + }, + { + name: "failed to merge update cfs quota on cgroup v2", + fields: fields{ + UseCgroupsV2: true, + initialValue: "invalid content", + }, + args: args{ + resourceType: sysutil.CPUCFSQuotaName, + parentDir: "/kubepods.slice/kubepods.slice-podxxx", + value: "-1", + }, + wantErr: true, + }, + { + name: "merge update cfs quota on cgroup v2", + fields: fields{ + UseCgroupsV2: true, + initialValue: "10000 100000", + }, + args: args{ + resourceType: sysutil.CPUCFSQuotaName, + parentDir: "/kubepods.slice/kubepods.slice-podxxx", + value: "-1", + }, + wantMerged: "max", // should be `max 100000` in the real cgroup + wantFinal: "max", // should be `max 100000` in the real cgroup + wantErr: false, + }, + { + name: "merge update cfs quota on cgroup v2 (case 1)", + fields: fields{ + UseCgroupsV2: true, + initialValue: "max 100000", + }, + args: args{ + resourceType: sysutil.CPUCFSQuotaName, + parentDir: "/kubepods.slice/kubepods.slice-podxxx", + value: "200000", + }, + wantMerged: "max 100000", + wantFinal: "200000", // should be `200000 100000` in the real cgroup + wantErr: false, + }, + { + name: "merge update min", + fields: fields{ + UseCgroupsV2: true, + initialValue: "1048576", + }, + args: args{ + resourceType: sysutil.MemoryMinName, + parentDir: "/kubepods.slice/kubepods.slice-podxxx", + value: "2097152", + }, + wantMerged: "2097152", + wantFinal: "2097152", + wantErr: false, + }, + { + name: "merge update min (case 1)", + fields: fields{ + UseCgroupsV2: true, + initialValue: "2097152", + }, + args: args{ + resourceType: sysutil.MemoryMinName, + parentDir: "/kubepods.slice/kubepods.slice-podxxx", + value: "1048576", + }, + wantMerged: "2097152", + wantFinal: "1048576", + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + helper := sysutil.NewFileTestUtil(t) + defer helper.Cleanup() + helper.SetCgroupsV2(tt.fields.UseCgroupsV2) + + u, gotErr := DefaultCgroupUpdaterFactory.New(tt.args.resourceType, tt.args.parentDir, tt.args.value, nil) + assert.NoError(t, gotErr) + c, ok := u.(*CgroupResourceUpdater) + assert.True(t, ok) + if tt.fields.initialValue != "" { + helper.WriteCgroupFileContents(tt.args.parentDir, c.file, tt.fields.initialValue) + } + + mergedUpdater, gotErr := u.MergeUpdate() + assert.Equal(t, tt.wantErr, gotErr != nil, gotErr) + if tt.wantErr { + return + } + assert.NotNil(t, mergedUpdater) + gotErr = mergedUpdater.update() + assert.NoError(t, gotErr) + assert.Equal(t, tt.wantMerged, helper.ReadCgroupFileContents(c.parentDir, c.file)) + gotErr = u.update() + assert.NoError(t, gotErr) + assert.Equal(t, tt.wantFinal, helper.ReadCgroupFileContents(c.parentDir, c.file)) + }) + } +} + func TestDefaultResourceUpdater_Update(t *testing.T) { type fields struct { initialValue string diff --git a/pkg/koordlet/runtimehooks/protocol/container_context.go b/pkg/koordlet/runtimehooks/protocol/container_context.go index 4a9789485..7ade6ac45 100644 --- a/pkg/koordlet/runtimehooks/protocol/container_context.go +++ b/pkg/koordlet/runtimehooks/protocol/container_context.go @@ -225,7 +225,10 @@ func (c *ContainerContext) FromProxy(req *runtimeapi.ContainerResourceHookReques c.Request.FromProxy(req) } -func (c *ContainerContext) ProxyDone(resp *runtimeapi.ContainerResourceHookResponse) { +func (c *ContainerContext) ProxyDone(resp *runtimeapi.ContainerResourceHookResponse, executor resourceexecutor.ResourceUpdateExecutor) { + if c.executor == nil { + c.executor = executor + } c.injectForExt() c.Response.ProxyDone(resp) c.Update() diff --git a/pkg/koordlet/runtimehooks/proxyserver/service.go b/pkg/koordlet/runtimehooks/proxyserver/service.go index 53320bf19..a2638cc1b 100644 --- a/pkg/koordlet/runtimehooks/proxyserver/service.go +++ b/pkg/koordlet/runtimehooks/proxyserver/service.go @@ -73,7 +73,7 @@ func (s *server) PreCreateContainerHook(ctx context.Context, containerCtx := &protocol.ContainerContext{} containerCtx.FromProxy(req) err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreCreateContainer, containerCtx) - containerCtx.ProxyDone(resp) + containerCtx.ProxyDone(resp, s.options.Executor) klog.V(5).Infof("send PreCreateContainerHook response for pod %v container %v response %v", req.PodMeta.String(), req.ContainerMeta.String(), resp.String()) return resp, err @@ -91,7 +91,7 @@ func (s *server) PreStartContainerHook(ctx context.Context, containerCtx := &protocol.ContainerContext{} containerCtx.FromProxy(req) err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreStartContainer, containerCtx) - containerCtx.ProxyDone(resp) + containerCtx.ProxyDone(resp, s.options.Executor) klog.V(5).Infof("send PreStartContainerHook for pod %v container %v response %v", req.PodMeta.String(), req.ContainerMeta.String(), resp.String()) return resp, err @@ -109,7 +109,7 @@ func (s *server) PostStartContainerHook(ctx context.Context, containerCtx := &protocol.ContainerContext{} containerCtx.FromProxy(req) err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PostStartContainer, containerCtx) - containerCtx.ProxyDone(resp) + containerCtx.ProxyDone(resp, s.options.Executor) klog.V(5).Infof("send PostStartContainerHook for pod %v container %v response %v", req.PodMeta.String(), req.ContainerMeta.String(), resp.String()) return resp, err @@ -127,7 +127,7 @@ func (s *server) PostStopContainerHook(ctx context.Context, containerCtx := &protocol.ContainerContext{} containerCtx.FromProxy(req) err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PostStopContainer, containerCtx) - containerCtx.ProxyDone(resp) + containerCtx.ProxyDone(resp, s.options.Executor) klog.V(5).Infof("send PostStopContainerHook for pod %v container %v response %v", req.PodMeta.String(), req.ContainerMeta.String(), resp.String()) return resp, err @@ -145,7 +145,7 @@ func (s *server) PreUpdateContainerResourcesHook(ctx context.Context, containerCtx := &protocol.ContainerContext{} containerCtx.FromProxy(req) err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreUpdateContainerResources, containerCtx) - containerCtx.ProxyDone(resp) + containerCtx.ProxyDone(resp, s.options.Executor) klog.V(5).Infof("send PreUpdateContainerResourcesHook for pod %v container %v response %v", req.PodMeta.String(), req.ContainerMeta.String(), resp.String()) return resp, err diff --git a/pkg/koordlet/runtimehooks/proxyserver/service_test.go b/pkg/koordlet/runtimehooks/proxyserver/service_test.go new file mode 100644 index 000000000..b6b308d5b --- /dev/null +++ b/pkg/koordlet/runtimehooks/proxyserver/service_test.go @@ -0,0 +1,179 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxyserver + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/koordinator-sh/koordinator/apis/extension" + runtimeapi "github.com/koordinator-sh/koordinator/apis/runtime/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" +) + +func TestServer(t *testing.T) { + t.Run("test not panic", func(t *testing.T) { + // create and start server + tmpDir := t.TempDir() + opt := Options{ + Network: "unix", + Address: filepath.Join(tmpDir, "koordlet.sock"), + HostEndpoint: filepath.Join(tmpDir, "koordlet.sock"), + FailurePolicy: "Ignore", + PluginFailurePolicy: "Ignore", + ConfigFilePath: filepath.Join(tmpDir, "hookserver.d"), + DisableStages: map[string]struct{}{}, + Executor: resourceexecutor.NewTestResourceExecutor(), + } + s, err := NewServer(opt) + assert.NoError(t, err) + assert.NotNil(t, s) + stopCh := make(chan struct{}) + defer close(stopCh) + opt.Executor.Run(stopCh) + err = s.Setup() + assert.NoError(t, err) + err = s.Start() + assert.NoError(t, err) + defer s.Stop() + + ss, ok := s.(*server) + assert.True(t, ok) + assert.NotNil(t, ss) + // PreRunPodSandboxHook + podResp, err := ss.PreRunPodSandboxHook(context.TODO(), &runtimeapi.PodSandboxHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + Labels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + CgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, podResp) + // PostStopPodSandboxHook + podResp, err = ss.PostStopPodSandboxHook(context.TODO(), &runtimeapi.PodSandboxHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + Labels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + CgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, podResp) + // PreCreateContainerHook + containerResp, err := ss.PreCreateContainerHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + ContainerMeta: &runtimeapi.ContainerMetadata{ + Name: "test-container", + Id: "123", + }, + PodLabels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + PodCgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, containerResp) + // PreStartContainerHook + containerResp, err = ss.PreStartContainerHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + ContainerMeta: &runtimeapi.ContainerMetadata{ + Name: "test-container", + Id: "123", + }, + PodLabels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + PodCgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, containerResp) + // PostStartContainerHook + containerResp, err = ss.PostStartContainerHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + ContainerMeta: &runtimeapi.ContainerMetadata{ + Name: "test-container", + Id: "123", + }, + PodLabels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + PodCgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, containerResp) + // PostStopContainerHook + containerResp, err = ss.PostStopContainerHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + ContainerMeta: &runtimeapi.ContainerMetadata{ + Name: "test-container", + Id: "123", + }, + PodLabels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + PodCgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, containerResp) + // PreUpdateContainerResourcesHook + containerResp, err = ss.PreUpdateContainerResourcesHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + ContainerMeta: &runtimeapi.ContainerMetadata{ + Name: "test-container", + Id: "123", + }, + PodLabels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + PodCgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, containerResp) + }) +}