Skip to content

Commit

Permalink
koordlet: fix proxy container service and revise cpu.max updater (koo…
Browse files Browse the repository at this point in the history
…rdinator-sh#1885)

Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Feb 22, 2024
1 parent d69789c commit cd9257a
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 18 deletions.
49 changes: 37 additions & 12 deletions pkg/koordlet/resourceexecutor/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func init() {
)
// special cases
DefaultCgroupUpdaterFactory.Register(NewCgroupUpdaterWithUpdateFunc(CgroupUpdateCPUSharesFunc), sysutil.CPUSharesName)
DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterWithConditionFunc(CgroupUpdateWithUnlimitedFunc, MergeConditionIfValueIsLarger),
DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterWithConditionFunc(CgroupUpdateWithUnlimitedFunc, MergeConditionIfCFSQuotaIsLarger),
sysutil.CPUCFSQuotaName,
)
DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterIfValueLarger,
Expand Down Expand Up @@ -432,16 +432,6 @@ func MergeFuncUpdateCgroup(resource ResourceUpdater, mergeCondition MergeConditi
return resource, cgroupFileWrite(c.parentDir, c.file, mergedValue)
}

func MergeFuncUpdateCgroupUnlimited(resource ResourceUpdater, mergeCondition MergeConditionFunc) (ResourceUpdater, error) {
c := resource.(*CgroupResourceUpdater)
// NOTE: convert "-1" to "max", since some cgroups-v2 files only accept "max" to unlimit resource instead of "-1".
// DO NOT use it on the cgroups which has a valid value of "-1".
if c.value == sysutil.CgroupUnlimitedSymbolStr && sysutil.GetCurrentCgroupVersion() == sysutil.CgroupVersionV2 {
c.value = sysutil.CgroupMaxSymbolStr
}
return MergeFuncUpdateCgroup(c, mergeCondition)
}

