Skip to content

Commit

Permalink
koordlet: fix proxy container service and revise cpu.max updater
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Jan 31, 2024
1 parent ad36a0b commit fc29d58
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 17 deletions.
36 changes: 25 additions & 11 deletions pkg/koordlet/resourceexecutor/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func init() {
)
// special cases
DefaultCgroupUpdaterFactory.Register(NewCgroupUpdaterWithUpdateFunc(CgroupUpdateCPUSharesFunc), sysutil.CPUSharesName)
DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterWithConditionFunc(CgroupUpdateWithUnlimitedFunc, MergeConditionIfValueIsLarger),
DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterWithConditionFunc(CgroupUpdateWithUnlimitedFunc, MergeConditionIfCFSQuotaIsLarger),
sysutil.CPUCFSQuotaName,
)
DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterIfValueLarger,
Expand Down Expand Up @@ -432,16 +432,6 @@ func MergeFuncUpdateCgroup(resource ResourceUpdater, mergeCondition MergeConditi
return resource, cgroupFileWrite(c.parentDir, c.file, mergedValue)
}

func MergeFuncUpdateCgroupUnlimited(resource ResourceUpdater, mergeCondition MergeConditionFunc) (ResourceUpdater, error) {
c := resource.(*CgroupResourceUpdater)
// NOTE: convert "-1" to "max", since some cgroups-v2 files only accept "max" to unlimit resource instead of "-1".
// DO NOT use it on the cgroups which has a valid value of "-1".
if c.value == sysutil.CgroupUnlimitedSymbolStr && sysutil.GetCurrentCgroupVersion() == sysutil.CgroupVersionV2 {
c.value = sysutil.CgroupMaxSymbolStr
}
return MergeFuncUpdateCgroup(c, mergeCondition)
}

