Skip to content

Commit

Permalink
koordlet: add net qos plugin
Browse files Browse the repository at this point in the history
Signed-off-by: l1b0k <libokang.dev@gmail.com>
  • Loading branch information
l1b0k committed Jan 29, 2024
1 parent 7c7a844 commit 9fc6aac
Show file tree
Hide file tree
Showing 6 changed files with 444 additions and 0 deletions.
5 changes: 5 additions & 0 deletions apis/extension/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
// LabelPodMutatingUpdate is a label key that pods with `pod.koordinator.sh/mutating-update=true` will
// be mutated by Koordinator webhook when updating.
LabelPodMutatingUpdate = PodDomainPrefix + "/mutating-update"

// AnnotationIngressBandwidth and AnnotationEgressBandwidth are used to set bandwidth for Pod. The unit is bps.
// For example, 10M means 10 megabits per second.
AnnotationIngressBandwidth = DomainPrefix + "ingress-bandwidth"
AnnotationEgressBandwidth = DomainPrefix + "egress-bandwidth"
)

type AggregationType string
Expand Down
8 changes: 8 additions & 0 deletions pkg/koordlet/runtimehooks/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/cpuset"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/groupidentity"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/terwayqos"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util/system"
)

Expand Down Expand Up @@ -73,6 +74,11 @@ const (
// owner: @saintube @zwzhang0107
// alpha: v1.4
CoreSched featuregate.Feature = "CoreSched"

// TerwayQoS enables net QoS feature of koordlet.
// owner: @l1b0k
// alpha: v1.5
TerwayQoS featuregate.Feature = "TerwayQoS"
)

var (
Expand All @@ -83,6 +89,7 @@ var (
BatchResource: {Default: true, PreRelease: featuregate.Beta},
CPUNormalization: {Default: false, PreRelease: featuregate.Alpha},
CoreSched: {Default: false, PreRelease: featuregate.Alpha},
TerwayQoS: {Default: false, PreRelease: featuregate.Alpha},
}

runtimeHookPlugins = map[featuregate.Feature]HookPlugin{
Expand All @@ -92,6 +99,7 @@ var (
BatchResource: batchresource.Object(),
CPUNormalization: cpunormalization.Object(),
CoreSched: coresched.Object(),
TerwayQoS: terwayqos.Object(),
}
)

Expand Down
289 changes: 289 additions & 0 deletions pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package terwayqos

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"

"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/klog/v2"

"github.com/koordinator-sh/koordinator/apis/extension"
slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/rule"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
"github.com/koordinator-sh/koordinator/pkg/koordlet/util"
)

const (
rootPath = "/var/lib/terway/qos"
podConfig = "pod.json"
nodeConfig = "node.json"
)

const (
name = "TerwayQoS"
description = "network qos management"

ruleNameForNodeQoS = name + " (nodeQoS)"
ruleNameForAllPods = name + " (allPods)"
)

type Plugin struct {
executor resourceexecutor.ResourceUpdateExecutor

lock sync.RWMutex
node *Node
}

func (p *Plugin) Register(op hooks.Options) {
klog.V(5).Infof("register hook %v", "terwqy qos configure generator")
rule.Register(ruleNameForNodeQoS, description,
rule.WithParseFunc(statesinformer.RegisterTypeNodeSLOSpec, p.parseRuleForNodeSLO),
rule.WithUpdateCallback(p.update))
rule.Register(ruleNameForAllPods, description,
rule.WithParseFunc(statesinformer.RegisterTypeAllPods, p.parseForAllPods),
rule.WithUpdateCallback(p.update))

p.executor = op.Executor
}

func (p *Plugin) parseRuleForNodeSLO(mergedNodeSLOIf interface{}) (bool, error) {
mergedNodeSLO := mergedNodeSLOIf.(*slov1alpha1.NodeSLOSpec)

n := &Node{}
err := parseNetQoS(mergedNodeSLO, n)
if err != nil {
return false, err
}

p.lock.Lock()
p.node = n
p.lock.Unlock()
return true, nil
}

func (p *Plugin) parseForAllPods(e interface{}) (bool, error) {
_, ok := e.(*struct{})
if !ok {
return false, fmt.Errorf("invalid rule type %T", e)
}

return true, nil
}

func (p *Plugin) update(target *statesinformer.CallbackTarget) error {
if target == nil {
return fmt.Errorf("callback target is nil")
}

err := os.MkdirAll(rootPath, os.ModeDir)
if err != nil {
return err
}

pods := make([]Pod, 0, len(target.Pods))
for _, meta := range target.Pods {
if meta.Pod == nil {
continue
}

v4, v6 := util.GetIPs(meta.Pod)

ing, egress, err := getPodQoS(meta.Pod.Annotations)
if err != nil {
klog.Errorf("get pod qos failed, err: %v", err)
continue
}
pods = append(pods, Pod{
PodName: meta.Pod.Name,
PodNamespace: meta.Pod.Namespace,
PodUID: string(meta.Pod.UID),
Prio: prioMapping[meta.Pod.Labels[extension.LabelPodQoS]],
IPv4: v4,
IPv6: v6,
HostNetwork: meta.Pod.Spec.HostNetwork,
CgroupDir: meta.CgroupDir,
QoSConfig: QoSConfig{
IngressBandwidth: ing,
EgressBandwidth: egress,
},
})
}
outPods, err := json.Marshal(pods)
if err != nil {
return err
}

err = os.WriteFile(filepath.Join(rootPath, podConfig), outPods, 0644)
if err != nil {
return err
}

p.lock.RLock()
defer p.lock.RUnlock()
outNode, err := json.Marshal(p.node)
if err != nil {
return err
}

err = os.WriteFile(filepath.Join(rootPath, nodeConfig), outNode, 0644)
return err
}

