Skip to content

Commit

Permalink
koordlet: fix core sched conflicts with group identity
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Jan 12, 2024
1 parent 7c7a844 commit 9a7b20d
Show file tree
Hide file tree
Showing 11 changed files with 741 additions and 140 deletions.
6 changes: 6 additions & 0 deletions pkg/koordlet/koordlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/metricsadvisor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/prediction"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
statesinformerimpl "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/impl"
Expand All @@ -63,6 +64,7 @@ type daemon struct {
qosManager qosmanager.QOSManager
runtimeHook runtimehooks.RuntimeHook
predictServer prediction.PredictServer
executor resourceexecutor.ResourceUpdateExecutor
}

func NewDaemon(config *config.Configuration) (Daemon, error) {
Expand Down Expand Up @@ -115,6 +117,7 @@ func NewDaemon(config *config.Configuration) (Daemon, error) {
qosManager: qosManager,
runtimeHook: runtimeHook,
predictServer: predictServer,
executor: resourceexecutor.NewResourceUpdateExecutor(),
}

return d, nil
Expand Down Expand Up @@ -163,6 +166,9 @@ func (d *daemon) Run(stopCh <-chan struct{}) {
}
}()

// start resource executor before the writers modules
go d.executor.Run(stopCh)

// start qos manager
go func() {
if err := d.qosManager.Run(stopCh); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions pkg/koordlet/runtimehooks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ const (
CPUNormalization featuregate.Feature = "CPUNormalization"

// CoreSched manages Linux Core Scheduling cookies for containers who enable the core sched.
// NOTE: CoreSched is an alternative policy of the CPU QoS, and it is exclusive to the Group Identity feature.
// Enable the CoreSched should disable the GroupIdentity cgroups like resetting all the `cpu.bvt_warp_ns`.
//
// owner: @saintube @zwzhang0107
// alpha: v1.4
Expand Down
32 changes: 30 additions & 2 deletions pkg/koordlet/runtimehooks/hooks/coresched/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type Plugin struct {
sysSupported *bool
supportedMsg string
sysEnabled bool
giSupported *bool

cookieCache *gocache.Cache // core-sched-group-id -> cookie id, set<pid>; if the group has had cookie
cookieCacheRWMutex sync.RWMutex
Expand Down Expand Up @@ -151,6 +152,24 @@ func (p *Plugin) IsCacheInited() bool {
return p.initialized.Load()
}

// TryDisableGroupIdentity tries disabling the group identity via sysctl to safely enable the core sched.
func (p *Plugin) TryDisableGroupIdentity() error {
if p.giSupported == nil {
bvtConfigPath := sysutil.GetProcSysFilePath(sysutil.KernelSchedGroupIdentityEnable)
p.giSupported = pointer.Bool(sysutil.FileExists(bvtConfigPath))
}
if !*p.giSupported { // not support either the group identity or the core sched
return nil
}

err := sysutil.SetSchedGroupIdentity(false)
if err != nil {
return err
}

return nil
}

func (p *Plugin) SetKubeQOSCPUIdle(proto protocol.HooksProtocol) error {
kubeQOSCtx := proto.(*protocol.KubeQOSContext)
if kubeQOSCtx == nil {
Expand Down Expand Up @@ -200,8 +219,10 @@ func (p *Plugin) SetContainerCookie(proto protocol.HooksProtocol) error {
podUID := containerCtx.Request.PodMeta.UID
// only process sandbox container or container has valid ID
if len(podUID) <= 0 || len(containerCtx.Request.ContainerMeta.ID) <= 0 {
return fmt.Errorf("invalid container ID for plugin %s, pod UID %s, container ID %s",
name, podUID, containerCtx.Request.ContainerMeta.ID)
klog.V(5).Infof("aborted to manage cookie for container %s/%s, err: empty pod UID %s or container ID %s",
containerCtx.Request.PodMeta.String(), containerCtx.Request.ContainerMeta.Name,
podUID, containerCtx.Request.ContainerMeta.ID)
return nil
}

if !p.rule.IsInited() || !p.IsCacheInited() {
Expand All @@ -219,6 +240,13 @@ func (p *Plugin) SetContainerCookie(proto protocol.HooksProtocol) error {
// 1. disabled -> enabled: Add or Assign.
// 2. keep enabled: Check the differences of cookie, group ID and the PIDs, and do Assign.
if isEnabled {
// FIXME(saintube): Currently the kernel feature core scheduling is strictly excluded with the group identity's
// bvt=-1. So we have to check if the GroupIdentity can be disabled before creating core sched cookies.
// This check will be removed when the conflict between kernel features is resolved.
if err := p.TryDisableGroupIdentity(); err != nil {
return fmt.Errorf("failed to disable group identity for core sched, err: %w", err)
}

return p.enableContainerCookie(containerCtx, groupID)
}
// else pod disables
Expand Down
117 changes: 115 additions & 2 deletions pkg/koordlet/runtimehooks/hooks/coresched/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,97 @@ func TestPluginSystemSupported(t *testing.T) {
}
}

func TestPluginTryDisableGroupIdentity(t *testing.T) {
type fields struct {
prepareFn func(helper *sysutil.FileTestUtil)
giSupported *bool
}
tests := []struct {
name string
fields fields
wantErr bool
wantField *bool
wantExtra func(t *testing.T, helper *sysutil.FileTestUtil)
}{
{
name: "system does not support sysctl for group identity",
fields: fields{
giSupported: nil,
},
wantErr: false,
wantField: pointer.Bool(false),
},
{
name: "already know system does not support sysctl for group identity",
fields: fields{
giSupported: pointer.Bool(false),
},
wantErr: false,
wantField: pointer.Bool(false),
},
{
name: "successfully disable group identity",
fields: fields{
prepareFn: func(helper *sysutil.FileTestUtil) {
bvtConfigPath := sysutil.GetProcSysFilePath(sysutil.KernelSchedGroupIdentityEnable)
helper.WriteFileContents(bvtConfigPath, "1")
},
},
wantErr: false,
wantField: pointer.Bool(true),
wantExtra: func(t *testing.T, helper *sysutil.FileTestUtil) {
bvtConfigPath := sysutil.GetProcSysFilePath(sysutil.KernelSchedGroupIdentityEnable)
got := helper.ReadFileContents(bvtConfigPath)
assert.Equal(t, "0", got)
},
},
{
name: "successfully disable group identity for known sysctl support",
fields: fields{
prepareFn: func(helper *sysutil.FileTestUtil) {
bvtConfigPath := sysutil.GetProcSysFilePath(sysutil.KernelSchedGroupIdentityEnable)
helper.WriteFileContents(bvtConfigPath, "1")
},
giSupported: pointer.Bool(true),
},
wantErr: false,
wantField: pointer.Bool(true),
wantExtra: func(t *testing.T, helper *sysutil.FileTestUtil) {
bvtConfigPath := sysutil.GetProcSysFilePath(sysutil.KernelSchedGroupIdentityEnable)
got := helper.ReadFileContents(bvtConfigPath)
assert.Equal(t, "0", got)
},
},
{
name: "failed to disable group identity",
fields: fields{
giSupported: pointer.Bool(true),
},
wantErr: true,
wantField: pointer.Bool(true),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
helper := sysutil.NewFileTestUtil(t)
defer helper.Cleanup()
if tt.fields.prepareFn != nil {
tt.fields.prepareFn(helper)
}

p := newPlugin()
p.Setup(hooks.Options{})
p.giSupported = tt.fields.giSupported
gotErr := p.TryDisableGroupIdentity()
assert.Equal(t, tt.wantErr, gotErr != nil)
assert.Equal(t, tt.wantField, p.giSupported)
if tt.wantExtra != nil {
tt.wantExtra(t, helper)
}
})
}
}

func TestPlugin_SetContainerCookie(t *testing.T) {
type fields struct {
prepareFn func(helper *sysutil.FileTestUtil)
Expand Down Expand Up @@ -180,7 +271,7 @@ func TestPlugin_SetContainerCookie(t *testing.T) {
wantErr: true,
},
{
name: "missing container ID",
name: "abort for missing container ID",
fields: fields{
plugin: newPlugin(),
},
Expand All @@ -193,7 +284,7 @@ func TestPlugin_SetContainerCookie(t *testing.T) {
CgroupParent: "kubepods.slice/kubepods-podxxxxxx.slice/cri-containerd-yyyyyy.scope",
},
},
wantErr: true,
wantErr: false,
},
{
name: "rule has not initialized",
Expand All @@ -214,6 +305,28 @@ func TestPlugin_SetContainerCookie(t *testing.T) {
},
wantErr: false,
},
{
name: "failed to disable group identity",
fields: fields{
plugin: testGetEnabledPlugin(),
preparePluginFn: func(p *Plugin) {
p.giSupported = pointer.Bool(true)
},
},
arg: &protocol.ContainerContext{
Request: protocol.ContainerRequest{
PodMeta: protocol.PodMeta{
Name: "test-pod",
UID: "xxxxxx",
},
ContainerMeta: protocol.ContainerMeta{
ID: "containerd://yyyyyy",
},
CgroupParent: "kubepods.slice/kubepods-podxxxxxx.slice/cri-containerd-yyyyyy.scope",
},
},
wantErr: true,
},
{
name: "add cookie for LS container correctly",
fields: fields{
Expand Down
2 changes: 1 addition & 1 deletion pkg/koordlet/runtimehooks/hooks/coresched/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (p *Plugin) ruleUpdateCb(target *statesinformer.CallbackTarget) error {
return nil
}

// TBD: try to enable the kernel feature if needed
// check the kernel feature and enable if needed
if supported, msg := p.SystemSupported(); !supported {
klog.V(4).Infof("plugin %s is not supported by system, msg: %s", name, msg)
return nil
Expand Down
3 changes: 2 additions & 1 deletion pkg/koordlet/runtimehooks/hooks/groupidentity/bvt.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,10 @@ func (b *bvtPlugin) initialize() error {
}

// try to set bvt kernel enabled via sysctl when the sysctl config is disabled or unknown
// https://github.com/koordinator-sh/koordinator/pull/1172
err := sysutil.SetSchedGroupIdentity(true)
if err != nil {
return fmt.Errorf("cannot enable kernel sysctl for bvt, err: %v", err)
return fmt.Errorf("cannot enable kernel sysctl for bvt, err: %w", err)
}
b.kernelEnabled = pointer.Bool(true)
klog.V(4).Infof("hook plugin %s is successfully initialized", name)
Expand Down
72 changes: 68 additions & 4 deletions pkg/koordlet/runtimehooks/hooks/groupidentity/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,22 +155,46 @@ func (b *bvtPlugin) ruleUpdateCb(target *statesinformer.CallbackTarget) error {
klog.V(5).Infof("hook plugin rule is nil, nothing to do for plugin %v", name)
return nil
}

qosCgroupMap := map[string]struct{}{}
for _, kubeQOS := range []corev1.PodQOSClass{
corev1.PodQOSGuaranteed, corev1.PodQOSBurstable, corev1.PodQOSBestEffort} {
bvtValue := r.getKubeQOSDirBvtValue(kubeQOS)
kubeQOSCgroupPath := koordletutil.GetPodQoSRelativePath(kubeQOS)
e := audit.V(3).Group(string(kubeQOS)).Reason(name).Message("set bvt to %v", bvtValue)
bvtUpdater, err := resourceexecutor.DefaultCgroupUpdaterFactory.New(sysutil.CPUBVTWarpNsName, kubeQOSCgroupPath, strconv.FormatInt(bvtValue, 10), e)
if err != nil {
klog.Infof("bvtupdater create failed, dir %v, error %v", kubeQOSCgroupPath, err)
klog.Infof("bvt updater create failed, dir %v, error %v", kubeQOSCgroupPath, err)
}
if _, err := b.executor.Update(true, bvtUpdater); err != nil {
if _, err = b.executor.Update(true, bvtUpdater); err != nil {
klog.Infof("update kube qos %v cpu bvt failed, dir %v, error %v", kubeQOS, kubeQOSCgroupPath, err)
}
qosCgroupMap[kubeQOSCgroupPath] = struct{}{}
}

if target == nil {
return fmt.Errorf("callback target is nil")
}

// FIXME(saintube): Currently the kernel feature core scheduling is strictly excluded with the group identity's
// bvt=-1. So we have to check and disable all the BE cgroups' bvt for the GroupIdentity before creating the
// core sched cookies. To keep the consistency of the cgroup tree's configuration, we list and update the cgroups
// by the level order when the group identity is globally disabled.
podCgroupMap := map[string]int64{}
// pod-level
for _, kubeQOS := range []corev1.PodQOSClass{corev1.PodQOSGuaranteed, corev1.PodQOSBurstable, corev1.PodQOSBestEffort} {
bvtValue := r.getKubeQOSDirBvtValue(kubeQOS)
podCgroupDirs, err := koordletutil.GetCgroupPathsByTargetDepth(kubeQOS, sysutil.CPUBVTWarpNsName, koordletutil.PodCgroupPathRelativeDepth)
if err != nil {
return fmt.Errorf("get pod cgroup paths failed, qos %s, err: %w", kubeQOS, err)
}
for _, cgroupDir := range podCgroupDirs {
if _, ok := qosCgroupMap[cgroupDir]; ok { // exclude qos cgroup
continue
}
podCgroupMap[cgroupDir] = bvtValue
}
}
for _, podMeta := range target.Pods {
podQOS := ext.GetPodQoSClassRaw(podMeta.Pod)
podKubeQOS := podMeta.Pod.Status.QOSClass
Expand All @@ -179,12 +203,13 @@ func (b *bvtPlugin) ruleUpdateCb(target *statesinformer.CallbackTarget) error {
e := audit.V(3).Pod(podMeta.Pod.Namespace, podMeta.Pod.Name).Reason(name).Message("set bvt to %v", podBvt)
bvtUpdater, err := resourceexecutor.DefaultCgroupUpdaterFactory.New(sysutil.CPUBVTWarpNsName, podCgroupPath, strconv.FormatInt(podBvt, 10), e)
if err != nil {
klog.Infof("bvtupdater create failed, dir %v, error %v", podCgroupPath, err)
klog.Infof("bvt updater create failed, dir %v, error %v", podCgroupPath, err)
}
if _, err := b.executor.Update(true, bvtUpdater); err != nil {
if _, err = b.executor.Update(true, bvtUpdater); err != nil {
klog.Infof("update pod %s cpu bvt failed, dir %v, error %v",
util.GetPodKey(podMeta.Pod), podCgroupPath, err)
}
delete(podCgroupMap, podCgroupPath)
}
for _, hostApp := range target.HostApplications {
hostCtx := protocol.HooksProtocolBuilder.HostApp(&hostApp)
Expand All @@ -195,6 +220,45 @@ func (b *bvtPlugin) ruleUpdateCb(target *statesinformer.CallbackTarget) error {
klog.V(5).Infof("set host application %v bvt value finished", hostApp.Name)
}
}
// handle the remaining pod cgroups, which can belong to the dangling pods
for podCgroupDir, bvtValue := range podCgroupMap {
e := audit.V(3).Reason(name).Message("set bvt to %v", bvtValue)
bvtUpdater, err := resourceexecutor.DefaultCgroupUpdaterFactory.New(sysutil.CPUBVTWarpNsName, podCgroupDir, strconv.FormatInt(bvtValue, 10), e)
if err != nil {
klog.Infof("bvt updater create failed, dir %v, error %v", podCgroupDir, err)
}
if _, err = b.executor.Update(true, bvtUpdater); err != nil {
klog.Infof("update remaining pod cpu bvt failed, dir %v, error %v", podCgroupDir, err)
}
}

// container-level
// NOTE: Although we do not set the container's cpu.bvt_warp_ns directly, it is inheritable from the pod-level,
// we have to handle the container-level only when we want to disable the group identity.
if !r.getEnable() {
for _, kubeQOS := range []corev1.PodQOSClass{corev1.PodQOSGuaranteed, corev1.PodQOSBurstable, corev1.PodQOSBestEffort} {
bvtValue := r.getKubeQOSDirBvtValue(kubeQOS)
containerCgroupDirs, err := koordletutil.GetCgroupPathsByTargetDepth(kubeQOS, sysutil.CPUBVTWarpNsName, koordletutil.ContainerCgroupPathRelativeDepth)
if err != nil {
return fmt.Errorf("get container cgroup paths failed, qos %s, err: %w", kubeQOS, err)
}
for _, cgroupDir := range containerCgroupDirs {
if _, ok := podCgroupMap[cgroupDir]; ok {
continue
}

e := audit.V(3).Reason(name).Message("set bvt to %v", bvtValue)
bvtUpdater, err := resourceexecutor.DefaultCgroupUpdaterFactory.New(sysutil.CPUBVTWarpNsName, cgroupDir, strconv.FormatInt(bvtValue, 10), e)
if err != nil {
klog.Infof("bvt updater create failed, dir %v, error %v", cgroupDir, err)
}
if _, err = b.executor.Update(true, bvtUpdater); err != nil {
klog.Infof("update container cpu bvt failed, dir %v, error %v", cgroupDir, err)
}
}
}
}

return nil
}

Expand Down
Loading

0 comments on commit 9a7b20d

Please sign in to comment.