Skip to content

Commit

Permalink
koordlet: revise base collectors and system status check (#1877)
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Feb 26, 2024
1 parent 85de463 commit 370fd51
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 35 deletions.
1 change: 1 addition & 0 deletions pkg/koordlet/koordlet.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func NewDaemon(config *config.Configuration) (Daemon, error) {
klog.Infof("NODE_NAME is %v, start time %v", nodeName, float64(time.Now().Unix()))
metrics.RecordKoordletStartTime(nodeName, float64(time.Now().Unix()))

system.InitSupportConfigs()
klog.Infof("sysconf: %+v, agentMode: %v", system.Conf, system.AgentMode)
klog.Infof("kernel version INFO: %+v", system.HostSystemInfo)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,15 @@ func (n *nodeResourceCollector) collectNodeResUsed() {
}
nodeMetrics = append(nodeMetrics, cpuUsageMetrics)

for _, deviceCollector := range n.deviceCollectors {
if metric, _ := deviceCollector.GetNodeMetric(); metric != nil {
for name, deviceCollector := range n.deviceCollectors {
if !deviceCollector.Enabled() {
klog.V(6).Infof("skip node metrics from the disabled device collector %s", name)
continue
}

if metric, err := deviceCollector.GetNodeMetric(); err != nil {
klog.Warningf("get node metrics from the device collector %s failed, err: %s", name, err)
} else {
nodeMetrics = append(nodeMetrics, metric...)
}
if info := deviceCollector.Infos(); info != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,15 @@ DirectMap1G: 0 kB`)
CPUTick: 0,
Timestamp: testLastCPUStatTime,
}
testDeviceCollector := &fakeDeviceCollector{isEnabled: true}

c := &nodeResourceCollector{
started: atomic.NewBool(false),
appendableDB: metricCache,
metricDB: metricCache,
lastNodeCPUStat: testLastCPUStat,
deviceCollectors: map[string]framework.DeviceCollector{
"TestDeviceCollector": &fakeDeviceCollector{},
"TestDeviceCollector": testDeviceCollector,
},
sharedState: framework.NewSharedState(),
}
Expand All @@ -167,6 +168,24 @@ DirectMap1G: 0 kB`)
assert.Equal(t, wantCPU, nodeCPU.Value)
assert.Equal(t, wantMemory, nodeMemory.Value)

// test collect without device collector
testDeviceCollector.isEnabled = false
testDeviceCollector.getNodeMetric = func() ([]metriccache.MetricSample, error) {
panic("should not be called")
}
assert.NotPanics(t, func() {
c.collectNodeResUsed()
})
assert.True(t, c.Started())
// validate collected values
// assert collected time is less than 10s
got, err = testGetNodeMetrics(t, c.metricDB, testNow, 5*time.Second)
wantCPU = testCPUUsage
assert.Equal(t, wantCPU, float64(got.Cpu().MilliValue()/1000))
// MemTotal - MemAvailable
wantMemory = float64(524288 * 1024)
assert.Equal(t, wantMemory, float64(got.Memory().Value()))

// test first cpu collection
c.lastNodeCPUStat = nil
assert.NotPanics(t, func() {
Expand All @@ -185,9 +204,18 @@ DirectMap1G: 0 kB`)

type fakeDeviceCollector struct {
framework.DeviceCollector
isEnabled bool
getNodeMetric func() ([]metriccache.MetricSample, error)
}

func (f *fakeDeviceCollector) Enabled() bool {
return f.isEnabled
}

func (f *fakeDeviceCollector) GetNodeMetric() ([]metriccache.MetricSample, error) {
if f.getNodeMetric != nil {
return f.getNodeMetric()
}
return nil, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ func (p *podResourceCollector) collectPodResUsed() {

metrics = append(metrics, cpuUsageMetric, memUsageMetric)
for deviceName, deviceCollector := range p.deviceCollectors {
if !deviceCollector.Enabled() {
klog.V(6).Infof("skip pod metrics from the disabled device collector %s, pod %s", deviceName, podKey)
continue
}

if deviceMetrics, err := deviceCollector.GetPodMetric(uid, meta.CgroupDir, pod.Status.ContainerStatuses); err != nil {
klog.V(4).Infof("get pod %s device usage failed for %v, error: %v", podKey, deviceName, err)
} else if len(metrics) > 0 {
Expand Down Expand Up @@ -262,6 +267,11 @@ func (p *podResourceCollector) collectContainerResUsed(meta *statesinformer.PodM
containerMetrics = append(containerMetrics, cpuUsageMetric, memUsageMetric)

for deviceName, deviceCollector := range p.deviceCollectors {
if !deviceCollector.Enabled() {
klog.V(6).Infof("skip container metrics from the disabled device collector %s, container %s", deviceName, containerKey)
continue
}

if metrics, err := deviceCollector.GetContainerMetric(containerStat.ContainerID, meta.CgroupDir, containerStat); err != nil {
klog.Warningf("get container %s device usage failed for %v, error: %v", containerKey, deviceName, err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func Test_collector_collectPodResUsed(t *testing.T) {
}
type fields struct {
podFilterOption framework.PodFilter
deviceCollectors map[string]framework.DeviceCollector
getPodMetas []*statesinformer.PodMeta
initPodLastStat func(lastState *gocache.Cache)
initContainerLastStat func(lastState *gocache.Cache)
Expand All @@ -96,6 +97,9 @@ func Test_collector_collectPodResUsed(t *testing.T) {
name: "cgroups v1",
fields: fields{
podFilterOption: framework.DefaultPodFilter,
deviceCollectors: map[string]framework.DeviceCollector{
"TestDeviceCollector": &fakeDeviceCollector{isEnabled: true},
},
getPodMetas: []*statesinformer.PodMeta{
{
CgroupDir: testPodMetaDir,
Expand Down Expand Up @@ -150,6 +154,9 @@ total_unevictable 0
name: "cgroups v2",
fields: fields{
podFilterOption: framework.DefaultPodFilter,
deviceCollectors: map[string]framework.DeviceCollector{
"TestDeviceCollector": &fakeDeviceCollector{isEnabled: true},
},
getPodMetas: []*statesinformer.PodMeta{
{
CgroupDir: testPodMetaDir,
Expand Down Expand Up @@ -212,9 +219,20 @@ unevictable 0
},
},
{
name: "cgroups v1, filter non-running pods",
name: "cgroups v1, filter non-running pods, skip disabled device collector",
fields: fields{
podFilterOption: &framework.TerminatedPodFilter{},
deviceCollectors: map[string]framework.DeviceCollector{
"TestDeviceCollector": &fakeDeviceCollector{
isEnabled: false,
getPodMetric: func(uid, podParentDir string, cs []corev1.ContainerStatus) ([]metriccache.MetricSample, error) {
panic("should not be called")
},
getContainerMetric: func(uid, podParentDir string, c *corev1.ContainerStatus) ([]metriccache.MetricSample, error) {
panic("should not be called")
},
},
},
getPodMetas: []*statesinformer.PodMeta{
{
CgroupDir: testPodMetaDir,
Expand Down Expand Up @@ -304,7 +322,8 @@ total_unevictable 0
},
})
collector.Setup(&framework.Context{
State: framework.NewSharedState(),
State: framework.NewSharedState(),
DeviceCollectors: tt.fields.deviceCollectors,
})
c := collector.(*podResourceCollector)
tt.fields.initPodLastStat(c.lastPodCPUStat)
Expand Down Expand Up @@ -350,3 +369,28 @@ func Test_podResourceCollector_Run(t *testing.T) {
close(stopCh)
})
}

type fakeDeviceCollector struct {
framework.DeviceCollector
isEnabled bool
getPodMetric func(uid, podParentDir string, cs []corev1.ContainerStatus) ([]metriccache.MetricSample, error)
getContainerMetric func(uid, podParentDir string, c *corev1.ContainerStatus) ([]metriccache.MetricSample, error)
}

func (f *fakeDeviceCollector) Enabled() bool {
return f.isEnabled
}

func (f *fakeDeviceCollector) GetPodMetric(uid, podParentDir string, cs []corev1.ContainerStatus) ([]metriccache.MetricSample, error) {
if f.getPodMetric != nil {
return f.getPodMetric(uid, podParentDir, cs)
}
return nil, nil
}

func (f *fakeDeviceCollector) GetContainerMetric(containerID, podParentDir string, c *corev1.ContainerStatus) ([]metriccache.MetricSample, error) {
if f.getContainerMetric != nil {
return f.getContainerMetric(containerID, podParentDir, c)
}
return nil, nil
}
2 changes: 1 addition & 1 deletion pkg/koordlet/runtimehooks/runtimehooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook,
}
nriServer, err = nri.NewNriServer(nriServerOptions)
if err != nil {
klog.Errorf("new nri mode runtimehooks server error: %v", err)
klog.Warningf("new nri mode runtimehooks server error: %v", err)
}
} else {
klog.V(4).Info("nri mode runtimehooks is disabled")
Expand Down
17 changes: 13 additions & 4 deletions pkg/koordlet/util/system/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"os"

"go.uber.org/atomic"
"k8s.io/klog/v2"
)

const (
Expand Down Expand Up @@ -57,10 +58,20 @@ func init() {
}
}

func initSupportConfigs() {
// InitSupportConfigs initializes the system support status.
// e.g. the cgroup version, resctrl capability
func InitSupportConfigs() {
// $ getconf CLK_TCK > jiffies
if err := initJiffies(); err != nil {
klog.Warningf("failed to get Jiffies, use the default %v, err: %v", Jiffies, err)
}
initCgroupsVersion()
HostSystemInfo = collectVersionInfo()
_, _ = IsSupportResctrl()
if isResctrlSupported, err := IsSupportResctrl(); err != nil {
klog.Warningf("failed to check resctrl support status, use %d, err: %v", isResctrlSupported, err)
} else {
klog.V(4).Infof("resctrl supported: %v", isResctrlSupported)
}
}

func NewHostModeConfig() *Config {
Expand Down Expand Up @@ -110,6 +121,4 @@ func (c *Config) InitFlags(fs *flag.FlagSet) {
fs.StringVar(&c.PouchEndpoint, "pouch-endpoint", c.PouchEndpoint, "pouch endPoint")

fs.StringVar(&c.DefaultRuntimeType, "default-runtime-type", c.DefaultRuntimeType, "default runtime type during runtime hooks handle request, candidates are containerd/docker/pouch.")

initSupportConfigs()
}
51 changes: 51 additions & 0 deletions pkg/koordlet/util/system/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,54 @@ func Test_InitFlags(t *testing.T) {
assert.NotNil(t, cfg)
})
}

func Test_InitSupportConfigs(t *testing.T) {
type fields struct {
prepareFn func(helper *FileTestUtil)
}
type expects struct {
hostSystemInfo VersionInfo
}
tests := []struct {
name string
fields fields
expects expects
}{
{
name: "not anolis os, not support resctrl",
fields: fields{
prepareFn: func(helper *FileTestUtil) {},
},
expects: expects{
hostSystemInfo: VersionInfo{
IsAnolisOS: false,
},
},
},
{
name: "anolis os, not support resctrl",
fields: fields{
prepareFn: func(helper *FileTestUtil) {
bvtResource, _ := GetCgroupResource(CPUBVTWarpNsName)
helper.WriteCgroupFileContents("", bvtResource, "0")
},
},
expects: expects{
hostSystemInfo: VersionInfo{
IsAnolisOS: true,
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
helper := NewFileTestUtil(t)
defer helper.Cleanup()
if tt.fields.prepareFn != nil {
tt.fields.prepareFn(helper)
}
InitSupportConfigs()
assert.Equal(t, tt.expects.hostSystemInfo, HostSystemInfo)
})
}
}
17 changes: 1 addition & 16 deletions pkg/koordlet/util/system/resctrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func IsSupportResctrl() (bool, error) {
return false, err
}
// Kernel cmdline not set resctrl features does not ensure feature must be disabled.
klog.V(4).Infof("isResctrlAvailableByKernelCmd result, cpuSupport: %v, kernelSupport: %v",
klog.Infof("IsSupportResctrl result, cpuSupport: %v, kernelSupport: %v",
cpuSupport, kernelCmdSupport)
isSupportResctrl = cpuSupport
isInit = true
Expand Down Expand Up @@ -560,21 +560,6 @@ func CheckAndTryEnableResctrlCat() error {
return nil
}

func InitCatGroupIfNotExist(group string) error {
path := GetResctrlGroupRootDirPath(group)
_, err := os.Stat(path)
if err == nil {
return nil
} else if !os.IsNotExist(err) {
return fmt.Errorf("check dir %v for group %s but got unexpected err: %v", path, group, err)
}
err = os.Mkdir(path, 0755)
if err != nil {
return fmt.Errorf("create dir %v failed for group %s, err: %v", path, group, err)
}
return nil
}

func CheckResctrlSchemataValid() error {
schemataPath := GetResctrlSchemataFilePath("")
schemataRaw, err := ReadResctrlSchemataRaw(schemataPath, -1)
Expand Down
9 changes: 1 addition & 8 deletions pkg/koordlet/util/system/system_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,12 @@ var (
Jiffies = float64(10 * time.Millisecond)
)

func init() {
// $ getconf CLK_TCK > jiffies
if err := initJiffies(); err != nil {
klog.Warningf("failed to get Jiffies, use the default %v, err: %v", Jiffies, err)
}
}

// initJiffies use command "getconf CLK_TCK" to fetch the clock tick on current host,
// if the command doesn't exist, uses the default value 10ms for jiffies
func initJiffies() error {
getconf, err := exec.LookPath("getconf")
if err != nil {
return nil
return err
}
cmd := exec.Command(getconf, "CLK_TCK")
var out bytes.Buffer
Expand Down
2 changes: 1 addition & 1 deletion pkg/koordlet/util/system/util_test_tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewFileTestUtil(t testing.TB) *FileTestUtil {
Conf.SysFSRootDir = filepath.Join(tempDir, "fs")
Conf.VarRunRootDir = tempDir

initSupportConfigs()
InitSupportConfigs()

return &FileTestUtil{
TempDir: tempDir,
Expand Down

0 comments on commit 370fd51

Please sign in to comment.