Skip to content

Commit

Permalink
koordlet: add net qos plugin
Browse files Browse the repository at this point in the history
Signed-off-by: l1b0k <libokang.dev@gmail.com>
  • Loading branch information
l1b0k committed Jan 29, 2024
1 parent 7c7a844 commit ab10daa
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 0 deletions.
5 changes: 5 additions & 0 deletions apis/extension/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
// LabelPodMutatingUpdate is a label key that pods with `pod.koordinator.sh/mutating-update=true` will
// be mutated by Koordinator webhook when updating.
LabelPodMutatingUpdate = PodDomainPrefix + "/mutating-update"

// AnnotationIngressBandwidth and AnnotationEgressBandwidth are used to set bandwidth for Pod. The unit is bps.
// For example, 10M means 10 megabits per second.
AnnotationIngressBandwidth = DomainPrefix + "ingress-bandwidth"
AnnotationEgressBandwidth = DomainPrefix + "egress-bandwidth"
)

type AggregationType string
Expand Down
8 changes: 8 additions & 0 deletions pkg/koordlet/runtimehooks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/cpuset"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/groupidentity"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/terwayqos"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
)

Expand Down Expand Up @@ -73,6 +74,11 @@ const (
// owner: @saintube @zwzhang0107
// alpha: v1.4
CoreSched featuregate.Feature = "CoreSched"

// TerwayQoS enables net QoS feature of koordlet.
// owner: @l1b0k
// alpha: v1.5
TerwayQoS featuregate.Feature = "TerwayQoS"
)

var (
Expand All @@ -83,6 +89,7 @@ var (
BatchResource: {Default: true, PreRelease: featuregate.Beta},
CPUNormalization: {Default: false, PreRelease: featuregate.Alpha},
CoreSched: {Default: false, PreRelease: featuregate.Alpha},
TerwayQoS: {Default: false, PreRelease: featuregate.Alpha},
}

runtimeHookPlugins = map[featuregate.Feature]HookPlugin{
Expand All @@ -92,6 +99,7 @@ var (
BatchResource: batchresource.Object(),
CPUNormalization: cpunormalization.Object(),
CoreSched: coresched.Object(),
TerwayQoS: terwayqos.Object(),
}
)

Expand Down
273 changes: 273 additions & 0 deletions pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
package terwayqos

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"

"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/apis/extension"
slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/rule"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util"
)

const (
rootPath = "/var/lib/terway/qos"
podConfig = "pod.json"
nodeConfig = "node.json"
)

const (
name = "TerwayQoS"
description = "network qos management"

ruleNameForNodeQoS = name + " (nodeQoS)"
ruleNameForAllPods = name + " (allPods)"
)

type Plugin struct {
executor resourceexecutor.ResourceUpdateExecutor

lock sync.RWMutex
node *Node
}

func (p *Plugin) Register(op hooks.Options) {
klog.V(5).Infof("register hook %v", "terwqy qos configure generator")
rule.Register(ruleNameForNodeQoS, description,
rule.WithParseFunc(statesinformer.RegisterTypeNodeSLOSpec, p.parseRuleForNodeSLO),
rule.WithUpdateCallback(p.update))
rule.Register(ruleNameForAllPods, description,
rule.WithParseFunc(statesinformer.RegisterTypeAllPods, p.parseForAllPods),
rule.WithUpdateCallback(p.update))

p.executor = op.Executor
}

func (p *Plugin) parseRuleForNodeSLO(mergedNodeSLOIf interface{}) (bool, error) {
mergedNodeSLO := mergedNodeSLOIf.(*slov1alpha1.NodeSLOSpec)

n := &Node{}
err := parseNetQoS(mergedNodeSLO, n)
if err != nil {
return false, err
}

p.lock.Lock()
p.node = n
p.lock.Unlock()
return true, nil
}

func (p *Plugin) parseForAllPods(e interface{}) (bool, error) {
_, ok := e.(*struct{})
if !ok {
return false, fmt.Errorf("invalid rule type %T", e)
}

return true, nil
}

func (p *Plugin) update(target *statesinformer.CallbackTarget) error {
if target == nil {
return fmt.Errorf("callback target is nil")
}

err := os.MkdirAll(rootPath, os.ModeDir)
if err != nil {
return err
}

pods := make([]Pod, 0, len(target.Pods))
for _, meta := range target.Pods {
if meta.Pod == nil {
continue
}

v4, v6 := util.GetIPs(meta.Pod)

ing, egress, err := getPodQoS(meta.Pod.Annotations)
if err != nil {
klog.Errorf("get pod qos failed, err: %v", err)
continue
}
pods = append(pods, Pod{
PodName: meta.Pod.Name,
PodNamespace: meta.Pod.Namespace,
PodUID: string(meta.Pod.UID),
Prio: prioMapping[meta.Pod.Labels[extension.LabelPodQoS]],
IPv4: v4,
IPv6: v6,
HostNetwork: meta.Pod.Spec.HostNetwork,
CgroupDir: meta.CgroupDir,
QoSConfig: QoSConfig{
IngressBandwidth: ing,
EgressBandwidth: egress,
},
})
}
outPods, err := json.Marshal(pods)
if err != nil {
return err
}

err = os.WriteFile(filepath.Join(rootPath, podConfig), outPods, 0644)
if err != nil {
return err
}

p.lock.RLock()
defer p.lock.RUnlock()
outNode, err := json.Marshal(p.node)
if err != nil {
return err
}

err = os.WriteFile(filepath.Join(rootPath, nodeConfig), outNode, 0644)
return err
}