// parseNetQoS only LS and BE is taken into account
func parseNetQoS(slo *slov1alpha1.NodeSLOSpec, node *Node) error {
if slo.SystemStrategy == nil {
return nil
}
if slo.SystemStrategy.TotalNetworkBandwidth.Value() < 0 {
return fmt.Errorf("invalid total network bandwidth %d", slo.SystemStrategy.TotalNetworkBandwidth.Value())
}
total := uint64(slo.SystemStrategy.TotalNetworkBandwidth.Value())
// to Byte/s
total = total * 1000 * 1000 / 8

node.HwTxBpsMax = total
node.HwRxBpsMax = total

qos := slo.ResourceQOSStrategy
if qos == nil {
return nil
}

if qos.LSClass != nil && qos.LSClass.NetworkQOS != nil && qos.LSClass.NetworkQOS.Enable != nil && *qos.LSClass.NetworkQOS.Enable {
q, err := parseQoS(qos.LSClass.NetworkQOS, total)
if err != nil {
return err
}
node.L1RxBpsMin = q.IngressRequestBps
node.L1RxBpsMax = q.IngressLimitBps
node.L1TxBpsMin = q.EgressRequestBps
node.L1TxBpsMax = q.EgressLimitBps
}

if qos.BEClass != nil && qos.BEClass.NetworkQOS != nil && qos.BEClass.NetworkQOS.Enable != nil && *qos.BEClass.NetworkQOS.Enable {
q, err := parseQoS(qos.BEClass.NetworkQOS, total)
if err != nil {
return err
}
node.L2RxBpsMin = q.IngressRequestBps
node.L2RxBpsMax = q.IngressLimitBps
node.L2TxBpsMin = q.EgressRequestBps
node.L2TxBpsMax = q.EgressLimitBps
}

return nil
}

func parseQoS(qos *slov1alpha1.NetworkQOSCfg, total uint64) (QoS, error) {
q := QoS{
IngressRequestBps: 0,
IngressLimitBps: 0,
EgressRequestBps: 0,
EgressLimitBps: 0,
}
if qos.Enable == nil {
return q, nil
}
if !*qos.Enable {
return q, nil
}

var err error
q.IngressRequestBps, err = parseQuantity(qos.IngressRequest, total)
if err != nil {
return QoS{}, err
}

q.IngressLimitBps, err = parseQuantity(qos.IngressLimit, total)
if err != nil {
return QoS{}, err
}

q.EgressRequestBps, err = parseQuantity(qos.EgressRequest, total)
if err != nil {
return QoS{}, err
}
q.EgressLimitBps, err = parseQuantity(qos.EgressLimit, total)
if err != nil {
return QoS{}, err
}

return q, nil
}

func parseQuantity(v *intstr.IntOrString, total uint64) (uint64, error) {
if v == nil {
return 0, nil
}
if v.Type == intstr.String {
val, err := resource.ParseQuantity(v.String())
if err != nil {
return 0, err
}
r := uint64(val.Value())
if r > total {
return 0, fmt.Errorf("quantity %s is larger than total %d", v.String(), total)
}

return r, nil
} else {
return uint64(v.IntValue()) * total / 100, nil
}
}

func getPodQoS(anno map[string]string) (uint64, uint64, error) {
var ingress, egress uint64

if anno[extension.AnnotationIngressBandwidth] != "" {
ing, err := resource.ParseQuantity(anno[extension.AnnotationIngressBandwidth])
if err != nil {
return 0, 0, err
}
ingress = uint64(ing.Value()) / 8
}
if anno[extension.AnnotationEgressBandwidth] != "" {
eg, err := resource.ParseQuantity(anno[extension.AnnotationEgressBandwidth])
if err != nil {
return 0, 0, err
}
egress = uint64(eg.Value()) / 8
}
return ingress, egress, nil
}

func newPlugin() *Plugin {
return &Plugin{}
}

var singleton *Plugin
var once sync.Once

func Object() *Plugin {
once.Do(func() {
singleton = newPlugin()
})
return singleton
}
51 changes: 51 additions & 0 deletions pkg/koordlet/runtimehooks/hooks/terwayqos/terwayqos_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package terwayqos

import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/util/intstr"
)

func TestParseQuantityWithNilValue(t *testing.T) {
result, err := parseQuantity(nil, 100)
assert.NoError(t, err)
assert.Equal(t, uint64(0), result)
}

func TestParseQuantityWithStringTypeAndValidValue(t *testing.T) {
val := intstr.FromString("50")
result, err := parseQuantity(&val, 100)
assert.NoError(t, err)
assert.Equal(t, uint64(50), result)
}

func TestParseQuantityWithStringTypeAndValueExceedingTotal(t *testing.T) {
val := intstr.FromString("200")
result, err := parseQuantity(&val, 100)
assert.Error(t, err)
assert.Equal(t, uint64(0), result)
}

func TestParseQuantityWithIntType(t *testing.T) {
val := intstr.FromInt(50)
result, err := parseQuantity(&val, 100)
assert.NoError(t, err)
assert.Equal(t, uint64(50), result)
}
Loading

0 comments on commit 9fc6aac

Please sign in to comment.