Skip to content

Commit

Permalink
koordlet: add net qos plugin
Browse files Browse the repository at this point in the history
Signed-off-by: l1b0k <libokang.lbk@alibaba-inc.com>
  • Loading branch information
l1b0k committed Jan 16, 2024
1 parent 6c737c6 commit 1141e90
Show file tree
Hide file tree
Showing 7 changed files with 536 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/koordlet/qosmanager/framework/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
MemoryEvictCoolTimeSeconds int
CPUEvictCoolTimeSeconds int
QOSExtensionCfg *QOSExtensionConfig
NetQOSProvider string
}

func NewDefaultConfig() *Config {
Expand All @@ -39,6 +40,7 @@ func NewDefaultConfig() *Config {
MemoryEvictCoolTimeSeconds: 4,
CPUEvictCoolTimeSeconds: 20,
QOSExtensionCfg: &QOSExtensionConfig{FeatureGates: map[string]bool{}},
NetQOSProvider: "koordlet",
}
}

Expand All @@ -49,5 +51,7 @@ func (c *Config) InitFlags(fs *flag.FlagSet) {
fs.IntVar(&c.MemoryEvictIntervalSeconds, "memory-evict-interval-seconds", c.MemoryEvictIntervalSeconds, "evict be pod(memory) interval by seconds")
fs.IntVar(&c.MemoryEvictCoolTimeSeconds, "memory-evict-cool-time-seconds", c.MemoryEvictCoolTimeSeconds, "cooling time: memory next evict time should after lastEvictTime + MemoryEvictCoolTimeSeconds")
fs.IntVar(&c.CPUEvictCoolTimeSeconds, "cpu-evict-cool-time-seconds", c.CPUEvictCoolTimeSeconds, "cooltime: CPU next evict time should after lastEvictTime + CPUEvictCoolTimeSeconds")
fs.StringVar(&c.NetQOSProvider, "net-qos-provider", c.NetQOSProvider, "net qos provider, support: koordlet,terway default: koordlet")

c.QOSExtensionCfg.InitFlags(fs)
}
76 changes: 76 additions & 0 deletions pkg/koordlet/qosmanager/plugins/netqos/koordlet/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package koordlet

import (
"encoding/json"
"net/netip"
"os"
"path/filepath"

"github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/plugins/netqos/types"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util"
)

const rootConfigPath = "/etc/koordlet/net-qos"

type Pod struct {
PodName string `json:"podName"`
PodNamespace string `json:"podNamespace"`
PodUID string `json:"podUID"`
QoSClass string `json:"qosClass"`

IPv4 netip.Addr `json:"ipv4"`
IPv6 netip.Addr `json:"ipv6"`

HostNetwork bool `json:"hostNetwork"`

CgroupDir string `json:"cgroupDir"`
}

type Koordlet struct {
}

func (t *Koordlet) Sync(node *types.Node, podsMeta []*statesinformer.PodMeta) error {
// write the node config
out, err := json.Marshal(node)
if err != nil {
return err
}

err = os.MkdirAll(rootConfigPath, os.ModeDir)
if err != nil {
return err
}

err = os.WriteFile(filepath.Join(rootConfigPath, "node.json"), out, 0644)
if err != nil {
return err
}

pods := make([]Pod, 0, len(podsMeta))
for _, meta := range podsMeta {
if meta.Pod == nil {
continue
}

v4, v6 := util.GetIPs(meta.Pod)

pods = append(pods, Pod{
PodName: meta.Pod.Name,
PodNamespace: meta.Pod.Namespace,
PodUID: string(meta.Pod.UID),
QoSClass: meta.Pod.Labels[extension.LabelPodQoS],
IPv4: v4,
IPv6: v6,
HostNetwork: meta.Pod.Spec.HostNetwork,
CgroupDir: meta.CgroupDir,
})
}
outPods, err := json.Marshal(pods)
if err != nil {
return err
}

return os.WriteFile(filepath.Join(rootConfigPath, "pods.json"), outPods, 0644)
}
207 changes: 207 additions & 0 deletions pkg/koordlet/qosmanager/plugins/netqos/net_qos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package netqos

import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/features"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/framework"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/plugins/netqos/koordlet"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/plugins/netqos/terway"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/plugins/netqos/types"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
)

const (
NetQoSName = "NetQoS"
)

var providers = map[string]Interface{
"terway": &terway.Terway{},
"koordlet": &koordlet.Koordlet{},
}

type Interface interface {
Sync(node *types.Node, podsMeta []*statesinformer.PodMeta) error
}

type nodeConfig struct {
provider Interface

reconcileInterval time.Duration
statesInformer statesinformer.StatesInformer
}

func (n *nodeConfig) Enabled() bool {
return features.DefaultKoordletFeatureGate.Enabled(features.NetQoS) && n.reconcileInterval > 0
}

func (n *nodeConfig) Setup(context *framework.Context) {}

func (n *nodeConfig) Run(stopCh <-chan struct{}) {
go wait.Until(n.reconcile, n.reconcileInterval, stopCh)
}

var _ framework.QOSStrategy = &nodeConfig{}

func New(opt *framework.Options) framework.QOSStrategy {
klog.Infof("enable net qos plugin %s", opt.Config.NetQOSProvider)
return &nodeConfig{
statesInformer: opt.StatesInformer,
reconcileInterval: time.Duration(opt.Config.ReconcileIntervalSeconds) * time.Second,
provider: providers[opt.Config.NetQOSProvider],
}
}

