diff --git a/apis/extension/constants.go b/apis/extension/constants.go index 17cce7954..5e8f6c636 100644 --- a/apis/extension/constants.go +++ b/apis/extension/constants.go @@ -40,6 +40,11 @@ const ( // LabelPodMutatingUpdate is a label key that pods with `pod.koordinator.sh/mutating-update=true` will // be mutated by Koordinator webhook when updating. LabelPodMutatingUpdate = PodDomainPrefix + "/mutating-update" + + // AnnotationIngressBandwidth and AnnotationEgressBandwidth are used to set bandwidth for Pod. The unit is bps. + // For example, 10M means 10 megabits per second. + AnnotationIngressBandwidth = DomainPrefix + "ingress-bandwidth" + AnnotationEgressBandwidth = DomainPrefix + "egress-bandwidth" ) type AggregationType string diff --git a/pkg/koordlet/runtimehooks/config.go b/pkg/koordlet/runtimehooks/config.go index cd99502ee..f292fd177 100644 --- a/pkg/koordlet/runtimehooks/config.go +++ b/pkg/koordlet/runtimehooks/config.go @@ -31,6 +31,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/cpuset" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/groupidentity" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/terwayqos" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" ) @@ -73,6 +74,11 @@ const ( // owner: @saintube @zwzhang0107 // alpha: v1.4 CoreSched featuregate.Feature = "CoreSched" + + // TerwayQoS enables net QoS feature of koordlet. + // owner: @l1b0k + // alpha: v1.5 + TerwayQoS featuregate.Feature = "TerwayQoS" ) var ( @@ -83,6 +89,7 @@ var ( BatchResource: {Default: true, PreRelease: featuregate.Beta}, CPUNormalization: {Default: false, PreRelease: featuregate.Alpha}, CoreSched: {Default: false, PreRelease: featuregate.Alpha}, + TerwayQoS: {Default: false, PreRelease: featuregate.Alpha}, } runtimeHookPlugins = map[featuregate.Feature]HookPlugin{ @@ -92,6 +99,7 @@ var ( BatchResource: batchresource.Object(), CPUNormalization: cpunormalization.Object(), CoreSched: coresched.Object(), + TerwayQoS: terwayqos.Object(), } ) diff --git a/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos.go b/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos.go new file mode 100644 index 000000000..abc46f518 --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos.go @@ -0,0 +1,273 @@ +package terwayqos + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/apis/extension" + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/rule" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" + "github.com/koordinator-sh/koordinator/pkg/koordlet/util" +) + +const ( + rootPath = "/var/lib/terway/qos" + podConfig = "pod.json" + nodeConfig = "node.json" +) + +const ( + name = "TerwayQoS" + description = "network qos management" + + ruleNameForNodeQoS = name + " (nodeQoS)" + ruleNameForAllPods = name + " (allPods)" +) + +type Plugin struct { + executor resourceexecutor.ResourceUpdateExecutor + + lock sync.RWMutex + node *Node +} + +func (p *Plugin) Register(op hooks.Options) { + klog.V(5).Infof("register hook %v", "terwqy qos configure generator") + rule.Register(ruleNameForNodeQoS, description, + rule.WithParseFunc(statesinformer.RegisterTypeNodeSLOSpec, p.parseRuleForNodeSLO), + rule.WithUpdateCallback(p.update)) + rule.Register(ruleNameForAllPods, description, + rule.WithParseFunc(statesinformer.RegisterTypeAllPods, p.parseForAllPods), + rule.WithUpdateCallback(p.update)) + + p.executor = op.Executor +} + +func (p *Plugin) parseRuleForNodeSLO(mergedNodeSLOIf interface{}) (bool, error) { + mergedNodeSLO := mergedNodeSLOIf.(*slov1alpha1.NodeSLOSpec) + + n := &Node{} + err := parseNetQoS(mergedNodeSLO, n) + if err != nil { + return false, err + } + + p.lock.Lock() + p.node = n + p.lock.Unlock() + return true, nil +} + +func (p *Plugin) parseForAllPods(e interface{}) (bool, error) { + _, ok := e.(*struct{}) + if !ok { + return false, fmt.Errorf("invalid rule type %T", e) + } + + return true, nil +} + +func (p *Plugin) update(target *statesinformer.CallbackTarget) error { + if target == nil { + return fmt.Errorf("callback target is nil") + } + + err := os.MkdirAll(rootPath, os.ModeDir) + if err != nil { + return err + } + + pods := make([]Pod, 0, len(target.Pods)) + for _, meta := range target.Pods { + if meta.Pod == nil { + continue + } + + v4, v6 := util.GetIPs(meta.Pod) + + ing, egress, err := getPodQoS(meta.Pod.Annotations) + if err != nil { + klog.Errorf("get pod qos failed, err: %v", err) + continue + } + pods = append(pods, Pod{ + PodName: meta.Pod.Name, + PodNamespace: meta.Pod.Namespace, + PodUID: string(meta.Pod.UID), + Prio: prioMapping[meta.Pod.Labels[extension.LabelPodQoS]], + IPv4: v4, + IPv6: v6, + HostNetwork: meta.Pod.Spec.HostNetwork, + CgroupDir: meta.CgroupDir, + QoSConfig: QoSConfig{ + IngressBandwidth: ing, + EgressBandwidth: egress, + }, + }) + } + outPods, err := json.Marshal(pods) + if err != nil { + return err + } + + err = os.WriteFile(filepath.Join(rootPath, podConfig), outPods, 0644) + if err != nil { + return err + } + + p.lock.RLock() + defer p.lock.RUnlock() + outNode, err := json.Marshal(p.node) + if err != nil { + return err + } + + err = os.WriteFile(filepath.Join(rootPath, nodeConfig), outNode, 0644) + return err +} + +// parseNetQoS only LS and BE is taken into account +func parseNetQoS(slo *slov1alpha1.NodeSLOSpec, node *Node) error { + if slo.SystemStrategy == nil { + return nil + } + if slo.SystemStrategy.TotalNetworkBandwidth.Value() < 0 { + return fmt.Errorf("invalid total network bandwidth %d", slo.SystemStrategy.TotalNetworkBandwidth.Value()) + } + total := uint64(slo.SystemStrategy.TotalNetworkBandwidth.Value()) + // to Byte/s + total = total * 1000 * 1000 / 8 + + node.HwTxBpsMax = total + node.HwRxBpsMax = total + + qos := slo.ResourceQOSStrategy + if qos == nil { + return nil + } + + if qos.LSClass != nil && qos.LSClass.NetworkQOS != nil && qos.LSClass.NetworkQOS.Enable != nil && *qos.LSClass.NetworkQOS.Enable { + q, err := parseQoS(qos.LSClass.NetworkQOS, total) + if err != nil { + return err + } + node.L1RxBpsMin = q.IngressRequestBps + node.L1RxBpsMax = q.IngressLimitBps + node.L1TxBpsMin = q.EgressRequestBps + node.L1TxBpsMax = q.EgressLimitBps + } + + if qos.BEClass != nil && qos.BEClass.NetworkQOS != nil && qos.BEClass.NetworkQOS.Enable != nil && *qos.BEClass.NetworkQOS.Enable { + q, err := parseQoS(qos.BEClass.NetworkQOS, total) + if err != nil { + return err + } + node.L2RxBpsMin = q.IngressRequestBps + node.L2RxBpsMax = q.IngressLimitBps + node.L2TxBpsMin = q.EgressRequestBps + node.L2TxBpsMax = q.EgressLimitBps + } + + return nil +} + +func parseQoS(qos *slov1alpha1.NetworkQOSCfg, total uint64) (QoS, error) { + q := QoS{ + IngressRequestBps: 0, + IngressLimitBps: 0, + EgressRequestBps: 0, + EgressLimitBps: 0, + } + if qos.Enable == nil { + return q, nil + } + if !*qos.Enable { + return q, nil + } + + var err error + q.IngressRequestBps, err = parseQuantity(qos.IngressRequest, total) + if err != nil { + return QoS{}, err + } + + q.IngressLimitBps, err = parseQuantity(qos.IngressLimit, total) + if err != nil { + return QoS{}, err + } + + q.EgressRequestBps, err = parseQuantity(qos.EgressRequest, total) + if err != nil { + return QoS{}, err + } + q.EgressLimitBps, err = parseQuantity(qos.EgressLimit, total) + if err != nil { + return QoS{}, err + } + + return q, nil +} + +func parseQuantity(v *intstr.IntOrString, total uint64) (uint64, error) { + if v == nil { + return 0, nil + } + if v.Type == intstr.String { + val, err := resource.ParseQuantity(v.String()) + if err != nil { + return 0, err + } + r := uint64(val.Value()) + if r > total { + return 0, fmt.Errorf("quantity %s is larger than total %d", v.String(), total) + } + + return r, nil + } else { + return uint64(v.IntValue()) * total / 100, nil + } +} + +func getPodQoS(anno map[string]string) (uint64, uint64, error) { + var ingress, egress uint64 + + if anno[extension.AnnotationIngressBandwidth] != "" { + ing, err := resource.ParseQuantity(anno[extension.AnnotationIngressBandwidth]) + if err != nil { + return 0, 0, err + } + ingress = uint64(ing.Value()) / 8 + } + if anno[extension.AnnotationEgressBandwidth] != "" { + eg, err := resource.ParseQuantity(anno[extension.AnnotationEgressBandwidth]) + if err != nil { + return 0, 0, err + } + egress = uint64(eg.Value()) / 8 + } + return ingress, egress, nil +} + +func newPlugin() *Plugin { + return &Plugin{} +} + +var singleton *Plugin +var once sync.Once + +func Object() *Plugin { + once.Do(func() { + singleton = newPlugin() + }) + return singleton +} diff --git a/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos_test.go b/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos_test.go new file mode 100644 index 000000000..732d42563 --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos_test.go @@ -0,0 +1,35 @@ +package terwayqos + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/util/intstr" +) + +func TestParseQuantityWithNilValue(t *testing.T) { + result, err := parseQuantity(nil, 100) + assert.NoError(t, err) + assert.Equal(t, uint64(0), result) +} + +func TestParseQuantityWithStringTypeAndValidValue(t *testing.T) { + val := intstr.FromString("50") + result, err := parseQuantity(&val, 100) + assert.NoError(t, err) + assert.Equal(t, uint64(50), result) +} + +func TestParseQuantityWithStringTypeAndValueExceedingTotal(t *testing.T) { + val := intstr.FromString("200") + result, err := parseQuantity(&val, 100) + assert.Error(t, err) + assert.Equal(t, uint64(0), result) +} + +func TestParseQuantityWithIntType(t *testing.T) { + val := intstr.FromInt(50) + result, err := parseQuantity(&val, 100) + assert.NoError(t, err) + assert.Equal(t, uint64(50), result) +} diff --git a/pkg/koordlet/runtimehooks/hooks/terwayqos/types.go b/pkg/koordlet/runtimehooks/hooks/terwayqos/types.go new file mode 100644 index 000000000..cd889d021 --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/terwayqos/types.go @@ -0,0 +1,53 @@ +package terwayqos + +import ( + "net/netip" + + "github.com/koordinator-sh/koordinator/apis/extension" +) + +type QoS struct { + IngressRequestBps uint64 `json:"ingressRequestBps"` + IngressLimitBps uint64 `json:"ingressLimitBps"` + EgressRequestBps uint64 `json:"egressRequestBps"` + EgressLimitBps uint64 `json:"egressLimitBps"` +} + +type Node struct { + HwTxBpsMax uint64 `json:"hw_tx_bps_max"` + HwRxBpsMax uint64 `json:"hw_rx_bps_max"` + L1TxBpsMin uint64 `json:"l1_tx_bps_min"` + L1TxBpsMax uint64 `json:"l1_tx_bps_max"` + L1RxBpsMin uint64 `json:"l1_rx_bps_min"` + L1RxBpsMax uint64 `json:"l1_rx_bps_max"` + L2TxBpsMin uint64 `json:"l2_tx_bps_min"` + L2TxBpsMax uint64 `json:"l2_tx_bps_max"` + L2RxBpsMin uint64 `json:"l2_rx_bps_min"` + L2RxBpsMax uint64 `json:"l2_rx_bps_max"` +} + +type Pod struct { + PodName string `json:"podName"` + PodNamespace string `json:"podNamespace"` + PodUID string `json:"podUID"` + Prio int `json:"prio"` + IPv4 netip.Addr `json:"ipv4"` + IPv6 netip.Addr `json:"ipv6"` + HostNetwork bool `json:"hostNetwork"` + CgroupDir string `json:"cgroupDir"` + QoSConfig QoSConfig `json:"qosConfig"` +} + +type QoSConfig struct { + IngressBandwidth uint64 `json:"ingressBandwidth"` + EgressBandwidth uint64 `json:"egressBandwidth"` +} + +var prioMapping = map[string]int{ + string(extension.QoSNone): 0, + string(extension.QoSSystem): 0, + string(extension.QoSLSE): 1, + string(extension.QoSLSR): 1, + string(extension.QoSLS): 1, + string(extension.QoSBE): 2, +} diff --git a/pkg/koordlet/util/pod.go b/pkg/koordlet/util/pod.go index fc7343b14..a2294e22b 100644 --- a/pkg/koordlet/util/pod.go +++ b/pkg/koordlet/util/pod.go @@ -18,10 +18,12 @@ package util import ( "fmt" + "net/netip" "os" "path/filepath" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" ) @@ -94,3 +96,23 @@ func GetPodSandboxContainerID(pod *corev1.Pod) (string, error) { } return fmt.Sprintf("%s://%s", containerRuntime, containerHashID), nil } + +func GetIPs(pod *corev1.Pod) (v4 netip.Addr, v6 netip.Addr) { + ips := sets.NewString(pod.Status.PodIP) + for _, ip := range pod.Status.PodIPs { + ips.Insert(ip.IP) + } + for ip := range ips { + addr, err := netip.ParseAddr(ip) + if err != nil { + continue + } + if addr.Is4() { + v4 = addr + } else { + v6 = addr + } + } + + return +}