Skip to content

Commit

Permalink
reconcile memory limit
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Liu <jasonliu747@gmail.com>
  • Loading branch information
jasonliu747 committed Apr 17, 2022
1 parent 7399d59 commit 344ed21
Show file tree
Hide file tree
Showing 14 changed files with 692 additions and 42 deletions.
4 changes: 2 additions & 2 deletions pkg/koordlet/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/reporter"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resmanager"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
sysutil "github.com/koordinator-sh/koordinator/pkg/util/system"
"github.com/koordinator-sh/koordinator/pkg/util/system"
)

type Configuration struct {
Expand All @@ -57,7 +57,7 @@ func NewConfiguration() *Configuration {
}

func (c *Configuration) InitFlags(fs *flag.FlagSet) {
sysutil.Conf.InitFlags(fs)
system.Conf.InitFlags(fs)
c.StatesInformerConf.InitFlags(fs)
c.ReporterConf.InitFlags(fs)
c.CollectorConf.InitFlags(fs)
Expand Down
34 changes: 17 additions & 17 deletions pkg/koordlet/metricsadvisor/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
util2 "github.com/koordinator-sh/koordinator/pkg/util"
sysutil "github.com/koordinator-sh/koordinator/pkg/util/system"
"github.com/koordinator-sh/koordinator/pkg/util"
"github.com/koordinator-sh/koordinator/pkg/util/system"
)

const (
Expand All @@ -48,7 +48,7 @@ var (
// jiffies is the duration unit of CPU stats
jiffies = float64(10 * time.Millisecond)

localCPUInfoGetter = util2.GetLocalCPUInfo
localCPUInfoGetter = util.GetLocalCPUInfo
)

type Collector interface {
Expand Down Expand Up @@ -172,8 +172,8 @@ func initJiffies() error {
func (c *collector) collectNodeResUsed() {
klog.V(6).Info("collectNodeResUsed start")
collectTime := time.Now()
currentCPUTick, err0 := util2.GetCPUStatUsageTicks()
memUsageValue, err1 := util2.GetMemInfoUsageKB()
currentCPUTick, err0 := util.GetCPUStatUsageTicks()
memUsageValue, err1 := util.GetMemInfoUsageKB()
if err0 != nil || err1 != nil {
klog.Warningf("failed to collect node usage, CPU err: %s, Memory err: %s", err0, err1)
return
Expand Down Expand Up @@ -218,8 +218,8 @@ func (c *collector) collectPodResUsed() {
pod := meta.Pod
uid := string(pod.UID) // types.UID
collectTime := time.Now()
currentCPUTick, err0 := util2.GetPodCPUStatUsageTicks(meta.CgroupDir)
memUsageValue, err1 := util2.GetPodMemStatUsageBytes(meta.CgroupDir)
currentCPUTick, err0 := util.GetPodCPUStatUsageTicks(meta.CgroupDir)
memUsageValue, err1 := util.GetPodMemStatUsageBytes(meta.CgroupDir)
if err0 != nil || err1 != nil {
// higher verbosity for probably non-running pods
if pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending {
Expand Down Expand Up @@ -275,8 +275,8 @@ func (c *collector) collectContainerResUsed(meta *statesinformer.PodMeta) {
for i := range pod.Status.ContainerStatuses {
containerStat := &pod.Status.ContainerStatuses[i]
collectTime := time.Now()
currentCPUTick, err0 := util2.GetContainerCPUStatUsageTicks(meta.CgroupDir, containerStat)
memUsageValue, err1 := util2.GetContainerMemStatUsageBytes(meta.CgroupDir, containerStat)
currentCPUTick, err0 := util.GetContainerCPUStatUsageTicks(meta.CgroupDir, containerStat)
memUsageValue, err1 := util.GetContainerMemStatUsageBytes(meta.CgroupDir, containerStat)
if err0 != nil || err1 != nil {
// higher verbosity for probably non-running pods
if containerStat.State.Running == nil {
Expand Down Expand Up @@ -355,8 +355,8 @@ func (c *collector) collectPodThrottledInfo() {
pod := meta.Pod
uid := string(pod.UID) // types.UID
collectTime := time.Now()
cgroupStatPath := util2.GetPodCgroupCPUStatPath(meta.CgroupDir)
currentCPUStat, err := sysutil.GetCPUStatRaw(cgroupStatPath)
cgroupStatPath := util.GetPodCgroupCPUStatPath(meta.CgroupDir)
currentCPUStat, err := system.GetCPUStatRaw(cgroupStatPath)
if err != nil || currentCPUStat == nil {
if pod.Status.Phase == corev1.PodRunning {
// print running pod collection error
Expand All @@ -372,8 +372,8 @@ func (c *collector) collectPodThrottledInfo() {
meta.Pod.Namespace, meta.Pod.Name, meta.Pod.UID)
continue
}
lastCPUThrottled := lastCPUThrottledValue.(*sysutil.CPUStatRaw)
cpuThrottledRatio := sysutil.CalcCPUThrottledRatio(currentCPUStat, lastCPUThrottled)
lastCPUThrottled := lastCPUThrottledValue.(*system.CPUStatRaw)
cpuThrottledRatio := system.CalcCPUThrottledRatio(currentCPUStat, lastCPUThrottled)

klog.V(6).Infof("collect pod %s/%s, uid %s throttled finished, metric %v",
meta.Pod.Namespace, meta.Pod.Name, meta.Pod.UID, cpuThrottledRatio)
Expand Down Expand Up @@ -404,13 +404,13 @@ func (c *collector) collectContainerThrottledInfo(podMeta *statesinformer.PodMet
pod.Namespace, pod.Name, containerStat.Name)
continue
}
containerCgroupPath, err := util2.GetContainerCgroupCPUStatPath(podMeta.CgroupDir, containerStat)
containerCgroupPath, err := util.GetContainerCgroupCPUStatPath(podMeta.CgroupDir, containerStat)
if err != nil {
klog.Warningf("generate container %s/%s/%s cgroup path failed, err %v",
pod.Namespace, pod.Name, containerStat.Name, err)
continue
}
currentCPUStat, err := sysutil.GetCPUStatRaw(containerCgroupPath)
currentCPUStat, err := system.GetCPUStatRaw(containerCgroupPath)
if err != nil {
klog.Infof("collect container %s/%s/%s cpu throttled failed, err %v, metric %v",
pod.Namespace, pod.Name, containerStat.Name, err, currentCPUStat)
Expand All @@ -423,8 +423,8 @@ func (c *collector) collectContainerThrottledInfo(podMeta *statesinformer.PodMet
pod.Namespace, pod.Name, containerStat.Name)
continue
}
lastCPUThrottled := lastCPUThrottledValue.(*sysutil.CPUStatRaw)
cpuThrottledRatio := sysutil.CalcCPUThrottledRatio(currentCPUStat, lastCPUThrottled)
lastCPUThrottled := lastCPUThrottledValue.(*system.CPUStatRaw)
cpuThrottledRatio := system.CalcCPUThrottledRatio(currentCPUStat, lastCPUThrottled)

containerMetric := &metriccache.ContainerThrottledMetric{
ContainerID: containerStat.ContainerID,
Expand Down
6 changes: 3 additions & 3 deletions pkg/koordlet/pleg/pleg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/utils/inotify"

sysutil "github.com/koordinator-sh/koordinator/pkg/util/system"
"github.com/koordinator-sh/koordinator/pkg/util/system"
)

func NewTestWatcher() (Watcher, error) {
Expand Down Expand Up @@ -81,7 +81,7 @@ func (h *testHandler) OnContainerDeleted(podID, containerID string) {
}

func TestPlegHandlePodEvents(t *testing.T) {
sysutil.SetupCgroupPathFormatter(sysutil.Cgroupfs)
system.SetupCgroupPathFormatter(system.Cgroupfs)
stopCh := make(chan struct{})
defer close(stopCh)

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestPlegHandlePodEvents(t *testing.T) {
}

func TestPlegHandleContainerEvents(t *testing.T) {
sysutil.SetupCgroupPathFormatter(sysutil.Cgroupfs)
system.SetupCgroupPathFormatter(system.Cgroupfs)
stopCh := make(chan struct{})
defer close(stopCh)

Expand Down
107 changes: 105 additions & 2 deletions pkg/koordlet/resmanager/be_reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (r *resmanager) reconcileBECgroup() {
reconcileBECPULimit(podMeta)
}
reconcileBECPUShare(podMeta)
reconcileBEMemLimit(podMeta)
}
}

Expand Down Expand Up @@ -151,14 +152,67 @@ func reconcileBECPUShare(podMeta *statesinformer.PodMeta) {
}
}

func reconcileBEMemLimit(podMeta *statesinformer.PodMeta) {
needReconcilePod, err := needReconcilePodBEMemLimit(podMeta)
if err != nil {
klog.Warningf("failed to check need reconcile memory limit for pod %v/%v %v, error: %v",
podMeta.Pod.Namespace, podMeta.Pod.Name, podMeta.Pod.UID, err)
return
}
if needReconcilePod {
err = applyPodBEMemLimitIfSpecified(podMeta)
if err != nil {
klog.Warningf("failed to apply cpu memory for pod %v/%v %v, error: %v",
podMeta.Pod.Namespace, podMeta.Pod.Name, podMeta.Pod.UID, err)
} else {
curLimit, err := util.GetPodCurMemLimitBytes(podMeta.CgroupDir)
klog.Infof("apply cpu memory for pod %v/%v %v succeed, current value %d, error: %v",
podMeta.Pod.Namespace, podMeta.Pod.Name, podMeta.Pod.UID, curLimit, err)
}
}

containerMap := make(map[string]*corev1.Container, len(podMeta.Pod.Spec.Containers))
for i := range podMeta.Pod.Spec.Containers {
container := &podMeta.Pod.Spec.Containers[i]
containerMap[container.Name] = container
}

for _, containerStat := range podMeta.Pod.Status.ContainerStatuses {
container, exist := containerMap[containerStat.Name]
if !exist {
klog.Warningf("container %v/%v/%v lost during memory limit reconcile",
podMeta.Pod.Namespace, podMeta.Pod.Name, containerStat.Name)
continue
}
needReconcileContainer, err := needReconcileContainerBEMemLimit(podMeta, container, &containerStat)
if err != nil {
klog.Warningf("failed to check need reconcile memory limit for container %v/%v/%v, error: %v",
podMeta.Pod.Namespace, podMeta.Pod.Name, containerStat.Name, err)
continue
}
if !needReconcileContainer {
continue
}

if err := applyContainerBEMemLimitIfSpecified(podMeta, container, &containerStat); err != nil {
klog.Warningf("failed to apply memory limit for container %v/%v/%v, error: %v",
podMeta.Pod.Namespace, podMeta.Pod.Name, containerStat.Name, err)
} else {
curLimit, err := util.GetContainerCurMemLimitBytes(podMeta.CgroupDir, &containerStat)
klog.Infof("apply memory limit for container %v/%v %v succeed, current value %v, error: %v",
podMeta.Pod.Namespace, podMeta.Pod.Name, podMeta.Pod.UID, curLimit, err)
}
}
}

func needReconcilePodBECPULimit(podMeta *statesinformer.PodMeta) (bool, error) {
if util.GetPodBEMilliCPULimit(podMeta.Pod) <= 0 {
return false, nil
}

podCurCFSQuota, err := util.GetPodCurCFSQuota(podMeta.CgroupDir)
if err != nil {
return false, nil
return false, err
}
return podCurCFSQuota == system.CFSQuotaUnlimitedValue, nil
}
Expand Down Expand Up @@ -199,6 +253,32 @@ func needReconcileContainerBECPUShare(podMeta *statesinformer.PodMeta, container
return containerCurCPUShare == system.CPUShareKubeBEValue, nil
}

func needReconcilePodBEMemLimit(podMeta *statesinformer.PodMeta) (bool, error) {
if util.GetPodBEMemoryByteLimit(podMeta.Pod) <= 0 {
return false, nil
}

podCurMemLimit, err := util.GetPodCurMemLimitBytes(podMeta.CgroupDir)
if err != nil {
return false, err
}
// by default 9223372036854771712 , use 1024TB = 1024 * 1024 * 1024 * 1024 * 1024 for compatibility
return podCurMemLimit > 1024*1024*1024*1024*1024, nil
}

func needReconcileContainerBEMemLimit(podMeta *statesinformer.PodMeta, container *corev1.Container, containerStatus *corev1.ContainerStatus) (bool, error) {
if util.GetContainerBEMemoryByteLimit(container) <= 0 {
return false, nil
}

containerCurCFSQuota, err := util.GetContainerCurMemLimitBytes(podMeta.CgroupDir, containerStatus)
if err != nil {
return false, err
}
// by default 9223372036854771712 , use 1024TB = 1024 * 1024 * 1024 * 1024 * 1024 for compatibility
return containerCurCFSQuota > 1024*1024*1024*1024*1024, nil
}

func applyPodBECPULimitIfSpecified(podMeta *statesinformer.PodMeta) error {
milliCPULimit := util.GetPodBEMilliCPULimit(podMeta.Pod)
if milliCPULimit <= 0 {
Expand Down Expand Up @@ -253,6 +333,29 @@ func applyContainerBECPUShareIfSpecified(podMeta *statesinformer.PodMeta, contai
if err != nil {
return err
}
audit.V(2).Pod(podMeta.Pod.Namespace, podMeta.Pod.Name).Container(container.Name).Reason(updateCPU).Message("set cfs_shares to %v", targetCPUShare).Do()
_ = audit.V(2).Pod(podMeta.Pod.Namespace, podMeta.Pod.Name).Container(container.Name).Reason(updateCPU).Message("set cfs_shares to %v", targetCPUShare).Do()
return ioutil.WriteFile(containerCPUSharePath, []byte(strconv.Itoa(targetCPUShare)), 0644)
}

func applyPodBEMemLimitIfSpecified(podMeta *statesinformer.PodMeta) error {
memoryLimit := util.GetPodBEMemoryByteLimit(podMeta.Pod)
if memoryLimit <= 0 {
return nil
}
podMemLimitPath := util.GetPodCgroupMemLimitPath(podMeta.CgroupDir)
_ = audit.V(2).Pod(podMeta.Pod.Namespace, podMeta.Pod.Name).Reason(updateMemory).Message("set memory.limits to %v", memoryLimit).Do()
return ioutil.WriteFile(podMemLimitPath, []byte(strconv.Itoa(int(memoryLimit))), 0644)
}

func applyContainerBEMemLimitIfSpecified(podMeta *statesinformer.PodMeta, container *corev1.Container, containerStatus *corev1.ContainerStatus) error {
memoryLimit := util.GetContainerBEMemoryByteLimit(container)
if memoryLimit <= 0 {
return nil
}
containerMemLimitPath, err := util.GetContainerCgroupMemLimitPath(podMeta.CgroupDir, containerStatus)
if err != nil {
return err
}
_ = audit.V(2).Pod(podMeta.Pod.Namespace, podMeta.Pod.Name).Container(container.Name).Reason(updateMemory).Message("set memory.limits to %v", memoryLimit).Do()
return ioutil.WriteFile(containerMemLimitPath, []byte(strconv.Itoa(int(memoryLimit))), 0644)
}
Loading

0 comments on commit 344ed21

Please sign in to comment.