From 339043388a9d350150f8b45a6ef5e3b83801787d Mon Sep 17 00:00:00 2001 From: l1b0k Date: Mon, 4 Mar 2024 10:14:59 +0800 Subject: [PATCH] koordlet: support NetworkQoS plugin (#1843) Signed-off-by: l1b0k --- apis/extension/constants.go | 4 + apis/slo/v1alpha1/nodeslo_types.go | 12 + apis/slo/v1alpha1/zz_generated.deepcopy.go | 5 + .../bases/slo.koordinator.sh_nodeslos.yaml | 3 + pkg/koordlet/runtimehooks/config.go | 8 + .../runtimehooks/hooks/terwayqos/terwayqos.go | 433 ++++++++++++++++++ .../hooks/terwayqos/terwayqos_test.go | 256 +++++++++++ .../runtimehooks/hooks/terwayqos/types.go | 96 ++++ .../hooks/terwayqos/types_test.go | 71 +++ pkg/util/sloconfig/nodeslo_config.go | 39 +- 10 files changed, 925 insertions(+), 2 deletions(-) create mode 100644 pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos.go create mode 100644 pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos_test.go create mode 100644 pkg/koordlet/runtimehooks/hooks/terwayqos/types.go create mode 100644 pkg/koordlet/runtimehooks/hooks/terwayqos/types_test.go diff --git a/apis/extension/constants.go b/apis/extension/constants.go index 17cce7954..aab69791b 100644 --- a/apis/extension/constants.go +++ b/apis/extension/constants.go @@ -40,6 +40,10 @@ const ( // LabelPodMutatingUpdate is a label key that pods with `pod.koordinator.sh/mutating-update=true` will // be mutated by Koordinator webhook when updating. LabelPodMutatingUpdate = PodDomainPrefix + "/mutating-update" + + // AnnotationNetworkQOS are used to set bandwidth for Pod. The unit is bps. + // For example, 10M means 10 megabits per second. + AnnotationNetworkQOS = DomainPrefix + "networkQOS" ) type AggregationType string diff --git a/apis/slo/v1alpha1/nodeslo_types.go b/apis/slo/v1alpha1/nodeslo_types.go index 798f24394..89d93fcc5 100644 --- a/apis/slo/v1alpha1/nodeslo_types.go +++ b/apis/slo/v1alpha1/nodeslo_types.go @@ -50,6 +50,15 @@ const ( CPUQOSPolicyCoreSched CPUQOSPolicy = "coreSched" ) +type NETQOSPolicy string + +const ( + // NETQOSPolicyTC indicates implement netqos by tc. + NETQOSPolicyTC NETQOSPolicy = "tc" + // NETQOSPolicyTerwayQos indicates implement netqos by terway-qos. + NETQOSPolicyTerwayQos NETQOSPolicy = "terway-qos" +) + // MemoryQOS enables memory qos features. type MemoryQOS struct { // memcg qos @@ -243,6 +252,9 @@ type NetworkQOS struct { type ResourceQOSPolicies struct { // applied policy for the CPU QoS, default = "groupIdentity" CPUPolicy *CPUQOSPolicy `json:"cpuPolicy,omitempty"` + + // applied policy for the Net QoS, default = "tc" + NETQOSPolicy *NETQOSPolicy `json:"netQOSPolicy,omitempty"` } type ResourceQOSStrategy struct { diff --git a/apis/slo/v1alpha1/zz_generated.deepcopy.go b/apis/slo/v1alpha1/zz_generated.deepcopy.go index 76a74cb67..7ef6eeefd 100644 --- a/apis/slo/v1alpha1/zz_generated.deepcopy.go +++ b/apis/slo/v1alpha1/zz_generated.deepcopy.go @@ -1014,6 +1014,11 @@ func (in *ResourceQOSPolicies) DeepCopyInto(out *ResourceQOSPolicies) { *out = new(CPUQOSPolicy) **out = **in } + if in.NETQOSPolicy != nil { + in, out := &in.NETQOSPolicy, &out.NETQOSPolicy + *out = new(NETQOSPolicy) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ResourceQOSPolicies. diff --git a/config/crd/bases/slo.koordinator.sh_nodeslos.yaml b/config/crd/bases/slo.koordinator.sh_nodeslos.yaml index a281dcad7..87102d995 100644 --- a/config/crd/bases/slo.koordinator.sh_nodeslos.yaml +++ b/config/crd/bases/slo.koordinator.sh_nodeslos.yaml @@ -1222,6 +1222,9 @@ spec: cpuPolicy: description: applied policy for the CPU QoS, default = "groupIdentity" type: string + netQOSPolicy: + description: applied policy for the Net QoS, default = "tc" + type: string type: object systemClass: description: ResourceQOS for system pods diff --git a/pkg/koordlet/runtimehooks/config.go b/pkg/koordlet/runtimehooks/config.go index dc8a747b0..294a92b7f 100644 --- a/pkg/koordlet/runtimehooks/config.go +++ b/pkg/koordlet/runtimehooks/config.go @@ -31,6 +31,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/cpuset" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/groupidentity" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/terwayqos" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" ) @@ -74,6 +75,11 @@ const ( // owner: @saintube @zwzhang0107 // alpha: v1.4 CoreSched featuregate.Feature = "CoreSched" + + // TerwayQoS enables net QoS feature of koordlet. + // owner: @l1b0k + // alpha: v1.5 + TerwayQoS featuregate.Feature = "TerwayQoS" ) var ( @@ -84,6 +90,7 @@ var ( BatchResource: {Default: true, PreRelease: featuregate.Beta}, CPUNormalization: {Default: false, PreRelease: featuregate.Alpha}, CoreSched: {Default: false, PreRelease: featuregate.Alpha}, + TerwayQoS: {Default: false, PreRelease: featuregate.Alpha}, } runtimeHookPlugins = map[featuregate.Feature]HookPlugin{ @@ -93,6 +100,7 @@ var ( BatchResource: batchresource.Object(), CPUNormalization: cpunormalization.Object(), CoreSched: coresched.Object(), + TerwayQoS: terwayqos.Object(), } ) diff --git a/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos.go b/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos.go new file mode 100644 index 000000000..765865019 --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos.go @@ -0,0 +1,433 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package terwayqos + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/apis/extension" + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/koordlet/audit" + "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/rule" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" +) + +const ( + rootPath = "/host-var-lib/terway/qos" + podConfig = "pod.json" + nodeConfig = "global_bps_config" +) + +const ( + name = "TerwayQoS" + description = "network qos management" + + ruleNameForNodeQoS = name + " (nodeQoS)" + ruleNameForAllPods = name + " (allPods)" +) + +type Plugin struct { + executor resourceexecutor.ResourceUpdateExecutor + + lock sync.RWMutex + enabled *bool + node *Node + pods map[string]*Pod + + podFilePath, nodeFilePath string + + syncChan chan struct{} +} + +func (p *Plugin) Register(op hooks.Options) { + klog.V(5).Infof("register hook %v", "terwqy qos configure generator") + rule.Register(ruleNameForNodeQoS, description, + rule.WithParseFunc(statesinformer.RegisterTypeNodeSLOSpec, p.parseRuleForNodeSLO), + rule.WithUpdateCallback(p.update)) + rule.Register(ruleNameForAllPods, description, + rule.WithParseFunc(statesinformer.RegisterTypeAllPods, p.parseForAllPods), + rule.WithUpdateCallback(p.update)) + + p.executor = op.Executor + + err := os.MkdirAll(rootPath, os.ModeDir) + if err != nil { + klog.Fatal("create terway qos dir failed, err: %v", err) + } + + go p.run() +} + +func (p *Plugin) parseRuleForNodeSLO(mergedNodeSLOIf interface{}) (bool, error) { + mergedNodeSLO := mergedNodeSLOIf.(*slov1alpha1.NodeSLOSpec) + enabled := false + + p.lock.Lock() + defer p.lock.Unlock() + + if mergedNodeSLO.ResourceQOSStrategy != nil && + mergedNodeSLO.ResourceQOSStrategy.Policies != nil && + mergedNodeSLO.ResourceQOSStrategy.Policies.NETQOSPolicy != nil && + *mergedNodeSLO.ResourceQOSStrategy.Policies.NETQOSPolicy == slov1alpha1.NETQOSPolicyTerwayQos { + enabled = true + + n := &Node{} + err := parseNetQoS(mergedNodeSLO, n) + if err != nil { + klog.Errorf("parse net qos failed, err: %v", err) + return false, err + } + + p.node = n + p.enabled = &enabled + } else { + p.enabled = &enabled + } + klog.Infof("terway qos enabled %v", enabled) + + select { + case p.syncChan <- struct{}{}: + default: + } + + return true, nil +} + +func (p *Plugin) parseForAllPods(e interface{}) (bool, error) { + _, ok := e.(*struct{}) + if !ok { + return false, fmt.Errorf("invalid rule type %T", e) + } + + return true, nil +} + +func (p *Plugin) run() { + for { + select { + case <-p.syncChan: + p.syncAll() + } + } +} + +func (p *Plugin) syncAll() { + err := p.syncNodeConfig() + if err != nil { + klog.Errorf("sync node config failed, err: %v", err) + return + } + + err = p.syncPodConfig() + if err != nil { + klog.Errorf("sync pod config failed, err: %v", err) + return + } +} + +func (p *Plugin) update(target *statesinformer.CallbackTarget) error { + if target == nil { + return fmt.Errorf("callback target is nil") + } + + pods := make(map[string]*Pod) + for _, meta := range target.Pods { + if meta.Pod == nil { + continue + } + + ing, egress, err := getPodQoS(meta.Pod.Annotations) + if err != nil { + klog.Errorf("get pod qos failed, err: %v", err) + continue + } + pods[string(meta.Pod.UID)] = &Pod{ + PodName: meta.Pod.Name, + PodNamespace: meta.Pod.Namespace, + PodUID: string(meta.Pod.UID), + Prio: getPodPrio(meta.Pod), + CgroupDir: filepath.Join("/sys/fs/cgroup/net_cls", meta.CgroupDir), + QoSConfig: QoSConfig{ + IngressBandwidth: ing, + EgressBandwidth: egress, + }, + } + } + + p.lock.Lock() + p.pods = pods + p.lock.Unlock() + + select { + case p.syncChan <- struct{}{}: + default: + } + return nil +} + +func (p *Plugin) syncPodConfig() error { + p.lock.RLock() + defer p.lock.RUnlock() + + if p.enabled == nil { + return nil + } + + if !*p.enabled { + _ = os.Remove(p.podFilePath) + return nil + } + + err := ensureConfig(p.podFilePath) + if err != nil { + return err + } + + outPods, err := json.Marshal(p.pods) + if err != nil { + return err + } + + pEvent := audit.V(3).Node().Reason("netqos reconcile").Message("update pod to : %v", string(outPods)) + pRes, err := resourceexecutor.NewCommonDefaultUpdater(p.podFilePath, p.podFilePath, string(outPods), pEvent) + if err != nil { + return err + } + + p.executor.UpdateBatch(true, pRes) + return nil +} + +func (p *Plugin) syncNodeConfig() error { + p.lock.RLock() + defer p.lock.RUnlock() + + if p.enabled == nil { + return nil + } + + if !*p.enabled { + _ = os.Remove(p.nodeFilePath) + return nil + } + + err := ensureConfig(p.nodeFilePath) + if err != nil { + return err + } + + outNode, err := p.node.MarshalText() + if err != nil { + return err + } + + nEvent := audit.V(3).Node().Reason("netqos reconcile").Message("update node to : %v", string(outNode)) + nRes, err := resourceexecutor.NewCommonDefaultUpdater(p.nodeFilePath, p.nodeFilePath, string(outNode), nEvent) + if err != nil { + return err + } + + p.executor.UpdateBatch(true, nRes) + return nil +} + +func ensureConfig(file string) error { + _, err := os.Stat(file) + if err != nil { + _, err = os.Create(file) + if err != nil { + return fmt.Errorf("create terway qos file %s failed, err: %v", file, err) + } + } + return nil +} + +// parseNetQoS only LS and BE is taken into account +func parseNetQoS(slo *slov1alpha1.NodeSLOSpec, node *Node) error { + if slo.SystemStrategy == nil { + return nil + } + if slo.SystemStrategy.TotalNetworkBandwidth.Value() < 0 { + return fmt.Errorf("invalid total network bandwidth %d", slo.SystemStrategy.TotalNetworkBandwidth.Value()) + } + total := uint64(slo.SystemStrategy.TotalNetworkBandwidth.Value()) + + // to Byte/s + node.HwTxBpsMax = BitsToBytes(total) + node.HwRxBpsMax = BitsToBytes(total) + + qos := slo.ResourceQOSStrategy + if qos == nil { + return nil + } + + if qos.LSClass != nil && qos.LSClass.NetworkQOS != nil && qos.LSClass.NetworkQOS.Enable != nil && *qos.LSClass.NetworkQOS.Enable { + q, err := parseQoS(qos.LSClass.NetworkQOS, total) + if err != nil { + return err + } + node.L1RxBpsMin = q.IngressRequestBps + node.L1RxBpsMax = q.IngressLimitBps + node.L1TxBpsMin = q.EgressRequestBps + node.L1TxBpsMax = q.EgressLimitBps + } + + if qos.BEClass != nil && qos.BEClass.NetworkQOS != nil && qos.BEClass.NetworkQOS.Enable != nil && *qos.BEClass.NetworkQOS.Enable { + q, err := parseQoS(qos.BEClass.NetworkQOS, total) + if err != nil { + return err + } + node.L2RxBpsMin = q.IngressRequestBps + node.L2RxBpsMax = q.IngressLimitBps + node.L2TxBpsMin = q.EgressRequestBps + node.L2TxBpsMax = q.EgressLimitBps + } + + return nil +} + +func parseQoS(qos *slov1alpha1.NetworkQOSCfg, total uint64) (QoS, error) { + q := QoS{ + IngressRequestBps: 0, + IngressLimitBps: 0, + EgressRequestBps: 0, + EgressLimitBps: 0, + } + if qos.Enable == nil { + return q, nil + } + if !*qos.Enable { + return q, nil + } + + var err error + q.IngressRequestBps, err = parseQuantity(qos.IngressRequest, total) + if err != nil { + return QoS{}, err + } + + q.IngressLimitBps, err = parseQuantity(qos.IngressLimit, total) + if err != nil { + return QoS{}, err + } + + q.EgressRequestBps, err = parseQuantity(qos.EgressRequest, total) + if err != nil { + return QoS{}, err + } + q.EgressLimitBps, err = parseQuantity(qos.EgressLimit, total) + if err != nil { + return QoS{}, err + } + + return q, nil +} + +func parseQuantity(v *intstr.IntOrString, total uint64) (uint64, error) { + if v == nil { + return 0, nil + } + if v.Type == intstr.String { + val, err := resource.ParseQuantity(v.String()) + if err != nil { + return 0, err + } + r := BitsToBytes(uint64(val.Value())) + if r > total { + return 0, fmt.Errorf("quantity %s is larger than total %d", v.String(), total) + } + + return r, nil + } else { + return uint64(v.IntValue()) * total / 100, nil + } +} + +func getPodQoS(anno map[string]string) (uint64, uint64, error) { + var ingress, egress uint64 + + if anno[extension.AnnotationNetworkQOS] != "" { + nqos := &NetworkQoS{} + err := json.Unmarshal([]byte(anno[extension.AnnotationNetworkQOS]), nqos) + if err != nil { + return 0, 0, err + } + ing, err := resource.ParseQuantity(nqos.IngressLimit) + if err != nil { + return 0, 0, err + } + ingress = BitsToBytes(uint64(ing.Value())) + + eg, err := resource.ParseQuantity(nqos.EgressLimit) + if err != nil { + return 0, 0, err + } + egress = BitsToBytes(uint64(eg.Value())) + } + + return ingress, egress, nil +} + +func getPodPrio(pod *corev1.Pod) int { + prio, ok := prioMapping[pod.Labels[extension.LabelPodQoS]] + if ok { + return prio + } + switch pod.Status.QOSClass { + case corev1.PodQOSGuaranteed, corev1.PodQOSBurstable: + return 1 + case corev1.PodQOSBestEffort: + return 2 + } + return 0 +} + +func newPlugin() *Plugin { + return &Plugin{ + executor: resourceexecutor.NewResourceUpdateExecutor(), + podFilePath: filepath.Join(rootPath, podConfig), + nodeFilePath: filepath.Join(rootPath, nodeConfig), + node: &Node{}, + pods: make(map[string]*Pod), + syncChan: make(chan struct{}, 1), + } +} + +var singleton *Plugin +var once sync.Once + +func Object() *Plugin { + once.Do(func() { + singleton = newPlugin() + }) + return singleton +} + +func BitsToBytes[T uint64 | float64 | int](bits T) T { + return bits / 8 +} diff --git a/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos_test.go b/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos_test.go new file mode 100644 index 000000000..eadf4090e --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos_test.go @@ -0,0 +1,256 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package terwayqos + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/koordinator-sh/koordinator/apis/extension" + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" +) + +func TestParseQuantityWithNilValue(t *testing.T) { + result, err := parseQuantity(nil, 100) + assert.NoError(t, err) + assert.Equal(t, uint64(0), result) +} + +func TestParseQuantityWithStringTypeAndValidValue(t *testing.T) { + val := intstr.FromString("40") + result, err := parseQuantity(&val, 100) + assert.NoError(t, err) + assert.Equal(t, uint64(5), result) +} + +func TestParseQuantityWithStringTypeAndValueExceedingTotal(t *testing.T) { + val := intstr.FromString("808") + result, err := parseQuantity(&val, 100) + assert.Error(t, err) + assert.Equal(t, uint64(0), result) +} + +func TestParseQuantityWithIntType(t *testing.T) { + val := intstr.FromInt(50) + result, err := parseQuantity(&val, 100) + assert.NoError(t, err) + assert.Equal(t, uint64(50), result) +} + +func TestGetPodQoSWithValidAnnotation(t *testing.T) { + anno := map[string]string{ + extension.AnnotationNetworkQOS: `{"IngressLimit": "1000", "EgressLimit": "2000"}`, + } + + ingress, egress, err := getPodQoS(anno) + + assert.NoError(t, err) + assert.Equal(t, uint64(125), ingress) + assert.Equal(t, uint64(250), egress) +} + +func TestGetPodQoSWithInvalidAnnotation(t *testing.T) { + anno := map[string]string{ + extension.AnnotationNetworkQOS: `{"IngressLimit": "invalid", "EgressLimit": "2000"}`, + } + + _, _, err := getPodQoS(anno) + + assert.Error(t, err) +} + +func TestGetPodQoSWithNoAnnotation(t *testing.T) { + anno := map[string]string{} + + ingress, egress, err := getPodQoS(anno) + + assert.NoError(t, err) + assert.Equal(t, uint64(0), ingress) + assert.Equal(t, uint64(0), egress) +} + +func TestPodPriorityWithExistingLabel(t *testing.T) { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + extension.LabelPodQoS: "BE", + }, + }, + } + + prio := getPodPrio(pod) + + assert.Equal(t, 2, prio) +} + +func TestPodPriorityWithQOSGuaranteed(t *testing.T) { + pod := &corev1.Pod{ + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSGuaranteed, + }, + } + + prio := getPodPrio(pod) + + assert.Equal(t, 1, prio) +} + +func TestPodPriorityWithQOSBurstable(t *testing.T) { + pod := &corev1.Pod{ + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSBurstable, + }, + } + + prio := getPodPrio(pod) + + assert.Equal(t, 1, prio) +} + +func TestPodPriorityWithQOSBestEffort(t *testing.T) { + pod := &corev1.Pod{ + Status: corev1.PodStatus{ + QOSClass: corev1.PodQOSBestEffort, + }, + } + + prio := getPodPrio(pod) + + assert.Equal(t, 2, prio) +} + +func TestPodPriorityWithNoQOSClass(t *testing.T) { + pod := &corev1.Pod{} + + prio := getPodPrio(pod) + + assert.Equal(t, 0, prio) +} + +func TestParseQoSWithNone(t *testing.T) { + qosConf := &slov1alpha1.NetworkQOSCfg{} + qos, err := parseQoS(qosConf, 0) + assert.NoError(t, err) + assert.Equal(t, uint64(0), qos.IngressLimitBps) +} + +func TestParseQoSWithDisabled(t *testing.T) { + enable := false + qosConf := &slov1alpha1.NetworkQOSCfg{Enable: &enable} + qos, err := parseQoS(qosConf, 0) + assert.NoError(t, err) + assert.Equal(t, uint64(0), qos.IngressLimitBps) +} + +func TestParseQoSWithEnabled(t *testing.T) { + enable := true + v := intstr.FromString("10M") + qosConf := &slov1alpha1.NetworkQOSCfg{ + Enable: &enable, + NetworkQOS: slov1alpha1.NetworkQOS{ + IngressRequest: &v, + IngressLimit: &v, + EgressRequest: &v, + EgressLimit: &v, + }, + } + qos, err := parseQoS(qosConf, 100*1000*1000) + assert.NoError(t, err) + assert.Equal(t, uint64(10*1000*1000/8), qos.IngressLimitBps) +} + +func TestParseRuleForNodeSLOWithValidQoSPolicy(t *testing.T) { + plugin := newPlugin() + + terway := slov1alpha1.NETQOSPolicyTerwayQos + mergedNodeSLO := &slov1alpha1.NodeSLOSpec{ + ResourceQOSStrategy: &slov1alpha1.ResourceQOSStrategy{ + Policies: &slov1alpha1.ResourceQOSPolicies{ + NETQOSPolicy: &terway, + }, + }, + } + + result, err := plugin.parseRuleForNodeSLO(mergedNodeSLO) + + assert.NoError(t, err) + assert.True(t, result) + assert.NotNil(t, plugin.node) + assert.True(t, *plugin.enabled) +} + +func TestParseRuleForNodeSLOWithNoQoSPolicy(t *testing.T) { + plugin := newPlugin() + mergedNodeSLO := &slov1alpha1.NodeSLOSpec{} + + result, err := plugin.parseRuleForNodeSLO(mergedNodeSLO) + + assert.NoError(t, err) + assert.True(t, result) + assert.False(t, *plugin.enabled) +} + +func TestParseNetQoS(t *testing.T) { + enable := true + v1 := intstr.FromString("10M") + v2 := intstr.FromString("20M") + + slo := &slov1alpha1.NodeSLOSpec{ + ResourceQOSStrategy: &slov1alpha1.ResourceQOSStrategy{ + LSClass: &slov1alpha1.ResourceQOS{ + NetworkQOS: &slov1alpha1.NetworkQOSCfg{ + Enable: &enable, + NetworkQOS: slov1alpha1.NetworkQOS{ + IngressRequest: &v1, + IngressLimit: &v1, + EgressRequest: &v1, + EgressLimit: &v1, + }, + }, + }, + BEClass: &slov1alpha1.ResourceQOS{ + NetworkQOS: &slov1alpha1.NetworkQOSCfg{ + Enable: &enable, + NetworkQOS: slov1alpha1.NetworkQOS{ + IngressRequest: &v2, + IngressLimit: &v2, + EgressRequest: &v2, + EgressLimit: &v2, + }, + }, + }, + }, + SystemStrategy: &slov1alpha1.SystemStrategy{ + TotalNetworkBandwidth: resource.MustParse("100M"), + }, + } + node := &Node{} + + err := parseNetQoS(slo, node) + + assert.NoError(t, err) + assert.Equal(t, uint64(100*1000*1000/8), node.HwRxBpsMax) + assert.Equal(t, uint64(10*1000*1000/8), node.L1TxBpsMin) + assert.Equal(t, uint64(20*1000*1000/8), node.L2TxBpsMin) + assert.Equal(t, uint64(10*1000*1000/8), node.L1TxBpsMax) + assert.Equal(t, uint64(20*1000*1000/8), node.L2TxBpsMax) +} diff --git a/pkg/koordlet/runtimehooks/hooks/terwayqos/types.go b/pkg/koordlet/runtimehooks/hooks/terwayqos/types.go new file mode 100644 index 000000000..3060601ee --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/terwayqos/types.go @@ -0,0 +1,96 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package terwayqos + +import ( + "bytes" + "fmt" + "reflect" + + "github.com/koordinator-sh/koordinator/apis/extension" +) + +type NetworkQoS struct { + // IngressLimit and EgressLimit is the bandwidth in bps + // are used to set bandwidth for Pod. The unit is bps. + // For example, 10M means 10 megabits per second. + IngressLimit string `json:"ingressLimit"` + EgressLimit string `json:"egressLimit"` +} + +type QoS struct { + IngressRequestBps uint64 `json:"ingressRequestBps"` + IngressLimitBps uint64 `json:"ingressLimitBps"` + EgressRequestBps uint64 `json:"egressRequestBps"` + EgressLimitBps uint64 `json:"egressLimitBps"` +} + +type Node struct { + HwTxBpsMax uint64 `text:"hw_tx_bps_max"` + HwRxBpsMax uint64 `text:"hw_rx_bps_max"` + L1TxBpsMin uint64 `text:"offline_l1_tx_bps_min"` + L1TxBpsMax uint64 `text:"offline_l1_tx_bps_max"` + L1RxBpsMin uint64 `text:"offline_l1_rx_bps_min"` + L1RxBpsMax uint64 `text:"offline_l1_rx_bps_max"` + L2TxBpsMin uint64 `text:"offline_l2_tx_bps_min"` + L2TxBpsMax uint64 `text:"offline_l2_tx_bps_max"` + L2RxBpsMin uint64 `text:"offline_l2_rx_bps_min"` + L2RxBpsMax uint64 `text:"offline_l2_rx_bps_max"` +} + +func (n Node) MarshalText() (text []byte, err error) { + val := reflect.ValueOf(n) + typ := val.Type() + + var buffer bytes.Buffer + + for i := 0; i < val.NumField(); i++ { + field := typ.Field(i) + tagValue := field.Tag.Get("text") + if tagValue == "" { + continue // Skip fields without text tag + } + + fieldValue := val.Field(i) + line := fmt.Sprintf("%s %v\n", tagValue, fieldValue.Interface()) + buffer.WriteString(line) + } + + return buffer.Bytes(), nil +} + +type Pod struct { + PodName string `json:"podName"` + PodNamespace string `json:"podNamespace"` + PodUID string `json:"podUID"` + Prio int `json:"prio"` + CgroupDir string `json:"cgroupDir"` + QoSConfig QoSConfig `json:"qosConfig"` +} + +type QoSConfig struct { + IngressBandwidth uint64 `json:"ingressBandwidth"` + EgressBandwidth uint64 `json:"egressBandwidth"` +} + +var prioMapping = map[string]int{ + string(extension.QoSSystem): 0, + string(extension.QoSLSE): 1, + string(extension.QoSLSR): 1, + string(extension.QoSLS): 1, + string(extension.QoSBE): 2, +} diff --git a/pkg/koordlet/runtimehooks/hooks/terwayqos/types_test.go b/pkg/koordlet/runtimehooks/hooks/terwayqos/types_test.go new file mode 100644 index 000000000..93ccbef9b --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/terwayqos/types_test.go @@ -0,0 +1,71 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package terwayqos + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMarshalTextWithAllFieldsSet(t *testing.T) { + node := Node{ + HwTxBpsMax: 100, + HwRxBpsMax: 200, + L1TxBpsMin: 300, + L1TxBpsMax: 400, + L1RxBpsMin: 500, + L1RxBpsMax: 600, + L2TxBpsMin: 700, + L2TxBpsMax: 800, + L2RxBpsMin: 900, + L2RxBpsMax: 1000, + } + + expected := "hw_tx_bps_max 100\nhw_rx_bps_max 200\noffline_l1_tx_bps_min 300\noffline_l1_tx_bps_max 400\noffline_l1_rx_bps_min 500\noffline_l1_rx_bps_max 600\noffline_l2_tx_bps_min 700\noffline_l2_tx_bps_max 800\noffline_l2_rx_bps_min 900\noffline_l2_rx_bps_max 1000\n" + + result, err := node.MarshalText() + + assert.NoError(t, err) + assert.Equal(t, expected, string(result)) +} + +func TestMarshalTextWithNoFieldsSet(t *testing.T) { + node := Node{} + + expected := "hw_tx_bps_max 0\nhw_rx_bps_max 0\noffline_l1_tx_bps_min 0\noffline_l1_tx_bps_max 0\noffline_l1_rx_bps_min 0\noffline_l1_rx_bps_max 0\noffline_l2_tx_bps_min 0\noffline_l2_tx_bps_max 0\noffline_l2_rx_bps_min 0\noffline_l2_rx_bps_max 0\n" + + result, err := node.MarshalText() + + assert.NoError(t, err) + assert.Equal(t, expected, string(result)) +} + +func TestMarshalTextWithSomeFieldsSet(t *testing.T) { + node := Node{ + HwTxBpsMax: 100, + L1TxBpsMin: 300, + L2RxBpsMax: 1000, + } + + expected := "hw_tx_bps_max 100\nhw_rx_bps_max 0\noffline_l1_tx_bps_min 300\noffline_l1_tx_bps_max 0\noffline_l1_rx_bps_min 0\noffline_l1_rx_bps_max 0\noffline_l2_tx_bps_min 0\noffline_l2_tx_bps_max 0\noffline_l2_rx_bps_min 0\noffline_l2_rx_bps_max 1000\n" + + result, err := node.MarshalText() + + assert.NoError(t, err) + assert.Equal(t, expected, string(result)) +} diff --git a/pkg/util/sloconfig/nodeslo_config.go b/pkg/util/sloconfig/nodeslo_config.go index e4489e162..da21df707 100644 --- a/pkg/util/sloconfig/nodeslo_config.go +++ b/pkg/util/sloconfig/nodeslo_config.go @@ -18,6 +18,7 @@ package sloconfig import ( "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/klog/v2" "k8s.io/utils/pointer" @@ -199,8 +200,10 @@ func DefaultMemoryQOS(qos apiext.QoSClass) *slov1alpha1.MemoryQOS { func DefaultResourceQOSPolicies() *slov1alpha1.ResourceQOSPolicies { defaultCPUPolicy := slov1alpha1.CPUQOSPolicyGroupIdentity + defaultNetQoSPolicy := slov1alpha1.NETQOSPolicyTC return &slov1alpha1.ResourceQOSPolicies{ - CPUPolicy: &defaultCPUPolicy, + CPUPolicy: &defaultCPUPolicy, + NETQOSPolicy: &defaultNetQoSPolicy, } } @@ -220,6 +223,10 @@ func DefaultResourceQOSStrategy() *slov1alpha1.ResourceQOSStrategy { Enable: pointer.Bool(false), MemoryQOS: *DefaultMemoryQOS(apiext.QoSLSR), }, + NetworkQOS: &slov1alpha1.NetworkQOSCfg{ + Enable: pointer.Bool(false), + NetworkQOS: *NoneNetworkQOS(), + }, }, LSClass: &slov1alpha1.ResourceQOS{ CPUQOS: &slov1alpha1.CPUQOSCfg{ @@ -234,6 +241,10 @@ func DefaultResourceQOSStrategy() *slov1alpha1.ResourceQOSStrategy { Enable: pointer.Bool(false), MemoryQOS: *DefaultMemoryQOS(apiext.QoSLS), }, + NetworkQOS: &slov1alpha1.NetworkQOSCfg{ + Enable: pointer.Bool(false), + NetworkQOS: *NoneNetworkQOS(), + }, }, BEClass: &slov1alpha1.ResourceQOS{ CPUQOS: &slov1alpha1.CPUQOSCfg{ @@ -248,6 +259,10 @@ func DefaultResourceQOSStrategy() *slov1alpha1.ResourceQOSStrategy { Enable: pointer.Bool(false), MemoryQOS: *DefaultMemoryQOS(apiext.QoSBE), }, + NetworkQOS: &slov1alpha1.NetworkQOSCfg{ + Enable: pointer.Bool(false), + NetworkQOS: *NoneNetworkQOS(), + }, }, SystemClass: &slov1alpha1.ResourceQOS{ CPUQOS: &slov1alpha1.CPUQOSCfg{ @@ -262,6 +277,10 @@ func DefaultResourceQOSStrategy() *slov1alpha1.ResourceQOSStrategy { Enable: pointer.Bool(false), MemoryQOS: *DefaultMemoryQOS(apiext.QoSSystem), }, + NetworkQOS: &slov1alpha1.NetworkQOSCfg{ + Enable: pointer.Bool(false), + NetworkQOS: *NoneNetworkQOS(), + }, }, } } @@ -280,6 +299,10 @@ func NoneResourceQOS(qos apiext.QoSClass) *slov1alpha1.ResourceQOS { Enable: pointer.Bool(false), MemoryQOS: *NoneMemoryQOS(), }, + NetworkQOS: &slov1alpha1.NetworkQOSCfg{ + Enable: pointer.Bool(false), + NetworkQOS: *NoneNetworkQOS(), + }, } } @@ -316,8 +339,10 @@ func NoneMemoryQOS() *slov1alpha1.MemoryQOS { func NoneResourceQOSPolicies() *slov1alpha1.ResourceQOSPolicies { noneCPUPolicy := slov1alpha1.CPUQOSPolicyGroupIdentity + defaultNetQoSPolicy := slov1alpha1.NETQOSPolicyTC return &slov1alpha1.ResourceQOSPolicies{ - CPUPolicy: &noneCPUPolicy, + CPUPolicy: &noneCPUPolicy, + NETQOSPolicy: &defaultNetQoSPolicy, } } @@ -360,3 +385,13 @@ func DefaultSystemStrategy() *slov1alpha1.SystemStrategy { func DefaultExtensions() *slov1alpha1.ExtensionsMap { return getDefaultExtensionsMap() } + +func NoneNetworkQOS() *slov1alpha1.NetworkQOS { + zero := intstr.FromInt(0) + return &slov1alpha1.NetworkQOS{ + IngressRequest: &zero, + IngressLimit: &zero, + EgressRequest: &zero, + EgressLimit: &zero, + } +}