// MergeConditionIfValueIsLarger returns a merge condition where only do update when the new value is larger.
func MergeConditionIfValueIsLarger(oldValue, newValue string) (string, bool, error) {
var newV, oldV int64
Expand All @@ -454,7 +444,7 @@ func MergeConditionIfValueIsLarger(oldValue, newValue string) (string, bool, err
return newValue, false, fmt.Errorf("new value is not int64, err: %v", err)
}
}
if oldValue == sysutil.CgroupMaxSymbolStr || newValue == sysutil.CgroupUnlimitedSymbolStr { // compatible with cgroup valued "max"
if oldValue == sysutil.CgroupMaxSymbolStr || oldValue == sysutil.CgroupUnlimitedSymbolStr { // compatible with cgroup valued "max"
oldV = int64(math.MaxInt64)
} else {
oldV, err = strconv.ParseInt(oldValue, 10, 64)
Expand All @@ -465,6 +455,41 @@ func MergeConditionIfValueIsLarger(oldValue, newValue string) (string, bool, err
return newValue, newV > oldV, nil
}

func MergeConditionIfCFSQuotaIsLarger(oldValue, newValue string) (string, bool, error) {
var newV, oldV int64
var err error
if newValue == sysutil.CgroupMaxSymbolStr || newValue == sysutil.CgroupUnlimitedSymbolStr {
newV = int64(math.MaxInt64)
} else {
newV, err = strconv.ParseInt(newValue, 10, 64)
if err != nil {
return newValue, false, fmt.Errorf("new value is not int64, err: %v", err)
}
}

// cgroup-v2 content: "max 100000", "100000 100000"
if sysutil.GetCurrentCgroupVersion() == sysutil.CgroupVersionV2 {
oldV, err = sysutil.ParseCPUCFSQuotaV2(oldValue)
if err != nil {
return newValue, false, fmt.Errorf("cannot parse old value %s, err: %v", oldValue, err)
}
if oldV == -1 {
oldV = int64(math.MaxInt64)
}
} else { // cgroup-v1 content: "-1", "100000"
if oldValue == sysutil.CgroupUnlimitedSymbolStr { // compatible with cgroup valued "max"
oldV = int64(math.MaxInt64)
} else {
oldV, err = strconv.ParseInt(oldValue, 10, 64)
if err != nil {
return newValue, false, fmt.Errorf("old value is not int64, err: %v", err)
}
}
}

return newValue, newV > oldV, nil
}

// MergeConditionIfCPUSetIsLooser returns a merge condition where only do update when the new cpuset value is looser.
func MergeConditionIfCPUSetIsLooser(oldValue, newValue string) (string, bool, error) {
v, err := cpuset.Parse(newValue)
Expand Down
150 changes: 150 additions & 0 deletions pkg/koordlet/resourceexecutor/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,156 @@ func TestCgroupResourceUpdater_Update(t *testing.T) {
}
}

func TestCgroupResourceUpdater_MergeUpdate(t *testing.T) {
type fields struct {
UseCgroupsV2 bool
initialValue string
}
type args struct {
resourceType sysutil.ResourceType
parentDir string
value string
}
tests := []struct {
name string
fields fields
args args
wantErr bool
wantMerged string
wantFinal string
}{
{
name: "merge update cfs quota",
fields: fields{
initialValue: "10000",
},
args: args{
resourceType: sysutil.CPUCFSQuotaName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "-1",
},
wantMerged: "-1",
wantFinal: "-1",
wantErr: false,
},
{
name: "merge update cfs quota (case 1)",
fields: fields{
initialValue: "-1",
},
args: args{
resourceType: sysutil.CPUCFSQuotaName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "10000",
},
wantMerged: "-1",
wantFinal: "10000",
wantErr: false,
},
{
name: "failed to merge update cfs quota on cgroup v2",
fields: fields{
UseCgroupsV2: true,
initialValue: "invalid content",
},
args: args{
resourceType: sysutil.CPUCFSQuotaName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "-1",
},
wantErr: true,
},
{
name: "merge update cfs quota on cgroup v2",
fields: fields{
UseCgroupsV2: true,
initialValue: "10000 100000",
},
args: args{
resourceType: sysutil.CPUCFSQuotaName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "-1",
},
wantMerged: "max", // should be `max 100000` in the real cgroup
wantFinal: "max", // should be `max 100000` in the real cgroup
wantErr: false,
},
{
name: "merge update cfs quota on cgroup v2 (case 1)",
fields: fields{
UseCgroupsV2: true,
initialValue: "max 100000",
},
args: args{
resourceType: sysutil.CPUCFSQuotaName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "200000",
},
wantMerged: "max 100000",
wantFinal: "200000", // should be `200000 100000` in the real cgroup
wantErr: false,
},
{
name: "merge update min",
fields: fields{
UseCgroupsV2: true,
initialValue: "1048576",
},
args: args{
resourceType: sysutil.MemoryMinName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "2097152",
},
wantMerged: "2097152",
wantFinal: "2097152",
wantErr: false,
},
{
name: "merge update min (case 1)",
fields: fields{
UseCgroupsV2: true,
initialValue: "2097152",
},
args: args{
resourceType: sysutil.MemoryMinName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "1048576",
},
wantMerged: "2097152",
wantFinal: "1048576",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
helper := sysutil.NewFileTestUtil(t)
defer helper.Cleanup()
helper.SetCgroupsV2(tt.fields.UseCgroupsV2)

u, gotErr := DefaultCgroupUpdaterFactory.New(tt.args.resourceType, tt.args.parentDir, tt.args.value, nil)
assert.NoError(t, gotErr)
c, ok := u.(*CgroupResourceUpdater)
assert.True(t, ok)
if tt.fields.initialValue != "" {
helper.WriteCgroupFileContents(tt.args.parentDir, c.file, tt.fields.initialValue)
}

mergedUpdater, gotErr := u.MergeUpdate()
assert.Equal(t, tt.wantErr, gotErr != nil, gotErr)
if tt.wantErr {
return
}
assert.NotNil(t, mergedUpdater)
gotErr = mergedUpdater.update()
assert.NoError(t, gotErr)
assert.Equal(t, tt.wantMerged, helper.ReadCgroupFileContents(c.parentDir, c.file))
gotErr = u.update()
assert.NoError(t, gotErr)
assert.Equal(t, tt.wantFinal, helper.ReadCgroupFileContents(c.parentDir, c.file))
})
}
}

func TestDefaultResourceUpdater_Update(t *testing.T) {
type fields struct {
initialValue string
Expand Down
5 changes: 4 additions & 1 deletion pkg/koordlet/runtimehooks/protocol/container_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ func (c *ContainerContext) FromProxy(req *runtimeapi.ContainerResourceHookReques
c.Request.FromProxy(req)
}

func (c *ContainerContext) ProxyDone(resp *runtimeapi.ContainerResourceHookResponse) {
func (c *ContainerContext) ProxyDone(resp *runtimeapi.ContainerResourceHookResponse, executor resourceexecutor.ResourceUpdateExecutor) {
if c.executor == nil {
c.executor = executor
}
c.injectForExt()
c.Response.ProxyDone(resp)
c.Update()
Expand Down
10 changes: 5 additions & 5 deletions pkg/koordlet/runtimehooks/proxyserver/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *server) PreCreateContainerHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreCreateContainer, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PreCreateContainerHook response for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand All @@ -91,7 +91,7 @@ func (s *server) PreStartContainerHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreStartContainer, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PreStartContainerHook for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand All @@ -109,7 +109,7 @@ func (s *server) PostStartContainerHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PostStartContainer, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PostStartContainerHook for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand All @@ -127,7 +127,7 @@ func (s *server) PostStopContainerHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PostStopContainer, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PostStopContainerHook for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand All @@ -145,7 +145,7 @@ func (s *server) PreUpdateContainerResourcesHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreUpdateContainerResources, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PreUpdateContainerResourcesHook for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand Down
Loading

0 comments on commit cd9257a

Please sign in to comment.