diff --git a/pkg/descheduler/apis/config/types_loadaware.go b/pkg/descheduler/apis/config/types_loadaware.go index 061c60bc3..adabc6c49 100644 --- a/pkg/descheduler/apis/config/types_loadaware.go +++ b/pkg/descheduler/apis/config/types_loadaware.go @@ -64,6 +64,11 @@ type LowNodeLoadArgs struct { // LowThresholds defines the low usage threshold of resources LowThresholds ResourceThresholds + + // AnomalyCondition indicates the node load anomaly thresholds, + // the default is 5 consecutive times exceeding HighThresholds, + // it is determined that the node is abnormal, and the Pods need to be migrated to reduce the load. + AnomalyCondition *LoadAnomalyCondition } type LowNodeLoadPodSelector struct { @@ -72,3 +77,10 @@ type LowNodeLoadPodSelector struct { // Selector label query over pods for migrated Selector *metav1.LabelSelector } + +type LoadAnomalyCondition struct { + // Timeout indicates the expiration time of the abnormal state, the default is 1 minute + Timeout metav1.Duration + // ConsecutiveAbnormalities indicates the number of consecutive abnormalities + ConsecutiveAbnormalities uint32 +} diff --git a/pkg/descheduler/apis/config/v1alpha2/defaults.go b/pkg/descheduler/apis/config/v1alpha2/defaults.go index ad2647f2b..d950e5d4f 100644 --- a/pkg/descheduler/apis/config/v1alpha2/defaults.go +++ b/pkg/descheduler/apis/config/v1alpha2/defaults.go @@ -49,6 +49,11 @@ var ( Duration: metav1.Duration{Duration: 5 * time.Minute}, }, } + + defaultLoadAnomalyCondition = &LoadAnomalyCondition{ + Timeout: &metav1.Duration{Duration: 1 * time.Minute}, + ConsecutiveAbnormalities: 5, + } ) func addDefaultingFuncs(scheme *runtime.Scheme) error { @@ -247,4 +252,7 @@ func SetDefaults_LowNodeLoadArgs(obj *LowNodeLoadArgs) { if obj.NodeFit == nil { obj.NodeFit = pointer.Bool(true) } + if obj.AnomalyCondition == nil || obj.AnomalyCondition.ConsecutiveAbnormalities == 0 { + obj.AnomalyCondition = defaultLoadAnomalyCondition + } } diff --git a/pkg/descheduler/apis/config/v1alpha2/types_loadaware.go b/pkg/descheduler/apis/config/v1alpha2/types_loadaware.go index 63ce67156..9d6fe3b22 100644 --- a/pkg/descheduler/apis/config/v1alpha2/types_loadaware.go +++ b/pkg/descheduler/apis/config/v1alpha2/types_loadaware.go @@ -63,6 +63,11 @@ type LowNodeLoadArgs struct { // LowThresholds defines the low usage threshold of resources LowThresholds ResourceThresholds `json:"lowThresholds,omitempty"` + + // AnomalyCondition indicates the node load anomaly thresholds, + // the default is 5 consecutive times exceeding HighThresholds, + // it is determined that the node is abnormal, and the Pods need to be migrated to reduce the load. + AnomalyCondition *LoadAnomalyCondition `json:"anomalyCondition,omitempty"` } type LowNodeLoadPodSelector struct { @@ -71,3 +76,10 @@ type LowNodeLoadPodSelector struct { // Selector label query over pods for migrated Selector *metav1.LabelSelector `json:"selector,omitempty"` } + +type LoadAnomalyCondition struct { + // Timeout indicates the expiration time of the abnormal state, the default is 1 minute + Timeout *metav1.Duration `json:"timeout,omitempty"` + // ConsecutiveAbnormalities indicates the number of consecutive abnormalities + ConsecutiveAbnormalities uint32 `json:"consecutiveAbnormalities,omitempty"` +} diff --git a/pkg/descheduler/apis/config/v1alpha2/zz_generated.conversion.go b/pkg/descheduler/apis/config/v1alpha2/zz_generated.conversion.go index 853301c2c..791f2df1a 100644 --- a/pkg/descheduler/apis/config/v1alpha2/zz_generated.conversion.go +++ b/pkg/descheduler/apis/config/v1alpha2/zz_generated.conversion.go @@ -59,6 +59,16 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*LoadAnomalyCondition)(nil), (*config.LoadAnomalyCondition)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha2_LoadAnomalyCondition_To_config_LoadAnomalyCondition(a.(*LoadAnomalyCondition), b.(*config.LoadAnomalyCondition), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.LoadAnomalyCondition)(nil), (*LoadAnomalyCondition)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_LoadAnomalyCondition_To_v1alpha2_LoadAnomalyCondition(a.(*config.LoadAnomalyCondition), b.(*LoadAnomalyCondition), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*LowNodeLoadArgs)(nil), (*config.LowNodeLoadArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1alpha2_LowNodeLoadArgs_To_config_LowNodeLoadArgs(a.(*LowNodeLoadArgs), b.(*config.LowNodeLoadArgs), scope) }); err != nil { @@ -334,6 +344,32 @@ func Convert_config_DeschedulerProfile_To_v1alpha2_DeschedulerProfile(in *config return autoConvert_config_DeschedulerProfile_To_v1alpha2_DeschedulerProfile(in, out, s) } +func autoConvert_v1alpha2_LoadAnomalyCondition_To_config_LoadAnomalyCondition(in *LoadAnomalyCondition, out *config.LoadAnomalyCondition, s conversion.Scope) error { + if err := v1.Convert_Pointer_v1_Duration_To_v1_Duration(&in.Timeout, &out.Timeout, s); err != nil { + return err + } + out.ConsecutiveAbnormalities = in.ConsecutiveAbnormalities + return nil +} + +// Convert_v1alpha2_LoadAnomalyCondition_To_config_LoadAnomalyCondition is an autogenerated conversion function. +func Convert_v1alpha2_LoadAnomalyCondition_To_config_LoadAnomalyCondition(in *LoadAnomalyCondition, out *config.LoadAnomalyCondition, s conversion.Scope) error { + return autoConvert_v1alpha2_LoadAnomalyCondition_To_config_LoadAnomalyCondition(in, out, s) +} + +func autoConvert_config_LoadAnomalyCondition_To_v1alpha2_LoadAnomalyCondition(in *config.LoadAnomalyCondition, out *LoadAnomalyCondition, s conversion.Scope) error { + if err := v1.Convert_v1_Duration_To_Pointer_v1_Duration(&in.Timeout, &out.Timeout, s); err != nil { + return err + } + out.ConsecutiveAbnormalities = in.ConsecutiveAbnormalities + return nil +} + +// Convert_config_LoadAnomalyCondition_To_v1alpha2_LoadAnomalyCondition is an autogenerated conversion function. +func Convert_config_LoadAnomalyCondition_To_v1alpha2_LoadAnomalyCondition(in *config.LoadAnomalyCondition, out *LoadAnomalyCondition, s conversion.Scope) error { + return autoConvert_config_LoadAnomalyCondition_To_v1alpha2_LoadAnomalyCondition(in, out, s) +} + func autoConvert_v1alpha2_LowNodeLoadArgs_To_config_LowNodeLoadArgs(in *LowNodeLoadArgs, out *config.LowNodeLoadArgs, s conversion.Scope) error { if err := v1.Convert_Pointer_bool_To_bool(&in.Paused, &out.Paused, s); err != nil { return err @@ -355,6 +391,15 @@ func autoConvert_v1alpha2_LowNodeLoadArgs_To_config_LowNodeLoadArgs(in *LowNodeL } out.HighThresholds = *(*config.ResourceThresholds)(unsafe.Pointer(&in.HighThresholds)) out.LowThresholds = *(*config.ResourceThresholds)(unsafe.Pointer(&in.LowThresholds)) + if in.AnomalyCondition != nil { + in, out := &in.AnomalyCondition, &out.AnomalyCondition + *out = new(config.LoadAnomalyCondition) + if err := Convert_v1alpha2_LoadAnomalyCondition_To_config_LoadAnomalyCondition(*in, *out, s); err != nil { + return err + } + } else { + out.AnomalyCondition = nil + } return nil } @@ -384,6 +429,15 @@ func autoConvert_config_LowNodeLoadArgs_To_v1alpha2_LowNodeLoadArgs(in *config.L } out.HighThresholds = *(*ResourceThresholds)(unsafe.Pointer(&in.HighThresholds)) out.LowThresholds = *(*ResourceThresholds)(unsafe.Pointer(&in.LowThresholds)) + if in.AnomalyCondition != nil { + in, out := &in.AnomalyCondition, &out.AnomalyCondition + *out = new(LoadAnomalyCondition) + if err := Convert_config_LoadAnomalyCondition_To_v1alpha2_LoadAnomalyCondition(*in, *out, s); err != nil { + return err + } + } else { + out.AnomalyCondition = nil + } return nil } diff --git a/pkg/descheduler/apis/config/v1alpha2/zz_generated.deepcopy.go b/pkg/descheduler/apis/config/v1alpha2/zz_generated.deepcopy.go index 91ce5a105..db13a211f 100644 --- a/pkg/descheduler/apis/config/v1alpha2/zz_generated.deepcopy.go +++ b/pkg/descheduler/apis/config/v1alpha2/zz_generated.deepcopy.go @@ -157,6 +157,27 @@ func (in *DeschedulerProfile) DeepCopy() *DeschedulerProfile { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LoadAnomalyCondition) DeepCopyInto(out *LoadAnomalyCondition) { + *out = *in + if in.Timeout != nil { + in, out := &in.Timeout, &out.Timeout + *out = new(v1.Duration) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LoadAnomalyCondition. +func (in *LoadAnomalyCondition) DeepCopy() *LoadAnomalyCondition { + if in == nil { + return nil + } + out := new(LoadAnomalyCondition) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LowNodeLoadArgs) DeepCopyInto(out *LowNodeLoadArgs) { *out = *in @@ -217,6 +238,11 @@ func (in *LowNodeLoadArgs) DeepCopyInto(out *LowNodeLoadArgs) { (*out)[key] = val } } + if in.AnomalyCondition != nil { + in, out := &in.AnomalyCondition, &out.AnomalyCondition + *out = new(LoadAnomalyCondition) + (*in).DeepCopyInto(*out) + } return } diff --git a/pkg/descheduler/apis/config/validation/validation_loadaware.go b/pkg/descheduler/apis/config/validation/validation_loadaware.go index e414baeab..00f9efe71 100644 --- a/pkg/descheduler/apis/config/validation/validation_loadaware.go +++ b/pkg/descheduler/apis/config/validation/validation_loadaware.go @@ -62,6 +62,11 @@ func ValidateLowLoadUtilizationArgs(path *field.Path, args *deschedulerconfig.Lo } } + if args.AnomalyCondition.ConsecutiveAbnormalities <= 0 { + fieldPath := path.Child("anomalyDetectionThresholds").Child("consecutiveAbnormalities") + allErrs = append(allErrs, field.Invalid(fieldPath, args.AnomalyCondition.ConsecutiveAbnormalities, "consecutiveAbnormalities must be greater than 0")) + } + if len(allErrs) == 0 { return nil } diff --git a/pkg/descheduler/apis/config/zz_generated.deepcopy.go b/pkg/descheduler/apis/config/zz_generated.deepcopy.go index 72469180b..a94f92c21 100644 --- a/pkg/descheduler/apis/config/zz_generated.deepcopy.go +++ b/pkg/descheduler/apis/config/zz_generated.deepcopy.go @@ -157,6 +157,23 @@ func (in *Float64OrString) DeepCopy() *Float64OrString { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LoadAnomalyCondition) DeepCopyInto(out *LoadAnomalyCondition) { + *out = *in + out.Timeout = in.Timeout + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LoadAnomalyCondition. +func (in *LoadAnomalyCondition) DeepCopy() *LoadAnomalyCondition { + if in == nil { + return nil + } + out := new(LoadAnomalyCondition) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *LowNodeLoadArgs) DeepCopyInto(out *LowNodeLoadArgs) { *out = *in @@ -192,6 +209,11 @@ func (in *LowNodeLoadArgs) DeepCopyInto(out *LowNodeLoadArgs) { (*out)[key] = val } } + if in.AnomalyCondition != nil { + in, out := &in.AnomalyCondition, &out.AnomalyCondition + *out = new(LoadAnomalyCondition) + **out = **in + } return } diff --git a/pkg/descheduler/framework/plugins/loadaware/low_node_load.go b/pkg/descheduler/framework/plugins/loadaware/low_node_load.go index c79ffaa06..88fbee04e 100644 --- a/pkg/descheduler/framework/plugins/loadaware/low_node_load.go +++ b/pkg/descheduler/framework/plugins/loadaware/low_node_load.go @@ -21,7 +21,9 @@ import ( "fmt" "sort" "strings" + "time" + gocache "github.com/patrickmn/go-cache" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,6 +40,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/descheduler/framework" nodeutil "github.com/koordinator-sh/koordinator/pkg/descheduler/node" podutil "github.com/koordinator-sh/koordinator/pkg/descheduler/pod" + "github.com/koordinator-sh/koordinator/pkg/descheduler/utils/anomaly" "github.com/koordinator-sh/koordinator/pkg/descheduler/utils/sorter" ) @@ -50,10 +53,11 @@ var _ framework.BalancePlugin = &LowNodeLoad{} // LowNodeLoad evicts pods from overutilized nodes to underutilized nodes. // Note that the plugin refers to the actual usage of the node. type LowNodeLoad struct { - handle framework.Handle - podFilter framework.FilterFunc - nodeMetricLister koordslolisters.NodeMetricLister - args *deschedulerconfig.LowNodeLoadArgs + handle framework.Handle + podFilter framework.FilterFunc + nodeMetricLister koordslolisters.NodeMetricLister + args *deschedulerconfig.LowNodeLoadArgs + nodeAnomalyDetectors *gocache.Cache } // NewLowNodeLoad builds plugin from its arguments while passing a handle @@ -104,16 +108,19 @@ func NewLowNodeLoad(args runtime.Object, handle framework.Handle) (framework.Plu koordSharedInformerFactory.Start(context.TODO().Done()) koordSharedInformerFactory.WaitForCacheSync(context.TODO().Done()) + nodeAnomalyDetectors := gocache.New(5*time.Minute, 5*time.Minute) + return &LowNodeLoad{ - handle: handle, - nodeMetricLister: nodeMetricInformer.Lister(), - args: loadLoadUtilizationArgs, - podFilter: podFilter, + handle: handle, + nodeMetricLister: nodeMetricInformer.Lister(), + args: loadLoadUtilizationArgs, + podFilter: podFilter, + nodeAnomalyDetectors: nodeAnomalyDetectors, }, nil } // Name retrieves the plugin name -func (l *LowNodeLoad) Name() string { +func (pl *LowNodeLoad) Name() string { return LowLoadUtilizationName } @@ -126,13 +133,13 @@ func (l *LowNodeLoad) Name() string { // it is possible that the utilization rate of the filtered Pods is higher than that of the candidate Pods to be descheduled. // Balance extension point implementation for the plugin -func (l *LowNodeLoad) Balance(ctx context.Context, nodes []*corev1.Node) *framework.Status { - if l.args.Paused { +func (pl *LowNodeLoad) Balance(ctx context.Context, nodes []*corev1.Node) *framework.Status { + if pl.args.Paused { klog.Infof("LowNodeLoad is paused and will do nothing.") return nil } - nodes, err := filterNodes(l.args.NodeSelector, nodes) + nodes, err := filterNodes(pl.args.NodeSelector, nodes) if err != nil { return &framework.Status{Err: err} } @@ -141,10 +148,10 @@ func (l *LowNodeLoad) Balance(ctx context.Context, nodes []*corev1.Node) *framew return nil } - lowThresholds, highThresholds := newThresholds(l.args) + lowThresholds, highThresholds := newThresholds(pl.args) resourceNames := getResourceNames(lowThresholds) - nodeUsages := getNodeUsage(nodes, resourceNames, l.nodeMetricLister, l.handle.GetPodsAssignedToNodeFunc()) - nodeThresholds := getNodeThresholds(nodeUsages, lowThresholds, highThresholds, resourceNames, l.args.UseDeviationThresholds) + nodeUsages := getNodeUsage(nodes, resourceNames, pl.nodeMetricLister, pl.handle.GetPodsAssignedToNodeFunc()) + nodeThresholds := getNodeThresholds(nodeUsages, lowThresholds, highThresholds, resourceNames, pl.args.UseDeviationThresholds) lowNodes, sourceNodes := classifyNodes(nodeUsages, nodeThresholds, lowThresholdFilter, highThresholdFilter) logUtilizationCriteria("Criteria for a node under low thresholds", lowThresholds, len(lowNodes)) @@ -155,8 +162,10 @@ func (l *LowNodeLoad) Balance(ctx context.Context, nodes []*corev1.Node) *framew return nil } - if len(lowNodes) <= int(l.args.NumberOfNodes) { - klog.V(4).InfoS("Number of nodes underutilized is less or equal than NumberOfNodes, nothing to do here", "underutilizedNodes", len(lowNodes), "numberOfNodes", l.args.NumberOfNodes) + markNormalNodes(lowNodes, pl.nodeAnomalyDetectors) + + if len(lowNodes) <= int(pl.args.NumberOfNodes) { + klog.V(4).InfoS("Number of nodes underutilized is less or equal than NumberOfNodes, nothing to do here", "underutilizedNodes", len(lowNodes), "numberOfNodes", pl.args.NumberOfNodes) return nil } @@ -170,8 +179,15 @@ func (l *LowNodeLoad) Balance(ctx context.Context, nodes []*corev1.Node) *framew return nil } + abnormalNodes := filterRealAbnormalNodes(sourceNodes, pl.nodeAnomalyDetectors, pl.args.AnomalyCondition) + if len(abnormalNodes) == 0 { + klog.V(4).InfoS("None of the nodes were detected as anomalous, nothing to do here") + return nil + } + continueEvictionCond := func(nodeInfo NodeInfo, totalAvailableUsages map[corev1.ResourceName]*resource.Quantity) bool { if _, overutilized := isNodeOverutilized(nodeInfo.NodeUsage.usage, nodeInfo.thresholds.highResourceThreshold); !overutilized { + markNormalNodes([]NodeInfo{nodeInfo}, pl.nodeAnomalyDetectors) return false } for _, resourceName := range resourceNames { @@ -185,17 +201,17 @@ func (l *LowNodeLoad) Balance(ctx context.Context, nodes []*corev1.Node) *framew } resourceToWeightMap := sorter.GenDefaultResourceToWeightMap(resourceNames) - sortNodesByUsage(sourceNodes, resourceToWeightMap, false) + sortNodesByUsage(abnormalNodes, resourceToWeightMap, false) evictPodsFromSourceNodes( ctx, - sourceNodes, + abnormalNodes, lowNodes, - l.args.DryRun, - l.args.NodeFit, - l.handle.Evictor(), - l.podFilter, - l.handle.GetPodsAssignedToNodeFunc(), + pl.args.DryRun, + pl.args.NodeFit, + pl.handle.Evictor(), + pl.podFilter, + pl.handle.GetPodsAssignedToNodeFunc(), resourceNames, continueEvictionCond, overUtilizedEvictionReason(highThresholds), @@ -204,6 +220,40 @@ func (l *LowNodeLoad) Balance(ctx context.Context, nodes []*corev1.Node) *framew return nil } +func markNormalNodes(lowNodes []NodeInfo, nodeAnomalyDetectors *gocache.Cache) { + for _, v := range lowNodes { + if obj, ok := nodeAnomalyDetectors.Get(v.node.Name); ok { + anomalyDetector := obj.(anomaly.Detector) + anomalyDetector.Mark(true) + } + } +} + +func filterRealAbnormalNodes(sourceNodes []NodeInfo, nodeAnomalyDetectors *gocache.Cache, anomalyCondition *deschedulerconfig.LoadAnomalyCondition) []NodeInfo { + if anomalyCondition == nil || anomalyCondition.ConsecutiveAbnormalities == 1 { + return sourceNodes + } + var abnormalNodes []NodeInfo + for _, v := range sourceNodes { + obj, ok := nodeAnomalyDetectors.Get(v.node.Name) + if !ok { + opts := anomaly.Options{ + Timeout: anomalyCondition.Timeout.Duration, + AnomalyConditionFn: func(counter anomaly.Counter) bool { + return counter.ConsecutiveAbnormalities > anomalyCondition.ConsecutiveAbnormalities + }, + } + obj = anomaly.NewBasicDetector(v.node.Name, opts) + } + anomalyDetector := obj.(anomaly.Detector) + if state, _ := anomalyDetector.Mark(false); state == anomaly.StateAnomaly { + abnormalNodes = append(abnormalNodes, v) + } + nodeAnomalyDetectors.Set(v.node.Name, anomalyDetector, gocache.DefaultExpiration) + } + return abnormalNodes +} + func newThresholds(args *deschedulerconfig.LowNodeLoadArgs) (thresholds, highThresholds deschedulerconfig.ResourceThresholds) { useDeviationThresholds := args.UseDeviationThresholds thresholds = args.LowThresholds diff --git a/pkg/descheduler/framework/plugins/loadaware/low_node_load_test.go b/pkg/descheduler/framework/plugins/loadaware/low_node_load_test.go index 3d63bc60f..1c8c04a84 100644 --- a/pkg/descheduler/framework/plugins/loadaware/low_node_load_test.go +++ b/pkg/descheduler/framework/plugins/loadaware/low_node_load_test.go @@ -19,7 +19,9 @@ package loadaware import ( "context" "testing" + "time" + gocache "github.com/patrickmn/go-cache" "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1beta1" @@ -27,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" coretesting "k8s.io/client-go/testing" @@ -1020,6 +1023,9 @@ func TestLowNodeUtilization(t *testing.T) { HighThresholds: tt.targetThresholds, UseDeviationThresholds: tt.useDeviationThresholds, EvictableNamespaces: tt.evictableNamespaces, + AnomalyCondition: &deschedulerconfig.LoadAnomalyCondition{ + ConsecutiveAbnormalities: 1, + }, } koordClientSet := koordfake.NewSimpleClientset() @@ -1207,3 +1213,106 @@ func Test_filterNodes(t *testing.T) { }) } } + +func Test_markNormalNodes(t *testing.T) { + node := NodeInfo{ + NodeUsage: &NodeUsage{ + node: test.BuildTestNode("test-node-", 4000, 3000, 10, nil), + }, + } + sourceNodes := []NodeInfo{node} + + condition := &deschedulerconfig.LoadAnomalyCondition{ + ConsecutiveAbnormalities: 2, + } + nodeAnomalyDetectors := gocache.New(5*time.Minute, 5*time.Minute) + for i := 0; i < int(condition.ConsecutiveAbnormalities); i++ { + filterRealAbnormalNodes(sourceNodes, nodeAnomalyDetectors, condition) + } + abnormalNodes := filterRealAbnormalNodes(sourceNodes, nodeAnomalyDetectors, condition) + assert.Equal(t, sourceNodes, abnormalNodes) + + markNormalNodes(sourceNodes, nodeAnomalyDetectors) + abnormalNodes = filterRealAbnormalNodes(sourceNodes, nodeAnomalyDetectors, condition) + assert.Equal(t, []NodeInfo(nil), abnormalNodes) +} + +func Test_filterRealAbnormalNodes(t *testing.T) { + tests := []struct { + name string + sourceNodes []string + abnormalNodes []string + anomalyCondition *deschedulerconfig.LoadAnomalyCondition + detectCounts int + want []string + }{ + { + name: "ConsecutiveAbnormalities 1 times and detected abnormality", + sourceNodes: []string{"test-node-1", "test-node-2"}, + anomalyCondition: &deschedulerconfig.LoadAnomalyCondition{ + ConsecutiveAbnormalities: 1, + }, + want: []string{"test-node-1", "test-node-2"}, + }, + { + name: "ConsecutiveAbnormalities 2 times and did not detect abnormality", + sourceNodes: []string{"test-node-1", "test-node-2"}, + anomalyCondition: &deschedulerconfig.LoadAnomalyCondition{ + ConsecutiveAbnormalities: 2, + }, + want: nil, + }, + { + name: "ConsecutiveAbnormalities 2 times and detect 2 times", + sourceNodes: []string{"test-node-1", "test-node-2"}, + anomalyCondition: &deschedulerconfig.LoadAnomalyCondition{ + ConsecutiveAbnormalities: 2, + }, + detectCounts: 2, + want: []string{"test-node-1", "test-node-2"}, + }, + { + name: "mix abnormal nodes and normal nodes", + sourceNodes: []string{"test-node-1", "test-node-2"}, + abnormalNodes: []string{"test-node-2"}, + anomalyCondition: &deschedulerconfig.LoadAnomalyCondition{ + ConsecutiveAbnormalities: 2, + }, + want: []string{"test-node-2"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + abnormalNodes := sets.NewString(tt.abnormalNodes...) + var sourceNodes []NodeInfo + var alreadyAbnormalNodes []NodeInfo + for _, v := range tt.sourceNodes { + node := NodeInfo{ + NodeUsage: &NodeUsage{ + node: test.BuildTestNode(v, 4000, 3000, 10, nil), + }, + } + sourceNodes = append(sourceNodes, node) + if abnormalNodes.Has(v) { + alreadyAbnormalNodes = append(alreadyAbnormalNodes, node) + } + } + nodeAnomalyDetectors := gocache.New(5*time.Minute, 5*time.Minute) + + for i := 0; i < int(tt.anomalyCondition.ConsecutiveAbnormalities); i++ { + filterRealAbnormalNodes(alreadyAbnormalNodes, nodeAnomalyDetectors, tt.anomalyCondition) + } + + for i := 0; i < tt.detectCounts; i++ { + filterRealAbnormalNodes(sourceNodes, nodeAnomalyDetectors, tt.anomalyCondition) + } + + got := filterRealAbnormalNodes(sourceNodes, nodeAnomalyDetectors, tt.anomalyCondition) + var gotNodes []string + for _, v := range got { + gotNodes = append(gotNodes, v.node.Name) + } + assert.Equal(t, tt.want, gotNodes) + }) + } +} diff --git a/pkg/descheduler/utils/anomaly/basic_detector.go b/pkg/descheduler/utils/anomaly/basic_detector.go new file mode 100644 index 000000000..dadc9029a --- /dev/null +++ b/pkg/descheduler/utils/anomaly/basic_detector.go @@ -0,0 +1,182 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package anomaly + +import ( + "sync" + "time" +) + +const ( + defaultTimeout = 60 * time.Second +) + +func defaultAnomalyCondition(counter Counter) bool { + return counter.ConsecutiveAbnormalities > 5 +} + +// Options configures BasicDetector +type Options struct { + // Timeout is the period of the open state, + // after which the state of the BasicDetector becomes half-open. + // If Timeout is less than or equal to 0, the timeout value of the BasicDetector is set to 60 seconds. + Timeout time.Duration + // AnomalyConditionFn is called with a copy of Counter whenever a request fails in the ok state. + // If AnomalyConditionFn returns true, the BasicDetector will be placed into the anomaly state. + // If AnomalyConditionFn is nil, default AnomalyConditionFn is used. + // Default AnomalyConditionFn returns true when the number of consecutive abnormalities is more than 5. + AnomalyConditionFn func(counter Counter) bool + // OnStateChange is called whenever the state of the BasicDetector changes. + OnStateChange func(name string, from State, to State) +} + +var _ Detector = &BasicDetector{} + +// BasicDetector is a state machine to prevent sending requests that are likely to fail. +type BasicDetector struct { + name string + timeout time.Duration + anomalyConditionFn func(counts Counter) bool + onStateChange func(name string, from State, to State) + + mutex sync.Mutex + state State + generation uint64 + counter Counter + expiration time.Time +} + +// NewBasicDetector returns a new BasicDetector configured with the given Options. +func NewBasicDetector(name string, opts Options) *BasicDetector { + d := &BasicDetector{ + name: name, + timeout: defaultTimeout, + anomalyConditionFn: defaultAnomalyCondition, + onStateChange: opts.OnStateChange, + } + if opts.Timeout > 0 { + d.timeout = opts.Timeout + } + if opts.AnomalyConditionFn != nil { + d.anomalyConditionFn = opts.AnomalyConditionFn + } + + d.toNewGeneration(time.Now()) + return d +} + +// Name returns the name of the BasicDetector. +func (d *BasicDetector) Name() string { + return d.name +} + +// State returns the current state of the BasicDetector. +func (d *BasicDetector) State() State { + d.mutex.Lock() + defer d.mutex.Unlock() + + now := time.Now() + state := d.currentState(now) + return state +} + +// Counter returns internal counters +func (d *BasicDetector) Counter() Counter { + d.mutex.Lock() + defer d.mutex.Unlock() + + return d.counter +} + +func (d *BasicDetector) Mark(normality bool) (State, error) { + d.mutex.Lock() + defer d.mutex.Unlock() + + now := time.Now() + state := d.currentState(now) + + d.counter.onMark() + if normality { + d.onNormality(state, now) + } else { + d.onAbnormalities(state, now) + } + return d.currentState(now), nil +} + +func (d *BasicDetector) onNormality(state State, now time.Time) { + switch state { + case StateOK: + d.counter.onNormality() + case StateAnomaly: + d.counter.onNormality() + d.setState(StateOK, now) + } +} + +func (d *BasicDetector) onAbnormalities(state State, now time.Time) { + switch state { + case StateOK: + d.counter.onAbnormalities() + if d.anomalyConditionFn(d.counter) { + d.setState(StateAnomaly, now) + } + } +} + +func (d *BasicDetector) currentState(now time.Time) State { + switch d.state { + case StateOK: + if !d.expiration.IsZero() && d.expiration.Before(now) { + d.toNewGeneration(now) + } + case StateAnomaly: + if d.expiration.Before(now) { + d.setState(StateOK, now) + } + } + return d.state +} + +func (d *BasicDetector) setState(state State, now time.Time) { + if d.state == state { + return + } + + prev := d.state + d.state = state + + d.toNewGeneration(now) + + if d.onStateChange != nil { + d.onStateChange(d.name, prev, state) + } +} + +func (d *BasicDetector) toNewGeneration(now time.Time) { + d.counter.clear() + + var zero time.Time + switch d.state { + case StateOK: + d.expiration = zero + case StateAnomaly: + d.expiration = now.Add(d.timeout) + default: + d.expiration = zero + } +} diff --git a/pkg/descheduler/utils/anomaly/basic_detector_test.go b/pkg/descheduler/utils/anomaly/basic_detector_test.go new file mode 100644 index 000000000..f6ae99358 --- /dev/null +++ b/pkg/descheduler/utils/anomaly/basic_detector_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package anomaly + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestBasicDetector(t *testing.T) { + callStateChangeTimes := 0 + opts := Options{ + Timeout: 3 * time.Second, + AnomalyConditionFn: func(counter Counter) bool { + return counter.ConsecutiveAbnormalities >= 2 + }, + OnStateChange: func(name string, from State, to State) { + callStateChangeTimes++ + }, + } + detector := NewBasicDetector("test", opts) + assert.Equal(t, "test", detector.Name()) + assert.Equal(t, StateOK, detector.State()) + + state, _ := detector.Mark(true) + assert.Equal(t, StateOK, state) + + state, _ = detector.Mark(false) + assert.Equal(t, StateOK, state) + + assert.Equal(t, Counter{ + TotalDetects: 2, + TotalNormalities: 1, + TotalAbnormalities: 1, + ConsecutiveNormalities: 0, + ConsecutiveAbnormalities: 1, + }, detector.Counter()) + + state, _ = detector.Mark(false) + assert.Equal(t, StateAnomaly, state) + + state, _ = detector.Mark(true) + assert.Equal(t, StateOK, state) + + detector.Mark(false) + detector.Mark(false) + + // wait for expiration + time.Sleep(opts.Timeout) + assert.Equal(t, StateOK, detector.State()) + state, _ = detector.Mark(false) + assert.Equal(t, StateOK, state) + + state, _ = detector.Mark(false) + assert.Equal(t, StateAnomaly, state) + + assert.NotZero(t, callStateChangeTimes) + assert.Equal(t, Counter{}, detector.Counter()) +} diff --git a/pkg/descheduler/utils/anomaly/counter.go b/pkg/descheduler/utils/anomaly/counter.go new file mode 100644 index 000000000..fd2811aff --- /dev/null +++ b/pkg/descheduler/utils/anomaly/counter.go @@ -0,0 +1,49 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package anomaly + +type Counter struct { + TotalDetects uint32 + TotalNormalities uint32 + TotalAbnormalities uint32 + ConsecutiveNormalities uint32 + ConsecutiveAbnormalities uint32 +} + +func (c *Counter) onMark() { + c.TotalDetects++ +} + +func (c *Counter) onNormality() { + c.TotalNormalities++ + c.ConsecutiveNormalities++ + c.ConsecutiveAbnormalities = 0 +} + +func (c *Counter) onAbnormalities() { + c.TotalAbnormalities++ + c.ConsecutiveAbnormalities++ + c.ConsecutiveNormalities = 0 +} + +func (c *Counter) clear() { + c.TotalDetects = 0 + c.TotalNormalities = 0 + c.TotalAbnormalities = 0 + c.ConsecutiveNormalities = 0 + c.ConsecutiveAbnormalities = 0 +} diff --git a/pkg/descheduler/utils/anomaly/types.go b/pkg/descheduler/utils/anomaly/types.go new file mode 100644 index 000000000..e3a3ef504 --- /dev/null +++ b/pkg/descheduler/utils/anomaly/types.go @@ -0,0 +1,48 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package anomaly + +import ( + "fmt" +) + +// State is a type that represents a state of Detector. +type State int + +// These constants are states of Detector. +const ( + StateOK State = iota + StateAnomaly +) + +// String implements stringer interface. +func (s State) String() string { + switch s { + case StateOK: + return "ok" + case StateAnomaly: + return "anomaly" + default: + return fmt.Sprintf("unknown state: %d", s) + } +} + +type Detector interface { + Name() string + Mark(normality bool) (State, error) + State() State +}