Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koordlet: fix proxy container service and revise cpu.max updater #1885

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 37 additions & 12 deletions pkg/koordlet/resourceexecutor/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func init() {
)
// special cases
DefaultCgroupUpdaterFactory.Register(NewCgroupUpdaterWithUpdateFunc(CgroupUpdateCPUSharesFunc), sysutil.CPUSharesName)
DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterWithConditionFunc(CgroupUpdateWithUnlimitedFunc, MergeConditionIfValueIsLarger),
DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterWithConditionFunc(CgroupUpdateWithUnlimitedFunc, MergeConditionIfCFSQuotaIsLarger),
sysutil.CPUCFSQuotaName,
)
DefaultCgroupUpdaterFactory.Register(NewMergeableCgroupUpdaterIfValueLarger,
Expand Down Expand Up @@ -432,16 +432,6 @@ func MergeFuncUpdateCgroup(resource ResourceUpdater, mergeCondition MergeConditi
return resource, cgroupFileWrite(c.parentDir, c.file, mergedValue)
}

func MergeFuncUpdateCgroupUnlimited(resource ResourceUpdater, mergeCondition MergeConditionFunc) (ResourceUpdater, error) {
c := resource.(*CgroupResourceUpdater)
// NOTE: convert "-1" to "max", since some cgroups-v2 files only accept "max" to unlimit resource instead of "-1".
// DO NOT use it on the cgroups which has a valid value of "-1".
if c.value == sysutil.CgroupUnlimitedSymbolStr && sysutil.GetCurrentCgroupVersion() == sysutil.CgroupVersionV2 {
c.value = sysutil.CgroupMaxSymbolStr
}
return MergeFuncUpdateCgroup(c, mergeCondition)
}

// MergeConditionIfValueIsLarger returns a merge condition where only do update when the new value is larger.
func MergeConditionIfValueIsLarger(oldValue, newValue string) (string, bool, error) {
var newV, oldV int64
Expand All @@ -454,7 +444,7 @@ func MergeConditionIfValueIsLarger(oldValue, newValue string) (string, bool, err
return newValue, false, fmt.Errorf("new value is not int64, err: %v", err)
}
}
if oldValue == sysutil.CgroupMaxSymbolStr || newValue == sysutil.CgroupUnlimitedSymbolStr { // compatible with cgroup valued "max"
if oldValue == sysutil.CgroupMaxSymbolStr || oldValue == sysutil.CgroupUnlimitedSymbolStr { // compatible with cgroup valued "max"
oldV = int64(math.MaxInt64)
} else {
oldV, err = strconv.ParseInt(oldValue, 10, 64)
Expand All @@ -465,6 +455,41 @@ func MergeConditionIfValueIsLarger(oldValue, newValue string) (string, bool, err
return newValue, newV > oldV, nil
}

func MergeConditionIfCFSQuotaIsLarger(oldValue, newValue string) (string, bool, error) {
var newV, oldV int64
var err error
if newValue == sysutil.CgroupMaxSymbolStr || newValue == sysutil.CgroupUnlimitedSymbolStr {
newV = int64(math.MaxInt64)
} else {
newV, err = strconv.ParseInt(newValue, 10, 64)
if err != nil {
return newValue, false, fmt.Errorf("new value is not int64, err: %v", err)
}
}

// cgroup-v2 content: "max 100000", "100000 100000"
if sysutil.GetCurrentCgroupVersion() == sysutil.CgroupVersionV2 {
oldV, err = sysutil.ParseCPUCFSQuotaV2(oldValue)
if err != nil {
return newValue, false, fmt.Errorf("cannot parse old value %s, err: %v", oldValue, err)
}
if oldV == -1 {
oldV = int64(math.MaxInt64)
}
} else { // cgroup-v1 content: "-1", "100000"
if oldValue == sysutil.CgroupUnlimitedSymbolStr { // compatible with cgroup valued "max"
oldV = int64(math.MaxInt64)
} else {
oldV, err = strconv.ParseInt(oldValue, 10, 64)
if err != nil {
return newValue, false, fmt.Errorf("old value is not int64, err: %v", err)
}
}
}

return newValue, newV > oldV, nil
}

// MergeConditionIfCPUSetIsLooser returns a merge condition where only do update when the new cpuset value is looser.
func MergeConditionIfCPUSetIsLooser(oldValue, newValue string) (string, bool, error) {
v, err := cpuset.Parse(newValue)
Expand Down
150 changes: 150 additions & 0 deletions pkg/koordlet/resourceexecutor/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,156 @@ func TestCgroupResourceUpdater_Update(t *testing.T) {
}
}

func TestCgroupResourceUpdater_MergeUpdate(t *testing.T) {
type fields struct {
UseCgroupsV2 bool
initialValue string
}
type args struct {
resourceType sysutil.ResourceType
parentDir string
value string
}
tests := []struct {
name string
fields fields
args args
wantErr bool
wantMerged string
wantFinal string
}{
{
name: "merge update cfs quota",
fields: fields{
initialValue: "10000",
},
args: args{
resourceType: sysutil.CPUCFSQuotaName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "-1",
},
wantMerged: "-1",
wantFinal: "-1",
wantErr: false,
},
{
name: "merge update cfs quota (case 1)",
fields: fields{
initialValue: "-1",
},
args: args{
resourceType: sysutil.CPUCFSQuotaName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "10000",
},
wantMerged: "-1",
wantFinal: "10000",
wantErr: false,
},
{
name: "failed to merge update cfs quota on cgroup v2",
fields: fields{
UseCgroupsV2: true,
initialValue: "invalid content",
},
args: args{
resourceType: sysutil.CPUCFSQuotaName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "-1",
},
wantErr: true,
},
{
name: "merge update cfs quota on cgroup v2",
fields: fields{
UseCgroupsV2: true,
initialValue: "10000 100000",
},
args: args{
resourceType: sysutil.CPUCFSQuotaName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "-1",
},
wantMerged: "max", // should be `max 100000` in the real cgroup
wantFinal: "max", // should be `max 100000` in the real cgroup
wantErr: false,
},
{
name: "merge update cfs quota on cgroup v2 (case 1)",
fields: fields{
UseCgroupsV2: true,
initialValue: "max 100000",
},
args: args{
resourceType: sysutil.CPUCFSQuotaName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "200000",
},
wantMerged: "max 100000",
wantFinal: "200000", // should be `200000 100000` in the real cgroup
wantErr: false,
},
{
name: "merge update min",
fields: fields{
UseCgroupsV2: true,
initialValue: "1048576",
},
args: args{
resourceType: sysutil.MemoryMinName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "2097152",
},
wantMerged: "2097152",
wantFinal: "2097152",
wantErr: false,
},
{
name: "merge update min (case 1)",
fields: fields{
UseCgroupsV2: true,
initialValue: "2097152",
},
args: args{
resourceType: sysutil.MemoryMinName,
parentDir: "/kubepods.slice/kubepods.slice-podxxx",
value: "1048576",
},
wantMerged: "2097152",
wantFinal: "1048576",
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
helper := sysutil.NewFileTestUtil(t)
defer helper.Cleanup()
helper.SetCgroupsV2(tt.fields.UseCgroupsV2)

u, gotErr := DefaultCgroupUpdaterFactory.New(tt.args.resourceType, tt.args.parentDir, tt.args.value, nil)
assert.NoError(t, gotErr)
c, ok := u.(*CgroupResourceUpdater)
assert.True(t, ok)
if tt.fields.initialValue != "" {
helper.WriteCgroupFileContents(tt.args.parentDir, c.file, tt.fields.initialValue)
}

mergedUpdater, gotErr := u.MergeUpdate()
assert.Equal(t, tt.wantErr, gotErr != nil, gotErr)
if tt.wantErr {
return
}
assert.NotNil(t, mergedUpdater)
gotErr = mergedUpdater.update()
assert.NoError(t, gotErr)
assert.Equal(t, tt.wantMerged, helper.ReadCgroupFileContents(c.parentDir, c.file))
gotErr = u.update()
assert.NoError(t, gotErr)
assert.Equal(t, tt.wantFinal, helper.ReadCgroupFileContents(c.parentDir, c.file))
})
}
}

func TestDefaultResourceUpdater_Update(t *testing.T) {
type fields struct {
initialValue string
Expand Down
5 changes: 4 additions & 1 deletion pkg/koordlet/runtimehooks/protocol/container_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ func (c *ContainerContext) FromProxy(req *runtimeapi.ContainerResourceHookReques
c.Request.FromProxy(req)
}

func (c *ContainerContext) ProxyDone(resp *runtimeapi.ContainerResourceHookResponse) {
func (c *ContainerContext) ProxyDone(resp *runtimeapi.ContainerResourceHookResponse, executor resourceexecutor.ResourceUpdateExecutor) {
if c.executor == nil {
c.executor = executor
}
c.injectForExt()
c.Response.ProxyDone(resp)
c.Update()
Expand Down
10 changes: 5 additions & 5 deletions pkg/koordlet/runtimehooks/proxyserver/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *server) PreCreateContainerHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreCreateContainer, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PreCreateContainerHook response for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand All @@ -91,7 +91,7 @@ func (s *server) PreStartContainerHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreStartContainer, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PreStartContainerHook for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand All @@ -109,7 +109,7 @@ func (s *server) PostStartContainerHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PostStartContainer, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PostStartContainerHook for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand All @@ -127,7 +127,7 @@ func (s *server) PostStopContainerHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PostStopContainer, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PostStopContainerHook for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand All @@ -145,7 +145,7 @@ func (s *server) PreUpdateContainerResourcesHook(ctx context.Context,
containerCtx := &protocol.ContainerContext{}
containerCtx.FromProxy(req)
err := hooks.RunHooks(s.options.PluginFailurePolicy, rmconfig.PreUpdateContainerResources, containerCtx)
containerCtx.ProxyDone(resp)
containerCtx.ProxyDone(resp, s.options.Executor)
klog.V(5).Infof("send PreUpdateContainerResourcesHook for pod %v container %v response %v",
req.PodMeta.String(), req.ContainerMeta.String(), resp.String())
return resp, err
Expand Down
Loading
Loading