From d190e521726a18d466642f296c7f55ea39f8c51b Mon Sep 17 00:00:00 2001 From: lucming <2876757716@qq.com> Date: Thu, 22 Feb 2024 18:44:17 +0800 Subject: [PATCH] koordlet: tc plugin for netqos --- docker/koordlet.dockerfile | 2 + go.mod | 3 +- go.sum | 1 + pkg/koordlet/resourceexecutor/updater.go | 1 + pkg/koordlet/runtimehooks/config.go | 8 + pkg/koordlet/runtimehooks/hooks/tc/helper.go | 99 +++ .../runtimehooks/hooks/tc/helper_test.go | 42 ++ pkg/koordlet/runtimehooks/hooks/tc/ipset.go | 78 +++ .../runtimehooks/hooks/tc/iptables.go | 141 +++++ .../runtimehooks/hooks/tc/netqos_tc.go | 85 +++ pkg/koordlet/runtimehooks/hooks/tc/rule.go | 47 ++ pkg/koordlet/runtimehooks/hooks/tc/tc.go | 46 ++ .../runtimehooks/hooks/tc/tc_linux.go | 578 ++++++++++++++++++ .../runtimehooks/hooks/tc/tc_linux_test.go | 281 +++++++++ .../runtimehooks/protocol/pod_context.go | 14 + .../runtimehooks/protocol/protocol.go | 17 +- .../runtimehooks/reconciler/reconciler.go | 22 + pkg/koordlet/runtimehooks/rule/rule.go | 2 +- pkg/koordlet/util/system/cgroup_resource.go | 9 + pkg/koordlet/util/system/common_linux.go | 19 + pkg/koordlet/util/system/common_linux_test.go | 26 + pkg/koordlet/util/system/util_test_tool.go | 1 + pkg/koordlet/util/system/validator.go | 48 ++ pkg/koordlet/util/system/validator_test.go | 96 +++ 24 files changed, 1660 insertions(+), 6 deletions(-) create mode 100644 pkg/koordlet/runtimehooks/hooks/tc/helper.go create mode 100644 pkg/koordlet/runtimehooks/hooks/tc/helper_test.go create mode 100644 pkg/koordlet/runtimehooks/hooks/tc/ipset.go create mode 100644 pkg/koordlet/runtimehooks/hooks/tc/iptables.go create mode 100644 pkg/koordlet/runtimehooks/hooks/tc/netqos_tc.go create mode 100644 pkg/koordlet/runtimehooks/hooks/tc/rule.go create mode 100644 pkg/koordlet/runtimehooks/hooks/tc/tc.go create mode 100644 pkg/koordlet/runtimehooks/hooks/tc/tc_linux.go create mode 100644 pkg/koordlet/runtimehooks/hooks/tc/tc_linux_test.go diff --git a/docker/koordlet.dockerfile b/docker/koordlet.dockerfile index f1362530b..e1ee75f00 100644 --- a/docker/koordlet.dockerfile +++ b/docker/koordlet.dockerfile @@ -6,6 +6,7 @@ ARG TARGETARCH ENV VERSION $VERSION ENV GOOS linux ENV GOARCH $TARGETARCH +ENV GOPROXY https://goproxy.cn,direct COPY go.mod go.mod COPY go.sum go.sum @@ -35,6 +36,7 @@ RUN go build -a -o koordlet cmd/koordlet/main.go FROM --platform=$TARGETPLATFORM nvidia/cuda:11.6.2-base-ubuntu20.04 WORKDIR / RUN apt-get update && apt-get install -y lvm2 && rm -rf /var/lib/apt/lists/* +RUN apt-get update && apt-get install -y iptables COPY --from=builder /go/src/github.com/koordinator-sh/koordinator/koordlet . COPY --from=builder /usr/local/lib /usr/lib ENTRYPOINT ["/koordlet"] diff --git a/go.mod b/go.mod index acf40aa15..625cf4f3b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/NVIDIA/go-nvml v0.11.6-0.0.20220823120812-7e2082095e82 github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 github.com/containerd/nri v0.3.0 + github.com/coreos/go-iptables v0.5.0 github.com/docker/docker v20.10.21+incompatible github.com/evanphx/json-patch v5.6.0+incompatible github.com/fsnotify/fsnotify v1.6.0 @@ -185,7 +186,7 @@ require ( github.com/stretchr/objx v0.5.0 // indirect github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 // indirect github.com/ugorji/go/codec v1.2.7 // indirect - github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 // indirect + github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect github.com/vmware/govmomi v0.30.0 // indirect go.etcd.io/etcd/api/v3 v3.5.5 // indirect diff --git a/go.sum b/go.sum index 4a5a0ad72..986aa949a 100644 --- a/go.sum +++ b/go.sum @@ -344,6 +344,7 @@ github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkE github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-iptables v0.4.5/go.mod h1:/mVI274lEDI2ns62jHCDnCyBF9Iwsmekav8Dbxlm1MU= +github.com/coreos/go-iptables v0.5.0 h1:mw6SAibtHKZcNzAsOxjoHIG0gy5YFHhypWSSNc6EjbQ= github.com/coreos/go-iptables v0.5.0/go.mod h1:/mVI274lEDI2ns62jHCDnCyBF9Iwsmekav8Dbxlm1MU= github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= diff --git a/pkg/koordlet/resourceexecutor/updater.go b/pkg/koordlet/resourceexecutor/updater.go index 896c6024c..cc82e92da 100644 --- a/pkg/koordlet/resourceexecutor/updater.go +++ b/pkg/koordlet/resourceexecutor/updater.go @@ -55,6 +55,7 @@ func init() { sysutil.MemoryPriorityName, sysutil.MemoryUsePriorityOomName, sysutil.MemoryOomGroupName, + sysutil.NetClsClassIdName, ) // special cases DefaultCgroupUpdaterFactory.Register(NewCgroupUpdaterWithUpdateFunc(CgroupUpdateCPUSharesFunc), sysutil.CPUSharesName) diff --git a/pkg/koordlet/runtimehooks/config.go b/pkg/koordlet/runtimehooks/config.go index 294a92b7f..ad318877f 100644 --- a/pkg/koordlet/runtimehooks/config.go +++ b/pkg/koordlet/runtimehooks/config.go @@ -31,6 +31,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/cpuset" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/groupidentity" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/tc" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/terwayqos" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" ) @@ -80,6 +81,11 @@ const ( // owner: @l1b0k // alpha: v1.5 TerwayQoS featuregate.Feature = "TerwayQoS" + + // TCNetworkQoS indicates a network qos implementation based on tc. + // owner: @lucming + // alpha: v1.5 + TCNetworkQoS featuregate.Feature = "TCNetworkQoS" ) var ( @@ -91,6 +97,7 @@ var ( CPUNormalization: {Default: false, PreRelease: featuregate.Alpha}, CoreSched: {Default: false, PreRelease: featuregate.Alpha}, TerwayQoS: {Default: false, PreRelease: featuregate.Alpha}, + TCNetworkQoS: {Default: false, PreRelease: featuregate.Alpha}, } runtimeHookPlugins = map[featuregate.Feature]HookPlugin{ @@ -101,6 +108,7 @@ var ( CPUNormalization: cpunormalization.Object(), CoreSched: coresched.Object(), TerwayQoS: terwayqos.Object(), + TCNetworkQoS: tc.Object(), } ) diff --git a/pkg/koordlet/runtimehooks/hooks/tc/helper.go b/pkg/koordlet/runtimehooks/hooks/tc/helper.go new file mode 100644 index 000000000..bd8512e65 --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/helper.go @@ -0,0 +1,99 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "fmt" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/intstr" + + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" +) + +func loadConfigFromNodeSlo(nodesloSpec *slov1alpha1.NodeSLOSpec) *NetQosGlobalConfig { + res := NetQosGlobalConfig{} + var total uint64 = 0 + if nodesloSpec != nil && nodesloSpec.SystemStrategy != nil { + total = uint64(nodesloSpec.SystemStrategy.TotalNetworkBandwidth.Value()) + res.HwRxBpsMax = total + res.HwTxBpsMax = total + } + + if nodesloSpec.ResourceQOSStrategy == nil { + return &res + } + + strategy := nodesloSpec.ResourceQOSStrategy + if strategy.LSClass != nil && + strategy.LSClass.NetworkQOS != nil && + *strategy.LSClass.NetworkQOS.Enable { + cur := strategy.LSClass.NetworkQOS + res.L1RxBpsMin = getBandwidthVal(total, cur.IngressRequest) + res.L1RxBpsMax = getBandwidthVal(total, cur.IngressLimit) + res.L1TxBpsMin = getBandwidthVal(total, cur.EgressRequest) + res.L1TxBpsMax = getBandwidthVal(total, cur.EgressLimit) + } + + if strategy.BEClass != nil && + strategy.BEClass.NetworkQOS != nil && + *strategy.BEClass.NetworkQOS.Enable { + cur := strategy.BEClass.NetworkQOS + res.L2RxBpsMin = getBandwidthVal(total, cur.IngressRequest) + res.L2RxBpsMax = getBandwidthVal(total, cur.IngressLimit) + res.L2TxBpsMin = getBandwidthVal(total, cur.EgressRequest) + res.L2TxBpsMax = getBandwidthVal(total, cur.EgressLimit) + } + + return &res +} + +func getBandwidthVal(total uint64, intOrPercent *intstr.IntOrString) uint64 { + if intOrPercent == nil { + return 0 + } + + switch intOrPercent.Type { + case intstr.String: + return getBandwidthByQuantityFormat(intOrPercent.StrVal) + case intstr.Int: + return getBandwidthByPercentageFormat(total, intOrPercent.IntValue()) + default: + return 0 + } +} + +func getBandwidthByQuantityFormat(quanityStr string) uint64 { + val, err := resource.ParseQuantity(quanityStr) + if err != nil { + return 0 + } + + return uint64(val.Value()) +} + +func getBandwidthByPercentageFormat(total uint64, percentage int) uint64 { + return total * uint64(percentage) / 100 +} + +func convertToClassId(major, minor int) string { + return fmt.Sprintf("%d:%d", major, minor) +} + +// convertToHexClassId get class id in hex. +func convertToHexClassId(major, minor int) string { + return fmt.Sprintf("0x%d%04d", major, minor) +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/helper_test.go b/pkg/koordlet/runtimehooks/hooks/tc/helper_test.go new file mode 100644 index 000000000..48538b5ba --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/helper_test.go @@ -0,0 +1,42 @@ +package tc + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_convertToHexClassId(t *testing.T) { + type args struct { + major int + minor int + } + tests := []struct { + name string + args args + want string + }{ + // TODO: Add test cases. + { + name: "", + args: args{ + major: 11, + minor: 2, + }, + want: "0x110002", + }, + { + name: "", + args: args{ + major: 1, + minor: 2222, + }, + want: "0x12222", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, convertToHexClassId(tt.args.major, tt.args.minor), "convertToHexClassId(%v, %v)", tt.args.major, tt.args.minor) + }) + } +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/ipset.go b/pkg/koordlet/runtimehooks/hooks/tc/ipset.go new file mode 100644 index 000000000..7197cffc3 --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/ipset.go @@ -0,0 +1,78 @@ +//go:build linux +// +build linux + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "fmt" + + "github.com/vishvananda/netlink" + apierror "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" +) + +func (p *tcPlugin) ipsetExisted() (bool, error) { + var errs []error + for _, cur := range ipsets { + if _, err := netlink.IpsetList(cur); err != nil { + errs = append(errs, err) + } + } + + if apierror.NewAggregate(errs) != nil { + return false, apierror.NewAggregate(errs) + } + + return true, nil +} + +func (p *tcPlugin) EnsureIpset() error { + klog.V(5).Infof("start to create ipset.") + var errs []error + for _, cur := range ipsets { + result, err := netlink.IpsetList(cur) + if err == nil && result != nil { + continue + } + + err = netlink.IpsetCreate(cur, "hash:ip", netlink.IpsetCreateOptions{}) + if err != nil { + err = fmt.Errorf("failed to create ipset. err=%v", err) + errs = append(errs, err) + } + } + + return apierror.NewAggregate(errs) +} + +func (p *tcPlugin) DestoryIpset() error { + klog.V(5).Infof("start to delete ipset rules crated by tc plugin.") + var errs []error + for _, cur := range ipsets { + result, err := netlink.IpsetList(cur) + if err == nil && result != nil { + if err := netlink.IpsetDestroy(cur); err != nil { + err = fmt.Errorf("failed to destroy ipset. err=%v", err) + errs = append(errs, err) + } + } + } + + return apierror.NewAggregate(errs) +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/iptables.go b/pkg/koordlet/runtimehooks/hooks/tc/iptables.go new file mode 100644 index 000000000..cf3fafa7f --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/iptables.go @@ -0,0 +1,141 @@ +//go:build linux +// +build linux + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "fmt" + + apierror "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" +) + +func (p *tcPlugin) EnsureIptables() error { + klog.V(5).Infof("start to create iptables.") + if p.iptablesHandler == nil { + return fmt.Errorf("can't create tc iptables rulues,because qos manager is nil") + } + + ipsetToClassid := map[NetQoSClass]string{ + NETQoSSystem: convertToClassId(ROOT_CLASS_MINOR_ID, SYSTEM_CLASS_MINOR_ID), + NETQoSLS: convertToClassId(ROOT_CLASS_MINOR_ID, LS_CLASS_MINOR_ID), + NETQoSBE: convertToClassId(ROOT_CLASS_MINOR_ID, BE_CLASS_MINOR_ID), + } + + for ipsetName, classid := range ipsetToClassid { + exp := fmt.Sprintf("-A POSTROUTING -m set --match-set %s src -j CLASSIFY --set-class %s", string(ipsetName), classid) + existed := false + + // looks like this one: + // -A POSTROUTING -m set --match-set mid_class src -j CLASSIFY --set-class 0001:0003 + rules, err := p.iptablesHandler.List("mangle", "POSTROUTING") + if err == nil { + for _, rule := range rules { + if rule == exp { + existed = true + break + } + } + } + + if existed { + continue + } + err = p.iptablesHandler.Append("mangle", "POSTROUTING", + "-m", "set", "--match-set", string(ipsetName), "src", + "-j", "CLASSIFY", "--set-class", classid) + if err != nil { + klog.Errorf("ipt append err=%v", err) + return err + } + } + + return nil +} + +func (p *tcPlugin) DelIptables() error { + klog.V(5).Infof("start to delete iptables rules crated by tc plugin.") + if p.iptablesHandler == nil { + return fmt.Errorf("can't create tc iptables rulues,because qos manager is nil") + } + ipsetToClassid := map[NetQoSClass]string{ + NETQoSSystem: convertToClassId(ROOT_CLASS_MINOR_ID, SYSTEM_CLASS_MINOR_ID), + NETQoSLS: convertToClassId(ROOT_CLASS_MINOR_ID, LS_CLASS_MINOR_ID), + NETQoSBE: convertToClassId(ROOT_CLASS_MINOR_ID, BE_CLASS_MINOR_ID), + } + var errs []error + + for ipsetName, classid := range ipsetToClassid { + exp := fmt.Sprintf("-A POSTROUTING -m set --match-set %s src -j CLASSIFY --set-class %s", string(ipsetName), classid) + existed := false + + // looks like this one: + // -A POSTROUTING -m set --match-set mid_class src -j CLASSIFY --set-class 0001:0003 + rules, err := p.iptablesHandler.List("mangle", "POSTROUTING") + if err != nil || rules == nil { + continue + } + for _, rule := range rules { + if rule == exp { + existed = true + break + } + } + + if existed { + err = p.iptablesHandler.Delete("mangle", "POSTROUTING", + "-m", "set", "--match-set", string(ipsetName), "src", + "-j", "CLASSIFY", "--set-class", classid) + errs = append(errs, err) + } + } + + return apierror.NewAggregate(errs) +} + +func (p *tcPlugin) iptablesExisted() (bool, error) { + ipsetToClassid := map[NetQoSClass]string{ + NETQoSSystem: convertToClassId(ROOT_CLASS_MINOR_ID, SYSTEM_CLASS_MINOR_ID), + NETQoSLS: convertToClassId(ROOT_CLASS_MINOR_ID, LS_CLASS_MINOR_ID), + NETQoSBE: convertToClassId(ROOT_CLASS_MINOR_ID, BE_CLASS_MINOR_ID), + } + + for ipsetName, classid := range ipsetToClassid { + exp := fmt.Sprintf("-A POSTROUTING -m set --match-set %s src -j CLASSIFY --set-class %s", string(ipsetName), classid) + existed := false + + // looks like this one: + // -A POSTROUTING -m set --match-set mid_class src -j CLASSIFY --set-class 0001:0003 + rules, err := p.iptablesHandler.List("mangle", "POSTROUTING") + if err == nil { + for _, rule := range rules { + if rule == exp { + existed = true + break + } + } + } + + if !existed { + return false, fmt.Errorf("iptables for matching ipset(%s) not found", ipsetName) + } + } + + return true, nil +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/netqos_tc.go b/pkg/koordlet/runtimehooks/hooks/tc/netqos_tc.go new file mode 100644 index 000000000..846f83d2c --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/netqos_tc.go @@ -0,0 +1,85 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "github.com/koordinator-sh/koordinator/apis/extension" + corev1 "k8s.io/api/core/v1" +) + +type NetQosGlobalConfig struct { + HwTxBpsMax uint64 `json:"hw_tx_bps_max"` + HwRxBpsMax uint64 `json:"hw_rx_bps_max"` + L1TxBpsMin uint64 `json:"l1_tx_bps_min"` + L1TxBpsMax uint64 `json:"l1_tx_bps_max"` + L2TxBpsMin uint64 `json:"l2_tx_bps_min"` + L2TxBpsMax uint64 `json:"l2_tx_bps_max"` + L1RxBpsMin uint64 `json:"l1_rx_bps_min"` + L1RxBpsMax uint64 `json:"l1_rx_bps_max"` + L2RxBpsMin uint64 `json:"l2_rx_bps_min"` + L2RxBpsMax uint64 `json:"l2_rx_bps_max"` +} + +type NetQoSClass string + +const ( + NETQoSSystem NetQoSClass = "system_class" + NETQoSLS NetQoSClass = "ls_class" + NETQoSBE NetQoSClass = "be_class" + NETQoSNone NetQoSClass = "" +) + +func GetPodNetQoSClassByName(qos string) NetQoSClass { + q := extension.QoSClass(qos) + + switch q { + case extension.QoSSystem: + return NETQoSSystem + case extension.QoSLSE, extension.QoSLSR, extension.QoSLS: + return NETQoSLS + case extension.QoSBE: + return NETQoSBE + } + + return NETQoSNone +} + +func GetClassIdByNetQos(qos NetQoSClass) string { + m := map[NetQoSClass]string{ + NETQoSSystem: convertToHexClassId(MAJOR_ID, SYSTEM_CLASS_MINOR_ID), + NETQoSLS: convertToHexClassId(MAJOR_ID, LS_CLASS_MINOR_ID), + NETQoSBE: convertToHexClassId(MAJOR_ID, BE_CLASS_MINOR_ID), + NETQoSNone: convertToHexClassId(MAJOR_ID, SYSTEM_CLASS_MINOR_ID), + } + + return m[qos] +} + +func GetPodNetQoSClass(pod *corev1.Pod) NetQoSClass { + if pod == nil || pod.Labels == nil { + return NETQoSNone + } + return GetNetQoSClassByAttrs(pod.Labels, pod.Annotations) +} + +func GetNetQoSClassByAttrs(labels, annotations map[string]string) NetQoSClass { + // annotations are for old format adaption reason + if q, exist := labels[extension.LabelPodQoS]; exist { + return GetPodNetQoSClassByName(q) + } + return NETQoSNone +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/rule.go b/pkg/koordlet/runtimehooks/hooks/tc/rule.go new file mode 100644 index 000000000..4b6b48b2c --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/rule.go @@ -0,0 +1,47 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "sync" +) + +type tcRule struct { + lock sync.RWMutex + enable bool + netCfg *NetQosGlobalConfig + speed uint64 +} + +func newRule() *tcRule { + return &tcRule{ + lock: sync.RWMutex{}, + enable: false, + netCfg: nil, + speed: 0, + } +} + +func (r *tcRule) IsEnabled() bool { + r.lock.RLock() + defer r.lock.RUnlock() + return r.enable +} + +func (r *tcRule) GetNetCfg() *NetQosGlobalConfig { + return r.netCfg +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/tc.go b/pkg/koordlet/runtimehooks/hooks/tc/tc.go new file mode 100644 index 000000000..43aa00f10 --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/tc.go @@ -0,0 +1,46 @@ +//go:build !linux +// +build !linux + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks" +) + +const ( + name = "TCInjection" + description = "set tc rules for nodes" +) + +type tcPlugin struct{} + +func Object() *tcPlugin { + return nil +} + +func (n *tcPlugin) Reconcile() { + klog.V(5).Info("net qos plugin start to reconcile in !linux os") + return +} + +func (n *tcPlugin) Register(op hooks.Options) { + klog.V(5).Infof("register hook %v", name) +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/tc_linux.go b/pkg/koordlet/runtimehooks/hooks/tc/tc_linux.go new file mode 100644 index 000000000..ba8d78dcf --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/tc_linux.go @@ -0,0 +1,578 @@ +//go:build linux +// +build linux + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "errors" + "fmt" + "net" + "os" + "sync" + + "github.com/coreos/go-iptables/iptables" + "github.com/vishvananda/netlink" + "go.uber.org/atomic" + apierror "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + "k8s.io/utils/pointer" + + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/reconciler" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/rule" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" + "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" + sysutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" +) + +const ( + name = "tcPlugin" + description = "setup tc rules for node" + + ruleNameForNodeSLO = name + " (nodeSLO)" + ruleNameForAllPods = name + " (allPods)" +) + +const ( + MAJOR_ID = 1 + QDISC_MINOR_ID = 0 + ROOT_CLASS_MINOR_ID = 1 + SYSTEM_CLASS_MINOR_ID = 2 + LS_CLASS_MINOR_ID = 3 + BE_CLASS_MINOR_ID = 4 + + // 0-7, In the round-robin process, classes with the lowest priority field are tried for packets first. + SYSTEM_CLASS_PRIO = 0 + LS_CLASS_PRIO = 1 + BE_CLASS_PRIO = 2 + + // Maximum rate this class and all its children are guaranteed. Mandatory. + // attention: the values below only represent the percentage of bandwidth can be used by different tc classes on the host, + // the real values need to be calculated based on the physical network bandwidth. + // eg: eth0: speed:200Mbit => high_clss.rate = 200Mbit * 40 / 100 = 80Mbit + BE_CLASS_RATE_PERCENTAGE = 30 + LS_CLASS_RATE_PERCENTAGE = 30 + SYSTEM_CLASS_RATE_PERCENTAGE = 40 + + // Maximum rate at which a class can send, if its parent has bandwidth to spare. Defaults to the configured rate, + // which implies no borrowing + CEIL_PERCENTAGE = 100 + + DEFAULT_INTERFACE_NAME = "eth0" +) + +var ( + rootClass = netlink.MakeHandle(MAJOR_ID, ROOT_CLASS_MINOR_ID) + systemClass = netlink.MakeHandle(MAJOR_ID, SYSTEM_CLASS_MINOR_ID) + lsClass = netlink.MakeHandle(MAJOR_ID, LS_CLASS_MINOR_ID) + beClass = netlink.MakeHandle(MAJOR_ID, BE_CLASS_MINOR_ID) + + ipsets = []string{string(NETQoSSystem), string(NETQoSLS), string(NETQoSBE)} +) + +type tcPlugin struct { + rule *tcRule + + initialized *atomic.Bool // whether the cache has been initialized + allPodsSyncOnce sync.Once // sync once for AllPods + + // this is the physical NIC on host, default eth0 + interfLink netlink.Link + + // for executing the iptables command. + iptablesHandler *iptables.IPTables + // for executing the tc and ipset command. + netLinkHandler netlink.Handle + + executor resourceexecutor.ResourceUpdateExecutor +} + +var singleton *tcPlugin + +func Object() *tcPlugin { + if singleton == nil { + singleton = newPlugin() + } + return singleton +} + +func newPlugin() *tcPlugin { + return &tcPlugin{ + initialized: atomic.NewBool(false), + netLinkHandler: netlink.Handle{}, + allPodsSyncOnce: sync.Once{}, + rule: newRule(), + } +} + +func (p *tcPlugin) Register(op hooks.Options) { + klog.V(5).Infof("register hook %v", name) + + rule.Register(ruleNameForNodeSLO, description, + rule.WithParseFunc(statesinformer.RegisterTypeNodeSLOSpec, p.parseRuleForNodeSLO), + rule.WithUpdateCallback(p.ruleUpdateCb), + ) + + rule.Register(ruleNameForAllPods, description, + rule.WithParseFunc(statesinformer.RegisterTypeAllPods, p.parseForAllPods), + rule.WithUpdateCallback(p.ruleUpdateCb)) + + reconciler.RegisterCgroupReconciler(reconciler.PodLevel, sysutil.NetClsClassId, description+" (pod net class id)", + p.SetPodNetCls, reconciler.PodHostNetworkFilter(), "true") + + p.executor = op.Executor +} + +func (p *tcPlugin) SetPodNetCls(proto protocol.HooksProtocol) error { + podCtx := proto.(*protocol.PodContext) + if podCtx == nil { + return fmt.Errorf("pod protocol is nil for plugin %v", name) + } + + netQos := NETQoSNone + if podCtx.Request.Labels != nil { + netQos = GetNetQoSClassByAttrs(podCtx.Request.Labels, podCtx.Request.Annotations) + } + classId := GetClassIdByNetQos(netQos) + podCtx.Response.Resources.NetClsClassId = pointer.String(classId) + + return nil +} + +func (p *tcPlugin) parseRuleForNodeSLO(mergedNodeSLOIf interface{}) (bool, error) { + mergedNodeSLO := mergedNodeSLOIf.(*slov1alpha1.NodeSLOSpec) + if mergedNodeSLO == nil { + return false, nil + } + qosStrategy := mergedNodeSLO.ResourceQOSStrategy + + // default policy enables + isNETQOSPolicyTC := qosStrategy == nil || qosStrategy.Policies == nil || qosStrategy.Policies.NETQOSPolicy == nil || + *qosStrategy.Policies.NETQOSPolicy == slov1alpha1.NETQOSPolicyTC + + if isNETQOSPolicyTC { + if mergedNodeSLO.SystemStrategy == nil { + return false, nil + } + p.rule.enable = true + p.rule.speed = uint64(mergedNodeSLO.SystemStrategy.TotalNetworkBandwidth.Value()) + p.rule.netCfg = loadConfigFromNodeSlo(mergedNodeSLO) + } else { + p.rule.enable = false + } + + return true, nil +} + +func (p *tcPlugin) parseForAllPods(e interface{}) (bool, error) { + _, ok := e.(*struct{}) + if !ok { + return false, fmt.Errorf("invalid rule type %T", e) + } + + needSync := false + p.allPodsSyncOnce.Do(func() { + needSync = true + klog.V(5).Infof("plugin %s callback the first all pods update", name) + }) + return needSync, nil +} + +func (p *tcPlugin) prepare() error { + linkInfo, err := system.GetLinkInfoByDefaultRoute() + if err != nil { + klog.Errorf("failed to get link info by default route. err=%v\n", err) + return err + } + if linkInfo == nil { + klog.Errorf("link info is nil") + return errors.New("link info is nil") + } + + p.interfLink = linkInfo + + ipt, err := iptables.New() + if err != nil { + klog.Errorf("failed to get iptables handler in those dir(%s). err=%v\n", os.Getenv("PATH"), err) + return err + } + p.iptablesHandler = ipt + p.initialized = atomic.NewBool(true) + + if err := p.InitRelatedRules(); err != nil { + klog.Errorf("failed to init some necessary rules. err=%v", err) + return err + } + + return nil +} + +func (p *tcPlugin) ruleUpdateCb(target *statesinformer.CallbackTarget) error { + if !p.initialized.Load() { + if err := p.prepare(); err != nil { + return err + } + } + + if !p.rule.IsEnabled() { + klog.V(5).Infof("tc plugin is not enabled, ready to cleanup related rules.") + return p.CleanUp() + } + + if target == nil { + return errors.New("callback target is nil") + } + + podMetas := target.Pods + if len(podMetas) <= 0 { + klog.V(5).Infof("plugin %s skipped for rule update, no pod passed from callback", name) + return nil + } + + return p.refreshForAllPods(podMetas) +} + +func (p *tcPlugin) refreshForAllPods(pods []*statesinformer.PodMeta) error { + ipInK8S := make(map[string]sets.String) + ipInIpset := make(map[string]sets.String) + + for _, pod := range pods { + netqos := GetPodNetQoSClass(pod.Pod) + if netqos == NETQoSNone { + continue + } + + if ipInK8S[string(netqos)] == nil { + ipInK8S[string(netqos)] = sets.NewString() + } + ipInK8S[string(netqos)].Insert(pod.Pod.Status.PodIP) + } + + for _, setName := range ipsets { + result, err := netlink.IpsetList(setName) + if err != nil || result == nil { + klog.Errorf("failed to get ipset.err=%v", err) + continue + } + + for _, entry := range result.Entries { + if ipInIpset[setName] == nil { + ipInIpset[setName] = sets.NewString() + } + ipInIpset[setName].Insert(entry.IP.String()) + } + } + + for _, setName := range ipsets { + for ip := range ipInK8S[setName].Difference(ipInIpset[setName]) { + klog.V(5).Infof("ready to add %s to ipset %s.\n", ip, setName) + if err := netlink.IpsetAdd(setName, &netlink.IPSetEntry{IP: net.ParseIP(ip).To4()}); err != nil { + klog.Warningf("failed to write ip %s to ipset %s, err=%v", ip, setName, err) + return err + } + } + + for ip := range ipInIpset[setName].Difference(ipInK8S[setName]) { + klog.V(5).Infof("ready to del ip %s from ipset %s.\n", ip, setName) + if err := netlink.IpsetDel(setName, &netlink.IPSetEntry{IP: net.ParseIP(ip).To4()}); err != nil { + klog.Warningf("failed to write ip %s to ipset %s, err=%v", ip, setName, err) + return err + } + } + } + + return nil +} + +func (p *tcPlugin) InitRelatedRules() error { + return apierror.NewAggregate([]error{ + p.EnsureQdisc(), + p.EnsureClasses(), + p.EnsureIpset(), + p.EnsureIptables(), + }) +} + +func (p *tcPlugin) CleanUp() error { + return apierror.NewAggregate([]error{ + p.DelQdisc(), + p.DelIptables(), + p.DestoryIpset(), + }) +} + +func (p *tcPlugin) EnsureQdisc() error { + klog.V(5).Infoln("start to create qdisc for default net interface") + attrs := netlink.QdiscAttrs{ + LinkIndex: p.interfLink.Attrs().Index, + Handle: netlink.MakeHandle(MAJOR_ID, QDISC_MINOR_ID), + Parent: netlink.HANDLE_ROOT, + } + htb := netlink.NewHtb(attrs) + htb.Defcls = SYSTEM_CLASS_MINOR_ID + + qdiscs, err := p.netLinkHandler.QdiscList(p.interfLink) + if err != nil { + return fmt.Errorf("failed to get qdisc. err=%v", err) + } + + for _, qdisc := range qdiscs { + if qdisc.Type() != "htb" { + continue + } + if qdisc.Attrs().Handle == htb.Handle { + return nil + } + if err := netlink.QdiscDel(htb); err != nil { + return fmt.Errorf("failed to delete old qidsc on %s, err=%v", p.interfLink.Attrs().Name, err) + } + } + + return p.netLinkHandler.QdiscAdd(htb) +} + +func (p *tcPlugin) DelQdisc() error { + klog.V(5).Infof("start to delete qdisc crated by tc plugin.") + attrs := netlink.QdiscAttrs{ + LinkIndex: p.interfLink.Attrs().Index, + Handle: netlink.MakeHandle(MAJOR_ID, QDISC_MINOR_ID), + Parent: netlink.HANDLE_ROOT, + } + htb := netlink.NewHtb(attrs) + + qdiscs, err := p.netLinkHandler.QdiscList(p.interfLink) + if err != nil { + return err + } + + for _, qdisc := range qdiscs { + if qdisc.Type() == "htb" && qdisc.Attrs().Handle == htb.Handle { + if err := netlink.QdiscDel(htb); err != nil { + return fmt.Errorf("failed to delete old qidsc on %s, err=%v", p.interfLink.Attrs().Name, err) + } + } + } + + return nil +} + +func (p *tcPlugin) EnsureClasses() error { + klog.V(5).Infof("start to create tc class rules.") + cfg := p.rule.GetNetCfg() + if cfg == nil { + return errors.New("net config is nil") + } + + return apierror.NewAggregate([]error{ + p.ensureClass(p.interfLink, newClass(p.interfLink.Attrs().Index, netlink.HANDLE_ROOT, rootClass, cfg.HwTxBpsMax, cfg.HwTxBpsMax, 0)), + p.ensureClass(p.interfLink, newClass(p.interfLink.Attrs().Index, rootClass, systemClass, cfg.HwTxBpsMax-cfg.L1TxBpsMin-cfg.L2TxBpsMin, cfg.HwTxBpsMax, SYSTEM_CLASS_PRIO)), + p.ensureClass(p.interfLink, newClass(p.interfLink.Attrs().Index, rootClass, lsClass, cfg.L1TxBpsMin, cfg.L1TxBpsMax, LS_CLASS_PRIO)), + p.ensureClass(p.interfLink, newClass(p.interfLink.Attrs().Index, rootClass, beClass, cfg.L2TxBpsMin, cfg.L2TxBpsMax, BE_CLASS_PRIO)), + }) +} + +func newClass(index int, parent, minor uint32, rate, ceil uint64, prio uint32) *netlink.HtbClass { + attr := netlink.ClassAttrs{ + LinkIndex: index, + Parent: parent, + Handle: minor, + } + classAttr := netlink.HtbClassAttrs{ + Rate: rate, + Ceil: ceil, + Prio: prio, + } + htbClass := NewHtbClass(attr, classAttr) + if htbClass.Cbuffer < 200 { + htbClass.Cbuffer = 200 + } + if htbClass.Buffer < 200 { + htbClass.Buffer = 200 + } + + return htbClass +} + +// NewHtbClass NOTE: function is in here because it uses other linux functions +func NewHtbClass(attrs netlink.ClassAttrs, cattrs netlink.HtbClassAttrs) *netlink.HtbClass { + mtu := 1600 + rate := cattrs.Rate / 8 + ceil := cattrs.Ceil / 8 + buffer := cattrs.Buffer + cbuffer := cattrs.Cbuffer + + if ceil == 0 { + ceil = rate + } + + if buffer == 0 { + buffer = uint32(float64(rate)/netlink.Hz() + float64(mtu)) + klog.V(2).Infof("buffer[%v]=rate[%v]/hz[%v]+mtu[%v]\n", buffer, rate, netlink.Hz(), mtu) + } + dstBuffer := netlink.Xmittime(rate, buffer) + klog.V(2).Infof("buffer[%v]=(1000000*(srcBuffer[%v]/rate[%v]))/tick[%v]\n", dstBuffer, buffer, rate, netlink.TickInUsec()) + + if cbuffer == 0 { + cbuffer = uint32(float64(ceil)/netlink.Hz() + float64(mtu)) + klog.V(2).Infof("cbuffer[%v]=ceil[%v]/hz[%v]+mtu[%v]\n", cbuffer, ceil, netlink.Hz(), mtu) + } + dstCbuffer := netlink.Xmittime(ceil, cbuffer) + klog.V(2).Infof("cbuffer[%v]=(1000000*(srcCbuffer[%v]/ceil[%v]))/tick[%v]", dstCbuffer, cbuffer, ceil, netlink.TickInUsec()) + + return &netlink.HtbClass{ + ClassAttrs: attrs, + Rate: rate, + Ceil: ceil, + Buffer: buffer, + Cbuffer: cbuffer, + Level: 0, + Prio: cattrs.Prio, + Quantum: cattrs.Quantum, + } +} + +func (p *tcPlugin) ensureClass(nic netlink.Link, expect *netlink.HtbClass) error { + classes, err := netlink.ClassList(nic, 0) + if err != nil { + return fmt.Errorf("failed to get tc class. err=%v", err) + } + var existing *netlink.HtbClass + for _, class := range classes { + switch class.(type) { + case *netlink.HtbClass: + htbClass := class.(*netlink.HtbClass) + if htbClass != nil && + htbClass.Handle == expect.Handle && + htbClass.Parent == expect.Parent { + existing = htbClass + break + } + } + } + + if existing != nil { + if expect.Rate == existing.Rate && + expect.Ceil == existing.Ceil && + expect.Prio == existing.Prio && + expect.Buffer == existing.Buffer && + expect.Cbuffer == existing.Cbuffer { + return nil + } + if err := netlink.ClassChange(expect); err != nil { + return fmt.Errorf("failed to change class from %v to %v on interface %s. err=: %v", existing, expect, p.interfLink.Attrs().Name, err) + } + klog.Infof("succeed to changed htb class from %v to %v on interface %s.", existing, expect, p.interfLink.Attrs().Name) + return nil + } + if err := netlink.ClassAdd(expect); err != nil { + return fmt.Errorf("failed to create htb class %v: %v on interface %s. err=%v", expect, err, p.interfLink.Attrs().Name, err) + } + + klog.V(2).Infof("succed to creat htb class: %v on interface %s\n", expect, p.interfLink.Attrs().Name) + return nil +} + +func (p *tcPlugin) checkAllRulesExisted() (bool, error) { + if _, err := p.QdiscExisted(); err != nil { + return false, err + } + + if _, err := p.classesExisted(); err != nil { + return false, err + } + + if _, err := p.ipsetExisted(); err != nil { + return false, err + } + + if _, err := p.iptablesExisted(); err != nil { + return false, err + } + + return true, nil +} + +func (p *tcPlugin) QdiscExisted() (bool, error) { + attrs := netlink.QdiscAttrs{ + LinkIndex: p.interfLink.Attrs().Index, + Handle: netlink.MakeHandle(MAJOR_ID, QDISC_MINOR_ID), + Parent: netlink.HANDLE_ROOT, + } + htb := netlink.NewHtb(attrs) + + qdiscs, err := p.netLinkHandler.QdiscList(p.interfLink) + if err != nil || qdiscs == nil { + return false, err + } + + if len(qdiscs) == 1 && qdiscs[0].Type() == "htb" && + qdiscs[0].Attrs().Handle == htb.Handle { + return true, nil + } + + return false, fmt.Errorf("qdisc not found") +} + +func (p *tcPlugin) classesExisted() (bool, error) { + link, err := netlink.LinkByIndex(p.interfLink.Attrs().Index) + if err != nil { + return false, err + } + + maxCeil := p.rule.speed * CEIL_PERCENTAGE / 100 + // other leaf class + highClassRate := p.rule.speed * SYSTEM_CLASS_RATE_PERCENTAGE / 100 + midClassRate := p.rule.speed * LS_CLASS_RATE_PERCENTAGE / 100 + lowClassRate := p.rule.speed * BE_CLASS_RATE_PERCENTAGE / 100 + + errs := apierror.NewAggregate([]error{ + p.classExisted(link, newClass(p.interfLink.Attrs().Index, netlink.HANDLE_ROOT, rootClass, maxCeil, maxCeil, 0)), + p.classExisted(link, newClass(p.interfLink.Attrs().Index, rootClass, systemClass, highClassRate, maxCeil, SYSTEM_CLASS_PRIO)), + p.classExisted(link, newClass(p.interfLink.Attrs().Index, rootClass, lsClass, midClassRate, maxCeil, LS_CLASS_PRIO)), + p.classExisted(link, newClass(p.interfLink.Attrs().Index, rootClass, beClass, lowClassRate, maxCeil, BE_CLASS_PRIO)), + }) + + if errs != nil { + return false, errs + } + + return true, nil +} + +func (p *tcPlugin) classExisted(nic netlink.Link, expect *netlink.HtbClass) error { + classes, err := netlink.ClassList(nic, 0) + if err != nil { + return err + } + + for _, class := range classes { + htbClass := class.(*netlink.HtbClass) + if class.Type() == "htb" && + htbClass.Handle == expect.Handle && + htbClass.Parent == expect.Parent { + return nil + } + } + + return fmt.Errorf("class(classid:%d) not find", expect.Handle) +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/tc_linux_test.go b/pkg/koordlet/runtimehooks/hooks/tc/tc_linux_test.go new file mode 100644 index 000000000..8d84a27b0 --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/tc_linux_test.go @@ -0,0 +1,281 @@ +//go:build linux +// +build linux + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "fmt" + "sync" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/vishvananda/netlink" + "go.uber.org/atomic" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" +) + +func TestObject(t *testing.T) { + t.Run("test", func(t *testing.T) { + b := Object() + assert.NotNil(t, b) + b1 := Object() + assert.Equal(t, b, b1) + }) +} + +func Test_bvtPlugin_Register(t *testing.T) { + t.Run("register tc plugin", func(t *testing.T) { + r := &tcPlugin{} + r.Register(hooks.Options{}) + }) +} + +func newTestTCPlugin() *tcPlugin { + klog.Info("start to init net qos manager") + n := tcPlugin{ + netLinkHandler: netlink.Handle{}, + rule: &tcRule{ + enable: true, + netCfg: &NetQosGlobalConfig{ + HwTxBpsMax: 1000000000, + HwRxBpsMax: 1000000000, + L1TxBpsMin: 500000000, + L1TxBpsMax: 1000000000, + L2TxBpsMin: 500000000, + L2TxBpsMax: 1000000000, + L1RxBpsMin: 500000000, + L1RxBpsMax: 1000000000, + L2RxBpsMin: 500000000, + L2RxBpsMax: 1000000000, + }, + lock: sync.RWMutex{}, + }, + initialized: atomic.NewBool(true), + } + + return &n +} + +func TestTCPlugin_Init(t *testing.T) { + plugin := newTestTCPlugin() + if err := plugin.prepare(); err != nil { + return + } + + tests := []struct { + name string + preHandle func() error + wantErr bool + endHandle func() error + }{ + { + name: "tc qdisc rules already existed", + preHandle: plugin.EnsureQdisc, + wantErr: false, + endHandle: plugin.CleanUp, + }, + { + name: "tc class rules already existed", + preHandle: func() error { + return errors.NewAggregate([]error{ + plugin.EnsureQdisc(), + plugin.EnsureClasses(), + }) + }, + wantErr: false, + endHandle: plugin.CleanUp, + }, + { + name: "ipset rules already existed", + preHandle: plugin.EnsureIpset, + wantErr: false, + endHandle: plugin.CleanUp, + }, + { + name: "iptables rules already existed", + preHandle: func() error { + return errors.NewAggregate([]error{ + plugin.EnsureIpset(), + plugin.EnsureIptables(), + }) + }, + wantErr: false, + endHandle: plugin.CleanUp, + }, + { + name: "all rulues have already been inited", + preHandle: plugin.InitRelatedRules, + wantErr: false, + endHandle: plugin.CleanUp, + }, + { + name: "cleanup all rules will be used in advance", + preHandle: plugin.CleanUp, + wantErr: false, + endHandle: plugin.CleanUp, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := tt.preHandle(); err != nil { + t.Errorf("failed to run preHandle.err=%v", err) + return + } + if err := plugin.InitRelatedRules(); (err != nil) != tt.wantErr { + t.Errorf("Init() error = %v, wantErr %v", err, tt.wantErr) + return + } + if err := tt.endHandle(); err != nil { + t.Errorf("failed to run endHandle.err=%v", err) + return + } + }) + } +} + +func genPod(podName, netqos, ip string) *statesinformer.PodMeta { + return &statesinformer.PodMeta{ + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Labels: map[string]string{ + "koordinator.sh/netQoSClass": netqos, + }, + }, + Status: corev1.PodStatus{ + PodIP: ip, + }, + }, + } +} + +func TestTCPlugin_Callback(t *testing.T) { + pod1 := genPod("pod1", "high_class", "192.168.0.1") + pod2 := genPod("pod2", "high_class", "192.168.0.2") + pod3 := genPod("pod3", "mid_class", "192.168.0.3") + pod4 := genPod("pod4", "low_class", "192.168.0.4") + pod5 := genPod("pod5", "", "192.168.0.5") + pod6 := genPod("pod6", "low_class", "192.168.0.6") + + plugin := newTestTCPlugin() + if err := plugin.prepare(); err != nil { + klog.Errorf("failed to init some necessary info tc plugin.") + return + } + defer plugin.CleanUp() + + type args struct { + targets *statesinformer.CallbackTarget + } + + tests := []struct { + name string + args args + wantFields *int64 + ipsetExpected map[string][]string + }{ + { + name: "", + args: args{ + targets: &statesinformer.CallbackTarget{ + Pods: []*statesinformer.PodMeta{ + pod1, pod2, pod3, pod4, pod5, + }, + }, + }, + wantFields: nil, + ipsetExpected: map[string][]string{ + "high_class": {"192.168.0.1", "192.168.0.2"}, + "mid_class": {"192.168.0.3"}, + "low_class": {"192.168.0.4"}, + }, + }, + { + name: "", + args: args{ + targets: &statesinformer.CallbackTarget{ + Pods: []*statesinformer.PodMeta{ + pod2, pod3, pod4, pod5, pod6, + }, + }, + }, + wantFields: nil, + ipsetExpected: map[string][]string{ + "high_class": {"192.168.0.2"}, + "mid_class": {"192.168.0.3"}, + "low_class": {"192.168.0.4", "192.168.0.6"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + if err := plugin.ruleUpdateCb(tt.args.targets); err != nil { + klog.Errorf("failed to process ruleUpdateCb, err=%v", err) + return + } + if _, err := plugin.checkAllRulesExisted(); err != nil { + t.Errorf("some necessary rules not existed. err=%v", err) + return + } + if !checkIpsetIsRight(tt.ipsetExpected) { + t.Errorf("ipset rules not the same as expected") + return + } + }) + } +} + +func checkIpsetIsRight(rules map[string][]string) bool { + for setName, ips := range rules { + for _, ip := range ips { + if !ipsetEntryExisted(setName, ip) { + fmt.Printf("%s:%s ipset rules not the same as expected\n", setName, ip) + return false + } + } + } + + return true +} + +func ipsetEntryExisted(setName, ip string) bool { + result, err := netlink.IpsetList(setName) + if err != nil || result == nil { + return false + } + + for _, entry := range result.Entries { + if entry.IP.String() == ip { + return true + } + } + + return false +} diff --git a/pkg/koordlet/runtimehooks/protocol/pod_context.go b/pkg/koordlet/runtimehooks/protocol/pod_context.go index f81a160d4..10cc9a3aa 100644 --- a/pkg/koordlet/runtimehooks/protocol/pod_context.go +++ b/pkg/koordlet/runtimehooks/protocol/pod_context.go @@ -277,4 +277,18 @@ func (p *PodContext) injectForExt() { p.Request.PodMeta.Namespace, p.Request.PodMeta.Name, *p.Response.Resources.MemoryLimit, p.Request.CgroupParent) } } + + if p.Response.Resources.NetClsClassId != nil { + eventHelper := audit.V(3).Pod(p.Request.PodMeta.Namespace, p.Request.PodMeta.Name).Reason("runtime-hooks").Message( + "set pod net class id to %v", *p.Response.Resources.NetClsClassId) + updater, err := injectNetClsClassId(p.Request.CgroupParent, *p.Response.Resources.NetClsClassId, eventHelper, p.executor) + if err != nil { + klog.Infof("set pod %v/%v net class id %v on cgroup parent %v failed, error %v", p.Request.PodMeta.Namespace, + p.Request.PodMeta.Name, *p.Response.Resources.NetClsClassId, p.Request.CgroupParent, err) + } else { + p.updaters = append(p.updaters, updater) + klog.V(5).Infof("set pod %v/%v net class id %v on cgroup parent %v", + p.Request.PodMeta.Namespace, p.Request.PodMeta.Name, *p.Response.Resources.NetClsClassId, p.Request.CgroupParent) + } + } } diff --git a/pkg/koordlet/runtimehooks/protocol/protocol.go b/pkg/koordlet/runtimehooks/protocol/protocol.go index 0688e2bfe..1fe3a3f28 100644 --- a/pkg/koordlet/runtimehooks/protocol/protocol.go +++ b/pkg/koordlet/runtimehooks/protocol/protocol.go @@ -73,10 +73,11 @@ var HooksProtocolBuilder = hooksProtocolBuilder{ type Resources struct { // origin resources - CPUShares *int64 - CFSQuota *int64 - CPUSet *string - MemoryLimit *int64 + CPUShares *int64 + CFSQuota *int64 + CPUSet *string + MemoryLimit *int64 + NetClsClassId *string // extended resources CPUBvt *int64 @@ -176,3 +177,11 @@ func injectCPUIdle(cgroupParent string, idleValue int64, a *audit.EventHelper, e } return updater, nil } + +func injectNetClsClassId(cgroupParent string, classId string, a *audit.EventHelper, e resourceexecutor.ResourceUpdateExecutor) (resourceexecutor.ResourceUpdater, error) { + updater, err := resourceexecutor.DefaultCgroupUpdaterFactory.New(sysutil.NetClsClassIdName, cgroupParent, classId, a) + if err != nil { + return nil, err + } + return updater, nil +} diff --git a/pkg/koordlet/runtimehooks/reconciler/reconciler.go b/pkg/koordlet/runtimehooks/reconciler/reconciler.go index cef2f8656..1a5347ecb 100644 --- a/pkg/koordlet/runtimehooks/reconciler/reconciler.go +++ b/pkg/koordlet/runtimehooks/reconciler/reconciler.go @@ -17,6 +17,7 @@ limitations under the License. package reconciler import ( + "strconv" "sync" "time" @@ -102,6 +103,7 @@ type podQOSFilter struct{} const ( PodQOSFilterName = "podQOS" + HostNetWork = "hostNetwork" ) func (p *podQOSFilter) Name() string { @@ -122,6 +124,16 @@ func (p *podQOSFilter) Filter(podMeta *statesinformer.PodMeta) string { return string(qosClass) } +type podHostNetworkFilter struct{} + +func (p *podHostNetworkFilter) Name() string { + return HostNetWork +} + +func (p *podHostNetworkFilter) Filter(podMeta *statesinformer.PodMeta) string { + return strconv.FormatBool(podMeta.Pod.Spec.HostNetwork) +} + var singletonPodQOSFilter *podQOSFilter // PodQOSFilter returns a Filter which filters pod qos class @@ -132,6 +144,16 @@ func PodQOSFilter() *podQOSFilter { return singletonPodQOSFilter } +var singletonPodHostNetworkFilter *podHostNetworkFilter + +// PodHostNetworkFilter returns a Filter which filters pod hostnetwork is true +func PodHostNetworkFilter() *podHostNetworkFilter { + if singletonPodQOSFilter == nil { + singletonPodHostNetworkFilter = &podHostNetworkFilter{} + } + return singletonPodHostNetworkFilter +} + type reconcileFunc func(protocol.HooksProtocol) error // RegisterCgroupReconciler registers a cgroup reconciler according to the cgroup file, reconcile function and filter diff --git a/pkg/koordlet/runtimehooks/rule/rule.go b/pkg/koordlet/runtimehooks/rule/rule.go index de680010d..b7794c896 100644 --- a/pkg/koordlet/runtimehooks/rule/rule.go +++ b/pkg/koordlet/runtimehooks/rule/rule.go @@ -65,7 +65,7 @@ func (r *Rule) runUpdateCallbacks(target *statesinformer.CallbackTarget) { for _, callbackFn := range r.callbacks { if err := callbackFn(target); err != nil { cbName := runtime.FuncForPC(reflect.ValueOf(callbackFn).Pointer()).Name() - klog.Warningf("executing %s callback function %s failed, error %v", r.name, cbName) + klog.Warningf("executing %s callback function %s failed, error %v", r.name, cbName, err) } } } diff --git a/pkg/koordlet/util/system/cgroup_resource.go b/pkg/koordlet/util/system/cgroup_resource.go index 5ad67f18a..41e4a0e57 100644 --- a/pkg/koordlet/util/system/cgroup_resource.go +++ b/pkg/koordlet/util/system/cgroup_resource.go @@ -111,6 +111,7 @@ const ( // subsystems CgroupCPUAcctDir string = "cpuacct/" CgroupMemDir string = "memory/" CgroupBlkioDir string = "blkio/" + CgroupNetClsDir string = "net_cls/" CgroupV2Dir = "" ) @@ -169,6 +170,8 @@ const ( BlkioTWBpsName = "blkio.throttle.write_bps_device" BlkioIOWeightName = "blkio.cost.weight" BlkioIOQoSName = "blkio.cost.qos" + + NetClsClassIdName = "net_cls.classid" ) var ( @@ -193,6 +196,8 @@ var ( BlkioIOWeightValidator = &BlkIORangeValidator{min: 1, max: 100, resource: BlkioIOWeightName} BlkioIOQoSValidator = &BlkIORangeValidator{min: 0, max: math.MaxInt64, resource: BlkioIOQoSName} + NetClsClassIdValidator = &NetClsRangeValidator{resource: NetClsClassIdName} + CPUSetCPUSValidator = &CPUSetStrValidator{} ) @@ -240,6 +245,8 @@ var ( BlkioIOWeight = DefaultFactory.New(BlkioIOWeightName, CgroupBlkioDir).WithValidator(BlkioIOWeightValidator).WithCheckSupported(SupportedIfFileExistsInKubepods).WithCheckOnce(true) BlkioIOQoS = DefaultFactory.New(BlkioIOQoSName, CgroupBlkioDir).WithValidator(BlkioIOQoSValidator).WithSupported(SupportedIfFileExistsInRootCgroup(BlkioIOQoSName, CgroupBlkioDir)) + NetClsClassId = DefaultFactory.New(NetClsClassIdName, CgroupNetClsDir).WithValidator(NetClsClassIdValidator).WithCheckSupported(SupportedIfFileExistsInKubepods).WithCheckOnce(true) + knownCgroupResources = []Resource{ CPUStat, CPUShares, @@ -276,6 +283,7 @@ var ( BlkioWriteBps, BlkioIOWeight, BlkioIOQoS, + NetClsClassId, } CPUCFSQuotaV2 = DefaultFactory.NewV2(CPUCFSQuotaName, CPUMaxName) @@ -342,6 +350,7 @@ var ( MemoryOomGroupV2, BlkioIOWeight, BlkioIOQoS, + NetClsClassId, } ) diff --git a/pkg/koordlet/util/system/common_linux.go b/pkg/koordlet/util/system/common_linux.go index 8272d4eba..5854d8e49 100644 --- a/pkg/koordlet/util/system/common_linux.go +++ b/pkg/koordlet/util/system/common_linux.go @@ -23,6 +23,7 @@ import ( "bytes" "fmt" "io" + "net" "os" "os/exec" "path" @@ -36,6 +37,7 @@ import ( "unicode" "github.com/cakturk/go-netstat/netstat" + "github.com/vishvananda/netlink" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" ) @@ -222,3 +224,20 @@ func WorkingDirOf(pid int) (string, error) { return strings.TrimSpace(tokens[1]), nil } } + +func GetLinkInfoByDefaultRoute() (netlink.Link, error) { + routes, err := netlink.RouteListFiltered(netlink.FAMILY_V4, &netlink.Route{}, netlink.RT_FILTER_DST) + if err != nil { + return nil, err + } + if len(routes) == 0 { + return nil, fmt.Errorf("not find route info by dst ip=%s", net.IPv4zero.String()) + } + + linkInfo, err := netlink.LinkByIndex(routes[0].LinkIndex) + if err != nil { + return nil, err + } + + return linkInfo, nil +} diff --git a/pkg/koordlet/util/system/common_linux_test.go b/pkg/koordlet/util/system/common_linux_test.go index 03390d166..cf84b4aa2 100644 --- a/pkg/koordlet/util/system/common_linux_test.go +++ b/pkg/koordlet/util/system/common_linux_test.go @@ -20,6 +20,7 @@ limitations under the License. package system import ( + "fmt" "net" "os" "path/filepath" @@ -29,6 +30,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/vishvananda/netlink" ) func Test_KubeletPortToPid(t *testing.T) { @@ -115,3 +117,27 @@ func Test_WorkingDirOf(t *testing.T) { assert.NotEmpty(t, err) }) } + +func TestGetLinkInfoByDefaultRoute(t *testing.T) { + tests := []struct { + name string + want netlink.Link + wantErr assert.ErrorAssertionFunc + }{ + // TODO: Add test cases. + { + name: "normal", + want: nil, + wantErr: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := GetLinkInfoByDefaultRoute() + if !tt.wantErr(t, err, fmt.Sprintf("GetLinkInfoByDefaultRoute()")) { + return + } + //assert.Equalf(t, tt.want, got, "GetLinkInfoByDefaultRoute()") + }) + } +} diff --git a/pkg/koordlet/util/system/util_test_tool.go b/pkg/koordlet/util/system/util_test_tool.go index 4384e7726..a79c05b74 100644 --- a/pkg/koordlet/util/system/util_test_tool.go +++ b/pkg/koordlet/util/system/util_test_tool.go @@ -50,6 +50,7 @@ var ( BlkioReadIops, BlkioWriteBps, BlkioWriteIops, + NetClsClassId, } ) diff --git a/pkg/koordlet/util/system/validator.go b/pkg/koordlet/util/system/validator.go index bb4f5e672..dcd6a777d 100644 --- a/pkg/koordlet/util/system/validator.go +++ b/pkg/koordlet/util/system/validator.go @@ -114,3 +114,51 @@ func (r *BlkIORangeValidator) Validate(value string) (bool, string) { return true, "" } + +type NetClsRangeValidator struct { + resource string +} + +const ( + maxClassIdDecimal = 41231686041 + maxClassIdHex = 99999999 +) + +func (r *NetClsRangeValidator) Validate(value string) (bool, string) { + if value == "" { + return false, "value is nil" + } + + if r.resource == NetClsClassIdName { + if strings.HasPrefix(value, "0x") { + value = value[2:] + // You can write hexadecimal values to net_cls.classid; the format for these values is 0xAAAABBBB; + // AAAA is the major handle number and BBBB is the minor handle number. Reading net_cls.classid yields a decimal result. + // so, the max length of this value is 8. + hexVal, err := strconv.Atoi(value) + if err != nil { + return false, err.Error() + } + + if hexVal < 0 || hexVal > maxClassIdHex { + return false, "class id is invalid, decimal value must in 0~0x99999999" + } + } else { + decimalVal, err := strconv.ParseInt(value, 10, 32) + if err != nil { + return false, err.Error() + } + + maxClassId, err := strconv.ParseInt(strconv.Itoa(maxClassIdHex), 16, 32) + if err != nil { + return false, err.Error() + } + + if decimalVal > maxClassId || decimalVal < 0 { + return false, fmt.Sprintf("class id is invaild, decimal vaule must in 0~%d", maxClassId) + } + } + } + + return true, "" +} diff --git a/pkg/koordlet/util/system/validator_test.go b/pkg/koordlet/util/system/validator_test.go index 433a84d86..39f281cd5 100644 --- a/pkg/koordlet/util/system/validator_test.go +++ b/pkg/koordlet/util/system/validator_test.go @@ -70,3 +70,99 @@ func Test_RangeValidate(t *testing.T) { }) } } + +func TestNetClsRangeValidator_Validate(t *testing.T) { + type fields struct { + resource string + } + type args struct { + value string + } + tests := []struct { + name string + fields fields + args args + want bool + }{ + // TODO: Add test cases. + { + name: "nil", + fields: fields{ + resource: NetClsClassIdName, + }, + args: args{ + value: "", + }, + want: false, + }, + { + name: "not number", + fields: fields{ + resource: NetClsClassIdName, + }, + args: args{ + value: "abc", + }, + want: false, + }, + { + name: "decimal negative number", + fields: fields{ + resource: NetClsClassIdName, + }, + args: args{ + value: "-1", + }, + want: false, + }, + { + name: "decimal positive number but too big", + fields: fields{ + resource: NetClsClassIdName, + }, + args: args{ + value: "111111111111111111111111", + }, + want: false, + }, + { + name: "invalid hex number", + fields: fields{ + resource: NetClsClassIdName, + }, + args: args{ + value: "0xmm", + }, + want: false, + }, + { + name: "negative flag in hex number", + fields: fields{ + resource: NetClsClassIdName, + }, + args: args{ + value: "0x-1", + }, + want: false, + }, + { + name: "invalid number but too big", + fields: fields{ + resource: NetClsClassIdName, + }, + args: args{ + value: "0x1111111111111111", + }, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := &NetClsRangeValidator{ + resource: tt.fields.resource, + } + got, _ := r.Validate(tt.args.value) + assert.Equalf(t, tt.want, got, "Validate(%v)", tt.args.value) + }) + } +}