diff --git a/pkg/koordlet/qosmanager/framework/config.go b/pkg/koordlet/qosmanager/framework/config.go index f6ac0ca6e..7fa8e4583 100644 --- a/pkg/koordlet/qosmanager/framework/config.go +++ b/pkg/koordlet/qosmanager/framework/config.go @@ -28,6 +28,7 @@ type Config struct { MemoryEvictCoolTimeSeconds int CPUEvictCoolTimeSeconds int QOSExtensionCfg *QOSExtensionConfig + NetQOSProvider string } func NewDefaultConfig() *Config { @@ -39,6 +40,7 @@ func NewDefaultConfig() *Config { MemoryEvictCoolTimeSeconds: 4, CPUEvictCoolTimeSeconds: 20, QOSExtensionCfg: &QOSExtensionConfig{FeatureGates: map[string]bool{}}, + NetQOSProvider: "koordlet", } } @@ -49,5 +51,7 @@ func (c *Config) InitFlags(fs *flag.FlagSet) { fs.IntVar(&c.MemoryEvictIntervalSeconds, "memory-evict-interval-seconds", c.MemoryEvictIntervalSeconds, "evict be pod(memory) interval by seconds") fs.IntVar(&c.MemoryEvictCoolTimeSeconds, "memory-evict-cool-time-seconds", c.MemoryEvictCoolTimeSeconds, "cooling time: memory next evict time should after lastEvictTime + MemoryEvictCoolTimeSeconds") fs.IntVar(&c.CPUEvictCoolTimeSeconds, "cpu-evict-cool-time-seconds", c.CPUEvictCoolTimeSeconds, "cooltime: CPU next evict time should after lastEvictTime + CPUEvictCoolTimeSeconds") + fs.StringVar(&c.NetQOSProvider, "net-qos-provider", c.NetQOSProvider, "net qos provider, support: koordlet,terway default: koordlet") + c.QOSExtensionCfg.InitFlags(fs) } diff --git a/pkg/koordlet/qosmanager/plugins/netqos/koordlet/default.go b/pkg/koordlet/qosmanager/plugins/netqos/koordlet/default.go new file mode 100644 index 000000000..80319885c --- /dev/null +++ b/pkg/koordlet/qosmanager/plugins/netqos/koordlet/default.go @@ -0,0 +1,76 @@ +package koordlet + +import ( + "encoding/json" + "net/netip" + "os" + "path/filepath" + + "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/plugins/netqos/types" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" + "github.com/koordinator-sh/koordinator/pkg/koordlet/util" +) + +const rootConfigPath = "/etc/koordlet/net-qos" + +type Pod struct { + PodName string `json:"podName"` + PodNamespace string `json:"podNamespace"` + PodUID string `json:"podUID"` + QoSClass string `json:"qosClass"` + + IPv4 netip.Addr `json:"ipv4"` + IPv6 netip.Addr `json:"ipv6"` + + HostNetwork bool `json:"hostNetwork"` + + CgroupDir string `json:"cgroupDir"` +} + +type Koordlet struct { +} + +func (t *Koordlet) Sync(node *types.Node, podsMeta []*statesinformer.PodMeta) error { + // write the node config + out, err := json.Marshal(node) + if err != nil { + return err + } + + err = os.MkdirAll(rootConfigPath, os.ModeDir) + if err != nil { + return err + } + + err = os.WriteFile(filepath.Join(rootConfigPath, "node.json"), out, 0644) + if err != nil { + return err + } + + pods := make([]Pod, 0, len(podsMeta)) + for _, meta := range podsMeta { + if meta.Pod == nil { + continue + } + + v4, v6 := util.GetIPs(meta.Pod) + + pods = append(pods, Pod{ + PodName: meta.Pod.Name, + PodNamespace: meta.Pod.Namespace, + PodUID: string(meta.Pod.UID), + QoSClass: meta.Pod.Labels[extension.LabelPodQoS], + IPv4: v4, + IPv6: v6, + HostNetwork: meta.Pod.Spec.HostNetwork, + CgroupDir: meta.CgroupDir, + }) + } + outPods, err := json.Marshal(pods) + if err != nil { + return err + } + + return os.WriteFile(filepath.Join(rootConfigPath, "pods.json"), outPods, 0644) +} diff --git a/pkg/koordlet/qosmanager/plugins/netqos/net_qos.go b/pkg/koordlet/qosmanager/plugins/netqos/net_qos.go new file mode 100644 index 000000000..308012e59 --- /dev/null +++ b/pkg/koordlet/qosmanager/plugins/netqos/net_qos.go @@ -0,0 +1,207 @@ +package netqos + +import ( + "fmt" + "time" + + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/features" + "github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/framework" + "github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/plugins/netqos/koordlet" + "github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/plugins/netqos/terway" + "github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/plugins/netqos/types" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" +) + +const ( + NetQoSName = "NetQoS" +) + +var providers = map[string]Interface{ + "terway": &terway.Terway{}, + "koordlet": &koordlet.Koordlet{}, +} + +type Interface interface { + Sync(node *types.Node, podsMeta []*statesinformer.PodMeta) error +} + +type nodeConfig struct { + provider Interface + + reconcileInterval time.Duration + statesInformer statesinformer.StatesInformer +} + +func (n *nodeConfig) Enabled() bool { + return features.DefaultKoordletFeatureGate.Enabled(features.NetQoS) && n.reconcileInterval > 0 +} + +func (n *nodeConfig) Setup(context *framework.Context) {} + +func (n *nodeConfig) Run(stopCh <-chan struct{}) { + go wait.Until(n.reconcile, n.reconcileInterval, stopCh) +} + +var _ framework.QOSStrategy = &nodeConfig{} + +func New(opt *framework.Options) framework.QOSStrategy { + klog.Infof("enable net qos plugin %s", opt.Config.NetQOSProvider) + return &nodeConfig{ + statesInformer: opt.StatesInformer, + reconcileInterval: time.Duration(opt.Config.ReconcileIntervalSeconds) * time.Second, + provider: providers[opt.Config.NetQOSProvider], + } +} + +func (n *nodeConfig) reconcile() { + // lookup the node config + slo := n.statesInformer.GetNodeSLO() + if slo == nil { + return + } + + err := n.sync(slo) + if err != nil { + klog.Error(err, "error sync netqos") + } +} + +func (n *nodeConfig) sync(slo *v1alpha1.NodeSLO) error { + node := &types.Node{ + Leveled: make(map[string]types.QoS), + } + + err := parseNodeSLO(slo, node) + if err != nil { + return err + } + + err = parseNetQoS(slo, node) + if err != nil { + return err + } + + podsMeta := n.statesInformer.GetAllPods() + + return n.provider.Sync(node, podsMeta) +} + +func parseNodeSLO(slo *v1alpha1.NodeSLO, node *types.Node) error { + if slo.Spec.SystemStrategy == nil { + return nil + } + node.TotalNetworkBandwidth = uint64(slo.Spec.SystemStrategy.TotalNetworkBandwidth.Value()) + return nil +} + +func parseNetQoS(slo *v1alpha1.NodeSLO, node *types.Node) error { + qos := slo.Spec.ResourceQOSStrategy + if qos == nil { + return nil + } + + if qos.LSRClass != nil && qos.LSRClass.NetworkQOS != nil { + q, err := parseQoS(qos.LSRClass.NetworkQOS, node.TotalNetworkBandwidth) + if err != nil { + return err + } + node.Leveled[string(extension.QoSLSR)] = q + } + if qos.LSClass != nil && qos.LSClass.NetworkQOS != nil { + q, err := parseQoS(qos.LSClass.NetworkQOS, node.TotalNetworkBandwidth) + if err != nil { + return err + } + node.Leveled[string(extension.QoSLS)] = q + } + + if qos.BEClass != nil && qos.BEClass.NetworkQOS != nil { + q, err := parseQoS(qos.BEClass.NetworkQOS, node.TotalNetworkBandwidth) + if err != nil { + return err + } + node.Leveled[string(extension.QoSBE)] = q + } + + if qos.SystemClass != nil && qos.SystemClass.NetworkQOS != nil { + q, err := parseQoS(qos.SystemClass.NetworkQOS, node.TotalNetworkBandwidth) + if err != nil { + return err + } + node.Leveled[string(extension.QoSSystem)] = q + } + + if qos.CgroupRoot != nil && qos.CgroupRoot.NetworkQOS != nil { + q, err := parseQoS(qos.CgroupRoot.NetworkQOS, node.TotalNetworkBandwidth) + if err != nil { + return err + } + node.Leveled[string(extension.QoSNone)] = q + } + + return nil +} + +func parseQoS(qos *v1alpha1.NetworkQOSCfg, total uint64) (types.QoS, error) { + q := types.QoS{ + IngressRequestBps: 0, + IngressLimitBps: 0, + EgressRequestBps: 0, + EgressLimitBps: 0, + } + if qos.Enable == nil { + return q, nil + } + if !*qos.Enable { + return q, nil + } + + var err error + q.IngressRequestBps, err = parseQuantity(qos.IngressRequest, total) + if err != nil { + return types.QoS{}, err + } + + q.IngressLimitBps, err = parseQuantity(qos.IngressLimit, total) + if err != nil { + return types.QoS{}, err + } + + q.EgressRequestBps, err = parseQuantity(qos.EgressRequest, total) + if err != nil { + return types.QoS{}, err + } + q.EgressLimitBps, err = parseQuantity(qos.EgressLimit, total) + if err != nil { + return types.QoS{}, err + } + + return q, nil +} + +func parseQuantity(v *intstr.IntOrString, total uint64) (uint64, error) { + if v == nil { + return 0, nil + } + if v.Type == intstr.String { + val, err := resource.ParseQuantity(v.String()) + if err != nil { + return 0, err + } + r := uint64(val.Value()) + if r > total { + return 0, fmt.Errorf("quantity %s is larger than total %d", v.String(), total) + } + + return r, nil + } else { + return uint64(v.IntValue()) * total / 100, nil + } +} diff --git a/pkg/koordlet/qosmanager/plugins/netqos/net_qos_test.go b/pkg/koordlet/qosmanager/plugins/netqos/net_qos_test.go new file mode 100644 index 000000000..35a59ef49 --- /dev/null +++ b/pkg/koordlet/qosmanager/plugins/netqos/net_qos_test.go @@ -0,0 +1,78 @@ +package netqos + +import ( + "testing" + + "k8s.io/apimachinery/pkg/util/intstr" +) + +func Test_parseQuantity(t *testing.T) { + type args struct { + v *intstr.IntOrString + total uint64 + } + tests := []struct { + name string + args args + want uint64 + wantErr bool + }{ + { + name: "null", + args: args{ + v: nil, + total: 0, + }, + want: 0, + wantErr: false, + }, + { + name: "percentage", + args: args{ + v: func() *intstr.IntOrString { + v := intstr.FromInt(10) + return &v + }(), + total: 100, + }, + want: 10, + wantErr: false, + }, + { + name: "quantity", + args: args{ + v: func() *intstr.IntOrString { + v := intstr.FromString("100M") + return &v + }(), + total: 100000000, + }, + want: 100000000, + wantErr: false, + }, + { + name: "too large", + args: args{ + v: func() *intstr.IntOrString { + v := intstr.FromString("900M") + return &v + }(), + total: 100000000, + }, + want: 0, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseQuantity(tt.args.v, tt.args.total) + if (err != nil) != tt.wantErr { + t.Errorf("parseQuantity() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("parseQuantity() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/koordlet/qosmanager/plugins/netqos/terway/terway.go b/pkg/koordlet/qosmanager/plugins/netqos/terway/terway.go new file mode 100644 index 000000000..b9470deab --- /dev/null +++ b/pkg/koordlet/qosmanager/plugins/netqos/terway/terway.go @@ -0,0 +1,132 @@ +package terway + +import ( + "encoding/json" + "net/netip" + "os" + "path/filepath" + + "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/plugins/netqos/types" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" + "github.com/koordinator-sh/koordinator/pkg/koordlet/util" +) + +var prioMapping = map[string]int{ + string(extension.QoSNone): 0, + string(extension.QoSSystem): 0, + string(extension.QoSLSE): 1, + string(extension.QoSLSR): 1, + string(extension.QoSLS): 1, + string(extension.QoSBE): 2, +} + +const ( + rootPath = "/var/lib/terway/qos" + podConfig = "pod.json" + nodeConfig = "node.json" +) + +type Node struct { + HwTxBpsMax uint64 `json:"hw_tx_bps_max"` + HwRxBpsMax uint64 `json:"hw_rx_bps_max"` + L0TxBpsMin uint64 `json:"l0_tx_bps_min"` + L0TxBpsMax uint64 `json:"l0_tx_bps_max"` + L0RxBpsMin uint64 `json:"l0_rx_bps_min"` + L0RxBpsMax uint64 `json:"l0_rx_bps_max"` + L1TxBpsMin uint64 `json:"l1_tx_bps_min"` + L1TxBpsMax uint64 `json:"l1_tx_bps_max"` + L1RxBpsMin uint64 `json:"l1_rx_bps_min"` + L1RxBpsMax uint64 `json:"l1_rx_bps_max"` + L2TxBpsMin uint64 `json:"l2_tx_bps_min"` + L2TxBpsMax uint64 `json:"l2_tx_bps_max"` + L2RxBpsMin uint64 `json:"l2_rx_bps_min"` + L2RxBpsMax uint64 `json:"l2_rx_bps_max"` +} + +type Pod struct { + PodName string `json:"podName"` + PodNamespace string `json:"podNamespace"` + PodUID string `json:"podUID"` + Prio int `json:"prio"` + IPv4 netip.Addr `json:"ipv4"` + IPv6 netip.Addr `json:"ipv6"` + HostNetwork bool `json:"hostNetwork"` + CgroupDir string `json:"cgroupDir"` +} + +type Terway struct { +} + +func (t *Terway) Sync(node *types.Node, podsMeta []*statesinformer.PodMeta) error { + err := os.MkdirAll(rootPath, os.ModeDir) + if err != nil { + return err + } + pods := make([]Pod, 0, len(podsMeta)) + for _, meta := range podsMeta { + if meta.Pod == nil { + continue + } + + v4, v6 := util.GetIPs(meta.Pod) + + pods = append(pods, Pod{ + PodName: meta.Pod.Name, + PodNamespace: meta.Pod.Namespace, + PodUID: string(meta.Pod.UID), + Prio: prioMapping[meta.Pod.Labels[extension.LabelPodQoS]], + IPv4: v4, + IPv6: v6, + HostNetwork: meta.Pod.Spec.HostNetwork, + CgroupDir: meta.CgroupDir, + }) + } + outPods, err := json.Marshal(pods) + if err != nil { + return err + } + + err = os.WriteFile(filepath.Join(rootPath, podConfig), outPods, 0644) + if err != nil { + return err + } + + n := &Node{ + HwRxBpsMax: node.TotalNetworkBandwidth, + HwTxBpsMax: node.TotalNetworkBandwidth, + } + + // mirror three level qos to terway + + l0, ok := node.Leveled[string(extension.QoSSystem)] + if ok { + n.L0RxBpsMin = l0.IngressRequestBps + n.L0RxBpsMax = l0.IngressLimitBps + n.L0TxBpsMin = l0.EgressRequestBps + n.L0TxBpsMax = l0.EgressLimitBps + } + + l1, ok := node.Leveled[string(extension.QoSLS)] + if ok { + n.L1RxBpsMin = l1.IngressRequestBps + n.L1RxBpsMax = l1.IngressLimitBps + n.L1TxBpsMin = l1.EgressRequestBps + n.L1TxBpsMax = l1.EgressLimitBps + } + + l2, ok := node.Leveled[string(extension.QoSBE)] + if ok { + n.L2RxBpsMin = l2.IngressRequestBps + n.L2RxBpsMax = l2.IngressLimitBps + n.L2TxBpsMin = l2.EgressRequestBps + n.L2TxBpsMax = l2.EgressLimitBps + } + + outNode, err := json.Marshal(n) + if err != nil { + return err + } + + return os.WriteFile(filepath.Join(rootPath, nodeConfig), outNode, 0644) +} diff --git a/pkg/koordlet/qosmanager/plugins/netqos/types/types.go b/pkg/koordlet/qosmanager/plugins/netqos/types/types.go new file mode 100644 index 000000000..9c37cdf23 --- /dev/null +++ b/pkg/koordlet/qosmanager/plugins/netqos/types/types.go @@ -0,0 +1,17 @@ +package types + +// Node represents the qos config for a node +// all the unit in the struct is bit per second +type Node struct { + TotalNetworkBandwidth uint64 `json:"totalNetworkBandwidth"` + + // Leveled is the qos config for each level, which is indexed by koordinator level name extension.QoSClass + Leveled map[string]QoS `json:"leveled"` +} + +type QoS struct { + IngressRequestBps uint64 `json:"ingressRequestBps"` + IngressLimitBps uint64 `json:"ingressLimitBps"` + EgressRequestBps uint64 `json:"egressRequestBps"` + EgressLimitBps uint64 `json:"egressLimitBps"` +} diff --git a/pkg/koordlet/util/pod.go b/pkg/koordlet/util/pod.go index fc7343b14..a2294e22b 100644 --- a/pkg/koordlet/util/pod.go +++ b/pkg/koordlet/util/pod.go @@ -18,10 +18,12 @@ package util import ( "fmt" + "net/netip" "os" "path/filepath" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" ) @@ -94,3 +96,23 @@ func GetPodSandboxContainerID(pod *corev1.Pod) (string, error) { } return fmt.Sprintf("%s://%s", containerRuntime, containerHashID), nil } + +func GetIPs(pod *corev1.Pod) (v4 netip.Addr, v6 netip.Addr) { + ips := sets.NewString(pod.Status.PodIP) + for _, ip := range pod.Status.PodIPs { + ips.Insert(ip.IP) + } + for ip := range ips { + addr, err := netip.ParseAddr(ip) + if err != nil { + continue + } + if addr.Is4() { + v4 = addr + } else { + v6 = addr + } + } + + return +}