diff --git a/pkg/koordlet/resourceexecutor/updater.go b/pkg/koordlet/resourceexecutor/updater.go index 48d8643e4..7e1f06f9f 100644 --- a/pkg/koordlet/resourceexecutor/updater.go +++ b/pkg/koordlet/resourceexecutor/updater.go @@ -58,7 +58,7 @@ func init() { ) // special cases DefaultCgroupUpdaterFactory.Register(NewCgroupUpdaterWithUpdateFunc(CgroupUpdateCPUSharesFunc), sysutil.CPUSharesName) - DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterWithConditionFunc(CgroupUpdateWithUnlimitedFunc, MergeConditionIfValueIsLarger), + DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterWithConditionFunc(CgroupUpdateWithUnlimitedFunc, MergeConditionIfCFSQuotaIsLarger), sysutil.CPUCFSQuotaName, ) DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterIfValueLarger, @@ -432,16 +432,6 @@ func MergeFuncUpdateCgroup(resource ResourceUpdater, mergeCondition MergeConditi return resource, cgroupFileWrite(c.parentDir, c.file, mergedValue) } -func MergeFuncUpdateCgroupUnlimited(resource ResourceUpdater, mergeCondition MergeConditionFunc) (ResourceUpdater, error) { - c := resource.(*CgroupResourceUpdater) - // NOTE: convert "-1" to "max", since some cgroups-v2 files only accept "max" to unlimit resource instead of "-1". - // DO NOT use it on the cgroups which has a valid value of "-1". - if c.value == sysutil.CgroupUnlimitedSymbolStr && sysutil.GetCurrentCgroupVersion() == sysutil.CgroupVersionV2 { - c.value = sysutil.CgroupMaxSymbolStr - } - return MergeFuncUpdateCgroup(c, mergeCondition) -} - // MergeConditionIfValueIsLarger returns a merge condition where only do update when the new value is larger. func MergeConditionIfValueIsLarger(oldValue, newValue string) (string, bool, error) { var newV, oldV int64 @@ -465,6 +455,30 @@ func MergeConditionIfValueIsLarger(oldValue, newValue string) (string, bool, err return newValue, newV > oldV, nil } +func MergeConditionIfCFSQuotaIsLarger(oldValue, newValue string) (string, bool, error) { + var newV, oldV int64 + var err error + if newValue == sysutil.CgroupMaxSymbolStr || newValue == sysutil.CgroupUnlimitedSymbolStr { + newV = int64(math.MaxInt64) + } else { + newV, err = strconv.ParseInt(newValue, 10, 64) + if err != nil { + return newValue, false, fmt.Errorf("new value is not int64, err: %v", err) + } + } + + // content: "max 100000", "100000 100000" + oldV, err = sysutil.ParseCPUCFSQuotaV2(oldValue) + if err != nil { + return newValue, false, fmt.Errorf("cannot parse old value %s, err: %v", oldValue, err) + } + if oldV == -1 { + oldV = int64(math.MaxInt64) + } + + return newValue, newV > oldV, nil +} + // MergeConditionIfCPUSetIsLooser returns a merge condition where only do update when the new cpuset value is looser. func MergeConditionIfCPUSetIsLooser(oldValue, newValue string) (string, bool, error) { v, err := cpuset.Parse(newValue) diff --git a/pkg/koordlet/runtimehooks/protocol/container_context.go b/pkg/koordlet/runtimehooks/protocol/container_context.go index 4a9789485..7ade6ac45 100644 --- a/pkg/koordlet/runtimehooks/protocol/container_context.go +++ b/pkg/koordlet/runtimehooks/protocol/container_context.go @@ -225,7 +225,10 @@ func (c *ContainerContext) FromProxy(req *runtimeapi.ContainerResourceHookReques c.Request.FromProxy(req) } -func (c *ContainerContext) ProxyDone(resp *runtimeapi.ContainerResourceHookResponse) { +func (c *ContainerContext) ProxyDone(resp *runtimeapi.ContainerResourceHookResponse, executor resourceexecutor.ResourceUpdateExecutor) { + if c.executor == nil { + c.executor = executor + } c.injectForExt() c.Response.ProxyDone(resp) c.Update() diff --git a/pkg/koordlet/runtimehooks/proxyserver/service.go b/pkg/koordlet/runtimehooks/proxyserver/service.go index 53320bf19..a2638cc1b 100644 --- a/pkg/koordlet/runtimehooks/proxyserver/service.go +++ b/pkg/koordlet/runtimehooks/proxyserver/service.go @@ -73,7 +73,7 @@ func (s *server) PreCreateContainerHook(ctx context.Context, containerCtx := &protocol.ContainerContext{} containerCtx.FromProxy(req) err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreCreateContainer, containerCtx) - containerCtx.ProxyDone(resp) + containerCtx.ProxyDone(resp, s.options.Executor) klog.V(5).Infof("send PreCreateContainerHook response for pod %v container %v response %v", req.PodMeta.String(), req.ContainerMeta.String(), resp.String()) return resp, err @@ -91,7 +91,7 @@ func (s *server) PreStartContainerHook(ctx context.Context, containerCtx := &protocol.ContainerContext{} containerCtx.FromProxy(req) err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreStartContainer, containerCtx) - containerCtx.ProxyDone(resp) + containerCtx.ProxyDone(resp, s.options.Executor) klog.V(5).Infof("send PreStartContainerHook for pod %v container %v response %v", req.PodMeta.String(), req.ContainerMeta.String(), resp.String()) return resp, err @@ -109,7 +109,7 @@ func (s *server) PostStartContainerHook(ctx context.Context, containerCtx := &protocol.ContainerContext{} containerCtx.FromProxy(req) err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PostStartContainer, containerCtx) - containerCtx.ProxyDone(resp) + containerCtx.ProxyDone(resp, s.options.Executor) klog.V(5).Infof("send PostStartContainerHook for pod %v container %v response %v", req.PodMeta.String(), req.ContainerMeta.String(), resp.String()) return resp, err @@ -127,7 +127,7 @@ func (s *server) PostStopContainerHook(ctx context.Context, containerCtx := &protocol.ContainerContext{} containerCtx.FromProxy(req) err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PostStopContainer, containerCtx) - containerCtx.ProxyDone(resp) + containerCtx.ProxyDone(resp, s.options.Executor) klog.V(5).Infof("send PostStopContainerHook for pod %v container %v response %v", req.PodMeta.String(), req.ContainerMeta.String(), resp.String()) return resp, err @@ -145,7 +145,7 @@ func (s *server) PreUpdateContainerResourcesHook(ctx context.Context, containerCtx := &protocol.ContainerContext{} containerCtx.FromProxy(req) err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreUpdateContainerResources, containerCtx) - containerCtx.ProxyDone(resp) + containerCtx.ProxyDone(resp, s.options.Executor) klog.V(5).Infof("send PreUpdateContainerResourcesHook for pod %v container %v response %v", req.PodMeta.String(), req.ContainerMeta.String(), resp.String()) return resp, err diff --git a/pkg/koordlet/runtimehooks/proxyserver/service_test.go b/pkg/koordlet/runtimehooks/proxyserver/service_test.go new file mode 100644 index 000000000..b6b308d5b --- /dev/null +++ b/pkg/koordlet/runtimehooks/proxyserver/service_test.go @@ -0,0 +1,179 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package proxyserver + +import ( + "context" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/koordinator-sh/koordinator/apis/extension" + runtimeapi "github.com/koordinator-sh/koordinator/apis/runtime/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" +) + +func TestServer(t *testing.T) { + t.Run("test not panic", func(t *testing.T) { + // create and start server + tmpDir := t.TempDir() + opt := Options{ + Network: "unix", + Address: filepath.Join(tmpDir, "koordlet.sock"), + HostEndpoint: filepath.Join(tmpDir, "koordlet.sock"), + FailurePolicy: "Ignore", + PluginFailurePolicy: "Ignore", + ConfigFilePath: filepath.Join(tmpDir, "hookserver.d"), + DisableStages: map[string]struct{}{}, + Executor: resourceexecutor.NewTestResourceExecutor(), + } + s, err := NewServer(opt) + assert.NoError(t, err) + assert.NotNil(t, s) + stopCh := make(chan struct{}) + defer close(stopCh) + opt.Executor.Run(stopCh) + err = s.Setup() + assert.NoError(t, err) + err = s.Start() + assert.NoError(t, err) + defer s.Stop() + + ss, ok := s.(*server) + assert.True(t, ok) + assert.NotNil(t, ss) + // PreRunPodSandboxHook + podResp, err := ss.PreRunPodSandboxHook(context.TODO(), &runtimeapi.PodSandboxHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + Labels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + CgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, podResp) + // PostStopPodSandboxHook + podResp, err = ss.PostStopPodSandboxHook(context.TODO(), &runtimeapi.PodSandboxHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + Labels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + CgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, podResp) + // PreCreateContainerHook + containerResp, err := ss.PreCreateContainerHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + ContainerMeta: &runtimeapi.ContainerMetadata{ + Name: "test-container", + Id: "123", + }, + PodLabels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + PodCgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, containerResp) + // PreStartContainerHook + containerResp, err = ss.PreStartContainerHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + ContainerMeta: &runtimeapi.ContainerMetadata{ + Name: "test-container", + Id: "123", + }, + PodLabels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + PodCgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, containerResp) + // PostStartContainerHook + containerResp, err = ss.PostStartContainerHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + ContainerMeta: &runtimeapi.ContainerMetadata{ + Name: "test-container", + Id: "123", + }, + PodLabels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + PodCgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, containerResp) + // PostStopContainerHook + containerResp, err = ss.PostStopContainerHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + ContainerMeta: &runtimeapi.ContainerMetadata{ + Name: "test-container", + Id: "123", + }, + PodLabels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + PodCgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, containerResp) + // PreUpdateContainerResourcesHook + containerResp, err = ss.PreUpdateContainerResourcesHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{ + PodMeta: &runtimeapi.PodSandboxMetadata{ + Name: "test-pod", + Namespace: "test-ns", + Uid: "xxxxxx", + }, + ContainerMeta: &runtimeapi.ContainerMetadata{ + Name: "test-container", + Id: "123", + }, + PodLabels: map[string]string{ + extension.LabelPodQoS: string(extension.QoSLS), + }, + PodCgroupParent: "kubepods/pod-xxxxxx/", + }) + assert.NoError(t, err) + assert.NotNil(t, containerResp) + }) +}