diff --git a/pkg/koordlet/metricsadvisor/be_collector.go b/pkg/koordlet/metricsadvisor/be_collector.go new file mode 100644 index 000000000..6979d1a6e --- /dev/null +++ b/pkg/koordlet/metricsadvisor/be_collector.go @@ -0,0 +1,143 @@ +/* + Copyright 2022 The Koordinator Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package metricsadvisor + +import ( + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/klog/v2" + + apiext "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache" + "github.com/koordinator-sh/koordinator/pkg/util" +) + +func (c *collector) collectBECPUResourceMetric() { + klog.V(6).Info("collectBECPUResourceMetric start") + + realMilliLimit, err := getBECPURealMilliLimit() + if err != nil { + klog.Errorf("getBECPURealMilliLimit failed, error: %v", err) + return + } + + beCPURequest := c.getBECPURequestSum() + + beCPUUsageCores, err := c.getBECPUUsageCores() + if err != nil { + klog.Errorf("getBECPUUsageCores failed, error: %v", err) + return + } + + if beCPUUsageCores == nil { + klog.Info("beCPUUsageCores is nil") + return + } + + beCPUMetric := metriccache.BECPUResourceMetric{ + CPUUsed: *beCPUUsageCores, + CPURealLimit: *resource.NewMilliQuantity(int64(realMilliLimit), resource.DecimalSI), + CPURequest: beCPURequest, + } + + collectTime := time.Now() + err = c.metricCache.InsertBECPUResourceMetric(collectTime, &beCPUMetric) + if err != nil { + klog.Errorf("InsertBECPUResourceMetric failed, error: %v", err) + return + } + klog.V(6).Info("collectBECPUResourceMetric finished") +} + +func getBECPURealMilliLimit() (int, error) { + limit := 0 + + cpuSet, err := util.GetRootCgroupCurCPUSet(corev1.PodQOSBestEffort) + if err != nil { + return 0, err + } + limit = len(cpuSet) * 1000 + + cfsQuota, err := util.GetRootCgroupCurCFSQuota(corev1.PodQOSBestEffort) + if err != nil { + return 0, err + } + + // -1 means not suppress by cfs_quota + if cfsQuota == -1 { + return limit, nil + } + + cfsPeriod, err := util.GetRootCgroupCurCFSPeriod(corev1.PodQOSBestEffort) + if err != nil { + return 0, err + } + + limitByCfsQuota := int(cfsQuota * 1000 / cfsPeriod) + + if limitByCfsQuota < limit { + limit = limitByCfsQuota + } + + return limit, nil +} + +func (c *collector) getBECPURequestSum() resource.Quantity { + requestSum := int64(0) + for _, podMeta := range c.statesInformer.GetAllPods() { + pod := podMeta.Pod + if apiext.GetPodQoSClass(pod) == apiext.QoSBE { + podCPUReq := util.GetPodBEMilliCPURequest(pod) + if podCPUReq > 0 { + requestSum += podCPUReq + } + } + } + return *resource.NewMilliQuantity(requestSum, resource.DecimalSI) +} + +func (c *collector) getBECPUUsageCores() (*resource.Quantity, error) { + klog.V(6).Info("getBECPUUsageCores start") + + collectTime := time.Now() + + currentCPUTick, err := util.GetRootCgroupCPUStatUsageTicks(corev1.PodQOSBestEffort) + if err != nil { + klog.Warningf("failed to collect be cgroup usage, error: %v", err) + return nil, err + } + + lastCPUStat := c.context.lastBECPUStat + c.context.lastBECPUStat = contextRecord{ + cpuTick: currentCPUTick, + ts: collectTime, + } + + if lastCPUStat.cpuTick <= 0 { + klog.V(6).Infof("ignore the first cpu stat collection") + return nil, nil + } + + // NOTICE: do subtraction and division first to avoid overflow + cpuUsageValue := float64(currentCPUTick-lastCPUStat.cpuTick) / float64(collectTime.Sub(lastCPUStat.ts)) * jiffies + // 1.0 CPU = 1000 Milli-CPU + cpuUsageCores := resource.NewMilliQuantity(int64(cpuUsageValue*1000), resource.DecimalSI) + klog.V(6).Infof("collectBECPUUsageCores finished %.2f", cpuUsageValue) + return cpuUsageCores, nil +} diff --git a/pkg/koordlet/metricsadvisor/be_collector_test.go b/pkg/koordlet/metricsadvisor/be_collector_test.go new file mode 100644 index 000000000..8d840f6d0 --- /dev/null +++ b/pkg/koordlet/metricsadvisor/be_collector_test.go @@ -0,0 +1,256 @@ +/* + Copyright 2022 The Koordinator Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package metricsadvisor + +import ( + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + apiext "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" + mock_statesinformer "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/mockstatesinformer" + "github.com/koordinator-sh/koordinator/pkg/util" + "github.com/koordinator-sh/koordinator/pkg/util/system" +) + +func Test_collectBECPUResourceMetric(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + metricCache, _ := metriccache.NewMetricCache(metriccache.NewDefaultConfig()) + mockStatesInformer := mock_statesinformer.NewMockStatesInformer(ctrl) + collector := collector{context: newCollectContext(), metricCache: metricCache, statesInformer: mockStatesInformer} + + // prepare be request, expect 1500 milliCores + bePod := mockBEPod() + lsPod := mockLSPod() + mockStatesInformer.EXPECT().GetAllPods().Return([]*statesinformer.PodMeta{{Pod: bePod}, {Pod: lsPod}}).AnyTimes() + + // prepare BECPUUsageCores data,expect 4 cores usage + collector.context.lastBECPUStat = contextRecord{cpuTick: 1200000, ts: time.Now().Add(-1 * time.Second)} + helper := system.NewFileTestUtil(t) + helper.WriteCgroupFileContents(util.GetKubeQosRelativePath(corev1.PodQOSBestEffort), system.CpuacctStat, "user 400380\nsystem 800020\n") + + // prepare limit data,expect 8 cores limit + helper.WriteCgroupFileContents(util.GetKubeQosRelativePath(corev1.PodQOSBestEffort), system.CPUSet, "1-15") + helper.WriteCgroupFileContents(util.GetKubeQosRelativePath(corev1.PodQOSBestEffort), system.CPUCFSQuota, "800000") + helper.WriteCgroupFileContents(util.GetKubeQosRelativePath(corev1.PodQOSBestEffort), system.CPUCFSPeriod, "100000") + + collector.collectBECPUResourceMetric() + + oldStartTime := time.Unix(0, 0) + now := time.Now() + params := &metriccache.QueryParam{ + Aggregate: metriccache.AggregationTypeLast, + Start: &oldStartTime, + End: &now, + } + + got := collector.metricCache.GetBECPUResourceMetric(params) + gotMetric := got.Metric + + assert.Equal(t, int64(1500), gotMetric.CPURequest.MilliValue(), "checkRequest") + assert.Equal(t, int64(4), gotMetric.CPUUsed.Value(), "checkUsage") + assert.Equal(t, int64(8), gotMetric.CPURealLimit.Value(), "checkLimit") +} + +func Test_getBECPUUsageCores(t *testing.T) { + tests := []struct { + name string + cpuacctStat string + lastBeCPUStat *contextRecord + expectCPUUsedCores *resource.Quantity + expectCurrentCPUTick uint64 + expectNil bool + expectError bool + }{ + { + name: "test_get_first_time", + cpuacctStat: "user 400000\nsystem 800000\n", + lastBeCPUStat: nil, + expectCPUUsedCores: nil, + expectCurrentCPUTick: 1200000, + expectNil: true, + expectError: false, + }, + { + name: "test_get_correct", + cpuacctStat: "user 400380\nsystem 800020\n", + lastBeCPUStat: &contextRecord{cpuTick: 1200000}, + expectCPUUsedCores: resource.NewQuantity(4, resource.DecimalSI), + expectCurrentCPUTick: 1200400, + expectNil: false, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + helper := system.NewFileTestUtil(t) + helper.WriteCgroupFileContents(util.GetKubeQosRelativePath(corev1.PodQOSBestEffort), system.CpuacctStat, tt.cpuacctStat) + + collector := collector{context: newCollectContext()} + if tt.lastBeCPUStat != nil { + collector.context.lastBECPUStat = *tt.lastBeCPUStat + collector.context.lastBECPUStat.ts = time.Now().Add(-1 * time.Second) + } + + gotCPUUsedCores, gotErr := collector.getBECPUUsageCores() + assert.Equal(t, tt.expectError, gotErr != nil, "checkError") + if !tt.expectNil { + assert.Equal(t, tt.expectCPUUsedCores.Value(), gotCPUUsedCores.Value(), "checkCPU") + } + assert.Equal(t, tt.expectCurrentCPUTick, collector.context.lastBECPUStat.cpuTick, "checkCPUTick") + }) + } +} + +func Test_getBECPURealMilliLimit(t *testing.T) { + + tests := []struct { + name string + cpuset string + cfsQuota string + expect int + }{ + { + name: "test_suppress_by_cpuset", + cpuset: "1-2", + cfsQuota: "-1", + expect: 2000, + }, + { + name: "test_suppress_by_cfsquota", + cpuset: "1-15", + cfsQuota: "800000", + expect: 8000, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + helper := system.NewFileTestUtil(t) + + helper.WriteCgroupFileContents(util.GetKubeQosRelativePath(corev1.PodQOSBestEffort), system.CPUSet, tt.cpuset) + helper.WriteCgroupFileContents(util.GetKubeQosRelativePath(corev1.PodQOSBestEffort), system.CPUCFSQuota, tt.cfsQuota) + helper.WriteCgroupFileContents(util.GetKubeQosRelativePath(corev1.PodQOSBestEffort), system.CPUCFSPeriod, "100000") + + milliLimit, err := getBECPURealMilliLimit() + assert.NoError(t, err) + assert.Equal(t, tt.expect, milliLimit) + }) + } +} + +func Test_getBECPURequestSum(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + mockStatesInformer := mock_statesinformer.NewMockStatesInformer(ctl) + + bePod := mockBEPod() + lsPod := mockLSPod() + mockStatesInformer.EXPECT().GetAllPods().Return([]*statesinformer.PodMeta{{Pod: bePod}, {Pod: lsPod}}).AnyTimes() + + c := &collector{statesInformer: mockStatesInformer} + beRequest := c.getBECPURequestSum() + assert.Equal(t, int64(1500), beRequest.MilliValue()) +} + +func mockBEPod() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-ns", + Name: "test-name-be", + UID: "test-pod-uid-be", + Labels: map[string]string{ + apiext.LabelPodQoS: string(apiext.QoSBE), + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container-1", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + apiext.BatchCPU: *resource.NewQuantity(500, resource.DecimalSI), + }, + Requests: corev1.ResourceList{ + apiext.BatchCPU: *resource.NewQuantity(500, resource.DecimalSI), + }, + }, + }, + { + Name: "test-container-2", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + apiext.BatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), + }, + Requests: corev1.ResourceList{ + apiext.BatchCPU: *resource.NewQuantity(1000, resource.DecimalSI), + }, + }, + }, + }, + }, + } +} + +func mockLSPod() *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-ns", + Name: "test-name-ls", + UID: "test-pod-uid-ls", + Labels: map[string]string{ + apiext.LabelPodQoS: string(apiext.QoSLS), + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container-1", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(1, resource.DecimalSI), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(1, resource.DecimalSI), + }, + }, + }, + { + Name: "test-container-2", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(2, resource.DecimalSI), + }, + }, + }, + }, + }, + } +} diff --git a/pkg/koordlet/metricsadvisor/collector.go b/pkg/koordlet/metricsadvisor/collector.go index 174a2ad0b..9162f0323 100644 --- a/pkg/koordlet/metricsadvisor/collector.go +++ b/pkg/koordlet/metricsadvisor/collector.go @@ -63,7 +63,7 @@ type contextRecord struct { type collectContext struct { // record latest cpu stat for calculate resource used - // lastBeCPUStat contextRecord + lastBECPUStat contextRecord lastNodeCPUStat contextRecord lastPodCPUStat sync.Map lastContainerCPUStat sync.Map @@ -133,6 +133,7 @@ func (c *collector) Run(stopCh <-chan struct{}) error { os.Exit(1) return } + c.collectBECPUResourceMetric() c.collectPodResUsed() c.collectPodThrottledInfo() }, time.Duration(c.config.CollectResUsedIntervalSeconds)*time.Second, stopCh) diff --git a/pkg/util/container.go b/pkg/util/container.go index b644d7280..4be25a548 100644 --- a/pkg/util/container.go +++ b/pkg/util/container.go @@ -105,8 +105,8 @@ func GetContainerBEMilliCPURequest(c *corev1.Container) int64 { } func GetContainerBEMilliCPULimit(c *corev1.Container) int64 { - if cpuRequest, ok := c.Resources.Limits[extension.BatchCPU]; ok { - return cpuRequest.Value() + if cpuLimit, ok := c.Resources.Limits[extension.BatchCPU]; ok { + return cpuLimit.Value() } return -1 } @@ -228,16 +228,6 @@ func FindContainerIdAndStatusByName(status *corev1.PodStatus, name string) (stri return "", nil, fmt.Errorf("unable to find ID for container with name %v in pod status (it may not be running)", name) } -func FindContainerStatusByID(pod *corev1.Pod, containerID string) *corev1.ContainerStatus { - for _, containerStatus := range pod.Status.ContainerStatuses { - _, cID, _ := ParseContainerId(containerStatus.ContainerID) - if containerID == cID { - return &containerStatus - } - } - return nil -} - func ParseContainerId(data string) (cType, cID string, err error) { // Trim the quotes and split the type and ID. parts := strings.Split(strings.Trim(data, "\""), "://") diff --git a/pkg/util/node.go b/pkg/util/node.go index 4c61e595e..9b61609d6 100644 --- a/pkg/util/node.go +++ b/pkg/util/node.go @@ -19,6 +19,8 @@ package util import ( "fmt" "path" + "strconv" + "strings" corev1 "k8s.io/api/core/v1" @@ -57,3 +59,19 @@ func GetRootCgroupCurCPUSet(qosClass corev1.PodQOSClass) ([]int32, error) { return ParseCPUSetStr(rawContent) } + +func GetRootCgroupCurCFSQuota(qosClass corev1.PodQOSClass) (int64, error) { + rawContent, err := system.CgroupFileRead(GetKubeQosRelativePath(qosClass), system.CPUCFSQuota) + if err != nil { + return 0, err + } + return strconv.ParseInt(strings.TrimSpace(rawContent), 10, 64) +} + +func GetRootCgroupCurCFSPeriod(qosClass corev1.PodQOSClass) (int64, error) { + rawContent, err := system.CgroupFileRead(GetKubeQosRelativePath(qosClass), system.CPUCFSPeriod) + if err != nil { + return 0, err + } + return strconv.ParseInt(strings.TrimSpace(rawContent), 10, 64) +} diff --git a/pkg/util/stat.go b/pkg/util/stat.go index d416872e5..7f139519a 100644 --- a/pkg/util/stat.go +++ b/pkg/util/stat.go @@ -100,3 +100,9 @@ func GetContainerCPUStatUsageTicks(podCgroupDir string, c *corev1.ContainerStatu } return readCPUAcctStatUsageTicks(containerStatPath) } + +func GetRootCgroupCPUStatUsageTicks(qosClass corev1.PodQOSClass) (uint64, error) { + rootCgroupParentDir := GetKubeQosRelativePath(qosClass) + statPath := system.GetCgroupFilePath(rootCgroupParentDir, system.CpuacctStat) + return readCPUAcctStatUsageTicks(statPath) +} diff --git a/pkg/util/stat_test.go b/pkg/util/stat_test.go index 68acf575d..04864054a 100644 --- a/pkg/util/stat_test.go +++ b/pkg/util/stat_test.go @@ -151,6 +151,14 @@ func Test_GetPodCPUStatUsageTicks(t *testing.T) { assert.NotNil(t, err) } +func Test_GetRootCgroupCPUStatUsageTicks(t *testing.T) { + helper := system.NewFileTestUtil(t) + helper.WriteCgroupFileContents(GetKubeQosRelativePath(corev1.PodQOSBestEffort), system.CpuacctStat, getStatContents()) + got, err := GetRootCgroupCPUStatUsageTicks(corev1.PodQOSBestEffort) + assert.NoError(t, err) + assert.Equal(t, uint64(1356232), got) +} + func getStatContents() string { return "user 407232\nnice 60223\nsystem 888777\nidle 3710549851\niowait 7638\nirq 0\nsoftirq 0\n" + "steal 1115801\nguest 0\nload average(1min) 5\nload average(5min) 1\nload average(15min) 0\n" +