diff --git a/apis/extension/netqos_tc.go b/apis/extension/netqos_tc.go new file mode 100644 index 000000000..ef50900fe --- /dev/null +++ b/apis/extension/netqos_tc.go @@ -0,0 +1,74 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package extension + +import corev1 "k8s.io/api/core/v1" + +type NetQosGlobalConfig struct { + HwTxBpsMax uint64 `json:"hw_tx_bps_max"` + HwRxBpsMax uint64 `json:"hw_rx_bps_max"` + L1TxBpsMin uint64 `json:"l1_tx_bps_min"` + L1TxBpsMax uint64 `json:"l1_tx_bps_max"` + L2TxBpsMin uint64 `json:"l2_tx_bps_min"` + L2TxBpsMax uint64 `json:"l2_tx_bps_max"` + L1RxBpsMin uint64 `json:"l1_rx_bps_min"` + L1RxBpsMax uint64 `json:"l1_rx_bps_max"` + L2RxBpsMin uint64 `json:"l2_rx_bps_min"` + L2RxBpsMax uint64 `json:"l2_rx_bps_max"` +} + +type NetQoSClass string + +const ( + NETQoSHigh NetQoSClass = "high_class" + NETQoSMid NetQoSClass = "mid_class" + NETQoSLow NetQoSClass = "low_class" + NETQoSNone NetQoSClass = "" + + NETQOSConfigPathForNode = "/var/run/koordinator/net/node" + NETQOSConfigPathForPod = "/var/run/koordinator/net/pods" +) + +func GetPodNetQoSClassByName(qos string) NetQoSClass { + q := QoSClass(qos) + + switch q { + case QoSSystem: + return NETQoSHigh + case QoSLSE, QoSLSR, QoSLS: + return NETQoSMid + case QoSBE: + return NETQoSLow + } + + return NETQoSNone +} + +func GetPodNetQoSClass(pod *corev1.Pod) NetQoSClass { + if pod == nil || pod.Labels == nil { + return NETQoSNone + } + return GetNetQoSClassByAttrs(pod.Labels, pod.Annotations) +} + +func GetNetQoSClassByAttrs(labels, annotations map[string]string) NetQoSClass { + // annotations are for old format adaption reason + if q, exist := labels[LabelPodQoS]; exist { + return GetPodNetQoSClassByName(q) + } + return NETQoSNone +} diff --git a/config/crd/bases/slo.koordinator.sh_nodeslos.yaml b/config/crd/bases/slo.koordinator.sh_nodeslos.yaml index 1f5970e82..6fe1b0643 100644 --- a/config/crd/bases/slo.koordinator.sh_nodeslos.yaml +++ b/config/crd/bases/slo.koordinator.sh_nodeslos.yaml @@ -1267,6 +1267,9 @@ spec: policies: description: Policies of pod QoS. properties: + NETQOSPolicy: + description: applied policy for the Net QoS, default = "tc" + type: string cpuPolicy: description: applied policy for the CPU QoS, default = "groupIdentity" type: string diff --git a/docker/koordlet.dockerfile b/docker/koordlet.dockerfile index f1362530b..e1ee75f00 100644 --- a/docker/koordlet.dockerfile +++ b/docker/koordlet.dockerfile @@ -6,6 +6,7 @@ ARG TARGETARCH ENV VERSION $VERSION ENV GOOS linux ENV GOARCH $TARGETARCH +ENV GOPROXY https://goproxy.cn,direct COPY go.mod go.mod COPY go.sum go.sum @@ -35,6 +36,7 @@ RUN go build -a -o koordlet cmd/koordlet/main.go FROM --platform=$TARGETPLATFORM nvidia/cuda:11.6.2-base-ubuntu20.04 WORKDIR / RUN apt-get update && apt-get install -y lvm2 && rm -rf /var/lib/apt/lists/* +RUN apt-get update && apt-get install -y iptables COPY --from=builder /go/src/github.com/koordinator-sh/koordinator/koordlet . COPY --from=builder /usr/local/lib /usr/lib ENTRYPOINT ["/koordlet"] diff --git a/go.mod b/go.mod index acf40aa15..625cf4f3b 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/NVIDIA/go-nvml v0.11.6-0.0.20220823120812-7e2082095e82 github.com/cakturk/go-netstat v0.0.0-20200220111822-e5b49efee7a5 github.com/containerd/nri v0.3.0 + github.com/coreos/go-iptables v0.5.0 github.com/docker/docker v20.10.21+incompatible github.com/evanphx/json-patch v5.6.0+incompatible github.com/fsnotify/fsnotify v1.6.0 @@ -185,7 +186,7 @@ require ( github.com/stretchr/objx v0.5.0 // indirect github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635 // indirect github.com/ugorji/go/codec v1.2.7 // indirect - github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 // indirect + github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 github.com/vishvananda/netns v0.0.0-20210104183010-2eb08e3e575f // indirect github.com/vmware/govmomi v0.30.0 // indirect go.etcd.io/etcd/api/v3 v3.5.5 // indirect diff --git a/go.sum b/go.sum index 4a5a0ad72..986aa949a 100644 --- a/go.sum +++ b/go.sum @@ -344,6 +344,7 @@ github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkE github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-iptables v0.4.5/go.mod h1:/mVI274lEDI2ns62jHCDnCyBF9Iwsmekav8Dbxlm1MU= +github.com/coreos/go-iptables v0.5.0 h1:mw6SAibtHKZcNzAsOxjoHIG0gy5YFHhypWSSNc6EjbQ= github.com/coreos/go-iptables v0.5.0/go.mod h1:/mVI274lEDI2ns62jHCDnCyBF9Iwsmekav8Dbxlm1MU= github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= diff --git a/pkg/koordlet/runtimehooks/config.go b/pkg/koordlet/runtimehooks/config.go index 294a92b7f..c99384eef 100644 --- a/pkg/koordlet/runtimehooks/config.go +++ b/pkg/koordlet/runtimehooks/config.go @@ -32,6 +32,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/groupidentity" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/terwayqos" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/tc" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" ) @@ -80,6 +81,11 @@ const ( // owner: @l1b0k // alpha: v1.5 TerwayQoS featuregate.Feature = "TerwayQoS" + + // NetQosByTC declines a network qos implementation based on tc. + // owner: @lucming + // alpha: v1.5 + NetQosByTC featuregate.Feature = "TC" ) var ( @@ -91,6 +97,7 @@ var ( CPUNormalization: {Default: false, PreRelease: featuregate.Alpha}, CoreSched: {Default: false, PreRelease: featuregate.Alpha}, TerwayQoS: {Default: false, PreRelease: featuregate.Alpha}, + NetQosByTC: {Default: false, PreRelease: featuregate.Alpha}, } runtimeHookPlugins = map[featuregate.Feature]HookPlugin{ @@ -101,6 +108,7 @@ var ( CPUNormalization: cpunormalization.Object(), CoreSched: coresched.Object(), TerwayQoS: terwayqos.Object(), + NetQosByTC: tc.Object(), } ) diff --git a/pkg/koordlet/runtimehooks/hooks/tc/helper.go b/pkg/koordlet/runtimehooks/hooks/tc/helper.go new file mode 100644 index 000000000..dd823e7f8 --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/helper.go @@ -0,0 +1,90 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/koordinator-sh/koordinator/apis/extension" + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" +) + +func loadConfigFromNodeSlo(nodesloSpec *slov1alpha1.NodeSLOSpec) *extension.NetQosGlobalConfig { + res := extension.NetQosGlobalConfig{} + var total uint64 = 0 + if nodesloSpec != nil && nodesloSpec.SystemStrategy != nil { + total = uint64(nodesloSpec.SystemStrategy.TotalNetworkBandwidth.Value()) + res.HwRxBpsMax = total + res.HwTxBpsMax = total + } + + if nodesloSpec.ResourceQOSStrategy == nil { + return &res + } + + strategy := nodesloSpec.ResourceQOSStrategy + if strategy.LSClass != nil && + strategy.LSClass.NetworkQOS != nil && + *strategy.LSClass.NetworkQOS.Enable { + cur := strategy.LSClass.NetworkQOS + res.L1RxBpsMin = getBandwidthVal(total, cur.IngressRequest) + res.L1RxBpsMax = getBandwidthVal(total, cur.IngressLimit) + res.L1TxBpsMin = getBandwidthVal(total, cur.EgressRequest) + res.L1TxBpsMax = getBandwidthVal(total, cur.EgressLimit) + } + + if strategy.BEClass != nil && + strategy.BEClass.NetworkQOS != nil && + *strategy.BEClass.NetworkQOS.Enable { + cur := strategy.BEClass.NetworkQOS + res.L2RxBpsMin = getBandwidthVal(total, cur.IngressRequest) + res.L2RxBpsMax = getBandwidthVal(total, cur.IngressLimit) + res.L2TxBpsMin = getBandwidthVal(total, cur.EgressRequest) + res.L2TxBpsMax = getBandwidthVal(total, cur.EgressLimit) + } + + return &res +} + +func getBandwidthVal(total uint64, intOrPercent *intstr.IntOrString) uint64 { + if intOrPercent == nil { + return 0 + } + + switch intOrPercent.Type { + case intstr.String: + return getBandwidthByQuantityFormat(intOrPercent.StrVal) + case intstr.Int: + return getBandwidthByPercentageFormat(total, intOrPercent.IntValue()) + default: + return 0 + } +} + +func getBandwidthByQuantityFormat(quanityStr string) uint64 { + val, err := resource.ParseQuantity(quanityStr) + if err != nil { + return 0 + } + + return uint64(val.Value()) +} + +func getBandwidthByPercentageFormat(total uint64, percentage int) uint64 { + return total * uint64(percentage) / 100 +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/ipset.go b/pkg/koordlet/runtimehooks/hooks/tc/ipset.go new file mode 100644 index 000000000..7197cffc3 --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/ipset.go @@ -0,0 +1,78 @@ +//go:build linux +// +build linux + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "fmt" + + "github.com/vishvananda/netlink" + apierror "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" +) + +func (p *tcPlugin) ipsetExisted() (bool, error) { + var errs []error + for _, cur := range ipsets { + if _, err := netlink.IpsetList(cur); err != nil { + errs = append(errs, err) + } + } + + if apierror.NewAggregate(errs) != nil { + return false, apierror.NewAggregate(errs) + } + + return true, nil +} + +func (p *tcPlugin) EnsureIpset() error { + klog.V(5).Infof("start to create ipset.") + var errs []error + for _, cur := range ipsets { + result, err := netlink.IpsetList(cur) + if err == nil && result != nil { + continue + } + + err = netlink.IpsetCreate(cur, "hash:ip", netlink.IpsetCreateOptions{}) + if err != nil { + err = fmt.Errorf("failed to create ipset. err=%v", err) + errs = append(errs, err) + } + } + + return apierror.NewAggregate(errs) +} + +func (p *tcPlugin) DestoryIpset() error { + klog.V(5).Infof("start to delete ipset rules crated by tc plugin.") + var errs []error + for _, cur := range ipsets { + result, err := netlink.IpsetList(cur) + if err == nil && result != nil { + if err := netlink.IpsetDestroy(cur); err != nil { + err = fmt.Errorf("failed to destroy ipset. err=%v", err) + errs = append(errs, err) + } + } + } + + return apierror.NewAggregate(errs) +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/iptables.go b/pkg/koordlet/runtimehooks/hooks/tc/iptables.go new file mode 100644 index 000000000..266c71f30 --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/iptables.go @@ -0,0 +1,142 @@ +//go:build linux +// +build linux + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "fmt" + + apierror "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/apis/extension" +) + +func (p *tcPlugin) EnsureIptables() error { + klog.V(5).Infof("start to create iptables.") + if p.iptablesHandler == nil { + return fmt.Errorf("can't create tc iptables rulues,because qos manager is nil") + } + ipsetToClassid := map[extension.NetQoSClass]string{ + extension.NETQoSHigh: "0001:0002", + extension.NETQoSMid: "0001:0003", + extension.NETQoSLow: "0001:0004", + } + + for ipsetName, classid := range ipsetToClassid { + exp := fmt.Sprintf("-A POSTROUTING -m set --match-set %s src -j CLASSIFY --set-class %s", string(ipsetName), classid) + existed := false + + // looks like this one: + // -A POSTROUTING -m set --match-set mid_class src -j CLASSIFY --set-class 0001:0003 + rules, err := p.iptablesHandler.List("mangle", "POSTROUTING") + if err == nil { + for _, rule := range rules { + if rule == exp { + existed = true + break + } + } + } + + if existed { + continue + } + err = p.iptablesHandler.Append("mangle", "POSTROUTING", + "-m", "set", "--match-set", string(ipsetName), "src", + "-j", "CLASSIFY", "--set-class", classid) + if err != nil { + klog.Errorf("ipt append err=%v", err) + return err + } + } + + return nil +} + +func (p *tcPlugin) DelIptables() error { + klog.V(5).Infof("start to delete iptables rules crated by tc plugin.") + if p.iptablesHandler == nil { + return fmt.Errorf("can't create tc iptables rulues,because qos manager is nil") + } + ipsetToClassid := map[extension.NetQoSClass]string{ + extension.NETQoSHigh: "0001:0002", + extension.NETQoSMid: "0001:0003", + extension.NETQoSLow: "0001:0004", + } + var errs []error + + for ipsetName, classid := range ipsetToClassid { + exp := fmt.Sprintf("-A POSTROUTING -m set --match-set %s src -j CLASSIFY --set-class %s", string(ipsetName), classid) + existed := false + + // looks like this one: + // -A POSTROUTING -m set --match-set mid_class src -j CLASSIFY --set-class 0001:0003 + rules, err := p.iptablesHandler.List("mangle", "POSTROUTING") + if err != nil || rules == nil { + continue + } + for _, rule := range rules { + if rule == exp { + existed = true + break + } + } + + if existed { + err = p.iptablesHandler.Delete("mangle", "POSTROUTING", + "-m", "set", "--match-set", string(ipsetName), "src", + "-j", "CLASSIFY", "--set-class", classid) + errs = append(errs, err) + } + } + + return apierror.NewAggregate(errs) +} + +func (p *tcPlugin) iptablesExisted() (bool, error) { + ipsetToClassid := map[extension.NetQoSClass]string{ + extension.NETQoSHigh: "0001:0002", + extension.NETQoSMid: "0001:0003", + extension.NETQoSLow: "0001:0004", + } + + for ipsetName, classid := range ipsetToClassid { + exp := fmt.Sprintf("-A POSTROUTING -m set --match-set %s src -j CLASSIFY --set-class %s", string(ipsetName), classid) + existed := false + + // looks like this one: + // -A POSTROUTING -m set --match-set mid_class src -j CLASSIFY --set-class 0001:0003 + rules, err := p.iptablesHandler.List("mangle", "POSTROUTING") + if err == nil { + for _, rule := range rules { + if rule == exp { + existed = true + break + } + } + } + + if !existed { + return false, fmt.Errorf("iptables for matching ipset(%s) not found", ipsetName) + } + } + + return true, nil +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/rule.go b/pkg/koordlet/runtimehooks/hooks/tc/rule.go new file mode 100644 index 000000000..418a8c37c --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/rule.go @@ -0,0 +1,49 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "sync" + + "github.com/koordinator-sh/koordinator/apis/extension" +) + +type tcRule struct { + lock sync.RWMutex + enable bool + netCfg *extension.NetQosGlobalConfig + speed uint64 +} + +func newRule() *tcRule { + return &tcRule{ + lock: sync.RWMutex{}, + enable: false, + netCfg: nil, + speed: 0, + } +} + +func (r *tcRule) IsEnabled() bool { + r.lock.RLock() + defer r.lock.RUnlock() + return r.enable +} + +func (r *tcRule) GetNetCfg() *extension.NetQosGlobalConfig { + return r.netCfg +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/tc.go b/pkg/koordlet/runtimehooks/hooks/tc/tc.go new file mode 100644 index 000000000..d352febdc --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/tc.go @@ -0,0 +1,46 @@ +//go:build !linux +// +build !linux + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks" +) + +const ( + name = "TCInjection" + description = "set tc rules for nodes" +) + +type tcPlugin struct{} + +func Object() *tcPlugin { + return nil +} + +func (n *tcPlugin) Reconcile() { + klog.Info("net qos plugin start to reconcile in !linux os") + return +} + +func (n *tcPlugin) Register(op hooks.Options) { + klog.V(5).Infof("register hook %v", name) +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/tc_linux.go b/pkg/koordlet/runtimehooks/hooks/tc/tc_linux.go new file mode 100644 index 000000000..19d0f4bcf --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/tc_linux.go @@ -0,0 +1,550 @@ +//go:build linux +// +build linux + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "errors" + "fmt" + "net" + "os" + "sync" + + "github.com/coreos/go-iptables/iptables" + "github.com/vishvananda/netlink" + "go.uber.org/atomic" + apierror "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/apis/extension" + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/rule" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" + "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" +) + +const ( + name = "tcPlugin" + description = "setup tc rules for node" + + ruleNameForNodeSLO = name + " (nodeSLO)" + ruleNameForAllPods = name + " (allPods)" +) + +const ( + MAJOR_ID = 1 + QDISC_MINOR_ID = 0 + ROOT_CLASS_MINOR_ID = 1 + HIGH_CLASS_MINOR_ID = 2 + MID_CLASS_MINOR_ID = 3 + LOW_CLASS_MINOR_ID = 4 + + // 0-7, In the round-robin process, classes with the lowest priority field are tried for packets first. + HIGH_CLASS_PRIO = 0 + MID_CLASS_PRIO = 1 + LOW_CLASS_PRIO = 2 + + // Maximum rate this class and all its children are guaranteed. Mandatory. + // attention: the values below only represent the percentage of bandwidth can be used by different tc classes on the host, + // the real values need to be calculated based on the physical network bandwidth. + // eg: eth0: speed:200Mbit => high_clss.rate = 200Mbit * 40 / 100 = 80Mbit + LOW_CLASS_RATE_PERCENTAGE = 30 + MID_CLASS_RATE_PERCENTAGE = 30 + HIGH_CLASS_RATE_PERCENTAGE = 40 + + // Maximum rate at which a class can send, if its parent has bandwidth to spare. Defaults to the configured rate, + // which implies no borrowing + CEIL_PERCENTAGE = 100 + + DEFAULT_INTERFACE_NAME = "eth0" +) + +var ( + rootClass = netlink.MakeHandle(MAJOR_ID, ROOT_CLASS_MINOR_ID) + highClass = netlink.MakeHandle(MAJOR_ID, HIGH_CLASS_MINOR_ID) + midClass = netlink.MakeHandle(MAJOR_ID, MID_CLASS_MINOR_ID) + lowClass = netlink.MakeHandle(MAJOR_ID, LOW_CLASS_MINOR_ID) + + ipsets = []string{"high_class", "mid_class", "low_class"} +) + +type tcPlugin struct { + rule *tcRule + + initialized *atomic.Bool // whether the cache has been initialized + allPodsSyncOnce sync.Once // sync once for AllPods + + // this is the physical NIC on host, default eth0 + interfLink netlink.Link + + // for executing the iptables command. + iptablesHandler *iptables.IPTables + // for executing the tc and ipset command. + netLinkHandler netlink.Handle +} + +var singleton *tcPlugin + +func Object() *tcPlugin { + if singleton == nil { + singleton = newPlugin() + } + return singleton +} + +func newPlugin() *tcPlugin { + return &tcPlugin{ + initialized: atomic.NewBool(false), + netLinkHandler: netlink.Handle{}, + allPodsSyncOnce: sync.Once{}, + rule: newRule(), + } +} + +func (p *tcPlugin) Register(op hooks.Options) { + klog.V(5).Infof("register hook %v", name) + + rule.Register(ruleNameForNodeSLO, description, + rule.WithParseFunc(statesinformer.RegisterTypeNodeSLOSpec, p.parseRuleForNodeSLO), + rule.WithUpdateCallback(p.ruleUpdateCb), + ) + + rule.Register(ruleNameForAllPods, description, + rule.WithParseFunc(statesinformer.RegisterTypeAllPods, p.parseForAllPods), + rule.WithUpdateCallback(p.ruleUpdateCb)) +} + +func (p *tcPlugin) parseRuleForNodeSLO(mergedNodeSLOIf interface{}) (bool, error) { + mergedNodeSLO := mergedNodeSLOIf.(*slov1alpha1.NodeSLOSpec) + if mergedNodeSLO == nil { + return false, nil + } + qosStrategy := mergedNodeSLO.ResourceQOSStrategy + + // default policy enables + isNETQOSPolicyTC := qosStrategy == nil || qosStrategy.Policies == nil || qosStrategy.Policies.NETQOSPolicy == nil || + *qosStrategy.Policies.NETQOSPolicy == slov1alpha1.NETQOSPolicyTC + + if isNETQOSPolicyTC { + if mergedNodeSLO.SystemStrategy == nil { + return false, nil + } + p.rule.enable = true + p.rule.speed = uint64(mergedNodeSLO.SystemStrategy.TotalNetworkBandwidth.Value()) + p.rule.netCfg = loadConfigFromNodeSlo(mergedNodeSLO) + } else { + p.rule.enable = false + } + + return true, nil +} + +func (p *tcPlugin) parseForAllPods(e interface{}) (bool, error) { + _, ok := e.(*struct{}) + if !ok { + return false, fmt.Errorf("invalid rule type %T", e) + } + + needSync := false + p.allPodsSyncOnce.Do(func() { + needSync = true + klog.V(5).Infof("plugin %s callback the first all pods update", name) + }) + return needSync, nil +} + +func (p *tcPlugin) prepare() error { + linkInfo, err := system.GetLinkInfoByDefaultRoute() + if err != nil { + klog.Errorf("failed to get link info by default route. err=%v\n", err) + return err + } + if linkInfo == nil { + klog.Errorf("link info is nil") + return errors.New("link info is nil") + } + + p.interfLink = linkInfo + + ipt, err := iptables.New() + if err != nil { + klog.Errorf("failed to get iptables handler in those dir(%s). err=%v\n", os.Getenv("PATH"), err) + return err + } + p.iptablesHandler = ipt + p.initialized = atomic.NewBool(true) + + return nil +} + +func (p *tcPlugin) ruleUpdateCb(target *statesinformer.CallbackTarget) error { + if !p.initialized.Load() { + if err := p.prepare(); err != nil { + return err + } + } + + if !p.rule.IsEnabled() { + klog.V(5).Infof("tc plugin is not enabled, ready to cleanup related rules.") + return p.CleanUp() + } + + if target == nil { + return errors.New("callback target is nil") + } + + podMetas := target.Pods + if len(podMetas) <= 0 { + klog.V(5).Infof("plugin %s skipped for rule update, no pod passed from callback", name) + return nil + } + + return p.refreshForAllPods(podMetas) +} + +func (p *tcPlugin) refreshForAllPods(pods []*statesinformer.PodMeta) error { + if err := p.InitRelatedRules(); err != nil { + klog.Errorf("failed to init some necessary rules. err=%v", err) + return err + } + + IPInK8S := make(map[string]sets.String) + IPInIpset := make(map[string]sets.String) + + for _, pod := range pods { + netqos := extension.GetPodNetQoSClass(pod.Pod) + if netqos == extension.NETQoSNone { + continue + } + + if IPInK8S[string(netqos)] == nil { + IPInK8S[string(netqos)] = sets.NewString() + } + IPInK8S[string(netqos)].Insert(pod.Pod.Status.PodIP) + } + + for _, setName := range ipsets { + result, err := netlink.IpsetList(setName) + if err != nil || result == nil { + klog.Errorf("failed to get ipset.err=%v", err) + continue + } + + for _, entry := range result.Entries { + if IPInIpset[setName] == nil { + IPInIpset[setName] = sets.NewString() + } + IPInIpset[setName].Insert(entry.IP.String()) + } + } + + for _, setName := range ipsets { + for ip := range IPInK8S[setName].Difference(IPInIpset[setName]) { + klog.V(5).Infof("ready to add %s to ipset %s.\n", ip, setName) + if err := netlink.IpsetAdd(setName, &netlink.IPSetEntry{IP: net.ParseIP(ip).To4()}); err != nil { + klog.Warningf("failed to write ip %s to ipset %s, err=%v", ip, setName, err) + return err + } + } + + for ip := range IPInIpset[setName].Difference(IPInK8S[setName]) { + klog.V(5).Infof("ready to del ip %s from ipset %s.\n", ip, setName) + if err := netlink.IpsetDel(setName, &netlink.IPSetEntry{IP: net.ParseIP(ip).To4()}); err != nil { + klog.Warningf("failed to write ip %s to ipset %s, err=%v", ip, setName, err) + return err + } + } + } + + return nil +} + +func (p *tcPlugin) InitRelatedRules() error { + return apierror.NewAggregate([]error{ + p.EnsureQdisc(), + p.EnsureClasses(), + p.EnsureIpset(), + p.EnsureIptables(), + }) +} + +func (p *tcPlugin) CleanUp() error { + return apierror.NewAggregate([]error{ + p.DelQdisc(), + p.DelIptables(), + p.DestoryIpset(), + }) +} + +func (p *tcPlugin) EnsureQdisc() error { + klog.V(5).Infoln("start to create qdisc for default net interface") + attrs := netlink.QdiscAttrs{ + LinkIndex: p.interfLink.Attrs().Index, + Handle: netlink.MakeHandle(MAJOR_ID, QDISC_MINOR_ID), + Parent: netlink.HANDLE_ROOT, + } + htb := netlink.NewHtb(attrs) + + qdiscs, err := p.netLinkHandler.QdiscList(p.interfLink) + if err != nil { + return fmt.Errorf("failed to get qdisc. err=%v", err) + } + + for _, qdisc := range qdiscs { + if qdisc.Type() != "htb" { + continue + } + if qdisc.Attrs().Handle == htb.Handle { + return nil + } + if err := netlink.QdiscDel(htb); err != nil { + return fmt.Errorf("failed to delete old qidsc on %s, err=%v", p.interfLink.Attrs().Name, err) + } + } + + return p.netLinkHandler.QdiscAdd(htb) +} + +func (p *tcPlugin) DelQdisc() error { + klog.V(5).Infof("start to delete qdisc crated by tc plugin.") + attrs := netlink.QdiscAttrs{ + LinkIndex: p.interfLink.Attrs().Index, + Handle: netlink.MakeHandle(MAJOR_ID, QDISC_MINOR_ID), + Parent: netlink.HANDLE_ROOT, + } + htb := netlink.NewHtb(attrs) + + qdiscs, err := p.netLinkHandler.QdiscList(p.interfLink) + if err != nil { + return err + } + + for _, qdisc := range qdiscs { + if qdisc.Type() == "htb" && qdisc.Attrs().Handle == htb.Handle { + if err := netlink.QdiscDel(htb); err != nil { + return fmt.Errorf("failed to delete old qidsc on %s, err=%v", p.interfLink.Attrs().Name, err) + } + } + } + + return nil +} + +func (p *tcPlugin) EnsureClasses() error { + klog.V(5).Infof("start to create tc class rules.") + cfg := p.rule.GetNetCfg() + if cfg == nil { + return errors.New("net config is nil") + } + + return apierror.NewAggregate([]error{ + p.ensureClass(p.interfLink, newClass(p.interfLink.Attrs().Index, netlink.HANDLE_ROOT, rootClass, cfg.HwTxBpsMax, cfg.HwTxBpsMax, 0)), + p.ensureClass(p.interfLink, newClass(p.interfLink.Attrs().Index, rootClass, highClass, cfg.HwTxBpsMax-cfg.L1TxBpsMin-cfg.L2TxBpsMin, cfg.HwTxBpsMax, HIGH_CLASS_PRIO)), + p.ensureClass(p.interfLink, newClass(p.interfLink.Attrs().Index, rootClass, midClass, cfg.L1TxBpsMin, cfg.L1TxBpsMax, MID_CLASS_PRIO)), + p.ensureClass(p.interfLink, newClass(p.interfLink.Attrs().Index, rootClass, lowClass, cfg.L2TxBpsMin, cfg.L2TxBpsMax, LOW_CLASS_PRIO)), + }) +} + +func newClass(index int, parent, minor uint32, rate, ceil uint64, prio uint32) *netlink.HtbClass { + attr := netlink.ClassAttrs{ + LinkIndex: index, + Parent: parent, + Handle: minor, + } + classAttr := netlink.HtbClassAttrs{ + Rate: rate, + Ceil: ceil, + Prio: prio, + } + htbClass := NewHtbClass(attr, classAttr) + if htbClass.Cbuffer < 200 { + htbClass.Cbuffer = 200 + } + if htbClass.Buffer < 200 { + htbClass.Buffer = 200 + } + + return htbClass +} + +// NewHtbClass NOTE: function is in here because it uses other linux functions +func NewHtbClass(attrs netlink.ClassAttrs, cattrs netlink.HtbClassAttrs) *netlink.HtbClass { + mtu := 1600 + rate := cattrs.Rate / 8 + ceil := cattrs.Ceil / 8 + buffer := cattrs.Buffer + cbuffer := cattrs.Cbuffer + + if ceil == 0 { + ceil = rate + } + + if buffer == 0 { + buffer = uint32(float64(rate)/netlink.Hz() + float64(mtu)) + klog.V(2).Infof("buffer[%v]=rate[%v]/hz[%v]+mtu[%v]\n", buffer, rate, netlink.Hz(), mtu) + } + dstBuffer := netlink.Xmittime(rate, buffer) + klog.V(2).Infof("buffer[%v]=(1000000*(srcBuffer[%v]/rate[%v]))/tick[%v]\n", dstBuffer, buffer, rate, netlink.TickInUsec()) + + if cbuffer == 0 { + cbuffer = uint32(float64(ceil)/netlink.Hz() + float64(mtu)) + klog.V(2).Infof("cbuffer[%v]=ceil[%v]/hz[%v]+mtu[%v]\n", cbuffer, ceil, netlink.Hz(), mtu) + } + dstCbuffer := netlink.Xmittime(ceil, cbuffer) + klog.V(2).Infof("cbuffer[%v]=(1000000*(srcCbuffer[%v]/ceil[%v]))/tick[%v]", dstCbuffer, cbuffer, ceil, netlink.TickInUsec()) + + return &netlink.HtbClass{ + ClassAttrs: attrs, + Rate: rate, + Ceil: ceil, + Buffer: buffer, + Cbuffer: cbuffer, + Level: 0, + Prio: cattrs.Prio, + Quantum: cattrs.Quantum, + } +} + +func (p *tcPlugin) ensureClass(nic netlink.Link, expect *netlink.HtbClass) error { + classes, err := netlink.ClassList(nic, 0) + if err != nil { + return fmt.Errorf("failed to get tc class. err=%v", err) + } + var existing *netlink.HtbClass + for _, class := range classes { + switch class.(type) { + case *netlink.HtbClass: + htbClass := class.(*netlink.HtbClass) + if htbClass != nil && + htbClass.Handle == expect.Handle && + htbClass.Parent == expect.Parent { + existing = htbClass + break + } + } + } + + if existing != nil { + if expect.Rate == existing.Rate && + expect.Ceil == existing.Ceil && + expect.Prio == existing.Prio && + expect.Buffer == existing.Buffer && + expect.Cbuffer == existing.Cbuffer { + return nil + } + if err := netlink.ClassChange(expect); err != nil { + return fmt.Errorf("failed to change class from %v to %v on interface %s. err=: %v", existing, expect, p.interfLink.Attrs().Name, err) + } + klog.Infof("succeed to changed htb class from %v to %v on interface %s.", existing, expect, p.interfLink.Attrs().Name) + return nil + } + if err := netlink.ClassAdd(expect); err != nil { + return fmt.Errorf("failed to create htb class %v: %v on interface %s. err=%v", expect, err, p.interfLink.Attrs().Name, err) + } + + klog.V(2).Infof("succed to creat htb class: %v on interface %s\n", expect, p.interfLink.Attrs().Name) + return nil +} + +func (p *tcPlugin) checkAllRulesExisted() (bool, error) { + if _, err := p.QdiscExisted(); err != nil { + return false, err + } + + if _, err := p.classesExisted(); err != nil { + return false, err + } + + if _, err := p.ipsetExisted(); err != nil { + return false, err + } + + if _, err := p.iptablesExisted(); err != nil { + return false, err + } + + return true, nil +} + +func (p *tcPlugin) QdiscExisted() (bool, error) { + attrs := netlink.QdiscAttrs{ + LinkIndex: p.interfLink.Attrs().Index, + Handle: netlink.MakeHandle(MAJOR_ID, QDISC_MINOR_ID), + Parent: netlink.HANDLE_ROOT, + } + htb := netlink.NewHtb(attrs) + + qdiscs, err := p.netLinkHandler.QdiscList(p.interfLink) + if err != nil || qdiscs == nil { + return false, err + } + + if len(qdiscs) == 1 && qdiscs[0].Type() == "htb" && + qdiscs[0].Attrs().Handle == htb.Handle { + return true, nil + } + + return false, fmt.Errorf("qdisc not found") +} + +func (p *tcPlugin) classesExisted() (bool, error) { + link, err := netlink.LinkByIndex(p.interfLink.Attrs().Index) + if err != nil { + return false, err + } + + maxCeil := p.rule.speed * CEIL_PERCENTAGE / 100 + // other leaf class + highClassRate := p.rule.speed * HIGH_CLASS_RATE_PERCENTAGE / 100 + midClassRate := p.rule.speed * MID_CLASS_RATE_PERCENTAGE / 100 + lowClassRate := p.rule.speed * LOW_CLASS_RATE_PERCENTAGE / 100 + + errs := apierror.NewAggregate([]error{ + p.classExisted(link, newClass(p.interfLink.Attrs().Index, netlink.HANDLE_ROOT, rootClass, maxCeil, maxCeil, 0)), + p.classExisted(link, newClass(p.interfLink.Attrs().Index, rootClass, highClass, highClassRate, maxCeil, HIGH_CLASS_PRIO)), + p.classExisted(link, newClass(p.interfLink.Attrs().Index, rootClass, midClass, midClassRate, maxCeil, MID_CLASS_PRIO)), + p.classExisted(link, newClass(p.interfLink.Attrs().Index, rootClass, lowClass, lowClassRate, maxCeil, LOW_CLASS_PRIO)), + }) + + if errs != nil { + return false, errs + } + + return true, nil +} + +func (p *tcPlugin) classExisted(nic netlink.Link, expect *netlink.HtbClass) error { + classes, err := netlink.ClassList(nic, 0) + if err != nil { + return err + } + + for _, class := range classes { + htbClass := class.(*netlink.HtbClass) + if class.Type() == "htb" && + htbClass.Handle == expect.Handle && + htbClass.Parent == expect.Parent { + return nil + } + } + + return fmt.Errorf("class(classid:%d) not find", expect.Handle) +} diff --git a/pkg/koordlet/runtimehooks/hooks/tc/tc_linux_test.go b/pkg/koordlet/runtimehooks/hooks/tc/tc_linux_test.go new file mode 100644 index 000000000..e543bcfe1 --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/tc/tc_linux_test.go @@ -0,0 +1,282 @@ +//go:build linux +// +build linux + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tc + +import ( + "fmt" + "sync" + "testing" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/vishvananda/netlink" + "go.uber.org/atomic" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" +) + +func TestObject(t *testing.T) { + t.Run("test", func(t *testing.T) { + b := Object() + assert.NotNil(t, b) + b1 := Object() + assert.Equal(t, b, b1) + }) +} + +func Test_bvtPlugin_Register(t *testing.T) { + t.Run("register tc plugin", func(t *testing.T) { + r := &tcPlugin{} + r.Register(hooks.Options{}) + }) +} + +func newTestTCPlugin() *tcPlugin { + klog.Info("start to init net qos manager") + n := tcPlugin{ + netLinkHandler: netlink.Handle{}, + rule: &tcRule{ + enable: true, + netCfg: &extension.NetQosGlobalConfig{ + HwTxBpsMax: 1000000000, + HwRxBpsMax: 1000000000, + L1TxBpsMin: 500000000, + L1TxBpsMax: 1000000000, + L2TxBpsMin: 500000000, + L2TxBpsMax: 1000000000, + L1RxBpsMin: 500000000, + L1RxBpsMax: 1000000000, + L2RxBpsMin: 500000000, + L2RxBpsMax: 1000000000, + }, + lock: sync.RWMutex{}, + }, + initialized: atomic.NewBool(true), + } + + return &n +} + +func TestTCPlugin_Init(t *testing.T) { + plugin := newTestTCPlugin() + if err := plugin.prepare(); err != nil { + return + } + + tests := []struct { + name string + preHandle func() error + wantErr bool + endHandle func() error + }{ + { + name: "tc qdisc rules already existed", + preHandle: plugin.EnsureQdisc, + wantErr: false, + endHandle: plugin.CleanUp, + }, + { + name: "tc class rules already existed", + preHandle: func() error { + return errors.NewAggregate([]error{ + plugin.EnsureQdisc(), + plugin.EnsureClasses(), + }) + }, + wantErr: false, + endHandle: plugin.CleanUp, + }, + { + name: "ipset rules already existed", + preHandle: plugin.EnsureIpset, + wantErr: false, + endHandle: plugin.CleanUp, + }, + { + name: "iptables rules already existed", + preHandle: func() error { + return errors.NewAggregate([]error{ + plugin.EnsureIpset(), + plugin.EnsureIptables(), + }) + }, + wantErr: false, + endHandle: plugin.CleanUp, + }, + { + name: "all rulues have already been inited", + preHandle: plugin.InitRelatedRules, + wantErr: false, + endHandle: plugin.CleanUp, + }, + { + name: "cleanup all rules will be used in advance", + preHandle: plugin.CleanUp, + wantErr: false, + endHandle: plugin.CleanUp, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := tt.preHandle(); err != nil { + t.Errorf("failed to run preHandle.err=%v", err) + return + } + if err := plugin.InitRelatedRules(); (err != nil) != tt.wantErr { + t.Errorf("Init() error = %v, wantErr %v", err, tt.wantErr) + return + } + if err := tt.endHandle(); err != nil { + t.Errorf("failed to run endHandle.err=%v", err) + return + } + }) + } +} + +func genPod(podName, netqos, ip string) *statesinformer.PodMeta { + return &statesinformer.PodMeta{ + Pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + Labels: map[string]string{ + "koordinator.sh/netQoSClass": netqos, + }, + }, + Status: corev1.PodStatus{ + PodIP: ip, + }, + }, + } +} + +func TestTCPlugin_Callback(t *testing.T) { + pod1 := genPod("pod1", "high_class", "192.168.0.1") + pod2 := genPod("pod2", "high_class", "192.168.0.2") + pod3 := genPod("pod3", "mid_class", "192.168.0.3") + pod4 := genPod("pod4", "low_class", "192.168.0.4") + pod5 := genPod("pod5", "", "192.168.0.5") + pod6 := genPod("pod6", "low_class", "192.168.0.6") + + plugin := newTestTCPlugin() + if err := plugin.prepare(); err != nil { + klog.Errorf("failed to init some necessary info tc plugin.") + return + } + defer plugin.CleanUp() + + type args struct { + targets *statesinformer.CallbackTarget + } + + tests := []struct { + name string + args args + wantFields *int64 + ipsetExpected map[string][]string + }{ + { + name: "", + args: args{ + targets: &statesinformer.CallbackTarget{ + Pods: []*statesinformer.PodMeta{ + pod1, pod2, pod3, pod4, pod5, + }, + }, + }, + wantFields: nil, + ipsetExpected: map[string][]string{ + "high_class": {"192.168.0.1", "192.168.0.2"}, + "mid_class": {"192.168.0.3"}, + "low_class": {"192.168.0.4"}, + }, + }, + { + name: "", + args: args{ + targets: &statesinformer.CallbackTarget{ + Pods: []*statesinformer.PodMeta{ + pod2, pod3, pod4, pod5, pod6, + }, + }, + }, + wantFields: nil, + ipsetExpected: map[string][]string{ + "high_class": {"192.168.0.2"}, + "mid_class": {"192.168.0.3"}, + "low_class": {"192.168.0.4", "192.168.0.6"}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + if err := plugin.ruleUpdateCb(tt.args.targets); err != nil { + klog.Errorf("failed to process ruleUpdateCb, err=%v", err) + return + } + if _, err := plugin.checkAllRulesExisted(); err != nil { + t.Errorf("some necessary rules not existed. err=%v", err) + return + } + if !checkIpsetIsRight(tt.ipsetExpected) { + t.Errorf("ipset rules not the same as expected") + return + } + }) + } +} + +func checkIpsetIsRight(rules map[string][]string) bool { + for setName, ips := range rules { + for _, ip := range ips { + if !ipsetEntryExisted(setName, ip) { + fmt.Printf("%s:%s ipset rules not the same as expected\n", setName, ip) + return false + } + } + } + + return true +} + +func ipsetEntryExisted(setName, ip string) bool { + result, err := netlink.IpsetList(setName) + if err != nil || result == nil { + return false + } + + for _, entry := range result.Entries { + if entry.IP.String() == ip { + return true + } + } + + return false +} diff --git a/pkg/koordlet/runtimehooks/rule/rule.go b/pkg/koordlet/runtimehooks/rule/rule.go index de680010d..b7794c896 100644 --- a/pkg/koordlet/runtimehooks/rule/rule.go +++ b/pkg/koordlet/runtimehooks/rule/rule.go @@ -65,7 +65,7 @@ func (r *Rule) runUpdateCallbacks(target *statesinformer.CallbackTarget) { for _, callbackFn := range r.callbacks { if err := callbackFn(target); err != nil { cbName := runtime.FuncForPC(reflect.ValueOf(callbackFn).Pointer()).Name() - klog.Warningf("executing %s callback function %s failed, error %v", r.name, cbName) + klog.Warningf("executing %s callback function %s failed, error %v", r.name, cbName, err) } } } diff --git a/pkg/koordlet/util/system/common_linux.go b/pkg/koordlet/util/system/common_linux.go index 8272d4eba..6e18b085c 100644 --- a/pkg/koordlet/util/system/common_linux.go +++ b/pkg/koordlet/util/system/common_linux.go @@ -23,6 +23,8 @@ import ( "bytes" "fmt" "io" + "io/ioutil" + "net" "os" "os/exec" "path" @@ -36,6 +38,7 @@ import ( "unicode" "github.com/cakturk/go-netstat/netstat" + "github.com/vishvananda/netlink" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/klog/v2" ) @@ -222,3 +225,31 @@ func WorkingDirOf(pid int) (string, error) { return strings.TrimSpace(tokens[1]), nil } } + +func GetLinkInfoByDefaultRoute() (netlink.Link, error) { + routes, err := netlink.RouteListFiltered(netlink.FAMILY_V4, &netlink.Route{}, netlink.RT_FILTER_DST) + if err != nil { + return nil, err + } + if len(routes) == 0 { + return nil, fmt.Errorf("not find route info by dst ip=%s", net.IPv4zero.String()) + } + + linkInfo, err := netlink.LinkByIndex(routes[0].LinkIndex) + if err != nil { + return nil, err + } + + return linkInfo, nil +} + +// GetSpeed get speed of ifName from /sys/class/net/$ifName/speed +func GetSpeed(ifName string) (int, error) { + file := path.Join("/sys/class/net/", ifName, "/speed") + speedStr, err := ioutil.ReadFile(file) + if err != nil { + return 0, err + } + value := strings.Replace(string(speedStr), "\n", "", -1) + return strconv.Atoi(value) +}