// parseNetQoS only LS and BE is taken into account
func parseNetQoS(slo *slov1alpha1.NodeSLOSpec, node *Node) error {
if slo.SystemStrategy == nil {
return nil
}
if slo.SystemStrategy.TotalNetworkBandwidth.Value() < 0 {
return fmt.Errorf("invalid total network bandwidth %d", slo.SystemStrategy.TotalNetworkBandwidth.Value())
}
total := uint64(slo.SystemStrategy.TotalNetworkBandwidth.Value())
// to Byte/s
total = total * 1000 * 1000 / 8

node.HwTxBpsMax = total
node.HwRxBpsMax = total

qos := slo.ResourceQOSStrategy
if qos == nil {
return nil
}

if qos.LSClass != nil && qos.LSClass.NetworkQOS != nil && qos.LSClass.NetworkQOS.Enable != nil && *qos.LSClass.NetworkQOS.Enable {
q, err := parseQoS(qos.LSClass.NetworkQOS, total)
if err != nil {
return err
}
node.L1RxBpsMin = q.IngressRequestBps
node.L1RxBpsMax = q.IngressLimitBps
node.L1TxBpsMin = q.EgressRequestBps
node.L1TxBpsMax = q.EgressLimitBps
}

if qos.BEClass != nil && qos.BEClass.NetworkQOS != nil && qos.BEClass.NetworkQOS.Enable != nil && *qos.BEClass.NetworkQOS.Enable {
q, err := parseQoS(qos.BEClass.NetworkQOS, total)
if err != nil {
return err
}
node.L2RxBpsMin = q.IngressRequestBps
node.L2RxBpsMax = q.IngressLimitBps
node.L2TxBpsMin = q.EgressRequestBps
node.L2TxBpsMax = q.EgressLimitBps
}

return nil
}

func parseQoS(qos *slov1alpha1.NetworkQOSCfg, total uint64) (QoS, error) {
q := QoS{
IngressRequestBps: 0,
IngressLimitBps: 0,
EgressRequestBps: 0,
EgressLimitBps: 0,
}
if qos.Enable == nil {
return q, nil
}
if !*qos.Enable {
return q, nil
}

var err error
q.IngressRequestBps, err = parseQuantity(qos.IngressRequest, total)
if err != nil {
return QoS{}, err
}

q.IngressLimitBps, err = parseQuantity(qos.IngressLimit, total)
if err != nil {
return QoS{}, err
}

q.EgressRequestBps, err = parseQuantity(qos.EgressRequest, total)
if err != nil {
return QoS{}, err
}
q.EgressLimitBps, err = parseQuantity(qos.EgressLimit, total)
if err != nil {
return QoS{}, err
}

return q, nil
}

func parseQuantity(v *intstr.IntOrString, total uint64) (uint64, error) {
if v == nil {
return 0, nil
}
if v.Type == intstr.String {
val, err := resource.ParseQuantity(v.String())
if err != nil {
return 0, err
}
r := uint64(val.Value())
if r > total {
return 0, fmt.Errorf("quantity %s is larger than total %d", v.String(), total)
}

return r, nil
} else {
return uint64(v.IntValue()) * total / 100, nil
}
}

func getPodQoS(anno map[string]string) (uint64, uint64, error) {
var ingress, egress uint64

if anno[extension.AnnotationIngressBandwidth] != "" {
ing, err := resource.ParseQuantity(anno[extension.AnnotationIngressBandwidth])
if err != nil {
return 0, 0, err
}
ingress = uint64(ing.Value()) / 8
}
if anno[extension.AnnotationEgressBandwidth] != "" {
eg, err := resource.ParseQuantity(anno[extension.AnnotationEgressBandwidth])
if err != nil {
return 0, 0, err
}
egress = uint64(eg.Value()) / 8
}
return ingress, egress, nil
}

func newPlugin() *Plugin {
return &Plugin{}
}

var singleton *Plugin
var once sync.Once

func Object() *Plugin {
once.Do(func() {
singleton = newPlugin()
})
return singleton
}
35 changes: 35 additions & 0 deletions pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package terwayqos

import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/intstr"
)

func TestParseQuantityWithNilValue(t *testing.T) {
result, err := parseQuantity(nil, 100)
assert.NoError(t, err)
assert.Equal(t, uint64(0), result)
}

func TestParseQuantityWithStringTypeAndValidValue(t *testing.T) {
val := intstr.FromString("50")
result, err := parseQuantity(&val, 100)
assert.NoError(t, err)
assert.Equal(t, uint64(50), result)
}

func TestParseQuantityWithStringTypeAndValueExceedingTotal(t *testing.T) {
val := intstr.FromString("200")
result, err := parseQuantity(&val, 100)
assert.Error(t, err)
assert.Equal(t, uint64(0), result)
}

func TestParseQuantityWithIntType(t *testing.T) {
val := intstr.FromInt(50)
result, err := parseQuantity(&val, 100)
assert.NoError(t, err)
assert.Equal(t, uint64(50), result)
}
Loading

0 comments on commit ab10daa

Please sign in to comment.