func (n *nodeConfig) reconcile() {
// lookup the node config
slo := n.statesInformer.GetNodeSLO()
if slo == nil {
return
}

err := n.sync(slo)
if err != nil {
klog.Error(err, "error sync netqos")
}
}

func (n *nodeConfig) sync(slo *v1alpha1.NodeSLO) error {
node := &types.Node{
Leveled: make(map[string]types.QoS),
}

err := parseNodeSLO(slo, node)
if err != nil {
return err
}

err = parseNetQoS(slo, node)
if err != nil {
return err
}

podsMeta := n.statesInformer.GetAllPods()

return n.provider.Sync(node, podsMeta)
}

func parseNodeSLO(slo *v1alpha1.NodeSLO, node *types.Node) error {
if slo.Spec.SystemStrategy == nil {
return nil
}
node.TotalNetworkBandwidth = uint64(slo.Spec.SystemStrategy.TotalNetworkBandwidth.Value())
return nil
}

func parseNetQoS(slo *v1alpha1.NodeSLO, node *types.Node) error {
qos := slo.Spec.ResourceQOSStrategy
if qos == nil {
return nil
}

if qos.LSRClass != nil && qos.LSRClass.NetworkQOS != nil {
q, err := parseQoS(qos.LSRClass.NetworkQOS, node.TotalNetworkBandwidth)
if err != nil {
return err
}
node.Leveled[string(extension.QoSLSR)] = q
}
if qos.LSClass != nil && qos.LSClass.NetworkQOS != nil {
q, err := parseQoS(qos.LSClass.NetworkQOS, node.TotalNetworkBandwidth)
if err != nil {
return err
}
node.Leveled[string(extension.QoSLS)] = q
}

if qos.BEClass != nil && qos.BEClass.NetworkQOS != nil {
q, err := parseQoS(qos.BEClass.NetworkQOS, node.TotalNetworkBandwidth)
if err != nil {
return err
}
node.Leveled[string(extension.QoSBE)] = q
}

if qos.SystemClass != nil && qos.SystemClass.NetworkQOS != nil {
q, err := parseQoS(qos.SystemClass.NetworkQOS, node.TotalNetworkBandwidth)
if err != nil {
return err
}
node.Leveled[string(extension.QoSSystem)] = q
}

if qos.CgroupRoot != nil && qos.CgroupRoot.NetworkQOS != nil {
q, err := parseQoS(qos.CgroupRoot.NetworkQOS, node.TotalNetworkBandwidth)
if err != nil {
return err
}
node.Leveled[string(extension.QoSNone)] = q
}

return nil
}

func parseQoS(qos *v1alpha1.NetworkQOSCfg, total uint64) (types.QoS, error) {
q := types.QoS{
IngressRequestBps: 0,
IngressLimitBps: 0,
EgressRequestBps: 0,
EgressLimitBps: 0,
}
if qos.Enable == nil {
return q, nil
}
if !*qos.Enable {
return q, nil
}

var err error
q.IngressRequestBps, err = parseQuantity(qos.IngressRequest, total)
if err != nil {
return types.QoS{}, err
}

q.IngressLimitBps, err = parseQuantity(qos.IngressLimit, total)
if err != nil {
return types.QoS{}, err
}

q.EgressRequestBps, err = parseQuantity(qos.EgressRequest, total)
if err != nil {
return types.QoS{}, err
}
q.EgressLimitBps, err = parseQuantity(qos.EgressLimit, total)
if err != nil {
return types.QoS{}, err
}

return q, nil
}

func parseQuantity(v *intstr.IntOrString, total uint64) (uint64, error) {
if v == nil {
return 0, nil
}
if v.Type == intstr.String {
val, err := resource.ParseQuantity(v.String())
if err != nil {
return 0, err
}
r := uint64(val.Value())
if r > total {
return 0, fmt.Errorf("quantity %s is larger than total %d", v.String(), total)
}

return r, nil
} else {
return uint64(v.IntValue()) * total / 100, nil
}
}
78 changes: 78 additions & 0 deletions pkg/koordlet/qosmanager/plugins/netqos/net_qos_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package netqos

import (
"testing"

"k8s.io/apimachinery/pkg/util/intstr"
)

func Test_parseQuantity(t *testing.T) {
type args struct {
v *intstr.IntOrString
total uint64
}
tests := []struct {
name string
args args
want uint64
wantErr bool
}{
{
name: "null",
args: args{
v: nil,
total: 0,
},
want: 0,
wantErr: false,
},
{
name: "percentage",
args: args{
v: func() *intstr.IntOrString {
v := intstr.FromInt(10)
return &v
}(),
total: 100,
},
want: 10,
wantErr: false,
},
{
name: "quantity",
args: args{
v: func() *intstr.IntOrString {
v := intstr.FromString("100M")
return &v
}(),
total: 100000000,
},
want: 100000000,
wantErr: false,
},
{
name: "too large",
args: args{
v: func() *intstr.IntOrString {
v := intstr.FromString("900M")
return &v
}(),
total: 100000000,
},
want: 0,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := parseQuantity(tt.args.v, tt.args.total)
if (err != nil) != tt.wantErr {
t.Errorf("parseQuantity() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("parseQuantity() got = %v, want %v", got, tt.want)
}
})
}
}
Loading

0 comments on commit 1141e90

Please sign in to comment.