From d6de2aac19863f602b83b3e6332897d22273b917 Mon Sep 17 00:00:00 2001 From: zwzhang <101699117+zwzhang0107@users.noreply.github.com> Date: Mon, 23 May 2022 10:10:43 +0800 Subject: [PATCH] add cpu qos and mv nodeslo informer to states informer (#153) Signed-off-by: zwzhang0107 --- apis/slo/v1alpha1/nodeslo_types.go | 14 + apis/slo/v1alpha1/zz_generated.deepcopy.go | 46 ++ .../bases/slo.koordinator.sh_nodeslos.yaml | 60 +++ pkg/koordlet/koordlet.go | 4 +- pkg/koordlet/resmanager/cpu_burst_test.go | 2 +- pkg/koordlet/resmanager/cpu_suppress_test.go | 2 +- pkg/koordlet/resmanager/memory_evict_test.go | 9 +- .../resmanager/resctrl_reconcile_test.go | 5 +- pkg/koordlet/resmanager/resmanager.go | 147 +----- pkg/koordlet/resmanager/resmanager_test.go | 214 --------- .../resource_update_executor_test.go | 20 +- pkg/koordlet/resmanager/strategy_util_test.go | 234 --------- pkg/koordlet/runtimehooks/config.go | 1 + pkg/koordlet/runtimehooks/rule/rule.go | 13 +- pkg/koordlet/runtimehooks/runtimehooks.go | 16 +- .../runtimehooks/runtimehooks_test.go | 11 +- pkg/koordlet/runtimehooks/server/service.go | 7 + .../statesinformer/mockstatesinformer/mock.go | 57 ++- pkg/koordlet/statesinformer/register.go | 62 +++ pkg/koordlet/statesinformer/register_test.go | 64 +++ .../statesinformer/states_informer.go | 203 +++----- .../statesinformer/states_informer_test.go | 76 ++- pkg/koordlet/statesinformer/states_node.go | 63 +++ .../states_nodeslo.go} | 108 ++++- .../statesinformer/states_nodeslo_test.go | 452 ++++++++++++++++++ pkg/koordlet/statesinformer/states_pod.go | 80 ++++ pkg/util/config.go | 43 ++ 27 files changed, 1213 insertions(+), 800 deletions(-) delete mode 100644 pkg/koordlet/resmanager/strategy_util_test.go create mode 100644 pkg/koordlet/statesinformer/register.go create mode 100644 pkg/koordlet/statesinformer/register_test.go create mode 100644 pkg/koordlet/statesinformer/states_node.go rename pkg/koordlet/{resmanager/strategy_util.go => statesinformer/states_nodeslo.go} (52%) create mode 100644 pkg/koordlet/statesinformer/states_nodeslo_test.go create mode 100644 pkg/koordlet/statesinformer/states_pod.go diff --git a/apis/slo/v1alpha1/nodeslo_types.go b/apis/slo/v1alpha1/nodeslo_types.go index 374620dd3e..d52645e0f0 100644 --- a/apis/slo/v1alpha1/nodeslo_types.go +++ b/apis/slo/v1alpha1/nodeslo_types.go @@ -23,6 +23,12 @@ import ( // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. +// CPUQOS enables cpu qos features. +type CPUQoS struct { + // group identity value for pods, default = 0 + GroupIdentity *int64 `json:"groupIdentity,omitempty"` +} + // MemoryQoS enables memory qos features. type MemoryQoS struct { // memcg qos @@ -103,6 +109,13 @@ type PodMemoryQoSConfig struct { MemoryQoS `json:",inline"` } +// CPUQoSCfg stores node-level config of cpu qos +type CPUQoSCfg struct { + // Enable indicates whether the cpu qos is enabled. + Enable *bool `json:"enable,omitempty"` + CPUQoS `json:",inline"` +} + // MemoryQoSCfg stores node-level config of memory qos type MemoryQoSCfg struct { // Enable indicates whether the memory qos is enabled (default: false). @@ -113,6 +126,7 @@ type MemoryQoSCfg struct { } type ResourceQoS struct { + CPUQoS *CPUQoSCfg `json:"cpuQoS,omitempty"` MemoryQoS *MemoryQoSCfg `json:"memoryQoS,omitempty"` ResctrlQoS *ResctrlQoSCfg `json:"resctrlQoS,omitempty"` } diff --git a/apis/slo/v1alpha1/zz_generated.deepcopy.go b/apis/slo/v1alpha1/zz_generated.deepcopy.go index 2cda401f29..68be5468fb 100644 --- a/apis/slo/v1alpha1/zz_generated.deepcopy.go +++ b/apis/slo/v1alpha1/zz_generated.deepcopy.go @@ -77,6 +77,47 @@ func (in *CPUBurstStrategy) DeepCopy() *CPUBurstStrategy { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CPUQoS) DeepCopyInto(out *CPUQoS) { + *out = *in + if in.GroupIdentity != nil { + in, out := &in.GroupIdentity, &out.GroupIdentity + *out = new(int64) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CPUQoS. +func (in *CPUQoS) DeepCopy() *CPUQoS { + if in == nil { + return nil + } + out := new(CPUQoS) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CPUQoSCfg) DeepCopyInto(out *CPUQoSCfg) { + *out = *in + if in.Enable != nil { + in, out := &in.Enable, &out.Enable + *out = new(bool) + **out = **in + } + in.CPUQoS.DeepCopyInto(&out.CPUQoS) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CPUQoSCfg. +func (in *CPUQoSCfg) DeepCopy() *CPUQoSCfg { + if in == nil { + return nil + } + out := new(CPUQoSCfg) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *MemoryQoS) DeepCopyInto(out *MemoryQoS) { *out = *in @@ -525,6 +566,11 @@ func (in *ResourceMap) DeepCopy() *ResourceMap { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ResourceQoS) DeepCopyInto(out *ResourceQoS) { *out = *in + if in.CPUQoS != nil { + in, out := &in.CPUQoS, &out.CPUQoS + *out = new(CPUQoSCfg) + (*in).DeepCopyInto(*out) + } if in.MemoryQoS != nil { in, out := &in.MemoryQoS, &out.MemoryQoS *out = new(MemoryQoSCfg) diff --git a/config/crd/bases/slo.koordinator.sh_nodeslos.yaml b/config/crd/bases/slo.koordinator.sh_nodeslos.yaml index d7b7bec2b6..e32dac50f7 100644 --- a/config/crd/bases/slo.koordinator.sh_nodeslos.yaml +++ b/config/crd/bases/slo.koordinator.sh_nodeslos.yaml @@ -73,6 +73,18 @@ spec: be: description: ResourceQoS for BE pods. properties: + cpuQoS: + description: CPUQoSCfg stores node-level config of cpu qos + properties: + enable: + description: Enable indicates whether the cpu qos is enabled. + type: boolean + groupIdentity: + description: group identity value for pods, default = + 0 + format: int64 + type: integer + type: object memoryQoS: description: MemoryQoSCfg stores node-level config of memory qos @@ -208,6 +220,18 @@ spec: cgroupRoot: description: ResourceQoS for root cgroup. properties: + cpuQoS: + description: CPUQoSCfg stores node-level config of cpu qos + properties: + enable: + description: Enable indicates whether the cpu qos is enabled. + type: boolean + groupIdentity: + description: group identity value for pods, default = + 0 + format: int64 + type: integer + type: object memoryQoS: description: MemoryQoSCfg stores node-level config of memory qos @@ -343,6 +367,18 @@ spec: ls: description: ResourceQoS for LS pods. properties: + cpuQoS: + description: CPUQoSCfg stores node-level config of cpu qos + properties: + enable: + description: Enable indicates whether the cpu qos is enabled. + type: boolean + groupIdentity: + description: group identity value for pods, default = + 0 + format: int64 + type: integer + type: object memoryQoS: description: MemoryQoSCfg stores node-level config of memory qos @@ -478,6 +514,18 @@ spec: lsr: description: ResourceQoS for LSR pods. properties: + cpuQoS: + description: CPUQoSCfg stores node-level config of cpu qos + properties: + enable: + description: Enable indicates whether the cpu qos is enabled. + type: boolean + groupIdentity: + description: group identity value for pods, default = + 0 + format: int64 + type: integer + type: object memoryQoS: description: MemoryQoSCfg stores node-level config of memory qos @@ -613,6 +661,18 @@ spec: system: description: ResourceQoS for system pods properties: + cpuQoS: + description: CPUQoSCfg stores node-level config of cpu qos + properties: + enable: + description: Enable indicates whether the cpu qos is enabled. + type: boolean + groupIdentity: + description: group identity value for pods, default = + 0 + format: int64 + type: integer + type: object memoryQoS: description: MemoryQoSCfg stores node-level config of memory qos diff --git a/pkg/koordlet/koordlet.go b/pkg/koordlet/koordlet.go index 2871bb99d6..3b1670fe4f 100644 --- a/pkg/koordlet/koordlet.go +++ b/pkg/koordlet/koordlet.go @@ -106,7 +106,7 @@ func NewDaemon(config *config.Configuration) (Daemon, error) { return nil, err } - statesInformer := statesinformer.NewStatesInformer(config.StatesInformerConf, kubeClient, pleg, nodeName) + statesInformer := statesinformer.NewStatesInformer(config.StatesInformerConf, kubeClient, crdClient, pleg, nodeName) metricCache, err := metriccache.NewMetricCache(config.MetricCacheConf) if err != nil { return nil, err @@ -117,7 +117,7 @@ func NewDaemon(config *config.Configuration) (Daemon, error) { resManagerService := resmanager.NewResManager(config.ResManagerConf, scheme, kubeClient, crdClient, nodeName, statesInformer, metricCache, int64(config.CollectorConf.CollectResUsedIntervalSeconds)) - runtimeHook, err := runtimehooks.NewRuntimeHook(config.RuntimeHookConf) + runtimeHook, err := runtimehooks.NewRuntimeHook(statesInformer, config.RuntimeHookConf) if err != nil { return nil, err } diff --git a/pkg/koordlet/resmanager/cpu_burst_test.go b/pkg/koordlet/resmanager/cpu_burst_test.go index 6e5fbe6f43..cf0572130a 100644 --- a/pkg/koordlet/resmanager/cpu_burst_test.go +++ b/pkg/koordlet/resmanager/cpu_burst_test.go @@ -1535,6 +1535,7 @@ func TestCPUBurst_start(t *testing.T) { ctl := gomock.NewController(t) mockStatesInformer := mock_statesinformer.NewMockStatesInformer(ctl) mockStatesInformer.EXPECT().GetAllPods().Return(getPodMetas(tt.fields.pods)).AnyTimes() + mockStatesInformer.EXPECT().GetNodeSLO().Return(tt.fields.nodeSLO).AnyTimes() mockMetricCache := mock_metriccache.NewMockMetricCache(ctl) mockMetricCache.EXPECT().GetNodeResourceMetric(gomock.Any()).Return(tt.fields.nodeMetric).AnyTimes() @@ -1560,7 +1561,6 @@ func TestCPUBurst_start(t *testing.T) { metricCache: mockMetricCache, eventRecorder: fakeRecorder, kubeClient: client, - nodeSLO: tt.fields.nodeSLO, } testHelper := system.NewFileTestUtil(t) diff --git a/pkg/koordlet/resmanager/cpu_suppress_test.go b/pkg/koordlet/resmanager/cpu_suppress_test.go index ec60e50ee0..54ec869ea6 100644 --- a/pkg/koordlet/resmanager/cpu_suppress_test.go +++ b/pkg/koordlet/resmanager/cpu_suppress_test.go @@ -573,6 +573,7 @@ func Test_cpuSuppress_suppressBECPU(t *testing.T) { si := mockstatesinformer.NewMockStatesInformer(ctl) si.EXPECT().GetAllPods().Return(tt.args.podMetas).AnyTimes() si.EXPECT().GetNode().Return(tt.args.node).AnyTimes() + si.EXPECT().GetNodeSLO().Return(getNodeSLOByThreshold(tt.args.thresholdConfig)).AnyTimes() // prepareData: mockMetricCache pods node beMetrics(AVG,current) mockMetricCache := mockmetriccache.NewMockMetricCache(ctl) @@ -603,7 +604,6 @@ func Test_cpuSuppress_suppressBECPU(t *testing.T) { statesInformer: si, metricCache: mockMetricCache, config: NewDefaultConfig(), - nodeSLO: getNodeSLOByThreshold(tt.args.thresholdConfig), collectResUsedIntervalSeconds: 1, } cpuSuppress := NewCPUSuppress(r) diff --git a/pkg/koordlet/resmanager/memory_evict_test.go b/pkg/koordlet/resmanager/memory_evict_test.go index ce0e3972b7..3dfc1ab62f 100644 --- a/pkg/koordlet/resmanager/memory_evict_test.go +++ b/pkg/koordlet/resmanager/memory_evict_test.go @@ -374,6 +374,7 @@ func Test_memoryEvict(t *testing.T) { mockStatesInformer := mock_statesinformer.NewMockStatesInformer(ctl) mockStatesInformer.EXPECT().GetAllPods().Return(getPodMetas(tt.pods)).AnyTimes() mockStatesInformer.EXPECT().GetNode().Return(tt.node).AnyTimes() + mockStatesInformer.EXPECT().GetNodeSLO().Return(getNodeSLOByThreshold(tt.thresholdConfig)).AnyTimes() mockMetricCache := mock_metriccache.NewMockMetricCache(ctl) mockNodeQueryResult := metriccache.NodeResourceQueryResult{Metric: tt.nodeMetric} @@ -385,7 +386,13 @@ func Test_memoryEvict(t *testing.T) { fakeRecorder := &FakeRecorder{} client := clientsetfake.NewSimpleClientset() - resmanager := &resmanager{statesInformer: mockStatesInformer, podsEvicted: cache.NewCacheDefault(), eventRecorder: fakeRecorder, metricCache: mockMetricCache, kubeClient: client, nodeSLO: getNodeSLOByThreshold(tt.thresholdConfig), config: NewDefaultConfig()} + resmanager := &resmanager{ + statesInformer: mockStatesInformer, + podsEvicted: cache.NewCacheDefault(), + eventRecorder: fakeRecorder, + metricCache: mockMetricCache, + kubeClient: client, + config: NewDefaultConfig()} stop := make(chan struct{}) _ = resmanager.podsEvicted.Run(stop) defer func() { stop <- struct{}{} }() diff --git a/pkg/koordlet/resmanager/resctrl_reconcile_test.go b/pkg/koordlet/resmanager/resctrl_reconcile_test.go index 96d6953f49..475c9b8257 100644 --- a/pkg/koordlet/resmanager/resctrl_reconcile_test.go +++ b/pkg/koordlet/resmanager/resctrl_reconcile_test.go @@ -1164,12 +1164,12 @@ func TestResctrlReconcile_reconcile(t *testing.T) { statesInformer := mock_statesinformer.NewMockStatesInformer(ctrl) metricCache := mock_metriccache.NewMockMetricCache(ctrl) statesInformer.EXPECT().GetAllPods().Return([]*statesinformer.PodMeta{testingPodMeta}).AnyTimes() + statesInformer.EXPECT().GetNodeSLO().Return(testingNodeSLO).AnyTimes() metricCache.EXPECT().GetNodeCPUInfo(&metriccache.QueryParam{}).Return(testingNodeCPUInfo, nil).AnyTimes() rm := &resmanager{ statesInformer: statesInformer, metricCache: metricCache, config: NewDefaultConfig(), - nodeSLO: testingNodeSLO, } helper := system.NewFileTestUtil(t) @@ -1206,7 +1206,8 @@ func TestResctrlReconcile_reconcile(t *testing.T) { r.reconcile() // test strategy parse error - r.resManager.nodeSLO.Spec.ResourceQoSStrategy = nil + testingNodeSLO.Spec.ResourceQoSStrategy = nil + statesInformer.EXPECT().GetNodeSLO().Return(testingNodeSLO).AnyTimes() r.reconcile() }) diff --git a/pkg/koordlet/resmanager/resmanager.go b/pkg/koordlet/resmanager/resmanager.go index 0a7ef6f1e9..753ef05635 100644 --- a/pkg/koordlet/resmanager/resmanager.go +++ b/pkg/koordlet/resmanager/resmanager.go @@ -19,9 +19,6 @@ package resmanager import ( "context" "fmt" - "reflect" - "sync" - "time" corev1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" @@ -29,7 +26,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apiruntime "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" @@ -39,7 +35,6 @@ import ( slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" koordclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned" - slolisterv1alpha1 "github.com/koordinator-sh/koordinator/pkg/client/listers/slo/v1alpha1" "github.com/koordinator-sh/koordinator/pkg/features" "github.com/koordinator-sh/koordinator/pkg/koordlet/audit" "github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache" @@ -67,112 +62,16 @@ type resmanager struct { statesInformer statesinformer.StatesInformer metricCache metriccache.MetricCache podsEvicted *expireCache.Cache - nodeSLOInformer cache.SharedIndexInformer - nodeSLOLister slolisterv1alpha1.NodeSLOLister kubeClient clientset.Interface eventRecorder record.EventRecorder - - // nodeSLO stores the latest nodeSLO object for the current node - nodeSLO *slov1alpha1.NodeSLO - nodeSLORWMutex sync.RWMutex -} - -func newNodeSLOInformer(client koordclientset.Interface, nodeName string) cache.SharedIndexInformer { - tweakListOptionFunc := func(opt *metav1.ListOptions) { - opt.FieldSelector = "metadata.name=" + nodeName - } - return cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (apiruntime.Object, error) { - tweakListOptionFunc(&options) - return client.SloV1alpha1().NodeSLOs().List(context.TODO(), options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - tweakListOptionFunc(&options) - return client.SloV1alpha1().NodeSLOs().Watch(context.TODO(), options) - }, - }, - &slov1alpha1.NodeSLO{}, - time.Hour*12, - cache.Indexers{}, - ) -} - -// mergeNodeSLOSpec merges nodeSLO with default config; ensure use the function with a RWMutex -func (r *resmanager) mergeNodeSLOSpec(nodeSLO *slov1alpha1.NodeSLO) { - if r.nodeSLO == nil || nodeSLO == nil { - klog.Errorf("failed to merge with nil nodeSLO, old: %v, new: %v", r.nodeSLO, nodeSLO) - return - } - - // merge ResourceUsedThresholdWithBE individually for nil-ResourceUsedThresholdWithBE case - mergedResourceUsedThresholdWithBESpec := mergeSLOSpecResourceUsedThresholdWithBE(util.DefaultNodeSLOSpecConfig().ResourceUsedThresholdWithBE, - nodeSLO.Spec.ResourceUsedThresholdWithBE) - if mergedResourceUsedThresholdWithBESpec != nil { - r.nodeSLO.Spec.ResourceUsedThresholdWithBE = mergedResourceUsedThresholdWithBESpec - } - - // merge ResourceQoSStrategy - mergedResourceQoSStrategySpec := mergeSLOSpecResourceQoSStrategy(util.DefaultNodeSLOSpecConfig().ResourceQoSStrategy, - nodeSLO.Spec.ResourceQoSStrategy) - mergeNoneResourceQoSIfDisabled(mergedResourceQoSStrategySpec) - if mergedResourceQoSStrategySpec != nil { - r.nodeSLO.Spec.ResourceQoSStrategy = mergedResourceQoSStrategySpec - } - - // merge CPUBurstStrategy - mergedCPUBurstStrategySpec := mergeSLOSpecCPUBurstStrategy(util.DefaultNodeSLOSpecConfig().CPUBurstStrategy, - nodeSLO.Spec.CPUBurstStrategy) - if mergedCPUBurstStrategySpec != nil { - r.nodeSLO.Spec.CPUBurstStrategy = mergedCPUBurstStrategySpec - } -} - -func (r *resmanager) createNodeSLO(nodeSLO *slov1alpha1.NodeSLO) { - r.nodeSLORWMutex.Lock() - defer r.nodeSLORWMutex.Unlock() - - oldNodeSLOStr := util.DumpJSON(r.nodeSLO) - - r.nodeSLO = nodeSLO.DeepCopy() - r.nodeSLO.Spec = nodeSLO.Spec - - // merge nodeSLO spec with the default config - r.mergeNodeSLOSpec(nodeSLO) - - newNodeSLOStr := util.DumpJSON(r.nodeSLO) - klog.Infof("update nodeSLO content: old %s, new %s", oldNodeSLOStr, newNodeSLOStr) } func (r *resmanager) getNodeSLOCopy() *slov1alpha1.NodeSLO { - r.nodeSLORWMutex.Lock() - defer r.nodeSLORWMutex.Unlock() - - if r.nodeSLO == nil { - return nil - } - nodeSLOCopy := r.nodeSLO.DeepCopy() - return nodeSLOCopy -} - -func (r *resmanager) updateNodeSLOSpec(nodeSLO *slov1alpha1.NodeSLO) { - r.nodeSLORWMutex.Lock() - defer r.nodeSLORWMutex.Unlock() - - oldNodeSLOStr := util.DumpJSON(r.nodeSLO) - - r.nodeSLO.Spec = nodeSLO.Spec - - // merge nodeSLO spec with the default config - r.mergeNodeSLOSpec(nodeSLO) - - newNodeSLOStr := util.DumpJSON(r.nodeSLO) - klog.Infof("update nodeSLO content: old %s, new %s", oldNodeSLOStr, newNodeSLOStr) + return r.statesInformer.GetNodeSLO() } func NewResManager(cfg *Config, schema *apiruntime.Scheme, kubeClient clientset.Interface, crdClient *koordclientset.Clientset, nodeName string, statesInformer statesinformer.StatesInformer, metricCache metriccache.MetricCache, collectResUsedIntervalSeconds int64) ResManager { - informer := newNodeSLOInformer(crdClient, nodeName) eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) @@ -185,38 +84,10 @@ func NewResManager(cfg *Config, schema *apiruntime.Scheme, kubeClient clientset. statesInformer: statesInformer, metricCache: metricCache, podsEvicted: expireCache.NewCacheDefault(), - nodeSLOInformer: informer, - nodeSLOLister: slolisterv1alpha1.NewNodeSLOLister(informer.GetIndexer()), kubeClient: kubeClient, eventRecorder: recorder, collectResUsedIntervalSeconds: collectResUsedIntervalSeconds, } - informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - nodeSLO, ok := obj.(*slov1alpha1.NodeSLO) - if ok { - r.createNodeSLO(nodeSLO) - klog.Infof("create NodeSLO %v", nodeSLO) - } else { - klog.Errorf("node slo informer add func parse nodeSLO failed") - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - oldNodeSLO, oldOK := oldObj.(*slov1alpha1.NodeSLO) - newNodeSLO, newOK := newObj.(*slov1alpha1.NodeSLO) - if !oldOK || !newOK { - klog.Errorf("unable to convert object to *slov1alpha1.NodeSLO, old %T, new %T", oldObj, newObj) - return - } - if reflect.DeepEqual(oldNodeSLO.Spec, newNodeSLO.Spec) { - klog.V(5).Infof("find NodeSLO spec %s has not changed", newNodeSLO.Name) - return - } - klog.Infof("update NodeSLO spec %v", newNodeSLO.Spec) - r.updateNodeSLOSpec(newNodeSLO) - }, - }) - return r } @@ -244,18 +115,9 @@ func (r *resmanager) Run(stopCh <-chan struct{}) error { r.podsEvicted.Run(stopCh) - klog.Infof("starting informer for NodeSLO") - go r.nodeSLOInformer.Run(stopCh) - if !cache.WaitForCacheSync(stopCh, r.nodeSLOInformer.HasSynced) { - return fmt.Errorf("time out waiting for node slo caches to sync") - } - if !cache.WaitForCacheSync(stopCh, r.statesInformer.HasSynced) { return fmt.Errorf("time out waiting for kubelet meta service caches to sync") } - if !cache.WaitForCacheSync(stopCh, r.hasSynced) { - return fmt.Errorf("time out waiting for sync NodeSLO") - } util.RunFeature(r.reconcileBECgroup, []featuregate.Feature{features.BECgroupReconcile}, r.config.ReconcileIntervalSeconds, stopCh) @@ -283,13 +145,6 @@ func (r *resmanager) Run(stopCh <-chan struct{}) error { return nil } -func (r *resmanager) hasSynced() bool { - r.nodeSLORWMutex.Lock() - defer r.nodeSLORWMutex.Unlock() - - return r.nodeSLO != nil && r.nodeSLO.Spec.ResourceUsedThresholdWithBE != nil -} - func (r *resmanager) evictPodsIfNotEvicted(evictPods []*corev1.Pod, node *corev1.Node, reason string, message string) { for _, evictPod := range evictPods { r.evictPodIfNotEvicted(evictPod, node, reason, message) diff --git a/pkg/koordlet/resmanager/resmanager_test.go b/pkg/koordlet/resmanager/resmanager_test.go index 12b0d5ba9f..eb4a900c33 100644 --- a/pkg/koordlet/resmanager/resmanager_test.go +++ b/pkg/koordlet/resmanager/resmanager_test.go @@ -41,7 +41,6 @@ import ( "github.com/koordinator-sh/koordinator/pkg/koordlet/metricsadvisor" mock_statesinformer "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/mockstatesinformer" "github.com/koordinator-sh/koordinator/pkg/tools/cache" - "github.com/koordinator-sh/koordinator/pkg/util" ) var podsResource = schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"} @@ -62,219 +61,6 @@ func TestNewResManager(t *testing.T) { }) } -func Test_mergeNodeSLOSpec(t *testing.T) { - testingCustomNodeSLOSpec := slov1alpha1.NodeSLOSpec{ - ResourceUsedThresholdWithBE: &slov1alpha1.ResourceThresholdStrategy{ - CPUSuppressThresholdPercent: pointer.Int64Ptr(80), - }, - ResourceQoSStrategy: &slov1alpha1.ResourceQoSStrategy{ - LSR: util.NoneResourceQoS(apiext.QoSLSR), - LS: util.NoneResourceQoS(apiext.QoSLS), - BE: &slov1alpha1.ResourceQoS{ - - MemoryQoS: &slov1alpha1.MemoryQoSCfg{ - Enable: pointer.BoolPtr(true), - }, - ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ - Enable: pointer.BoolPtr(true), - ResctrlQoS: slov1alpha1.ResctrlQoS{ - CATRangeEndPercent: pointer.Int64Ptr(50), - }, - }, - }, - }, - CPUBurstStrategy: &slov1alpha1.CPUBurstStrategy{ - CPUBurstConfig: slov1alpha1.CPUBurstConfig{}, - SharePoolThresholdPercent: nil, - }, - } - testingMergedNodeSLOSpec := util.DefaultNodeSLOSpecConfig() - mergedInterface, err := util.MergeCfg(&testingMergedNodeSLOSpec, &testingCustomNodeSLOSpec) - assert.NoError(t, err) - testingMergedNodeSLOSpec = *mergedInterface.(*slov1alpha1.NodeSLOSpec) - type args struct { - nodeSLO *slov1alpha1.NodeSLO - } - type field struct { - nodeSLO *slov1alpha1.NodeSLO - } - tests := []struct { - name string - args args - field field - want *slov1alpha1.NodeSLO - }{ - { - name: "skip the merge if the old one is nil", - args: args{ - nodeSLO: &slov1alpha1.NodeSLO{}, - }, - field: field{nodeSLO: nil}, - want: nil, - }, - { - name: "skip the merge if the new one is nil", - field: field{ - nodeSLO: &slov1alpha1.NodeSLO{}, - }, - want: &slov1alpha1.NodeSLO{}, - }, - { - name: "use default and do not panic if the new is nil", - field: field{ - nodeSLO: &slov1alpha1.NodeSLO{ - Spec: util.DefaultNodeSLOSpecConfig(), - }, - }, - want: &slov1alpha1.NodeSLO{ - Spec: util.DefaultNodeSLOSpecConfig(), - }, - }, - { - name: "merge with the default", - args: args{ - nodeSLO: &slov1alpha1.NodeSLO{ - Spec: testingCustomNodeSLOSpec, - }, - }, - field: field{ - nodeSLO: &slov1alpha1.NodeSLO{ - Spec: slov1alpha1.NodeSLOSpec{ - ResourceUsedThresholdWithBE: &slov1alpha1.ResourceThresholdStrategy{ - CPUSuppressThresholdPercent: pointer.Int64Ptr(100), - MemoryEvictThresholdPercent: pointer.Int64Ptr(100), - }, - ResourceQoSStrategy: &slov1alpha1.ResourceQoSStrategy{ - LSR: &slov1alpha1.ResourceQoS{ - ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ - ResctrlQoS: slov1alpha1.ResctrlQoS{ - CATRangeStartPercent: pointer.Int64Ptr(0), - CATRangeEndPercent: pointer.Int64Ptr(100), - }, - }, - }, - LS: &slov1alpha1.ResourceQoS{ - ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ - ResctrlQoS: slov1alpha1.ResctrlQoS{ - CATRangeStartPercent: pointer.Int64Ptr(0), - CATRangeEndPercent: pointer.Int64Ptr(100), - }, - }, - }, - BE: &slov1alpha1.ResourceQoS{ - ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ - ResctrlQoS: slov1alpha1.ResctrlQoS{ - CATRangeStartPercent: pointer.Int64Ptr(0), - CATRangeEndPercent: pointer.Int64Ptr(40), - }, - }, - }, - }, - }, - }, - }, - want: &slov1alpha1.NodeSLO{ - Spec: testingMergedNodeSLOSpec, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := resmanager{nodeSLO: tt.field.nodeSLO} - r.mergeNodeSLOSpec(tt.args.nodeSLO) - assert.Equal(t, tt.want, r.nodeSLO) - }) - } -} - -func Test_createNodeSLO(t *testing.T) { - testingNewNodeSLO := &slov1alpha1.NodeSLO{ - Spec: util.DefaultNodeSLOSpecConfig(), - } - testingNewNodeSLO.Spec.ResourceUsedThresholdWithBE = &slov1alpha1.ResourceThresholdStrategy{ - Enable: pointer.BoolPtr(true), - CPUSuppressThresholdPercent: pointer.Int64Ptr(80), - } - - testingNewNodeSLO.Spec.ResourceQoSStrategy.BE = &slov1alpha1.ResourceQoS{ - ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ - Enable: pointer.BoolPtr(true), - ResctrlQoS: slov1alpha1.ResctrlQoS{ - CATRangeStartPercent: pointer.Int64Ptr(0), - CATRangeEndPercent: pointer.Int64Ptr(20), - }, - }, - } - - testingCreatedNodeSLO := &slov1alpha1.NodeSLO{ - Spec: util.DefaultNodeSLOSpecConfig(), - } - testingCreatedNodeSLO.Spec.ResourceUsedThresholdWithBE.Enable = pointer.BoolPtr(true) - testingCreatedNodeSLO.Spec.ResourceUsedThresholdWithBE.CPUSuppressThresholdPercent = pointer.Int64Ptr(80) - testingCreatedNodeSLO.Spec.ResourceQoSStrategy.LSR = util.NoneResourceQoS(apiext.QoSLSR) - testingCreatedNodeSLO.Spec.ResourceQoSStrategy.LS = util.NoneResourceQoS(apiext.QoSLS) - testingCreatedNodeSLO.Spec.ResourceQoSStrategy.BE = util.NoneResourceQoS(apiext.QoSBE) - testingCreatedNodeSLO.Spec.ResourceQoSStrategy.BE.ResctrlQoS.Enable = pointer.BoolPtr(true) - testingCreatedNodeSLO.Spec.ResourceQoSStrategy.BE.ResctrlQoS.CATRangeStartPercent = pointer.Int64Ptr(0) - testingCreatedNodeSLO.Spec.ResourceQoSStrategy.BE.ResctrlQoS.CATRangeEndPercent = pointer.Int64Ptr(20) - - r := resmanager{nodeSLO: nil} - - r.createNodeSLO(testingNewNodeSLO) - assert.Equal(t, testingCreatedNodeSLO, r.nodeSLO) -} - -func Test_updateNodeSLOSpec(t *testing.T) { - testingNewNodeSLO := &slov1alpha1.NodeSLO{ - Spec: slov1alpha1.NodeSLOSpec{ - ResourceUsedThresholdWithBE: &slov1alpha1.ResourceThresholdStrategy{ - Enable: pointer.BoolPtr(true), - CPUSuppressThresholdPercent: pointer.Int64Ptr(80), - }, - ResourceQoSStrategy: &slov1alpha1.ResourceQoSStrategy{ - BE: &slov1alpha1.ResourceQoS{ - ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ - Enable: pointer.BoolPtr(true), - ResctrlQoS: slov1alpha1.ResctrlQoS{ - CATRangeStartPercent: pointer.Int64Ptr(0), - CATRangeEndPercent: pointer.Int64Ptr(20), - }, - }, - }, - }, - }, - } - testingUpdatedNodeSLO := &slov1alpha1.NodeSLO{ - Spec: util.DefaultNodeSLOSpecConfig(), - } - testingUpdatedNodeSLO.Spec.ResourceUsedThresholdWithBE.Enable = pointer.BoolPtr(true) - testingUpdatedNodeSLO.Spec.ResourceUsedThresholdWithBE.CPUSuppressThresholdPercent = pointer.Int64Ptr(80) - testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.LSR.MemoryQoS.MemoryQoS = *util.NoneMemoryQoS() - testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.LSR.ResctrlQoS.ResctrlQoS = *util.NoneResctrlQoS() - - testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.LS.MemoryQoS.MemoryQoS = *util.NoneMemoryQoS() - testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.LS.ResctrlQoS.ResctrlQoS = *util.NoneResctrlQoS() - - testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.BE.MemoryQoS.MemoryQoS = *util.NoneMemoryQoS() - testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.BE.ResctrlQoS.Enable = pointer.BoolPtr(true) - testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.BE.ResctrlQoS.CATRangeStartPercent = pointer.Int64Ptr(0) - testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.BE.ResctrlQoS.CATRangeEndPercent = pointer.Int64Ptr(20) - - r := resmanager{ - nodeSLO: &slov1alpha1.NodeSLO{ - Spec: slov1alpha1.NodeSLOSpec{ - ResourceUsedThresholdWithBE: &slov1alpha1.ResourceThresholdStrategy{ - CPUSuppressThresholdPercent: pointer.Int64Ptr(90), - MemoryEvictThresholdPercent: pointer.Int64Ptr(90), - }, - }, - }, - } - - r.updateNodeSLOSpec(testingNewNodeSLO) - assert.Equal(t, testingUpdatedNodeSLO, r.nodeSLO) -} - func Test_isFeatureDisabled(t *testing.T) { type args struct { nodeSLO *slov1alpha1.NodeSLO diff --git a/pkg/koordlet/resmanager/resource_update_executor_test.go b/pkg/koordlet/resmanager/resource_update_executor_test.go index 798e18aa40..adf43d46d6 100644 --- a/pkg/koordlet/resmanager/resource_update_executor_test.go +++ b/pkg/koordlet/resmanager/resource_update_executor_test.go @@ -1,17 +1,17 @@ /* -Copyright 2022 The Koordinator Authors. + Copyright 2022 The Koordinator Authors. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ package resmanager diff --git a/pkg/koordlet/resmanager/strategy_util_test.go b/pkg/koordlet/resmanager/strategy_util_test.go deleted file mode 100644 index e2f0ef6669..0000000000 --- a/pkg/koordlet/resmanager/strategy_util_test.go +++ /dev/null @@ -1,234 +0,0 @@ -/* - Copyright 2022 The Koordinator Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -package resmanager - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "k8s.io/utils/pointer" - - apiext "github.com/koordinator-sh/koordinator/apis/extension" - slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" - "github.com/koordinator-sh/koordinator/pkg/util" -) - -func Test_mergeSLOSpecResourceUsedThresholdWithBE(t *testing.T) { - testingDefaultSpec := util.DefaultResourceThresholdStrategy() - testingNewSpec := &slov1alpha1.ResourceThresholdStrategy{ - Enable: pointer.BoolPtr(true), - CPUSuppressThresholdPercent: pointer.Int64Ptr(80), - MemoryEvictThresholdPercent: pointer.Int64Ptr(75), - } - testingNewSpec1 := &slov1alpha1.ResourceThresholdStrategy{ - Enable: pointer.BoolPtr(true), - CPUSuppressThresholdPercent: pointer.Int64Ptr(80), - } - testingMergedSpec := &slov1alpha1.ResourceThresholdStrategy{ - Enable: pointer.BoolPtr(true), - CPUSuppressThresholdPercent: pointer.Int64Ptr(80), - MemoryEvictThresholdPercent: pointer.Int64Ptr(70), - CPUSuppressPolicy: slov1alpha1.CPUSetPolicy, - } - type args struct { - defaultSpec *slov1alpha1.ResourceThresholdStrategy - newSpec *slov1alpha1.ResourceThresholdStrategy - } - tests := []struct { - name string - args args - want *slov1alpha1.ResourceThresholdStrategy - }{ - { - name: "both empty", - args: args{ - defaultSpec: &slov1alpha1.ResourceThresholdStrategy{}, - newSpec: &slov1alpha1.ResourceThresholdStrategy{}, - }, - want: &slov1alpha1.ResourceThresholdStrategy{}, - }, - { - name: "totally use new", - args: args{ - defaultSpec: &slov1alpha1.ResourceThresholdStrategy{}, - newSpec: testingNewSpec, - }, - want: testingNewSpec, - }, - { - name: "totally use new 1", - args: args{ - defaultSpec: testingDefaultSpec, - newSpec: testingNewSpec, - }, - want: &slov1alpha1.ResourceThresholdStrategy{ - Enable: pointer.BoolPtr(true), - CPUSuppressThresholdPercent: pointer.Int64Ptr(80), - MemoryEvictThresholdPercent: pointer.Int64Ptr(75), - CPUSuppressPolicy: slov1alpha1.CPUSetPolicy, - }, - }, - { - name: "partially use new, merging with the default", - args: args{ - defaultSpec: testingDefaultSpec, - newSpec: testingNewSpec1, - }, - want: testingMergedSpec, - }, - { - name: "new overwrite a nil", - args: args{ - defaultSpec: testingDefaultSpec, - }, - want: testingDefaultSpec, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := mergeSLOSpecResourceUsedThresholdWithBE(tt.args.defaultSpec, tt.args.newSpec) - assert.Equal(t, tt.want, got) - }) - } -} - -func Test_mergeSLOSpecResourceQoSStrategy(t *testing.T) { - testingDefaultSpec := util.DefaultResourceQoSStrategy() - - testingNewSpec := testingDefaultSpec.DeepCopy() - testingNewSpec.BE.MemoryQoS.WmarkRatio = pointer.Int64Ptr(0) - - testingNewSpec1 := &slov1alpha1.ResourceQoSStrategy{ - BE: &slov1alpha1.ResourceQoS{ - MemoryQoS: &slov1alpha1.MemoryQoSCfg{ - Enable: pointer.BoolPtr(true), - MemoryQoS: slov1alpha1.MemoryQoS{ - WmarkRatio: pointer.Int64Ptr(90), - }, - }, - }, - } - - testingMergedSpec := testingDefaultSpec.DeepCopy() - testingMergedSpec.BE.MemoryQoS.Enable = pointer.BoolPtr(true) - testingMergedSpec.BE.MemoryQoS.WmarkRatio = pointer.Int64Ptr(90) - - type args struct { - defaultSpec *slov1alpha1.ResourceQoSStrategy - newSpec *slov1alpha1.ResourceQoSStrategy - } - tests := []struct { - name string - args args - want *slov1alpha1.ResourceQoSStrategy - }{ - { - name: "both empty", - args: args{ - defaultSpec: &slov1alpha1.ResourceQoSStrategy{}, - newSpec: &slov1alpha1.ResourceQoSStrategy{}, - }, - want: &slov1alpha1.ResourceQoSStrategy{}, - }, - { - name: "totally use new", - args: args{ - defaultSpec: &slov1alpha1.ResourceQoSStrategy{}, - newSpec: testingNewSpec, - }, - want: testingNewSpec, - }, - { - name: "totally use new 1", - args: args{ - defaultSpec: testingDefaultSpec, - newSpec: testingNewSpec, - }, - want: testingNewSpec, - }, - { - name: "partially use new, merging with the default", - args: args{ - defaultSpec: testingDefaultSpec, - newSpec: testingNewSpec1, - }, - want: testingMergedSpec, - }, - { - name: "new overwrite a nil", - args: args{ - defaultSpec: testingDefaultSpec, - }, - want: testingDefaultSpec, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := mergeSLOSpecResourceQoSStrategy(tt.args.defaultSpec, tt.args.newSpec) - assert.Equal(t, tt.want, got) - }) - } -} - -func Test_mergeNoneResourceQoSIfDisabled(t *testing.T) { - testDefault := util.DefaultResourceQoSStrategy() - testAllNone := util.NoneResourceQoSStrategy() - - testLSMemQOSEnabled := testDefault.DeepCopy() - testLSMemQOSEnabled.LS.MemoryQoS.Enable = pointer.BoolPtr(true) - testLSMemQOSEnabledResult := util.NoneResourceQoSStrategy() - testLSMemQOSEnabledResult.LS.MemoryQoS.Enable = pointer.BoolPtr(true) - testLSMemQOSEnabledResult.LS.MemoryQoS.MemoryQoS = *util.DefaultMemoryQoS(apiext.QoSLS) - - type args struct { - nodeCfg *slov1alpha1.NodeSLO - } - tests := []struct { - name string - args args - want *slov1alpha1.ResourceQoSStrategy - }{ - { - name: "all disabled", - args: args{ - nodeCfg: &slov1alpha1.NodeSLO{ - Spec: slov1alpha1.NodeSLOSpec{ - ResourceQoSStrategy: testDefault, - }, - }, - }, - want: testAllNone, - }, - { - name: "only ls memory qos enabled", - args: args{ - nodeCfg: &slov1alpha1.NodeSLO{ - Spec: slov1alpha1.NodeSLOSpec{ - ResourceQoSStrategy: testLSMemQOSEnabled, - }, - }, - }, - want: testLSMemQOSEnabledResult, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - mergeNoneResourceQoSIfDisabled(tt.args.nodeCfg.Spec.ResourceQoSStrategy) - assert.Equal(t, tt.want, tt.args.nodeCfg.Spec.ResourceQoSStrategy) - }) - } -} diff --git a/pkg/koordlet/runtimehooks/config.go b/pkg/koordlet/runtimehooks/config.go index 1e91cd3a22..2938217feb 100644 --- a/pkg/koordlet/runtimehooks/config.go +++ b/pkg/koordlet/runtimehooks/config.go @@ -23,6 +23,7 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" cliflag "k8s.io/component-base/cli/flag" "k8s.io/component-base/featuregate" + // "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/groupidentity" ) const ( diff --git a/pkg/koordlet/runtimehooks/rule/rule.go b/pkg/koordlet/runtimehooks/rule/rule.go index 14d5ff7b92..bdcbdbbceb 100644 --- a/pkg/koordlet/runtimehooks/rule/rule.go +++ b/pkg/koordlet/runtimehooks/rule/rule.go @@ -75,21 +75,25 @@ func find(name string) (*Rule, bool) { return newRule, false } -func UpdateRules(nodeSLO *slov1alpha1.NodeSLOSpec) { +func UpdateRules(s statesinformer.StatesInformer, nodeSLOIf interface{}) { + nodeSLO, ok := nodeSLOIf.(*slov1alpha1.NodeSLO) + if !ok { + klog.Errorf("update rules with type %T is illegal", nodeSLOIf) + return + } klog.Infof("applying rules with new NodeSLO %v", nodeSLO) for _, r := range globalHookRules { if !r.systemSupported { klog.Infof("system unsupported for rule %s, do nothing during UpdateRules", r.name) return } - updated, err := r.parseFunc(nodeSLO) + updated, err := r.parseFunc(&nodeSLO.Spec) if err != nil { klog.Warningf("parse rule %s from nodeSLO failed, error: %v", r.name, err) continue } if updated { - // TODO get all pods from stats informer *synchronously* - var pods []*statesinformer.PodMeta + pods := s.GetAllPods() r.runUpdateCallbacks(pods) } } @@ -97,5 +101,4 @@ func UpdateRules(nodeSLO *slov1alpha1.NodeSLOSpec) { func init() { globalHookRules = map[string]*Rule{} - // TODO register rule.UpdateRules to states-informer, as callback function when nodeSLO Spec created or updated } diff --git a/pkg/koordlet/runtimehooks/runtimehooks.go b/pkg/koordlet/runtimehooks/runtimehooks.go index e9274d999f..00c3738308 100644 --- a/pkg/koordlet/runtimehooks/runtimehooks.go +++ b/pkg/koordlet/runtimehooks/runtimehooks.go @@ -17,6 +17,11 @@ package runtimehooks import ( + "reflect" + + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/rule" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" "k8s.io/klog/v2" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/server" @@ -32,7 +37,8 @@ type RuntimeHook interface { } type runtimeHook struct { - server server.Server + statesInformer statesinformer.StatesInformer + server server.Server } func (r *runtimeHook) Run(stopCh <-chan struct{}) error { @@ -45,14 +51,18 @@ func (r *runtimeHook) Run(stopCh <-chan struct{}) error { return nil } -func NewRuntimeHook(cfg *Config) (RuntimeHook, error) { +func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook, error) { s, err := server.NewServer(server.Options{Network: cfg.RuntimeHooksNetwork, Address: cfg.RuntimeHooksAddr}) if err != nil { return nil, err } r := &runtimeHook{ - server: s, + statesInformer: si, + server: s, } + si.RegisterCallbacks(reflect.TypeOf(&slov1alpha1.NodeSLO{}), "runtime-hooks-rule", + "Update hooks rule can run callbacks if NodeSLO spec update", + rule.UpdateRules) if err := s.Setup(); err != nil { klog.Fatal("failed to setup runtime hook server, error %v", err) return nil, err diff --git a/pkg/koordlet/runtimehooks/runtimehooks_test.go b/pkg/koordlet/runtimehooks/runtimehooks_test.go index acc61dad51..dd6d46f468 100644 --- a/pkg/koordlet/runtimehooks/runtimehooks_test.go +++ b/pkg/koordlet/runtimehooks/runtimehooks_test.go @@ -18,9 +18,14 @@ package runtimehooks import ( "path" + "reflect" "testing" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + mockstatesinformer "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/mockstatesinformer" ) func Test_runtimeHook_Run(t *testing.T) { @@ -58,7 +63,11 @@ func Test_runtimeHook_Run(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r, err := NewRuntimeHook(tt.fields.config) + ctrl := gomock.NewController(t) + defer ctrl.Finish() + si := mockstatesinformer.NewMockStatesInformer(ctrl) + si.EXPECT().RegisterCallbacks(reflect.TypeOf(&slov1alpha1.NodeSLO{}), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + r, err := NewRuntimeHook(si, tt.fields.config) assert.NoError(t, err) stop := make(chan struct{}) go func() { stop <- struct{}{} }() diff --git a/pkg/koordlet/runtimehooks/server/service.go b/pkg/koordlet/runtimehooks/server/service.go index 6a293d2f2d..fe332223e9 100644 --- a/pkg/koordlet/runtimehooks/server/service.go +++ b/pkg/koordlet/runtimehooks/server/service.go @@ -19,6 +19,8 @@ package server import ( "context" + "k8s.io/klog/v2" + rmconfig "github.com/koordinator-sh/koordinator/pkg/runtime-manager/config" runtimeapi "github.com/koordinator-sh/koordinator/apis/runtime/v1alpha1" @@ -27,6 +29,7 @@ import ( func (s *server) PreRunPodSandboxHook(ctx context.Context, req *runtimeapi.RunPodSandboxHookRequest) (*runtimeapi.RunPodSandboxHookResponse, error) { + klog.V(5).Infof("receive PreRunPodSandboxHook request %v", req.String()) resp := &runtimeapi.RunPodSandboxHookResponse{ Labels: req.Labels, Annotations: req.Annotations, @@ -39,6 +42,7 @@ func (s *server) PreRunPodSandboxHook(ctx context.Context, func (s *server) PreStartContainerHook(ctx context.Context, req *runtimeapi.ContainerResourceHookRequest) (*runtimeapi.ContainerResourceHookResponse, error) { + klog.V(5).Infof("receive PreStartContainerHook request %v", req.String()) resp := &runtimeapi.ContainerResourceHookResponse{ ContainerAnnotations: req.ContainerAnnotations, ContainerResources: req.ContainerResources, @@ -49,6 +53,7 @@ func (s *server) PreStartContainerHook(ctx context.Context, func (s *server) PostStartContainerHook(ctx context.Context, req *runtimeapi.ContainerResourceHookRequest) (*runtimeapi.ContainerResourceHookResponse, error) { + klog.V(5).Infof("receive PostStartContainerHook request %v", req.String()) resp := &runtimeapi.ContainerResourceHookResponse{ ContainerAnnotations: req.ContainerAnnotations, ContainerResources: req.ContainerResources, @@ -59,6 +64,7 @@ func (s *server) PostStartContainerHook(ctx context.Context, func (s *server) PostStopContainerHook(ctx context.Context, req *runtimeapi.ContainerResourceHookRequest) (*runtimeapi.ContainerResourceHookResponse, error) { + klog.V(5).Infof("receive PostStopContainerHook request %v", req.String()) resp := &runtimeapi.ContainerResourceHookResponse{ ContainerAnnotations: req.ContainerAnnotations, ContainerResources: req.ContainerResources, @@ -69,6 +75,7 @@ func (s *server) PostStopContainerHook(ctx context.Context, func (s *server) PreUpdateContainerResourcesHook(ctx context.Context, req *runtimeapi.ContainerResourceHookRequest) (*runtimeapi.ContainerResourceHookResponse, error) { + klog.V(5).Infof("receive PreUpdateContainerResourcesHook request %v", req.String()) resp := &runtimeapi.ContainerResourceHookResponse{ ContainerAnnotations: req.ContainerAnnotations, ContainerResources: req.ContainerResources, diff --git a/pkg/koordlet/statesinformer/mockstatesinformer/mock.go b/pkg/koordlet/statesinformer/mockstatesinformer/mock.go index cb6658b62a..24d48e9d2e 100644 --- a/pkg/koordlet/statesinformer/mockstatesinformer/mock.go +++ b/pkg/koordlet/statesinformer/mockstatesinformer/mock.go @@ -1,18 +1,18 @@ -/* - Copyright 2022 The Koordinator Authors. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ +// /* +// Copyright 2022 The Koordinator Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// */ // Code generated by MockGen. DO NOT EDIT. // Source: pkg/koordlet/statesinformer/states_informer.go @@ -24,6 +24,7 @@ import ( reflect "reflect" gomock "github.com/golang/mock/gomock" + v1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" statesinformer "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" v1 "k8s.io/api/core/v1" ) @@ -79,6 +80,20 @@ func (mr *MockStatesInformerMockRecorder) GetNode() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNode", reflect.TypeOf((*MockStatesInformer)(nil).GetNode)) } +// GetNodeSLO mocks base method. +func (m *MockStatesInformer) GetNodeSLO() *v1alpha1.NodeSLO { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetNodeSLO") + ret0, _ := ret[0].(*v1alpha1.NodeSLO) + return ret0 +} + +// GetNodeSLO indicates an expected call of GetNodeSLO. +func (mr *MockStatesInformerMockRecorder) GetNodeSLO() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNodeSLO", reflect.TypeOf((*MockStatesInformer)(nil).GetNodeSLO)) +} + // HasSynced mocks base method. func (m *MockStatesInformer) HasSynced() bool { m.ctrl.T.Helper() @@ -93,6 +108,18 @@ func (mr *MockStatesInformerMockRecorder) HasSynced() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasSynced", reflect.TypeOf((*MockStatesInformer)(nil).HasSynced)) } +// RegisterCallbacks mocks base method. +func (m *MockStatesInformer) RegisterCallbacks(objType reflect.Type, name, description string, callbackFn statesinformer.UpdateCbFn) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "RegisterCallbacks", objType, name, description, callbackFn) +} + +// RegisterCallbacks indicates an expected call of RegisterCallbacks. +func (mr *MockStatesInformerMockRecorder) RegisterCallbacks(objType, name, description, callbackFn interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterCallbacks", reflect.TypeOf((*MockStatesInformer)(nil).RegisterCallbacks), objType, name, description, callbackFn) +} + // Run mocks base method. func (m *MockStatesInformer) Run(stopCh <-chan struct{}) error { m.ctrl.T.Helper() diff --git a/pkg/koordlet/statesinformer/register.go b/pkg/koordlet/statesinformer/register.go new file mode 100644 index 0000000000..04cec884d6 --- /dev/null +++ b/pkg/koordlet/statesinformer/register.go @@ -0,0 +1,62 @@ +/* + Copyright 2022 The Koordinator Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package statesinformer + +import ( + "reflect" + + "k8s.io/klog/v2" +) + +type updateCallback struct { + name string + description string + fn UpdateCbFn +} + +type UpdateCbFn func(s StatesInformer, statesObj interface{}) + +func (s *statesInformer) RegisterCallbacks(objType reflect.Type, name, description string, callbackFn UpdateCbFn) { + callbacks, legal := s.stateUpdateCallbacks[objType] + if !legal { + klog.Fatalf("states informer callback register with type %v is illegal", objType) + } + for _, c := range callbacks { + if c.name == name { + klog.Fatalf("states informer callback register %s with type %v already registered", name, objType) + } + } + newCb := updateCallback{ + name: name, + description: description, + fn: callbackFn, + } + s.stateUpdateCallbacks[objType] = append(s.stateUpdateCallbacks[objType], newCb) + klog.Infof("states informer callback %s has registered", name) +} + +func (s *statesInformer) runCallbacks(objType reflect.Type, obj interface{}) { + callbacks, exist := s.stateUpdateCallbacks[objType] + if !exist { + klog.Errorf("states informer callbacks type %v not exist", objType) + return + } + for _, c := range callbacks { + klog.V(5).Infof("start running callback function %v for type %v", c.name, objType) + c.fn(s, obj) + } +} diff --git a/pkg/koordlet/statesinformer/register_test.go b/pkg/koordlet/statesinformer/register_test.go new file mode 100644 index 0000000000..85be04d5fc --- /dev/null +++ b/pkg/koordlet/statesinformer/register_test.go @@ -0,0 +1,64 @@ +/* + Copyright 2022 The Koordinator Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package statesinformer + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/utils/pointer" + + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" +) + +func TestRegisterCallbacksAndRun(t *testing.T) { + type args struct { + objType reflect.Type + name string + description string + } + tests := []struct { + name string + args args + }{ + { + name: "register and run", + args: args{ + objType: reflect.TypeOf(&slov1alpha1.NodeSLO{}), + name: "set-bool-var", + description: "set test bool var as true", + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testVar := pointer.BoolPtr(false) + callbackFn := func(si StatesInformer, statesObj interface{}) { + *testVar = true + } + si := &statesInformer{ + stateUpdateCallbacks: map[reflect.Type][]updateCallback{ + reflect.TypeOf(&slov1alpha1.NodeSLO{}): {}, + }, + } + si.RegisterCallbacks(tt.args.objType, tt.args.name, tt.args.description, callbackFn) + si.runCallbacks(tt.args.objType, &slov1alpha1.NodeSLO{}) + assert.Equal(t, *testVar, true) + }) + } +} diff --git a/pkg/koordlet/statesinformer/states_informer.go b/pkg/koordlet/statesinformer/states_informer.go index 18042f4941..7825c255f5 100644 --- a/pkg/koordlet/statesinformer/states_informer.go +++ b/pkg/koordlet/statesinformer/states_informer.go @@ -24,19 +24,18 @@ import ( "time" "go.uber.org/atomic" - "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" + apiruntime "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + koordclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned" "github.com/koordinator-sh/koordinator/pkg/koordlet/pleg" - "github.com/koordinator-sh/koordinator/pkg/util" ) type StatesInformer interface { @@ -44,14 +43,17 @@ type StatesInformer interface { HasSynced() bool GetNode() *corev1.Node + GetNodeSLO() *slov1alpha1.NodeSLO GetAllPods() []*PodMeta + + RegisterCallbacks(objType reflect.Type, name, description string, callbackFn UpdateCbFn) } type statesInformer struct { - config *Config - kubelet KubeletStub - hasSynced *atomic.Bool + config *Config + kubelet KubeletStub + podHasSynced *atomic.Bool // use pleg to accelerate the efficiency of Pod meta update pleg pleg.Pleg podCreated chan string @@ -60,102 +62,92 @@ type statesInformer struct { nodeRWMutex sync.RWMutex node *corev1.Node + nodeSLOInformer cache.SharedIndexInformer + nodeSLORWMutex sync.RWMutex + nodeSLO *slov1alpha1.NodeSLO + podRWMutex sync.RWMutex podMap map[string]*PodMeta podUpdatedTime time.Time + + stateUpdateCallbacks map[reflect.Type][]updateCallback } -func NewStatesInformer(config *Config, kubeClient clientset.Interface, pleg pleg.Pleg, nodeName string) StatesInformer { +func NewStatesInformer(config *Config, kubeClient clientset.Interface, crdClient koordclientset.Interface, pleg pleg.Pleg, nodeName string) StatesInformer { nodeInformer := newNodeInformer(kubeClient, nodeName) + nodeSLOInformer := newNodeSLOInformer(crdClient, nodeName) - m := &statesInformer{ - config: config, - kubelet: NewKubeletStub(config.KubeletIPAddr, config.KubeletHTTPPort, config.KubeletSyncTimeoutSeconds), - hasSynced: atomic.NewBool(false), + return &statesInformer{ + config: config, + kubelet: NewKubeletStub(config.KubeletIPAddr, config.KubeletHTTPPort, config.KubeletSyncTimeoutSeconds), + podHasSynced: atomic.NewBool(false), pleg: pleg, - nodeInformer: nodeInformer, + nodeInformer: nodeInformer, + nodeSLOInformer: nodeSLOInformer, podMap: map[string]*PodMeta{}, podCreated: make(chan string, 1), // set 1 buffer - } - nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - node, ok := obj.(*corev1.Node) - if ok { - m.syncNode(node) - } else { - klog.Errorf("node informer add func parse Node failed, obj %T", obj) - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - oldNode, oldOK := oldObj.(*corev1.Node) - newNode, newOK := newObj.(*corev1.Node) - if !oldOK || !newOK { - klog.Errorf("unable to convert object to *corev1.Node, old %T, new %T", oldObj, newObj) - return - } - if reflect.DeepEqual(oldNode, newNode) { - klog.V(5).Infof("find node %s has not changed", newNode.Name) - return - } - m.syncNode(newNode) + + stateUpdateCallbacks: map[reflect.Type][]updateCallback{ + reflect.TypeOf(&slov1alpha1.NodeSLO{}): {}, }, - }) - return m + } } -func (m *statesInformer) Run(stopCh <-chan struct{}) error { +func (s *statesInformer) Run(stopCh <-chan struct{}) error { defer utilruntime.HandleCrash() - klog.Infof("starting statesInformer") - - klog.Infof("starting informer for Node") - go m.nodeInformer.Run(stopCh) - if !cache.WaitForCacheSync(stopCh, m.nodeInformer.HasSynced) { - return fmt.Errorf("timed out waiting for node caches to sync") + klog.Infof("setup statesInformer") + s.setupInformers() + klog.Infof("starting informers") + go s.nodeInformer.Run(stopCh) + go s.nodeSLOInformer.Run(stopCh) + waitInformersSynced := []cache.InformerSynced{s.nodeInformer.HasSynced, s.nodeSLOInformer.HasSynced} + if !cache.WaitForCacheSync(stopCh, waitInformersSynced...) { + return fmt.Errorf("timed out waiting for states informer caches to sync") } - if m.config.KubeletSyncIntervalSeconds > 0 { - hdlID := m.pleg.AddHandler(pleg.PodLifeCycleHandlerFuncs{ + if s.config.KubeletSyncIntervalSeconds > 0 { + hdlID := s.pleg.AddHandler(pleg.PodLifeCycleHandlerFuncs{ PodAddedFunc: func(podID string) { // There is no need to notify to update the data when the channel is not empty - if len(m.podCreated) == 0 { - m.podCreated <- podID + if len(s.podCreated) == 0 { + s.podCreated <- podID } }, }) - defer m.pleg.RemoverHandler(hdlID) + defer s.pleg.RemoverHandler(hdlID) - go m.syncKubeletLoop(time.Duration(m.config.KubeletSyncIntervalSeconds)*time.Second, stopCh) + go s.syncKubeletLoop(time.Duration(s.config.KubeletSyncIntervalSeconds)*time.Second, stopCh) } else { klog.Infof("KubeletSyncIntervalSeconds is %d, statesInformer sync of kubelet is disabled", - m.config.KubeletSyncIntervalSeconds) + s.config.KubeletSyncIntervalSeconds) } - klog.Infof("start meta service successfully") + klog.Infof("start states informer successfully") <-stopCh - klog.Infof("shutting down meta service daemon") + klog.Infof("shutting down states informer daemon") return nil } -func (m *statesInformer) HasSynced() bool { - return m.hasSynced.Load() +func (s *statesInformer) HasSynced() bool { + return s.podHasSynced.Load() } -func (m *statesInformer) GetNode() *corev1.Node { - m.nodeRWMutex.RLock() - defer m.nodeRWMutex.RUnlock() - if m.node == nil { +func (s *statesInformer) GetNode() *corev1.Node { + s.nodeRWMutex.RLock() + defer s.nodeRWMutex.RUnlock() + if s.node == nil { return nil } - return m.node.DeepCopy() + return s.node.DeepCopy() } -func (m *statesInformer) GetAllPods() []*PodMeta { - m.podRWMutex.RLock() - defer m.podRWMutex.RUnlock() - pods := make([]*PodMeta, 0, len(m.podMap)) - for _, pod := range m.podMap { +func (s *statesInformer) GetAllPods() []*PodMeta { + s.podRWMutex.RLock() + defer s.podRWMutex.RUnlock() + pods := make([]*PodMeta, 0, len(s.podMap)) + for _, pod := range s.podMap { pods = append(pods, pod.DeepCopy()) } return pods @@ -168,7 +160,7 @@ func newNodeInformer(client clientset.Interface, nodeName string) cache.SharedIn return cache.NewSharedIndexInformer( &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + ListFunc: func(options metav1.ListOptions) (apiruntime.Object, error) { tweakListOptionsFunc(&options) return client.CoreV1().Nodes().List(context.TODO(), options) }, @@ -183,65 +175,28 @@ func newNodeInformer(client clientset.Interface, nodeName string) cache.SharedIn ) } -func (m *statesInformer) syncNode(newNode *corev1.Node) { - klog.V(5).Infof("node update detail %v", newNode) - m.nodeRWMutex.Lock() - defer m.nodeRWMutex.Unlock() - m.node = newNode - - // also register node for metrics - metrics.Register(newNode) -} - -func (m *statesInformer) syncKubelet() error { - podList, err := m.kubelet.GetAllPods() - if err != nil { - klog.Warningf("get pods from kubelet failed, err: %v", err) - return err - } - newPodMap := make(map[string]*PodMeta, len(podList.Items)) - for _, pod := range podList.Items { - newPodMap[string(pod.UID)] = &PodMeta{ - Pod: pod.DeepCopy(), - CgroupDir: genPodCgroupParentDir(&pod), - } - } - m.podMap = newPodMap - m.hasSynced.Store(true) - m.podUpdatedTime = time.Now() - klog.Infof("get pods from kubelet success, len %d", len(m.podMap)) - return nil -} - -func (m *statesInformer) syncKubeletLoop(duration time.Duration, stopCh <-chan struct{}) { - timer := time.NewTimer(duration) - defer timer.Stop() - // TODO add a config to setup the values - rateLimiter := rate.NewLimiter(5, 10) - for { - select { - case <-m.podCreated: - if rateLimiter.Allow() { - // sync kubelet triggered immediately when the Pod is created - m.syncKubelet() - // reset timer to - if !timer.Stop() { - <-timer.C - } - timer.Reset(duration) - } - case <-timer.C: - timer.Reset(duration) - m.syncKubelet() - case <-stopCh: - klog.Infof("sync kubelet loop is exited") - return - } +func newNodeSLOInformer(client koordclientset.Interface, nodeName string) cache.SharedIndexInformer { + tweakListOptionFunc := func(opt *metav1.ListOptions) { + opt.FieldSelector = "metadata.name=" + nodeName } + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (apiruntime.Object, error) { + tweakListOptionFunc(&options) + return client.SloV1alpha1().NodeSLOs().List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + tweakListOptionFunc(&options) + return client.SloV1alpha1().NodeSLOs().Watch(context.TODO(), options) + }, + }, + &slov1alpha1.NodeSLO{}, + time.Hour*12, + cache.Indexers{}, + ) } -func genPodCgroupParentDir(pod *corev1.Pod) string { - // todo use cri interface to get pod cgroup dir - // e.g. kubepods-burstable.slice/kubepods-burstable-pod9dba1d9e_67ba_4db6_8a73_fb3ea297c363.slice/ - return util.GetPodKubeRelativePath(pod) +func (s *statesInformer) setupInformers() { + s.setupNodeInformer() + s.setupNodeSLOInformer() } diff --git a/pkg/koordlet/statesinformer/states_informer_test.go b/pkg/koordlet/statesinformer/states_informer_test.go index 0fdab58c20..decb861199 100644 --- a/pkg/koordlet/statesinformer/states_informer_test.go +++ b/pkg/koordlet/statesinformer/states_informer_test.go @@ -20,11 +20,13 @@ import ( "path/filepath" "testing" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - + koordclientfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake" "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" + "github.com/koordinator-sh/koordinator/pkg/koordlet/pleg" "github.com/koordinator-sh/koordinator/pkg/util/system" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientsetfake "k8s.io/client-go/kubernetes/fake" ) func Test_genPodCgroupParentDirWithSystemdDriver(t *testing.T) { @@ -137,7 +139,7 @@ func Test_genPodCgroupParentDirWithCgroupfsDriver(t *testing.T) { } } -func Test_metaService_syncNode(t *testing.T) { +func Test_statesInformer_syncNode(t *testing.T) { testingNode := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "test", @@ -152,39 +154,33 @@ func Test_metaService_syncNode(t *testing.T) { m.syncNode(testingNode) } -// TODO: fix data race, https://github.com/koordinator-sh/koordinator/issues/77 - -// type testKubeletStub struct { -// pods corev1.PodList -// } -// -// func (t *testKubeletStub) GetAllPods() (corev1.PodList, error) { -// return t.pods, nil -// } -// -// func Test_metaService_syncPods(t *testing.T) { -// client := clientsetfake.NewSimpleClientset() -// pleg, _ := pleg.NewPLEG(sysutil.Conf.CgroupRootDir) -// stopCh := make(chan struct{}) -// defer close(stopCh) -// -// c := NewDefaultConfig() -// c.KubeletSyncIntervalSeconds = 60 -// m := NewStatesInformer(c, client, pleg, "localhost") -// m.(*statesInformer).kubelet = &testKubeletStub{pods: corev1.PodList{ -// Items: []corev1.Pod{ -// {}, -// }, -// }} -// -// go m.Run(stopCh) -// -// m.(*statesInformer).podCreated <- "pod1" -// time.Sleep(200 * time.Millisecond) -// if time.Since(m.(*statesInformer).podUpdatedTime) > time.Second { -// t.Errorf("failed to triggle update by pod created event") -// } -// if len(m.(*statesInformer).podMap) != 1 { -// t.Errorf("failed to update pods") -// } -// } +type testKubeletStub struct { + pods corev1.PodList +} + +func (t *testKubeletStub) GetAllPods() (corev1.PodList, error) { + return t.pods, nil +} + +func Test_statesInformer_syncPods(t *testing.T) { + client := clientsetfake.NewSimpleClientset() + crdClient := koordclientfake.NewSimpleClientset() + pleg, _ := pleg.NewPLEG(system.Conf.CgroupRootDir) + stopCh := make(chan struct{}, 1) + defer close(stopCh) + + c := NewDefaultConfig() + c.KubeletSyncIntervalSeconds = 60 + m := NewStatesInformer(c, client, crdClient, pleg, "localhost") + m.(*statesInformer).kubelet = &testKubeletStub{pods: corev1.PodList{ + Items: []corev1.Pod{ + {}, + }, + }} + + m.(*statesInformer).syncKubelet() + + if len(m.(*statesInformer).GetAllPods()) != 1 { + t.Errorf("failed to update pods") + } +} diff --git a/pkg/koordlet/statesinformer/states_node.go b/pkg/koordlet/statesinformer/states_node.go new file mode 100644 index 0000000000..d8cdb2497e --- /dev/null +++ b/pkg/koordlet/statesinformer/states_node.go @@ -0,0 +1,63 @@ +/* + Copyright 2022 The Koordinator Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package statesinformer + +import ( + "reflect" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" +) + +func (s *statesInformer) setupNodeInformer() { + s.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + node, ok := obj.(*corev1.Node) + if ok { + s.syncNode(node) + } else { + klog.Errorf("node informer add func parse Node failed, obj %T", obj) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldNode, oldOK := oldObj.(*corev1.Node) + newNode, newOK := newObj.(*corev1.Node) + if !oldOK || !newOK { + klog.Errorf("unable to convert object to *corev1.Node, old %T, new %T", oldObj, newObj) + return + } + if reflect.DeepEqual(oldNode, newNode) { + klog.V(5).Infof("find node %s has not changed", newNode.Name) + return + } + s.syncNode(newNode) + }, + }) +} + +func (s *statesInformer) syncNode(newNode *corev1.Node) { + klog.V(5).Infof("node update detail %v", newNode) + s.nodeRWMutex.Lock() + defer s.nodeRWMutex.Unlock() + s.node = newNode + + // also register node for metrics + metrics.Register(newNode) +} diff --git a/pkg/koordlet/resmanager/strategy_util.go b/pkg/koordlet/statesinformer/states_nodeslo.go similarity index 52% rename from pkg/koordlet/resmanager/strategy_util.go rename to pkg/koordlet/statesinformer/states_nodeslo.go index 4ac9b5837e..c163dd75a6 100644 --- a/pkg/koordlet/resmanager/strategy_util.go +++ b/pkg/koordlet/statesinformer/states_nodeslo.go @@ -14,17 +14,106 @@ limitations under the License. */ -package resmanager +package statesinformer import ( "encoding/json" + "reflect" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" "github.com/koordinator-sh/koordinator/pkg/util" ) +func (s *statesInformer) GetNodeSLO() *slov1alpha1.NodeSLO { + s.nodeSLORWMutex.RLock() + defer s.nodeSLORWMutex.RUnlock() + return s.nodeSLO.DeepCopy() +} + +func (s *statesInformer) setupNodeSLOInformer() { + s.nodeSLOInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + nodeSLO, ok := obj.(*slov1alpha1.NodeSLO) + if ok { + s.updateNodeSLOSpec(nodeSLO) + klog.Infof("create NodeSLO %v", nodeSLO) + } else { + klog.Errorf("node slo informer add func parse nodeSLO failed") + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldNodeSLO, oldOK := oldObj.(*slov1alpha1.NodeSLO) + newNodeSLO, newOK := newObj.(*slov1alpha1.NodeSLO) + if !oldOK || !newOK { + klog.Errorf("unable to convert object to *slov1alpha1.NodeSLO, old %T, new %T", oldObj, newObj) + return + } + if reflect.DeepEqual(oldNodeSLO.Spec, newNodeSLO.Spec) { + klog.V(5).Infof("find NodeSLO spec %s has not changed", newNodeSLO.Name) + return + } + klog.Infof("update NodeSLO spec %v", newNodeSLO.Spec) + s.updateNodeSLOSpec(newNodeSLO) + }, + }) +} + +func (s *statesInformer) updateNodeSLOSpec(nodeSLO *slov1alpha1.NodeSLO) { + s.setNodeSLOSpec(nodeSLO) + s.runCallbacks(reflect.TypeOf(&slov1alpha1.NodeSLO{}), s.GetNodeSLO()) +} + +func (s *statesInformer) setNodeSLOSpec(nodeSLO *slov1alpha1.NodeSLO) { + s.nodeSLORWMutex.Lock() + defer s.nodeSLORWMutex.Unlock() + + oldNodeSLOStr := util.DumpJSON(s.nodeSLO) + + if s.nodeSLO == nil { + s.nodeSLO = nodeSLO.DeepCopy() + } else { + s.nodeSLO.Spec = nodeSLO.Spec + } + + // merge nodeSLO spec with the default config + s.mergeNodeSLOSpec(nodeSLO) + + newNodeSLOStr := util.DumpJSON(s.nodeSLO) + klog.Infof("update nodeSLO content: old %s, new %s", oldNodeSLOStr, newNodeSLOStr) +} + +func (s *statesInformer) mergeNodeSLOSpec(nodeSLO *slov1alpha1.NodeSLO) { + if s.nodeSLO == nil || nodeSLO == nil { + klog.Errorf("failed to merge with nil nodeSLO, old is nil: %v, new is nil: %v", s.nodeSLO == nil, nodeSLO == nil) + return + } + + // merge ResourceUsedThresholdWithBE individually for nil-ResourceUsedThresholdWithBE case + mergedResourceUsedThresholdWithBESpec := mergeSLOSpecResourceUsedThresholdWithBE(util.DefaultNodeSLOSpecConfig().ResourceUsedThresholdWithBE, + nodeSLO.Spec.ResourceUsedThresholdWithBE) + if mergedResourceUsedThresholdWithBESpec != nil { + s.nodeSLO.Spec.ResourceUsedThresholdWithBE = mergedResourceUsedThresholdWithBESpec + } + + // merge ResourceQoSStrategy + mergedResourceQoSStrategySpec := mergeSLOSpecResourceQoSStrategy(util.DefaultNodeSLOSpecConfig().ResourceQoSStrategy, + nodeSLO.Spec.ResourceQoSStrategy) + mergeNoneResourceQoSIfDisabled(mergedResourceQoSStrategySpec) + if mergedResourceQoSStrategySpec != nil { + s.nodeSLO.Spec.ResourceQoSStrategy = mergedResourceQoSStrategySpec + } + + // merge CPUBurstStrategy + mergedCPUBurstStrategySpec := mergeSLOSpecCPUBurstStrategy(util.DefaultNodeSLOSpecConfig().CPUBurstStrategy, + nodeSLO.Spec.CPUBurstStrategy) + if mergedCPUBurstStrategySpec != nil { + s.nodeSLO.Spec.CPUBurstStrategy = mergedCPUBurstStrategySpec + } +} + // mergeSLOSpecResourceUsedThresholdWithBE merges the nodeSLO ResourceUsedThresholdWithBE with default configs func mergeSLOSpecResourceUsedThresholdWithBE(defaultSpec, newSpec *slov1alpha1.ResourceThresholdStrategy) *slov1alpha1.ResourceThresholdStrategy { spec := &slov1alpha1.ResourceThresholdStrategy{} @@ -69,6 +158,7 @@ func mergeSLOSpecCPUBurstStrategy(defaultSpec, // mergeNoneResourceQoSIfDisabled complete ResourceQoSStrategy according to enable statuses of qos features func mergeNoneResourceQoSIfDisabled(resourceQoS *slov1alpha1.ResourceQoSStrategy) { + mergeNoneCPUQoSIfDisabled(resourceQoS) mergeNoneResctrlQoSIfDisabled(resourceQoS) mergeNoneMemoryQoSIfDisabled(resourceQoS) klog.V(5).Infof("get merged node ResourceQoS %v", util.DumpJSON(resourceQoS)) @@ -106,3 +196,19 @@ func mergeNoneMemoryQoSIfDisabled(resourceQoS *slov1alpha1.ResourceQoSStrategy) resourceQoS.BE.MemoryQoS.MemoryQoS = *util.NoneMemoryQoS() } } + +func mergeNoneCPUQoSIfDisabled(resourceQoS *slov1alpha1.ResourceQoSStrategy) { + // if CPUQoS.Enabled=false, merge with NoneCPUQoS + if resourceQoS.LSR != nil && resourceQoS.LSR.CPUQoS != nil && + resourceQoS.LSR.CPUQoS.Enable != nil && !(*resourceQoS.LSR.CPUQoS.Enable) { + resourceQoS.LSR.CPUQoS.CPUQoS = *util.NoneCPUQoS() + } + if resourceQoS.LS != nil && resourceQoS.LS.CPUQoS != nil && + resourceQoS.LS.CPUQoS.Enable != nil && !(*resourceQoS.LS.CPUQoS.Enable) { + resourceQoS.LS.CPUQoS.CPUQoS = *util.NoneCPUQoS() + } + if resourceQoS.BE != nil && resourceQoS.BE.CPUQoS != nil && + resourceQoS.BE.CPUQoS.Enable != nil && !(*resourceQoS.BE.CPUQoS.Enable) { + resourceQoS.BE.CPUQoS.CPUQoS = *util.NoneCPUQoS() + } +} diff --git a/pkg/koordlet/statesinformer/states_nodeslo_test.go b/pkg/koordlet/statesinformer/states_nodeslo_test.go new file mode 100644 index 0000000000..2b72ec7dad --- /dev/null +++ b/pkg/koordlet/statesinformer/states_nodeslo_test.go @@ -0,0 +1,452 @@ +/* + Copyright 2022 The Koordinator Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package statesinformer + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/utils/pointer" + + apiext "github.com/koordinator-sh/koordinator/apis/extension" + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/util" +) + +func Test_mergeNodeSLOSpec(t *testing.T) { + testingCustomNodeSLOSpec := slov1alpha1.NodeSLOSpec{ + ResourceUsedThresholdWithBE: &slov1alpha1.ResourceThresholdStrategy{ + CPUSuppressThresholdPercent: pointer.Int64Ptr(80), + }, + ResourceQoSStrategy: &slov1alpha1.ResourceQoSStrategy{ + LSR: util.NoneResourceQoS(apiext.QoSLSR), + LS: util.NoneResourceQoS(apiext.QoSLS), + BE: &slov1alpha1.ResourceQoS{ + CPUQoS: &slov1alpha1.CPUQoSCfg{ + Enable: pointer.BoolPtr(true), + }, + MemoryQoS: &slov1alpha1.MemoryQoSCfg{ + Enable: pointer.BoolPtr(true), + }, + ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ + Enable: pointer.BoolPtr(true), + ResctrlQoS: slov1alpha1.ResctrlQoS{ + CATRangeEndPercent: pointer.Int64Ptr(50), + }, + }, + }, + }, + CPUBurstStrategy: &slov1alpha1.CPUBurstStrategy{ + CPUBurstConfig: slov1alpha1.CPUBurstConfig{}, + SharePoolThresholdPercent: nil, + }, + } + testingMergedNodeSLOSpec := util.DefaultNodeSLOSpecConfig() + mergedInterface, err := util.MergeCfg(&testingMergedNodeSLOSpec, &testingCustomNodeSLOSpec) + assert.NoError(t, err) + testingMergedNodeSLOSpec = *mergedInterface.(*slov1alpha1.NodeSLOSpec) + type args struct { + nodeSLO *slov1alpha1.NodeSLO + } + type field struct { + nodeSLO *slov1alpha1.NodeSLO + } + tests := []struct { + name string + args args + field field + want *slov1alpha1.NodeSLO + }{ + { + name: "skip the merge if the old one is nil", + args: args{ + nodeSLO: &slov1alpha1.NodeSLO{}, + }, + field: field{nodeSLO: nil}, + want: nil, + }, + { + name: "skip the merge if the new one is nil", + field: field{ + nodeSLO: &slov1alpha1.NodeSLO{}, + }, + want: &slov1alpha1.NodeSLO{}, + }, + { + name: "use default and do not panic if the new is nil", + field: field{ + nodeSLO: &slov1alpha1.NodeSLO{ + Spec: util.DefaultNodeSLOSpecConfig(), + }, + }, + want: &slov1alpha1.NodeSLO{ + Spec: util.DefaultNodeSLOSpecConfig(), + }, + }, + { + name: "merge with the default", + args: args{ + nodeSLO: &slov1alpha1.NodeSLO{ + Spec: testingCustomNodeSLOSpec, + }, + }, + field: field{ + nodeSLO: &slov1alpha1.NodeSLO{ + Spec: slov1alpha1.NodeSLOSpec{ + ResourceUsedThresholdWithBE: &slov1alpha1.ResourceThresholdStrategy{ + CPUSuppressThresholdPercent: pointer.Int64Ptr(100), + MemoryEvictThresholdPercent: pointer.Int64Ptr(100), + }, + ResourceQoSStrategy: &slov1alpha1.ResourceQoSStrategy{ + LSR: &slov1alpha1.ResourceQoS{ + ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ + ResctrlQoS: slov1alpha1.ResctrlQoS{ + CATRangeStartPercent: pointer.Int64Ptr(0), + CATRangeEndPercent: pointer.Int64Ptr(100), + }, + }, + }, + LS: &slov1alpha1.ResourceQoS{ + ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ + ResctrlQoS: slov1alpha1.ResctrlQoS{ + CATRangeStartPercent: pointer.Int64Ptr(0), + CATRangeEndPercent: pointer.Int64Ptr(100), + }, + }, + }, + BE: &slov1alpha1.ResourceQoS{ + ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ + ResctrlQoS: slov1alpha1.ResctrlQoS{ + CATRangeStartPercent: pointer.Int64Ptr(0), + CATRangeEndPercent: pointer.Int64Ptr(40), + }, + }, + }, + }, + }, + }, + }, + want: &slov1alpha1.NodeSLO{ + Spec: testingMergedNodeSLOSpec, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + r := statesInformer{nodeSLO: tt.field.nodeSLO} + r.mergeNodeSLOSpec(tt.args.nodeSLO) + assert.Equal(t, tt.want, r.nodeSLO) + }) + } +} + +func Test_createNodeSLO(t *testing.T) { + testingNewNodeSLO := &slov1alpha1.NodeSLO{ + Spec: util.DefaultNodeSLOSpecConfig(), + } + testingNewNodeSLO.Spec.ResourceUsedThresholdWithBE = &slov1alpha1.ResourceThresholdStrategy{ + Enable: pointer.BoolPtr(true), + CPUSuppressThresholdPercent: pointer.Int64Ptr(80), + } + + testingNewNodeSLO.Spec.ResourceQoSStrategy.BE = &slov1alpha1.ResourceQoS{ + ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ + Enable: pointer.BoolPtr(true), + ResctrlQoS: slov1alpha1.ResctrlQoS{ + CATRangeStartPercent: pointer.Int64Ptr(0), + CATRangeEndPercent: pointer.Int64Ptr(20), + }, + }, + } + + testingCreatedNodeSLO := &slov1alpha1.NodeSLO{ + Spec: util.DefaultNodeSLOSpecConfig(), + } + testingCreatedNodeSLO.Spec.ResourceUsedThresholdWithBE.Enable = pointer.BoolPtr(true) + testingCreatedNodeSLO.Spec.ResourceUsedThresholdWithBE.CPUSuppressThresholdPercent = pointer.Int64Ptr(80) + testingCreatedNodeSLO.Spec.ResourceQoSStrategy.LSR = util.NoneResourceQoS(apiext.QoSLSR) + testingCreatedNodeSLO.Spec.ResourceQoSStrategy.LS = util.NoneResourceQoS(apiext.QoSLS) + testingCreatedNodeSLO.Spec.ResourceQoSStrategy.BE = util.NoneResourceQoS(apiext.QoSBE) + testingCreatedNodeSLO.Spec.ResourceQoSStrategy.BE.ResctrlQoS.Enable = pointer.BoolPtr(true) + testingCreatedNodeSLO.Spec.ResourceQoSStrategy.BE.ResctrlQoS.CATRangeStartPercent = pointer.Int64Ptr(0) + testingCreatedNodeSLO.Spec.ResourceQoSStrategy.BE.ResctrlQoS.CATRangeEndPercent = pointer.Int64Ptr(20) + + r := statesInformer{nodeSLO: nil} + + r.updateNodeSLOSpec(testingNewNodeSLO) + assert.Equal(t, testingCreatedNodeSLO, r.nodeSLO) +} + +func Test_updateNodeSLOSpec(t *testing.T) { + testingNewNodeSLO := &slov1alpha1.NodeSLO{ + Spec: slov1alpha1.NodeSLOSpec{ + ResourceUsedThresholdWithBE: &slov1alpha1.ResourceThresholdStrategy{ + Enable: pointer.BoolPtr(true), + CPUSuppressThresholdPercent: pointer.Int64Ptr(80), + }, + ResourceQoSStrategy: &slov1alpha1.ResourceQoSStrategy{ + BE: &slov1alpha1.ResourceQoS{ + ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ + Enable: pointer.BoolPtr(true), + ResctrlQoS: slov1alpha1.ResctrlQoS{ + CATRangeStartPercent: pointer.Int64Ptr(0), + CATRangeEndPercent: pointer.Int64Ptr(20), + }, + }, + }, + }, + }, + } + testingUpdatedNodeSLO := &slov1alpha1.NodeSLO{ + Spec: util.DefaultNodeSLOSpecConfig(), + } + testingUpdatedNodeSLO.Spec.ResourceUsedThresholdWithBE.Enable = pointer.BoolPtr(true) + testingUpdatedNodeSLO.Spec.ResourceUsedThresholdWithBE.CPUSuppressThresholdPercent = pointer.Int64Ptr(80) + testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.LSR.CPUQoS.CPUQoS = *util.NoneCPUQoS() + testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.LSR.MemoryQoS.MemoryQoS = *util.NoneMemoryQoS() + testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.LSR.ResctrlQoS.ResctrlQoS = *util.NoneResctrlQoS() + + testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.LS.CPUQoS.CPUQoS = *util.NoneCPUQoS() + testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.LS.MemoryQoS.MemoryQoS = *util.NoneMemoryQoS() + testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.LS.ResctrlQoS.ResctrlQoS = *util.NoneResctrlQoS() + + testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.BE.CPUQoS.CPUQoS = *util.NoneCPUQoS() + testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.BE.MemoryQoS.MemoryQoS = *util.NoneMemoryQoS() + testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.BE.ResctrlQoS.Enable = pointer.BoolPtr(true) + testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.BE.ResctrlQoS.CATRangeStartPercent = pointer.Int64Ptr(0) + testingUpdatedNodeSLO.Spec.ResourceQoSStrategy.BE.ResctrlQoS.CATRangeEndPercent = pointer.Int64Ptr(20) + + r := statesInformer{ + nodeSLO: &slov1alpha1.NodeSLO{ + Spec: slov1alpha1.NodeSLOSpec{ + ResourceUsedThresholdWithBE: &slov1alpha1.ResourceThresholdStrategy{ + CPUSuppressThresholdPercent: pointer.Int64Ptr(90), + MemoryEvictThresholdPercent: pointer.Int64Ptr(90), + }, + }, + }, + } + + r.updateNodeSLOSpec(testingNewNodeSLO) + assert.Equal(t, testingUpdatedNodeSLO, r.nodeSLO) +} + +func Test_mergeSLOSpecResourceUsedThresholdWithBE(t *testing.T) { + testingDefaultSpec := util.DefaultResourceThresholdStrategy() + testingNewSpec := &slov1alpha1.ResourceThresholdStrategy{ + Enable: pointer.BoolPtr(true), + CPUSuppressThresholdPercent: pointer.Int64Ptr(80), + MemoryEvictThresholdPercent: pointer.Int64Ptr(75), + } + testingNewSpec1 := &slov1alpha1.ResourceThresholdStrategy{ + Enable: pointer.BoolPtr(true), + CPUSuppressThresholdPercent: pointer.Int64Ptr(80), + } + testingMergedSpec := &slov1alpha1.ResourceThresholdStrategy{ + Enable: pointer.BoolPtr(true), + CPUSuppressThresholdPercent: pointer.Int64Ptr(80), + MemoryEvictThresholdPercent: pointer.Int64Ptr(70), + CPUSuppressPolicy: slov1alpha1.CPUSetPolicy, + } + type args struct { + defaultSpec *slov1alpha1.ResourceThresholdStrategy + newSpec *slov1alpha1.ResourceThresholdStrategy + } + tests := []struct { + name string + args args + want *slov1alpha1.ResourceThresholdStrategy + }{ + { + name: "both empty", + args: args{ + defaultSpec: &slov1alpha1.ResourceThresholdStrategy{}, + newSpec: &slov1alpha1.ResourceThresholdStrategy{}, + }, + want: &slov1alpha1.ResourceThresholdStrategy{}, + }, + { + name: "totally use new", + args: args{ + defaultSpec: &slov1alpha1.ResourceThresholdStrategy{}, + newSpec: testingNewSpec, + }, + want: testingNewSpec, + }, + { + name: "totally use new 1", + args: args{ + defaultSpec: testingDefaultSpec, + newSpec: testingNewSpec, + }, + want: &slov1alpha1.ResourceThresholdStrategy{ + Enable: pointer.BoolPtr(true), + CPUSuppressThresholdPercent: pointer.Int64Ptr(80), + MemoryEvictThresholdPercent: pointer.Int64Ptr(75), + CPUSuppressPolicy: slov1alpha1.CPUSetPolicy, + }, + }, + { + name: "partially use new, merging with the default", + args: args{ + defaultSpec: testingDefaultSpec, + newSpec: testingNewSpec1, + }, + want: testingMergedSpec, + }, + { + name: "new overwrite a nil", + args: args{ + defaultSpec: testingDefaultSpec, + }, + want: testingDefaultSpec, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := mergeSLOSpecResourceUsedThresholdWithBE(tt.args.defaultSpec, tt.args.newSpec) + assert.Equal(t, tt.want, got) + }) + } +} + +func Test_mergeSLOSpecResourceQoSStrategy(t *testing.T) { + testingDefaultSpec := util.DefaultResourceQoSStrategy() + + testingNewSpec := testingDefaultSpec.DeepCopy() + testingNewSpec.BE.MemoryQoS.WmarkRatio = pointer.Int64Ptr(0) + + testingNewSpec1 := &slov1alpha1.ResourceQoSStrategy{ + BE: &slov1alpha1.ResourceQoS{ + MemoryQoS: &slov1alpha1.MemoryQoSCfg{ + Enable: pointer.BoolPtr(true), + MemoryQoS: slov1alpha1.MemoryQoS{ + WmarkRatio: pointer.Int64Ptr(90), + }, + }, + }, + } + + testingMergedSpec := testingDefaultSpec.DeepCopy() + testingMergedSpec.BE.MemoryQoS.Enable = pointer.BoolPtr(true) + testingMergedSpec.BE.MemoryQoS.WmarkRatio = pointer.Int64Ptr(90) + + type args struct { + defaultSpec *slov1alpha1.ResourceQoSStrategy + newSpec *slov1alpha1.ResourceQoSStrategy + } + tests := []struct { + name string + args args + want *slov1alpha1.ResourceQoSStrategy + }{ + { + name: "both empty", + args: args{ + defaultSpec: &slov1alpha1.ResourceQoSStrategy{}, + newSpec: &slov1alpha1.ResourceQoSStrategy{}, + }, + want: &slov1alpha1.ResourceQoSStrategy{}, + }, + { + name: "totally use new", + args: args{ + defaultSpec: &slov1alpha1.ResourceQoSStrategy{}, + newSpec: testingNewSpec, + }, + want: testingNewSpec, + }, + { + name: "totally use new 1", + args: args{ + defaultSpec: testingDefaultSpec, + newSpec: testingNewSpec, + }, + want: testingNewSpec, + }, + { + name: "partially use new, merging with the default", + args: args{ + defaultSpec: testingDefaultSpec, + newSpec: testingNewSpec1, + }, + want: testingMergedSpec, + }, + { + name: "new overwrite a nil", + args: args{ + defaultSpec: testingDefaultSpec, + }, + want: testingDefaultSpec, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := mergeSLOSpecResourceQoSStrategy(tt.args.defaultSpec, tt.args.newSpec) + assert.Equal(t, tt.want, got) + }) + } +} + +func Test_mergeNoneResourceQoSIfDisabled(t *testing.T) { + testDefault := util.DefaultResourceQoSStrategy() + testAllNone := util.NoneResourceQoSStrategy() + + testLSMemQOSEnabled := testDefault.DeepCopy() + testLSMemQOSEnabled.LS.MemoryQoS.Enable = pointer.BoolPtr(true) + testLSMemQOSEnabledResult := util.NoneResourceQoSStrategy() + testLSMemQOSEnabledResult.LS.MemoryQoS.Enable = pointer.BoolPtr(true) + testLSMemQOSEnabledResult.LS.MemoryQoS.MemoryQoS = *util.DefaultMemoryQoS(apiext.QoSLS) + + type args struct { + nodeCfg *slov1alpha1.NodeSLO + } + tests := []struct { + name string + args args + want *slov1alpha1.ResourceQoSStrategy + }{ + { + name: "all disabled", + args: args{ + nodeCfg: &slov1alpha1.NodeSLO{ + Spec: slov1alpha1.NodeSLOSpec{ + ResourceQoSStrategy: testDefault, + }, + }, + }, + want: testAllNone, + }, + { + name: "only ls memory qos enabled", + args: args{ + nodeCfg: &slov1alpha1.NodeSLO{ + Spec: slov1alpha1.NodeSLOSpec{ + ResourceQoSStrategy: testLSMemQOSEnabled, + }, + }, + }, + want: testLSMemQOSEnabledResult, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mergeNoneResourceQoSIfDisabled(tt.args.nodeCfg.Spec.ResourceQoSStrategy) + assert.Equal(t, tt.want, tt.args.nodeCfg.Spec.ResourceQoSStrategy) + }) + } +} diff --git a/pkg/koordlet/statesinformer/states_pod.go b/pkg/koordlet/statesinformer/states_pod.go new file mode 100644 index 0000000000..c8c595440c --- /dev/null +++ b/pkg/koordlet/statesinformer/states_pod.go @@ -0,0 +1,80 @@ +/* + Copyright 2022 The Koordinator Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package statesinformer + +import ( + "time" + + "golang.org/x/time/rate" + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/pkg/util" +) + +func (s *statesInformer) syncKubelet() error { + podList, err := s.kubelet.GetAllPods() + if err != nil { + klog.Warningf("get pods from kubelet failed, err: %v", err) + return err + } + newPodMap := make(map[string]*PodMeta, len(podList.Items)) + for _, pod := range podList.Items { + newPodMap[string(pod.UID)] = &PodMeta{ + Pod: pod.DeepCopy(), + CgroupDir: genPodCgroupParentDir(&pod), + } + } + s.podMap = newPodMap + s.podHasSynced.Store(true) + s.podUpdatedTime = time.Now() + klog.Infof("get pods from kubelet success, len %d", len(s.podMap)) + return nil +} + +func (s *statesInformer) syncKubeletLoop(duration time.Duration, stopCh <-chan struct{}) { + timer := time.NewTimer(duration) + defer timer.Stop() + // TODO add a config to setup the values + rateLimiter := rate.NewLimiter(5, 10) + for { + select { + case <-s.podCreated: + if rateLimiter.Allow() { + // sync kubelet triggered immediately when the Pod is created + s.syncKubelet() + // reset timer to + if !timer.Stop() { + <-timer.C + } + timer.Reset(duration) + } + case <-timer.C: + timer.Reset(duration) + s.syncKubelet() + case <-stopCh: + klog.Infof("sync kubelet loop is exited") + return + } + } +} + +func genPodCgroupParentDir(pod *corev1.Pod) string { + // todo use cri interface to get pod cgroup dir + // e.g. kubepods-burstable.slice/kubepods-burstable-pod9dba1d9e_67ba_4db6_8a73_fb3ea297c363.slice/ + return util.GetPodKubeRelativePath(pod) +} diff --git a/pkg/util/config.go b/pkg/util/config.go index b0b0b49e4d..595676809f 100644 --- a/pkg/util/config.go +++ b/pkg/util/config.go @@ -42,6 +42,27 @@ func DefaultResourceThresholdStrategy() *slov1alpha1.ResourceThresholdStrategy { } } +func DefaultCPUQoS(qos apiext.QoSClass) *slov1alpha1.CPUQoS { + var cpuQoS *slov1alpha1.CPUQoS + switch qos { + case apiext.QoSLSR: + cpuQoS = &slov1alpha1.CPUQoS{ + GroupIdentity: pointer.Int64Ptr(2), + } + case apiext.QoSLS: + cpuQoS = &slov1alpha1.CPUQoS{ + GroupIdentity: pointer.Int64Ptr(2), + } + case apiext.QoSBE: + cpuQoS = &slov1alpha1.CPUQoS{ + GroupIdentity: pointer.Int64Ptr(-1), + } + default: + klog.Infof("cpu qos has no auto config for qos %s", qos) + } + return cpuQoS +} + // TODO https://github.com/koordinator-sh/koordinator/pull/94#discussion_r858786733 func DefaultResctrlQoS(qos apiext.QoSClass) *slov1alpha1.ResctrlQoS { var resctrlQoS *slov1alpha1.ResctrlQoS @@ -128,6 +149,10 @@ func DefaultMemoryQoS(qos apiext.QoSClass) *slov1alpha1.MemoryQoS { func DefaultResourceQoSStrategy() *slov1alpha1.ResourceQoSStrategy { return &slov1alpha1.ResourceQoSStrategy{ LSR: &slov1alpha1.ResourceQoS{ + CPUQoS: &slov1alpha1.CPUQoSCfg{ + Enable: pointer.BoolPtr(false), + CPUQoS: *DefaultCPUQoS(apiext.QoSLSR), + }, ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ Enable: pointer.BoolPtr(false), ResctrlQoS: *DefaultResctrlQoS(apiext.QoSLSR), @@ -138,6 +163,10 @@ func DefaultResourceQoSStrategy() *slov1alpha1.ResourceQoSStrategy { }, }, LS: &slov1alpha1.ResourceQoS{ + CPUQoS: &slov1alpha1.CPUQoSCfg{ + Enable: pointer.BoolPtr(false), + CPUQoS: *DefaultCPUQoS(apiext.QoSLS), + }, ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ Enable: pointer.BoolPtr(false), ResctrlQoS: *DefaultResctrlQoS(apiext.QoSLS), @@ -148,6 +177,10 @@ func DefaultResourceQoSStrategy() *slov1alpha1.ResourceQoSStrategy { }, }, BE: &slov1alpha1.ResourceQoS{ + CPUQoS: &slov1alpha1.CPUQoSCfg{ + Enable: pointer.BoolPtr(false), + CPUQoS: *DefaultCPUQoS(apiext.QoSBE), + }, ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ Enable: pointer.BoolPtr(false), ResctrlQoS: *DefaultResctrlQoS(apiext.QoSBE), @@ -162,6 +195,10 @@ func DefaultResourceQoSStrategy() *slov1alpha1.ResourceQoSStrategy { func NoneResourceQoS(qos apiext.QoSClass) *slov1alpha1.ResourceQoS { return &slov1alpha1.ResourceQoS{ + CPUQoS: &slov1alpha1.CPUQoSCfg{ + Enable: pointer.BoolPtr(false), + CPUQoS: *NoneCPUQoS(), + }, ResctrlQoS: &slov1alpha1.ResctrlQoSCfg{ Enable: pointer.BoolPtr(false), ResctrlQoS: *NoneResctrlQoS(), @@ -173,6 +210,12 @@ func NoneResourceQoS(qos apiext.QoSClass) *slov1alpha1.ResourceQoS { } } +func NoneCPUQoS() *slov1alpha1.CPUQoS { + return &slov1alpha1.CPUQoS{ + GroupIdentity: pointer.Int64(0), + } +} + func NoneResctrlQoS() *slov1alpha1.ResctrlQoS { return &slov1alpha1.ResctrlQoS{ CATRangeStartPercent: pointer.Int64Ptr(0),