// MergeConditionIfValueIsLarger returns a merge condition where only do update when the new value is larger.
func MergeConditionIfValueIsLarger(oldValue, newValue string) (string, bool, error) {
var newV, oldV int64
Expand All @@ -465,6 +455,30 @@ func MergeConditionIfValueIsLarger(oldValue, newValue string) (string, bool, err
return newValue, newV > oldV, nil
}

func MergeConditionIfCFSQuotaIsLarger(oldValue, newValue string) (string, bool, error) {
var newV, oldV int64
var err error
if newValue == sysutil.CgroupMaxSymbolStr || newValue == sysutil.CgroupUnlimitedSymbolStr {
newV = int64(math.MaxInt64)
} else {
newV, err = strconv.ParseInt(newValue, 10, 64)
if err != nil {
return newValue, false, fmt.Errorf("new value is not int64, err: %v", err)
}
}

// content: "max 100000", "100000 100000"
oldV, err = sysutil.ParseCPUCFSQuotaV2(oldValue)
if err != nil {
return newValue, false, fmt.Errorf("cannot parse old value %s, err: %v", oldValue, err)
}
if oldV == -1 {
oldV = int64(math.MaxInt64)
}

return newValue, newV > oldV, nil
}

// MergeConditionIfCPUSetIsLooser returns a merge condition where only do update when the new cpuset value is looser.
func MergeConditionIfCPUSetIsLooser(oldValue, newValue string) (string, bool, error) {
v, err := cpuset.Parse(newValue)
Expand Down
5 changes: 4 additions & 1 deletion pkg/koordlet/runtimehooks/protocol/container_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ func (c *ContainerContext) FromProxy(req *runtimeapi.ContainerResourceHookReques
c.Request.FromProxy(req)
}

func (c *ContainerContext) ProxyDone(resp *runtimeapi.ContainerResourceHookResponse) {
func (c *ContainerContext) ProxyDone(resp *runtimeapi.ContainerResourceHookResponse, executor resourceexecutor.ResourceUpdateExecutor) {
if c.executor == nil {
c.executor = executor
}
c.injectForExt()
c.Response.ProxyDone(resp)
c.Update()
Expand Down
10 changes: 5 additions & 5 deletions pkg/koordlet/runtimehooks/proxyserver/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *server) PreCreateContainerHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreCreateContainer, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PreCreateContainerHook response for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand All @@ -91,7 +91,7 @@ func (s *server) PreStartContainerHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreStartContainer, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PreStartContainerHook for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand All @@ -109,7 +109,7 @@ func (s *server) PostStartContainerHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PostStartContainer, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PostStartContainerHook for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand All @@ -127,7 +127,7 @@ func (s *server) PostStopContainerHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PostStopContainer, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PostStopContainerHook for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand All @@ -145,7 +145,7 @@ func (s *server) PreUpdateContainerResourcesHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreUpdateContainerResources, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PreUpdateContainerResourcesHook for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand Down
179 changes: 179 additions & 0 deletions pkg/koordlet/runtimehooks/proxyserver/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package proxyserver

import (
"context"
"path/filepath"
"testing"

"github.com/stretchr/testify/assert"

"github.com/koordinator-sh/koordinator/apis/extension"
runtimeapi "github.com/koordinator-sh/koordinator/apis/runtime/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
)

func TestServer(t *testing.T) {
t.Run("test not panic", func(t *testing.T) {
// create and start server
tmpDir := t.TempDir()
opt := Options{
Network: "unix",
Address: filepath.Join(tmpDir, "koordlet.sock"),
HostEndpoint: filepath.Join(tmpDir, "koordlet.sock"),
FailurePolicy: "Ignore",
PluginFailurePolicy: "Ignore",
ConfigFilePath: filepath.Join(tmpDir, "hookserver.d"),
DisableStages: map[string]struct{}{},
Executor: resourceexecutor.NewTestResourceExecutor(),
}
s, err := NewServer(opt)
assert.NoError(t, err)
assert.NotNil(t, s)
stopCh := make(chan struct{})
defer close(stopCh)
opt.Executor.Run(stopCh)
err = s.Setup()
assert.NoError(t, err)
err = s.Start()
assert.NoError(t, err)
defer s.Stop()

ss, ok := s.(*server)
assert.True(t, ok)
assert.NotNil(t, ss)
// PreRunPodSandboxHook
podResp, err := ss.PreRunPodSandboxHook(context.TODO(), &runtimeapi.PodSandboxHookRequest{
PodMeta: &runtimeapi.PodSandboxMetadata{
Name: "test-pod",
Namespace: "test-ns",
Uid: "xxxxxx",
},
Labels: map[string]string{
extension.LabelPodQoS: string(extension.QoSLS),
},
CgroupParent: "kubepods/pod-xxxxxx/",
})
assert.NoError(t, err)
assert.NotNil(t, podResp)
// PostStopPodSandboxHook
podResp, err = ss.PostStopPodSandboxHook(context.TODO(), &runtimeapi.PodSandboxHookRequest{
PodMeta: &runtimeapi.PodSandboxMetadata{
Name: "test-pod",
Namespace: "test-ns",
Uid: "xxxxxx",
},
Labels: map[string]string{
extension.LabelPodQoS: string(extension.QoSLS),
},
CgroupParent: "kubepods/pod-xxxxxx/",
})
assert.NoError(t, err)
assert.NotNil(t, podResp)
// PreCreateContainerHook
containerResp, err := ss.PreCreateContainerHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{
PodMeta: &runtimeapi.PodSandboxMetadata{
Name: "test-pod",
Namespace: "test-ns",
Uid: "xxxxxx",
},
ContainerMeta: &runtimeapi.ContainerMetadata{
Name: "test-container",
Id: "123",
},
PodLabels: map[string]string{
extension.LabelPodQoS: string(extension.QoSLS),
},
PodCgroupParent: "kubepods/pod-xxxxxx/",
})
assert.NoError(t, err)
assert.NotNil(t, containerResp)
// PreStartContainerHook
containerResp, err = ss.PreStartContainerHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{
PodMeta: &runtimeapi.PodSandboxMetadata{
Name: "test-pod",
Namespace: "test-ns",
Uid: "xxxxxx",
},
ContainerMeta: &runtimeapi.ContainerMetadata{
Name: "test-container",
Id: "123",
},
PodLabels: map[string]string{
extension.LabelPodQoS: string(extension.QoSLS),
},
PodCgroupParent: "kubepods/pod-xxxxxx/",
})
assert.NoError(t, err)
assert.NotNil(t, containerResp)
// PostStartContainerHook
containerResp, err = ss.PostStartContainerHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{
PodMeta: &runtimeapi.PodSandboxMetadata{
Name: "test-pod",
Namespace: "test-ns",
Uid: "xxxxxx",
},
ContainerMeta: &runtimeapi.ContainerMetadata{
Name: "test-container",
Id: "123",
},
PodLabels: map[string]string{
extension.LabelPodQoS: string(extension.QoSLS),
},
PodCgroupParent: "kubepods/pod-xxxxxx/",
})
assert.NoError(t, err)
assert.NotNil(t, containerResp)
// PostStopContainerHook
containerResp, err = ss.PostStopContainerHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{
PodMeta: &runtimeapi.PodSandboxMetadata{
Name: "test-pod",
Namespace: "test-ns",
Uid: "xxxxxx",
},
ContainerMeta: &runtimeapi.ContainerMetadata{
Name: "test-container",
Id: "123",
},
PodLabels: map[string]string{
extension.LabelPodQoS: string(extension.QoSLS),
},
PodCgroupParent: "kubepods/pod-xxxxxx/",
})
assert.NoError(t, err)
assert.NotNil(t, containerResp)
// PreUpdateContainerResourcesHook
containerResp, err = ss.PreUpdateContainerResourcesHook(context.TODO(), &runtimeapi.ContainerResourceHookRequest{
PodMeta: &runtimeapi.PodSandboxMetadata{
Name: "test-pod",
Namespace: "test-ns",
Uid: "xxxxxx",
},
ContainerMeta: &runtimeapi.ContainerMetadata{
Name: "test-container",
Id: "123",
},
PodLabels: map[string]string{
extension.LabelPodQoS: string(extension.QoSLS),
},
PodCgroupParent: "kubepods/pod-xxxxxx/",
})
assert.NoError(t, err)
assert.NotNil(t, containerResp)
})
}

0 comments on commit fc29d58

Please sign in to comment.