From 82d00aabcea1237fd4f69f56565eabaaa7770358 Mon Sep 17 00:00:00 2001 From: l1b0k Date: Tue, 26 Dec 2023 15:19:08 +0800 Subject: [PATCH] koordlet: add net qos plugin Signed-off-by: l1b0k --- apis/extension/constants.go | 4 + apis/slo/v1alpha1/nodeslo_types.go | 2 + pkg/koordlet/runtimehooks/config.go | 8 + .../runtimehooks/hooks/terwayqos/terwayqos.go | 358 ++++++++++++++++++ .../hooks/terwayqos/terwayqos_test.go | 51 +++ .../runtimehooks/hooks/terwayqos/types.go | 96 +++++ pkg/koordlet/util/pod.go | 22 ++ pkg/util/sloconfig/nodeslo_config.go | 27 ++ 8 files changed, 568 insertions(+) create mode 100644 pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos.go create mode 100644 pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos_test.go create mode 100644 pkg/koordlet/runtimehooks/hooks/terwayqos/types.go diff --git a/apis/extension/constants.go b/apis/extension/constants.go index 17cce7954..aab69791b 100644 --- a/apis/extension/constants.go +++ b/apis/extension/constants.go @@ -40,6 +40,10 @@ const ( // LabelPodMutatingUpdate is a label key that pods with `pod.koordinator.sh/mutating-update=true` will // be mutated by Koordinator webhook when updating. LabelPodMutatingUpdate = PodDomainPrefix + "/mutating-update" + + // AnnotationNetworkQOS are used to set bandwidth for Pod. The unit is bps. + // For example, 10M means 10 megabits per second. + AnnotationNetworkQOS = DomainPrefix + "networkQOS" ) type AggregationType string diff --git a/apis/slo/v1alpha1/nodeslo_types.go b/apis/slo/v1alpha1/nodeslo_types.go index 798f24394..9c7699683 100644 --- a/apis/slo/v1alpha1/nodeslo_types.go +++ b/apis/slo/v1alpha1/nodeslo_types.go @@ -243,6 +243,8 @@ type NetworkQOS struct { type ResourceQOSPolicies struct { // applied policy for the CPU QoS, default = "groupIdentity" CPUPolicy *CPUQOSPolicy `json:"cpuPolicy,omitempty"` + + NetworkQOS *NetworkQOSCfg `json:"networkQOS,omitempty"` } type ResourceQOSStrategy struct { diff --git a/pkg/koordlet/runtimehooks/config.go b/pkg/koordlet/runtimehooks/config.go index cd99502ee..f292fd177 100644 --- a/pkg/koordlet/runtimehooks/config.go +++ b/pkg/koordlet/runtimehooks/config.go @@ -31,6 +31,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/cpuset" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/groupidentity" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/terwayqos" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" ) @@ -73,6 +74,11 @@ const ( // owner: @saintube @zwzhang0107 // alpha: v1.4 CoreSched featuregate.Feature = "CoreSched" + + // TerwayQoS enables net QoS feature of koordlet. + // owner: @l1b0k + // alpha: v1.5 + TerwayQoS featuregate.Feature = "TerwayQoS" ) var ( @@ -83,6 +89,7 @@ var ( BatchResource: {Default: true, PreRelease: featuregate.Beta}, CPUNormalization: {Default: false, PreRelease: featuregate.Alpha}, CoreSched: {Default: false, PreRelease: featuregate.Alpha}, + TerwayQoS: {Default: false, PreRelease: featuregate.Alpha}, } runtimeHookPlugins = map[featuregate.Feature]HookPlugin{ @@ -92,6 +99,7 @@ var ( BatchResource: batchresource.Object(), CPUNormalization: cpunormalization.Object(), CoreSched: coresched.Object(), + TerwayQoS: terwayqos.Object(), } ) diff --git a/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos.go b/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos.go new file mode 100644 index 000000000..634c58b3e --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos.go @@ -0,0 +1,358 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package terwayqos + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/apis/extension" + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/koordlet/audit" + "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/rule" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" +) + +const ( + rootPath = "/var/lib/terway/qos" + podConfig = "pod.json" + nodeConfig = "global_bps_config" +) + +const ( + name = "TerwayQoS" + description = "network qos management" + + ruleNameForNodeQoS = name + " (nodeQoS)" + ruleNameForAllPods = name + " (allPods)" +) + +type Plugin struct { + executor resourceexecutor.ResourceUpdateExecutor + + lock sync.RWMutex + node *Node + pods map[string]*Pod + + podFilePath, nodeFilePath string +} + +func (p *Plugin) Register(op hooks.Options) { + klog.V(5).Infof("register hook %v", "terwqy qos configure generator") + rule.Register(ruleNameForNodeQoS, description, + rule.WithParseFunc(statesinformer.RegisterTypeNodeSLOSpec, p.parseRuleForNodeSLO), + rule.WithUpdateCallback(p.update)) + rule.Register(ruleNameForAllPods, description, + rule.WithParseFunc(statesinformer.RegisterTypeAllPods, p.parseForAllPods), + rule.WithUpdateCallback(p.update)) + + p.executor = op.Executor + + err := os.MkdirAll(rootPath, os.ModeDir) + if err != nil { + klog.Fatal("create terway qos dir failed, err: %v", err) + } + + _, err = os.Stat(p.podFilePath) + if err != nil { + _, err := os.Create(p.podFilePath) + if err != nil { + klog.Fatal("create terway qos pod file failed, err: %v", err) + } + } + _, err = os.Stat(p.nodeFilePath) + if err != nil { + _, err := os.Create(p.nodeFilePath) + if err != nil { + klog.Fatal("create terway qos pod file failed, err: %v", err) + } + } +} + +func (p *Plugin) parseRuleForNodeSLO(mergedNodeSLOIf interface{}) (bool, error) { + mergedNodeSLO := mergedNodeSLOIf.(*slov1alpha1.NodeSLOSpec) + + n := &Node{} + err := parseNetQoS(mergedNodeSLO, n) + if err != nil { + return false, err + } + + p.lock.Lock() + p.node = n + p.lock.Unlock() + return true, nil +} + +func (p *Plugin) parseForAllPods(e interface{}) (bool, error) { + _, ok := e.(*struct{}) + if !ok { + return false, fmt.Errorf("invalid rule type %T", e) + } + + return true, nil +} + +func (p *Plugin) update(target *statesinformer.CallbackTarget) error { + if target == nil { + return fmt.Errorf("callback target is nil") + } + + pods := make(map[string]*Pod) + for _, meta := range target.Pods { + if meta.Pod == nil { + continue + } + + ing, egress, err := getPodQoS(meta.Pod.Annotations) + if err != nil { + klog.Errorf("get pod qos failed, err: %v", err) + continue + } + pods[string(meta.Pod.UID)] = &Pod{ + PodName: meta.Pod.Name, + PodNamespace: meta.Pod.Namespace, + PodUID: string(meta.Pod.UID), + Prio: getPodPrio(meta.Pod), + CgroupDir: filepath.Join("/sys/fs/cgroup/net_cls", meta.CgroupDir), + QoSConfig: QoSConfig{ + IngressBandwidth: ing, + EgressBandwidth: egress, + }, + } + } + + p.lock.Lock() + p.pods = pods + p.lock.Unlock() + + err := p.syncNodeConfig() + if err != nil { + return err + } + + return p.syncPodConfig() +} + +func (p *Plugin) syncPodConfig() error { + p.lock.RLock() + defer p.lock.RUnlock() + + outPods, err := json.Marshal(p.pods) + if err != nil { + return err + } + + pEvent := audit.V(3).Node().Reason("netqos reconcile").Message("update pod to : %v", string(outPods)) + pRes, err := resourceexecutor.NewCommonDefaultUpdater(p.podFilePath, p.podFilePath, string(outPods), pEvent) + if err != nil { + return err + } + + p.executor.UpdateBatch(true, pRes) + return nil +} + +func (p *Plugin) syncNodeConfig() error { + p.lock.RLock() + defer p.lock.RUnlock() + + outNode, err := p.node.MarshalText() + if err != nil { + return err + } + + nEvent := audit.V(3).Node().Reason("netqos reconcile").Message("update node to : %v", string(outNode)) + nRes, err := resourceexecutor.NewCommonDefaultUpdater(p.nodeFilePath, p.nodeFilePath, string(outNode), nEvent) + if err != nil { + return err + } + + p.executor.UpdateBatch(true, nRes) + return nil +} + +// parseNetQoS only LS and BE is taken into account +func parseNetQoS(slo *slov1alpha1.NodeSLOSpec, node *Node) error { + if slo.SystemStrategy == nil { + return nil + } + if slo.SystemStrategy.TotalNetworkBandwidth.Value() < 0 { + return fmt.Errorf("invalid total network bandwidth %d", slo.SystemStrategy.TotalNetworkBandwidth.Value()) + } + total := uint64(slo.SystemStrategy.TotalNetworkBandwidth.Value()) + // to Byte/s + total = total * 1000 * 1000 / 8 + + node.HwTxBpsMax = total + node.HwRxBpsMax = total + + qos := slo.ResourceQOSStrategy + if qos == nil { + return nil + } + + if qos.LSClass != nil && qos.LSClass.NetworkQOS != nil && qos.LSClass.NetworkQOS.Enable != nil && *qos.LSClass.NetworkQOS.Enable { + q, err := parseQoS(qos.LSClass.NetworkQOS, total) + if err != nil { + return err + } + node.L1RxBpsMin = q.IngressRequestBps + node.L1RxBpsMax = q.IngressLimitBps + node.L1TxBpsMin = q.EgressRequestBps + node.L1TxBpsMax = q.EgressLimitBps + } + + if qos.BEClass != nil && qos.BEClass.NetworkQOS != nil && qos.BEClass.NetworkQOS.Enable != nil && *qos.BEClass.NetworkQOS.Enable { + q, err := parseQoS(qos.BEClass.NetworkQOS, total) + if err != nil { + return err + } + node.L2RxBpsMin = q.IngressRequestBps + node.L2RxBpsMax = q.IngressLimitBps + node.L2TxBpsMin = q.EgressRequestBps + node.L2TxBpsMax = q.EgressLimitBps + } + + return nil +} + +func parseQoS(qos *slov1alpha1.NetworkQOSCfg, total uint64) (QoS, error) { + q := QoS{ + IngressRequestBps: 0, + IngressLimitBps: 0, + EgressRequestBps: 0, + EgressLimitBps: 0, + } + if qos.Enable == nil { + return q, nil + } + if !*qos.Enable { + return q, nil + } + + var err error + q.IngressRequestBps, err = parseQuantity(qos.IngressRequest, total) + if err != nil { + return QoS{}, err + } + + q.IngressLimitBps, err = parseQuantity(qos.IngressLimit, total) + if err != nil { + return QoS{}, err + } + + q.EgressRequestBps, err = parseQuantity(qos.EgressRequest, total) + if err != nil { + return QoS{}, err + } + q.EgressLimitBps, err = parseQuantity(qos.EgressLimit, total) + if err != nil { + return QoS{}, err + } + + return q, nil +} + +func parseQuantity(v *intstr.IntOrString, total uint64) (uint64, error) { + if v == nil { + return 0, nil + } + if v.Type == intstr.String { + val, err := resource.ParseQuantity(v.String()) + if err != nil { + return 0, err + } + r := uint64(val.Value()) + if r > total { + return 0, fmt.Errorf("quantity %s is larger than total %d", v.String(), total) + } + + return r, nil + } else { + return uint64(v.IntValue()) * total / 100, nil + } +} + +func getPodQoS(anno map[string]string) (uint64, uint64, error) { + var ingress, egress uint64 + + if anno[extension.AnnotationNetworkQOS] != "" { + nqos := &NetworkQoS{} + err := json.Unmarshal([]byte(anno[extension.AnnotationNetworkQOS]), nqos) + if err != nil { + return 0, 0, err + } + ing, err := resource.ParseQuantity(nqos.IngressLimit) + if err != nil { + return 0, 0, err + } + ingress = uint64(ing.Value()) / 8 + + eg, err := resource.ParseQuantity(nqos.EgressLimit) + if err != nil { + return 0, 0, err + } + egress = uint64(eg.Value()) / 8 + } + + return ingress, egress, nil +} + +func getPodPrio(pod *corev1.Pod) int { + prio, ok := prioMapping[pod.Labels[extension.LabelPodQoS]] + if ok { + return prio + } + switch pod.Status.QOSClass { + case corev1.PodQOSGuaranteed, corev1.PodQOSBurstable: + return 1 + case corev1.PodQOSBestEffort: + return 2 + } + return 0 +} + +func newPlugin() *Plugin { + return &Plugin{ + executor: resourceexecutor.NewResourceUpdateExecutor(), + podFilePath: filepath.Join(rootPath, podConfig), + nodeFilePath: filepath.Join(rootPath, nodeConfig), + node: &Node{}, + pods: make(map[string]*Pod), + } +} + +var singleton *Plugin +var once sync.Once + +func Object() *Plugin { + once.Do(func() { + singleton = newPlugin() + }) + return singleton +} diff --git a/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos_test.go b/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos_test.go new file mode 100644 index 000000000..fb9caa0cd --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos_test.go @@ -0,0 +1,51 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package terwayqos + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/intstr" +) + +func TestParseQuantityWithNilValue(t *testing.T) { + result, err := parseQuantity(nil, 100) + assert.NoError(t, err) + assert.Equal(t, uint64(0), result) +} + +func TestParseQuantityWithStringTypeAndValidValue(t *testing.T) { + val := intstr.FromString("50") + result, err := parseQuantity(&val, 100) + assert.NoError(t, err) + assert.Equal(t, uint64(50), result) +} + +func TestParseQuantityWithStringTypeAndValueExceedingTotal(t *testing.T) { + val := intstr.FromString("200") + result, err := parseQuantity(&val, 100) + assert.Error(t, err) + assert.Equal(t, uint64(0), result) +} + +func TestParseQuantityWithIntType(t *testing.T) { + val := intstr.FromInt(50) + result, err := parseQuantity(&val, 100) + assert.NoError(t, err) + assert.Equal(t, uint64(50), result) +} diff --git a/pkg/koordlet/runtimehooks/hooks/terwayqos/types.go b/pkg/koordlet/runtimehooks/hooks/terwayqos/types.go new file mode 100644 index 000000000..3060601ee --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/terwayqos/types.go @@ -0,0 +1,96 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package terwayqos + +import ( + "bytes" + "fmt" + "reflect" + + "github.com/koordinator-sh/koordinator/apis/extension" +) + +type NetworkQoS struct { + // IngressLimit and EgressLimit is the bandwidth in bps + // are used to set bandwidth for Pod. The unit is bps. + // For example, 10M means 10 megabits per second. + IngressLimit string `json:"ingressLimit"` + EgressLimit string `json:"egressLimit"` +} + +type QoS struct { + IngressRequestBps uint64 `json:"ingressRequestBps"` + IngressLimitBps uint64 `json:"ingressLimitBps"` + EgressRequestBps uint64 `json:"egressRequestBps"` + EgressLimitBps uint64 `json:"egressLimitBps"` +} + +type Node struct { + HwTxBpsMax uint64 `text:"hw_tx_bps_max"` + HwRxBpsMax uint64 `text:"hw_rx_bps_max"` + L1TxBpsMin uint64 `text:"offline_l1_tx_bps_min"` + L1TxBpsMax uint64 `text:"offline_l1_tx_bps_max"` + L1RxBpsMin uint64 `text:"offline_l1_rx_bps_min"` + L1RxBpsMax uint64 `text:"offline_l1_rx_bps_max"` + L2TxBpsMin uint64 `text:"offline_l2_tx_bps_min"` + L2TxBpsMax uint64 `text:"offline_l2_tx_bps_max"` + L2RxBpsMin uint64 `text:"offline_l2_rx_bps_min"` + L2RxBpsMax uint64 `text:"offline_l2_rx_bps_max"` +} + +func (n Node) MarshalText() (text []byte, err error) { + val := reflect.ValueOf(n) + typ := val.Type() + + var buffer bytes.Buffer + + for i := 0; i < val.NumField(); i++ { + field := typ.Field(i) + tagValue := field.Tag.Get("text") + if tagValue == "" { + continue // Skip fields without text tag + } + + fieldValue := val.Field(i) + line := fmt.Sprintf("%s %v\n", tagValue, fieldValue.Interface()) + buffer.WriteString(line) + } + + return buffer.Bytes(), nil +} + +type Pod struct { + PodName string `json:"podName"` + PodNamespace string `json:"podNamespace"` + PodUID string `json:"podUID"` + Prio int `json:"prio"` + CgroupDir string `json:"cgroupDir"` + QoSConfig QoSConfig `json:"qosConfig"` +} + +type QoSConfig struct { + IngressBandwidth uint64 `json:"ingressBandwidth"` + EgressBandwidth uint64 `json:"egressBandwidth"` +} + +var prioMapping = map[string]int{ + string(extension.QoSSystem): 0, + string(extension.QoSLSE): 1, + string(extension.QoSLSR): 1, + string(extension.QoSLS): 1, + string(extension.QoSBE): 2, +} diff --git a/pkg/koordlet/util/pod.go b/pkg/koordlet/util/pod.go index fc7343b14..a2294e22b 100644 --- a/pkg/koordlet/util/pod.go +++ b/pkg/koordlet/util/pod.go @@ -18,10 +18,12 @@ package util import ( "fmt" + "net/netip" "os" "path/filepath" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" ) @@ -94,3 +96,23 @@ func GetPodSandboxContainerID(pod *corev1.Pod) (string, error) { } return fmt.Sprintf("%s://%s", containerRuntime, containerHashID), nil } + +func GetIPs(pod *corev1.Pod) (v4 netip.Addr, v6 netip.Addr) { + ips := sets.NewString(pod.Status.PodIP) + for _, ip := range pod.Status.PodIPs { + ips.Insert(ip.IP) + } + for ip := range ips { + addr, err := netip.ParseAddr(ip) + if err != nil { + continue + } + if addr.Is4() { + v4 = addr + } else { + v6 = addr + } + } + + return +} diff --git a/pkg/util/sloconfig/nodeslo_config.go b/pkg/util/sloconfig/nodeslo_config.go index e4489e162..116a69039 100644 --- a/pkg/util/sloconfig/nodeslo_config.go +++ b/pkg/util/sloconfig/nodeslo_config.go @@ -18,6 +18,7 @@ package sloconfig import ( "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" "k8s.io/utils/pointer" @@ -220,6 +221,10 @@ func DefaultResourceQOSStrategy() *slov1alpha1.ResourceQOSStrategy { Enable: pointer.Bool(false), MemoryQOS: *DefaultMemoryQOS(apiext.QoSLSR), }, + NetworkQOS: &slov1alpha1.NetworkQOSCfg{ + Enable: pointer.Bool(false), + NetworkQOS: NoneNetworkQOS(), + }, }, LSClass: &slov1alpha1.ResourceQOS{ CPUQOS: &slov1alpha1.CPUQOSCfg{ @@ -234,6 +239,10 @@ func DefaultResourceQOSStrategy() *slov1alpha1.ResourceQOSStrategy { Enable: pointer.Bool(false), MemoryQOS: *DefaultMemoryQOS(apiext.QoSLS), }, + NetworkQOS: &slov1alpha1.NetworkQOSCfg{ + Enable: pointer.Bool(false), + NetworkQOS: NoneNetworkQOS(), + }, }, BEClass: &slov1alpha1.ResourceQOS{ CPUQOS: &slov1alpha1.CPUQOSCfg{ @@ -248,6 +257,10 @@ func DefaultResourceQOSStrategy() *slov1alpha1.ResourceQOSStrategy { Enable: pointer.Bool(false), MemoryQOS: *DefaultMemoryQOS(apiext.QoSBE), }, + NetworkQOS: &slov1alpha1.NetworkQOSCfg{ + Enable: pointer.Bool(false), + NetworkQOS: NoneNetworkQOS(), + }, }, SystemClass: &slov1alpha1.ResourceQOS{ CPUQOS: &slov1alpha1.CPUQOSCfg{ @@ -262,6 +275,10 @@ func DefaultResourceQOSStrategy() *slov1alpha1.ResourceQOSStrategy { Enable: pointer.Bool(false), MemoryQOS: *DefaultMemoryQOS(apiext.QoSSystem), }, + NetworkQOS: &slov1alpha1.NetworkQOSCfg{ + Enable: pointer.Bool(false), + NetworkQOS: NoneNetworkQOS(), + }, }, } } @@ -360,3 +377,13 @@ func DefaultSystemStrategy() *slov1alpha1.SystemStrategy { func DefaultExtensions() *slov1alpha1.ExtensionsMap { return getDefaultExtensionsMap() } + +func NoneNetworkQOS() slov1alpha1.NetworkQOS { + zero := intstr.FromInt(0) + return slov1alpha1.NetworkQOS{ + IngressRequest: &zero, + IngressLimit: &zero, + EgressRequest: &zero, + EgressLimit: &zero, + } +}