diff --git a/Makefile b/Makefile index 9e14b0adf..214aa786b 100644 --- a/Makefile +++ b/Makefile @@ -62,6 +62,7 @@ manifests: controller-gen ## Generate WebhookConfiguration, ClusterRole and Cust .PHONY: generate generate: controller-gen ## Generate code containing DeepCopy, DeepCopyInto, and DeepCopyObject method implementations. @hack/generate_client.sh + @hack/update-scheduler-codegen.sh $(CONTROLLER_GEN) object:headerFile="$(LICENSE_HEADER_GO)" paths="./apis/..." .PHONY: fmt diff --git a/apis/extension/resource.go b/apis/extension/resource.go index 922f7ffed..cb9794782 100644 --- a/apis/extension/resource.go +++ b/apis/extension/resource.go @@ -33,3 +33,11 @@ var ( }, } ) + +// TranslateResourceNameByPriorityClass translates defaultResourceName to extend resourceName by PriorityClass +func TranslateResourceNameByPriorityClass(priorityClass PriorityClass, defaultResourceName corev1.ResourceName) corev1.ResourceName { + if priorityClass == PriorityProd || priorityClass == PriorityNone { + return defaultResourceName + } + return ResourceNameMap[priorityClass][defaultResourceName] +} diff --git a/apis/extension/scheduling.go b/apis/extension/scheduling.go new file mode 100644 index 000000000..5c3b77830 --- /dev/null +++ b/apis/extension/scheduling.go @@ -0,0 +1,47 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package extension + +import ( + "encoding/json" + + corev1 "k8s.io/api/core/v1" +) + +const ( + // AnnotationCustomUsageThresholds represents the user-defined resource utilization threshold. + // For specific value definitions, see CustomUsageThresholds + AnnotationCustomUsageThresholds = "scheduling.koordinator.sh/usage-thresholds" +) + +// CustomUsageThresholds supports user-defined node resource utilization thresholds. +type CustomUsageThresholds struct { + UsageThresholds map[corev1.ResourceName]int64 `json:"usageThresholds,omitempty"` +} + +func GetCustomUsageThresholds(node *corev1.Node) (*CustomUsageThresholds, error) { + usageThresholds := &CustomUsageThresholds{} + data, ok := node.Annotations[AnnotationCustomUsageThresholds] + if !ok { + return usageThresholds, nil + } + err := json.Unmarshal([]byte(data), usageThresholds) + if err != nil { + return nil, err + } + return usageThresholds, nil +} diff --git a/cmd/koord-scheduler/app/config/config.go b/cmd/koord-scheduler/app/config/config.go new file mode 100644 index 000000000..b1e973ec6 --- /dev/null +++ b/cmd/koord-scheduler/app/config/config.go @@ -0,0 +1,48 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + schedulerappconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config" + + koordinatorclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned" + koordinatorinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions" +) + +// Config has all the context to run a Scheduler +type Config struct { + *schedulerappconfig.Config + KoordinatorClient koordinatorclientset.Interface + KoordinatorSharedInformerFactory koordinatorinformers.SharedInformerFactory +} + +type completedConfig struct { + *Config +} + +// CompletedConfig same as Config, just to swap private object. +type CompletedConfig struct { + // Embed a private pointer that cannot be instantiated outside of this package. + *completedConfig +} + +// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver. +func (c *Config) Complete() CompletedConfig { + c.Config.Complete() + cc := completedConfig{c} + return CompletedConfig{&cc} +} diff --git a/cmd/koord-scheduler/app/options/options.go b/cmd/koord-scheduler/app/options/options.go new file mode 100644 index 000000000..f4abde20e --- /dev/null +++ b/cmd/koord-scheduler/app/options/options.go @@ -0,0 +1,58 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package options + +import ( + scheduleroptions "k8s.io/kubernetes/cmd/kube-scheduler/app/options" + + schedulerappconfig "github.com/koordinator-sh/koordinator/cmd/koord-scheduler/app/config" + koordinatorclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned" + koordinatorinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions" +) + +// Options has all the params needed to run a Scheduler +type Options struct { + *scheduleroptions.Options +} + +// NewOptions returns default scheduler app options. +func NewOptions() *Options { + return &Options{ + Options: scheduleroptions.NewOptions(), + } +} + +// Config return a scheduler config object +func (o *Options) Config() (*schedulerappconfig.Config, error) { + config, err := o.Options.Config() + if err != nil { + return nil, err + } + + koordinatorClient, err := koordinatorclientset.NewForConfig(config.KubeConfig) + if err != nil { + return nil, err + } + + koordinatorSharedInformerFactory := koordinatorinformers.NewSharedInformerFactoryWithOptions(koordinatorClient, 0) + + return &schedulerappconfig.Config{ + Config: config, + KoordinatorClient: koordinatorClient, + KoordinatorSharedInformerFactory: koordinatorSharedInformerFactory, + }, nil +} diff --git a/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go b/cmd/koord-scheduler/app/server.go similarity index 86% rename from vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go rename to cmd/koord-scheduler/app/server.go index a953d3a37..1ad623a7f 100644 --- a/vendor/k8s.io/kubernetes/cmd/kube-scheduler/app/server.go +++ b/cmd/koord-scheduler/app/server.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2022 The Koordinator Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -25,7 +25,6 @@ import ( goruntime "runtime" "github.com/spf13/cobra" - utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apiserver/pkg/authentication/authenticator" "k8s.io/apiserver/pkg/authorization/authorizer" @@ -49,33 +48,35 @@ import ( "k8s.io/component-base/version" "k8s.io/component-base/version/verflag" "k8s.io/klog/v2" - schedulerserverconfig "k8s.io/kubernetes/cmd/kube-scheduler/app/config" - "k8s.io/kubernetes/cmd/kube-scheduler/app/options" + scheduleroptions "k8s.io/kubernetes/cmd/kube-scheduler/app/options" "k8s.io/kubernetes/pkg/scheduler" kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/latest" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" "k8s.io/kubernetes/pkg/scheduler/metrics/resources" "k8s.io/kubernetes/pkg/scheduler/profile" + + schedulerserverconfig "github.com/koordinator-sh/koordinator/cmd/koord-scheduler/app/config" + "github.com/koordinator-sh/koordinator/cmd/koord-scheduler/app/options" + "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" ) // Option configures a framework.Registry. -type Option func(runtime.Registry) error +type Option func(frameworkext.ExtendedHandle, runtime.Registry) error // NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions func NewSchedulerCommand(registryOptions ...Option) *cobra.Command { opts := options.NewOptions() cmd := &cobra.Command{ - Use: "kube-scheduler", - Long: `The Kubernetes scheduler is a control plane process which assigns -Pods to Nodes. The scheduler determines which Nodes are valid placements for -each Pod in the scheduling queue according to constraints and available -resources. The scheduler then ranks each valid Node and binds the Pod to a -suitable Node. Multiple different schedulers may be used within a cluster; -kube-scheduler is the reference implementation. -See [scheduling](https://kubernetes.io/docs/concepts/scheduling-eviction/) -for more information about scheduling and the kube-scheduler component.`, + Use: "koord-scheduler", + Long: `The Koordinator scheduler is a control plane process which assigns +Pods to Nodes. The scheduler implements based on kubernetes scheduling framework. +On the basis of compatibility with community scheduling capabilities, it provides +richer advanced scheduling capabilities to address scheduling needs in co-located +scenarios,ensuring the runtime quality of different workloads and users' demands +for cost reduction and efficiency enhancement. +`, Run: func(cmd *cobra.Command, args []string) { if err := runCommand(cmd, opts, registryOptions...); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) @@ -132,7 +133,7 @@ func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Op // Run executes the scheduler based on the given configuration. It only returns on error or when context is done. func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error { // To help debugging, immediately log version - klog.V(1).InfoS("Starting Kubernetes Scheduler version", "version", version.Get()) + klog.V(1).InfoS("Starting Koordinator Scheduler version", "version", version.Get()) // Configz registration. if cz, err := configz.New("componentconfig"); err == nil { @@ -187,9 +188,11 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched * // Start all informers. cc.InformerFactory.Start(ctx.Done()) + cc.KoordinatorSharedInformerFactory.Start(ctx.Done()) // Wait for all caches to sync before scheduling. cc.InformerFactory.WaitForCacheSync(ctx.Done()) + cc.KoordinatorSharedInformerFactory.WaitForCacheSync(ctx.Done()) // If leader election is enabled, runCommand via LeaderElector until done and exit. if cc.LeaderElection != nil { @@ -296,8 +299,8 @@ func getRecorderFactory(cc *schedulerserverconfig.CompletedConfig) profile.Recor // WithPlugin creates an Option based on plugin name and factory. Please don't remove this function: it is used to register out-of-tree plugins, // hence there are no references to it from the kubernetes scheduler code base. func WithPlugin(name string, factory runtime.PluginFactory) Option { - return func(registry runtime.Registry) error { - return registry.Register(name, factory) + return func(handle frameworkext.ExtendedHandle, registry runtime.Registry) error { + return registry.Register(name, frameworkext.PluginFactoryProxy(handle, factory)) } } @@ -321,9 +324,13 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions // Get the completed config cc := c.Complete() + // NOTE(joseph): K8s scheduling framework does not provide extension point for initialization. + // Currently, only by copying the initialization code and implementing custom initialization. + extendedHandle := frameworkext.NewExtendedHandle(cc.KoordinatorClient, cc.KoordinatorSharedInformerFactory) + outOfTreeRegistry := make(runtime.Registry) for _, option := range outOfTreeRegistryOptions { - if err := option(outOfTreeRegistry); err != nil { + if err := option(extendedHandle, outOfTreeRegistry); err != nil { return nil, nil, err } } @@ -353,9 +360,12 @@ func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions if err != nil { return nil, nil, err } - if err := options.LogOrWriteConfig(opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil { + if err := scheduleroptions.LogOrWriteConfig(opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil { return nil, nil, err } + // TODO(joseph): Some extensions can also be made in the future, + // such as replacing some interfaces in Scheduler to implement custom logic + return &cc, sched, nil } diff --git a/cmd/koord-scheduler/main.go b/cmd/koord-scheduler/main.go index 8b3093bb3..d5db1e570 100644 --- a/cmd/koord-scheduler/main.go +++ b/cmd/koord-scheduler/main.go @@ -22,7 +22,12 @@ import ( "time" "k8s.io/component-base/logs" - "k8s.io/kubernetes/cmd/kube-scheduler/app" + + "github.com/koordinator-sh/koordinator/cmd/koord-scheduler/app" + "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/loadaware" + + // Ensure scheme package is initialized. + _ "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/scheme" ) func main() { @@ -31,7 +36,9 @@ func main() { // Register custom plugins to the scheduler framework. // Later they can consist of scheduler profile(s) and hence // used by various kinds of workloads. - command := app.NewSchedulerCommand() + command := app.NewSchedulerCommand( + app.WithPlugin(loadaware.Name, loadaware.New), + ) logs.InitLogs() defer logs.FlushLogs() diff --git a/go.mod b/go.mod index 0bc8ba11f..02e59dda9 100644 --- a/go.mod +++ b/go.mod @@ -14,6 +14,7 @@ require ( github.com/onsi/gomega v1.15.0 github.com/prashantv/gostub v1.1.0 github.com/prometheus/client_golang v1.11.0 + github.com/spf13/cobra v1.1.3 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.7.0 go.uber.org/atomic v1.7.0 @@ -30,7 +31,8 @@ require ( k8s.io/component-base v0.22.6 k8s.io/cri-api v0.22.6 k8s.io/klog/v2 v2.9.0 - k8s.io/kubernetes v0.0.0-00010101000000-000000000000 + k8s.io/kube-scheduler v0.22.6 + k8s.io/kubernetes v1.22.6 k8s.io/utils v0.0.0-20210819203725-bdf08cb9a70a sigs.k8s.io/controller-runtime v0.10.2 ) @@ -92,7 +94,6 @@ require ( github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect github.com/sirupsen/logrus v1.8.1 // indirect - github.com/spf13/cobra v1.1.3 // indirect go.etcd.io/etcd/api/v3 v3.5.0 // indirect go.etcd.io/etcd/client/pkg/v3 v3.5.0 // indirect go.etcd.io/etcd/client/v3 v3.5.0 // indirect @@ -133,7 +134,6 @@ require ( k8s.io/csi-translation-lib v0.22.6 // indirect k8s.io/gengo v0.0.0-20201214224949-b6c5ce23f027 // indirect k8s.io/kube-openapi v0.0.0-20211109043538-20434351676c // indirect - k8s.io/kube-scheduler v0.0.0 // indirect k8s.io/mount-utils v0.22.6 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.27 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect diff --git a/hack/update-scheduler-codegen.sh b/hack/update-scheduler-codegen.sh new file mode 100755 index 000000000..928253923 --- /dev/null +++ b/hack/update-scheduler-codegen.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +set -o errexit +set -o nounset +set -o pipefail + +GOPATH=`go env GOPATH` +export GOPATH + +SCRIPT_ROOT=$(dirname "${BASH_SOURCE[@]}")/.. + +CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)} + +bash "${CODEGEN_PKG}"/generate-internal-groups.sh \ + "deepcopy,conversion,defaulter" \ + github.com/koordinator-sh/koordinator/pkg/scheduler/apis/generated \ + github.com/koordinator-sh/koordinator/pkg/scheduler/apis \ + github.com/koordinator-sh/koordinator/pkg/scheduler/apis \ + "config:v1beta2" \ + --go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt diff --git a/pkg/scheduler/OWNERS b/pkg/scheduler/OWNERS index 2855db070..dd3b22c69 100644 --- a/pkg/scheduler/OWNERS +++ b/pkg/scheduler/OWNERS @@ -1,3 +1,4 @@ reviewers: + - eahydra - allwmh - zhaoxianyang385 diff --git a/pkg/scheduler/apis/config/doc.go b/pkg/scheduler/apis/config/doc.go new file mode 100644 index 000000000..8c0a897d0 --- /dev/null +++ b/pkg/scheduler/apis/config/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package +// +groupName=scheduler.config.koordinator.sh + +// Package config of the API. +package config diff --git a/pkg/scheduler/apis/config/register.go b/pkg/scheduler/apis/config/register.go new file mode 100644 index 000000000..0815dfa0d --- /dev/null +++ b/pkg/scheduler/apis/config/register.go @@ -0,0 +1,47 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + schedconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" +) + +// SchemeGroupVersion is group version used to register these objects +var SchemeGroupVersion = schema.GroupVersion{Group: schedconfig.GroupName, Version: runtime.APIVersionInternal} + +var ( + localSchemeBuilder = &schedconfig.SchemeBuilder + // AddToScheme is a global function that registers this API group & version to a scheme + AddToScheme = localSchemeBuilder.AddToScheme +) + +// addKnownTypes registers known types to the given scheme +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &LoadAwareSchedulingArgs{}, + ) + return nil +} + +func init() { + // We only register manually written functions here. The registration of the + // generated functions takes place in the generated files. The separation + // makes the code compile even when the generated files are missing. + localSchemeBuilder.Register(addKnownTypes) +} diff --git a/pkg/scheduler/apis/config/scheme/scheme.go b/pkg/scheduler/apis/config/scheme/scheme.go new file mode 100644 index 000000000..f1ef19a4d --- /dev/null +++ b/pkg/scheduler/apis/config/scheme/scheme.go @@ -0,0 +1,45 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheme + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" + + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/v1beta2" +) + +var ( + // Re-use the in-tree Scheme. + Scheme = kubeschedulerscheme.Scheme + + // Codecs provides access to encoding and decoding for the scheme. + Codecs = serializer.NewCodecFactory(Scheme, serializer.EnableStrict) +) + +func init() { + AddToScheme(Scheme) +} + +// AddToScheme builds the kubescheduler scheme using all known versions of the kubescheduler api. +func AddToScheme(scheme *runtime.Scheme) { + utilruntime.Must(config.AddToScheme(scheme)) + utilruntime.Must(v1beta2.AddToScheme(scheme)) +} diff --git a/pkg/scheduler/apis/config/types.go b/pkg/scheduler/apis/config/types.go new file mode 100644 index 000000000..feff30b3b --- /dev/null +++ b/pkg/scheduler/apis/config/types.go @@ -0,0 +1,45 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// LoadAwareSchedulingArgs holds arguments used to configure the LoadAwareScheduling plugin. +type LoadAwareSchedulingArgs struct { + metav1.TypeMeta + + // FilterExpiredNodeMetrics indicates whether to filter nodes where koordlet fails to update NodeMetric. + FilterExpiredNodeMetrics *bool `json:"filterExpiredNodeMetrics,omitempty"` + // NodeMetricExpirationSeconds indicates the NodeMetric expiration in seconds. + // When NodeMetrics expired, the node is considered abnormal. + // Default is 180 seconds. + NodeMetricExpirationSeconds *int64 `json:"nodeMetricExpirationSeconds,omitempty"` + // ResourceWeights indicates the weights of resources. + // The weights of CPU and Memory are both 1 by default. + ResourceWeights map[corev1.ResourceName]int64 `json:"resourceWeights,omitempty"` + // UsageThresholds indicates the resource utilization threshold. + // The default for CPU is 65%, and the default for memory is 95%. + UsageThresholds map[corev1.ResourceName]int64 `json:"usageThresholds,omitempty"` + // EstimatedScalingFactors indicates the factor when estimating resource usage. + // The default value of CPU is 85%, and the default value of Memory is 70%. + EstimatedScalingFactors map[corev1.ResourceName]int64 `json:"estimatedScalingFactors,omitempty"` +} diff --git a/pkg/scheduler/apis/config/v1beta2/defaults.go b/pkg/scheduler/apis/config/v1beta2/defaults.go new file mode 100644 index 000000000..65b6764b1 --- /dev/null +++ b/pkg/scheduler/apis/config/v1beta2/defaults.go @@ -0,0 +1,60 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta2 + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/utils/pointer" +) + +var ( + defaultNodeMetricExpirationSeconds int64 = 180 + + defaultResourceWeights = map[corev1.ResourceName]int64{ + corev1.ResourceCPU: 1, + corev1.ResourceMemory: 1, + } + + defaultUsageThresholds = map[corev1.ResourceName]int64{ + corev1.ResourceCPU: 65, // 65% + corev1.ResourceMemory: 95, // 95% + } + + defaultEstimatedScalingFactors = map[corev1.ResourceName]int64{ + corev1.ResourceCPU: 85, // 85% + corev1.ResourceMemory: 70, // 70% + } +) + +// SetDefaults_LoadAwareSchedulingArgs sets the default parameters for LoadAwareScheduling plugin. +func SetDefaults_LoadAwareSchedulingArgs(obj *LoadAwareSchedulingArgs) { + if obj.FilterExpiredNodeMetrics == nil { + obj.FilterExpiredNodeMetrics = pointer.Bool(true) + } + if obj.NodeMetricExpirationSeconds == nil { + obj.NodeMetricExpirationSeconds = pointer.Int64Ptr(defaultNodeMetricExpirationSeconds) + } + if len(obj.ResourceWeights) == 0 { + obj.ResourceWeights = defaultResourceWeights + } + if len(obj.UsageThresholds) == 0 { + obj.UsageThresholds = defaultUsageThresholds + } + if len(obj.EstimatedScalingFactors) == 0 { + obj.EstimatedScalingFactors = defaultEstimatedScalingFactors + } +} diff --git a/pkg/scheduler/apis/config/v1beta2/doc.go b/pkg/scheduler/apis/config/v1beta2/doc.go new file mode 100644 index 000000000..a6f73ace0 --- /dev/null +++ b/pkg/scheduler/apis/config/v1beta2/doc.go @@ -0,0 +1,24 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// +k8s:deepcopy-gen=package +// +k8s:conversion-gen=github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config +// +k8s:defaulter-gen=TypeMeta +// +k8s:defaulter-gen-input=. +// +groupName=scheduler.config.koordinator.sh + +// Package v1beta2 +package v1beta2 diff --git a/pkg/scheduler/apis/config/v1beta2/register.go b/pkg/scheduler/apis/config/v1beta2/register.go new file mode 100644 index 000000000..b238f81c6 --- /dev/null +++ b/pkg/scheduler/apis/config/v1beta2/register.go @@ -0,0 +1,48 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta2 + +import ( + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + schedschemev1beta2 "k8s.io/kube-scheduler/config/v1beta2" +) + +// SchemeGroupVersion is group version used to register these objects +var SchemeGroupVersion = schema.GroupVersion{Group: schedschemev1beta2.GroupName, Version: "v1beta2"} + +var ( + localSchemeBuilder = &schedschemev1beta2.SchemeBuilder + // AddToScheme is a global function that registers this API group & version to a scheme + AddToScheme = localSchemeBuilder.AddToScheme +) + +// addKnownTypes registers known types to the given scheme +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &LoadAwareSchedulingArgs{}, + ) + return nil +} + +func init() { + // We only register manually written functions here. The registration of the + // generated functions takes place in the generated files. The separation + // makes the code compile even when the generated files are missing. + localSchemeBuilder.Register(addKnownTypes) + localSchemeBuilder.Register(RegisterDefaults) +} diff --git a/pkg/scheduler/apis/config/v1beta2/types.go b/pkg/scheduler/apis/config/v1beta2/types.go new file mode 100644 index 000000000..08465dd29 --- /dev/null +++ b/pkg/scheduler/apis/config/v1beta2/types.go @@ -0,0 +1,45 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta2 + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// LoadAwareSchedulingArgs holds arguments used to configure the LoadAwareScheduling plugin. +type LoadAwareSchedulingArgs struct { + metav1.TypeMeta `json:",inline"` + + // FilterExpiredNodeMetrics indicates whether to filter nodes where koordlet fails to update NodeMetric. + FilterExpiredNodeMetrics *bool `json:"filterExpiredNodeMetrics,omitempty"` + // NodeMetricExpirationSeconds indicates the NodeMetric expiration in seconds. + // When NodeMetrics expired, the node is considered abnormal. + // Default is 180 seconds. + NodeMetricExpirationSeconds *int64 `json:"nodeMetricExpirationSeconds,omitempty"` + // ResourceWeights indicates the weights of resources. + // The weights of CPU and Memory are both 1 by default. + ResourceWeights map[corev1.ResourceName]int64 `json:"resourceWeights,omitempty"` + // UsageThresholds indicates the resource utilization threshold. + // The default for CPU is 65%, and the default for memory is 95%. + UsageThresholds map[corev1.ResourceName]int64 `json:"usageThresholds,omitempty"` + // EstimatedScalingFactors indicates the factor when estimating resource usage. + // The default value of CPU is 85%, and the default value of Memory is 70%. + EstimatedScalingFactors map[corev1.ResourceName]int64 `json:"estimatedScalingFactors,omitempty"` +} diff --git a/pkg/scheduler/apis/config/v1beta2/zz_generated.conversion.go b/pkg/scheduler/apis/config/v1beta2/zz_generated.conversion.go new file mode 100644 index 000000000..b2968929f --- /dev/null +++ b/pkg/scheduler/apis/config/v1beta2/zz_generated.conversion.go @@ -0,0 +1,79 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by conversion-gen. DO NOT EDIT. + +package v1beta2 + +import ( + unsafe "unsafe" + + config "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + v1 "k8s.io/api/core/v1" + conversion "k8s.io/apimachinery/pkg/conversion" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +func init() { + localSchemeBuilder.Register(RegisterConversions) +} + +// RegisterConversions adds conversion functions to the given scheme. +// Public to allow building arbitrary schemes. +func RegisterConversions(s *runtime.Scheme) error { + if err := s.AddGeneratedConversionFunc((*LoadAwareSchedulingArgs)(nil), (*config.LoadAwareSchedulingArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1beta2_LoadAwareSchedulingArgs_To_config_LoadAwareSchedulingArgs(a.(*LoadAwareSchedulingArgs), b.(*config.LoadAwareSchedulingArgs), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.LoadAwareSchedulingArgs)(nil), (*LoadAwareSchedulingArgs)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_LoadAwareSchedulingArgs_To_v1beta2_LoadAwareSchedulingArgs(a.(*config.LoadAwareSchedulingArgs), b.(*LoadAwareSchedulingArgs), scope) + }); err != nil { + return err + } + return nil +} + +func autoConvert_v1beta2_LoadAwareSchedulingArgs_To_config_LoadAwareSchedulingArgs(in *LoadAwareSchedulingArgs, out *config.LoadAwareSchedulingArgs, s conversion.Scope) error { + out.FilterExpiredNodeMetrics = (*bool)(unsafe.Pointer(in.FilterExpiredNodeMetrics)) + out.NodeMetricExpirationSeconds = (*int64)(unsafe.Pointer(in.NodeMetricExpirationSeconds)) + out.ResourceWeights = *(*map[v1.ResourceName]int64)(unsafe.Pointer(&in.ResourceWeights)) + out.UsageThresholds = *(*map[v1.ResourceName]int64)(unsafe.Pointer(&in.UsageThresholds)) + out.EstimatedScalingFactors = *(*map[v1.ResourceName]int64)(unsafe.Pointer(&in.EstimatedScalingFactors)) + return nil +} + +// Convert_v1beta2_LoadAwareSchedulingArgs_To_config_LoadAwareSchedulingArgs is an autogenerated conversion function. +func Convert_v1beta2_LoadAwareSchedulingArgs_To_config_LoadAwareSchedulingArgs(in *LoadAwareSchedulingArgs, out *config.LoadAwareSchedulingArgs, s conversion.Scope) error { + return autoConvert_v1beta2_LoadAwareSchedulingArgs_To_config_LoadAwareSchedulingArgs(in, out, s) +} + +func autoConvert_config_LoadAwareSchedulingArgs_To_v1beta2_LoadAwareSchedulingArgs(in *config.LoadAwareSchedulingArgs, out *LoadAwareSchedulingArgs, s conversion.Scope) error { + out.FilterExpiredNodeMetrics = (*bool)(unsafe.Pointer(in.FilterExpiredNodeMetrics)) + out.NodeMetricExpirationSeconds = (*int64)(unsafe.Pointer(in.NodeMetricExpirationSeconds)) + out.ResourceWeights = *(*map[v1.ResourceName]int64)(unsafe.Pointer(&in.ResourceWeights)) + out.UsageThresholds = *(*map[v1.ResourceName]int64)(unsafe.Pointer(&in.UsageThresholds)) + out.EstimatedScalingFactors = *(*map[v1.ResourceName]int64)(unsafe.Pointer(&in.EstimatedScalingFactors)) + return nil +} + +// Convert_config_LoadAwareSchedulingArgs_To_v1beta2_LoadAwareSchedulingArgs is an autogenerated conversion function. +func Convert_config_LoadAwareSchedulingArgs_To_v1beta2_LoadAwareSchedulingArgs(in *config.LoadAwareSchedulingArgs, out *LoadAwareSchedulingArgs, s conversion.Scope) error { + return autoConvert_config_LoadAwareSchedulingArgs_To_v1beta2_LoadAwareSchedulingArgs(in, out, s) +} diff --git a/pkg/scheduler/apis/config/v1beta2/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/v1beta2/zz_generated.deepcopy.go new file mode 100644 index 000000000..8455a55d9 --- /dev/null +++ b/pkg/scheduler/apis/config/v1beta2/zz_generated.deepcopy.go @@ -0,0 +1,83 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package v1beta2 + +import ( + v1 "k8s.io/api/core/v1" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LoadAwareSchedulingArgs) DeepCopyInto(out *LoadAwareSchedulingArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.FilterExpiredNodeMetrics != nil { + in, out := &in.FilterExpiredNodeMetrics, &out.FilterExpiredNodeMetrics + *out = new(bool) + **out = **in + } + if in.NodeMetricExpirationSeconds != nil { + in, out := &in.NodeMetricExpirationSeconds, &out.NodeMetricExpirationSeconds + *out = new(int64) + **out = **in + } + if in.ResourceWeights != nil { + in, out := &in.ResourceWeights, &out.ResourceWeights + *out = make(map[v1.ResourceName]int64, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.UsageThresholds != nil { + in, out := &in.UsageThresholds, &out.UsageThresholds + *out = make(map[v1.ResourceName]int64, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.EstimatedScalingFactors != nil { + in, out := &in.EstimatedScalingFactors, &out.EstimatedScalingFactors + *out = make(map[v1.ResourceName]int64, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LoadAwareSchedulingArgs. +func (in *LoadAwareSchedulingArgs) DeepCopy() *LoadAwareSchedulingArgs { + if in == nil { + return nil + } + out := new(LoadAwareSchedulingArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *LoadAwareSchedulingArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} diff --git a/pkg/scheduler/apis/config/v1beta2/zz_generated.defaults.go b/pkg/scheduler/apis/config/v1beta2/zz_generated.defaults.go new file mode 100644 index 000000000..f310a26d1 --- /dev/null +++ b/pkg/scheduler/apis/config/v1beta2/zz_generated.defaults.go @@ -0,0 +1,38 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by defaulter-gen. DO NOT EDIT. + +package v1beta2 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// RegisterDefaults adds defaulters functions to the given scheme. +// Public to allow building arbitrary schemes. +// All generated defaulters are covering - they call all nested defaulters. +func RegisterDefaults(scheme *runtime.Scheme) error { + scheme.AddTypeDefaultingFunc(&LoadAwareSchedulingArgs{}, func(obj interface{}) { SetObjectDefaults_LoadAwareSchedulingArgs(obj.(*LoadAwareSchedulingArgs)) }) + return nil +} + +func SetObjectDefaults_LoadAwareSchedulingArgs(in *LoadAwareSchedulingArgs) { + SetDefaults_LoadAwareSchedulingArgs(in) +} diff --git a/pkg/scheduler/apis/config/validation/validation_pluginargs.go b/pkg/scheduler/apis/config/validation/validation_pluginargs.go new file mode 100644 index 000000000..8b9e5aff7 --- /dev/null +++ b/pkg/scheduler/apis/config/validation/validation_pluginargs.go @@ -0,0 +1,93 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package validation + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/validation/field" + + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" +) + +// ValidateLoadAwareSchedulingArgs validates that LoadAwareSchedulingArgs are correct. +func ValidateLoadAwareSchedulingArgs(args *config.LoadAwareSchedulingArgs) error { + var allErrs field.ErrorList + + if args.NodeMetricExpirationSeconds != nil && *args.NodeMetricExpirationSeconds <= 0 { + allErrs = append(allErrs, field.Invalid(field.NewPath("nodeMetricExpiredSeconds"), *args.NodeMetricExpirationSeconds, "nodeMetricExpiredSeconds should be a positive value")) + } + + if err := validateResourceWeights(args.ResourceWeights); err != nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("resourceWeights"), args.ResourceWeights, err.Error())) + } + if err := validateResourceThresholds(args.UsageThresholds); err != nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("usageThresholds"), args.UsageThresholds, err.Error())) + } + if err := validateEstimatedResourceThresholds(args.EstimatedScalingFactors); err != nil { + allErrs = append(allErrs, field.Invalid(field.NewPath("estimatedScalingFactors"), args.EstimatedScalingFactors, err.Error())) + } + + for resourceName := range args.ResourceWeights { + if _, ok := args.EstimatedScalingFactors[resourceName]; !ok { + allErrs = append(allErrs, field.NotFound(field.NewPath("estimatedScalingFactors"), resourceName)) + break + } + } + + if len(allErrs) == 0 { + return nil + } + return allErrs.ToAggregate() +} + +func validateResourceWeights(resources map[corev1.ResourceName]int64) error { + for resourceName, weight := range resources { + if weight <= 0 { + return fmt.Errorf("resource Weight of %v should be a positive value, got %v", resourceName, weight) + } + if weight > 100 { + return fmt.Errorf("resource Weight of %v should be less than 100, got %v", resourceName, weight) + } + } + return nil +} + +func validateResourceThresholds(thresholds map[corev1.ResourceName]int64) error { + for resourceName, thresholdPercent := range thresholds { + if thresholdPercent < 0 { + return fmt.Errorf("resource Threshold of %v should be a positive value, got %v", resourceName, thresholdPercent) + } + if thresholdPercent > 100 { + return fmt.Errorf("resource Threshold of %v should be less than 100, got %v", resourceName, thresholdPercent) + } + } + return nil +} + +func validateEstimatedResourceThresholds(thresholds map[corev1.ResourceName]int64) error { + for resourceName, thresholdPercent := range thresholds { + if thresholdPercent <= 0 { + return fmt.Errorf("estimated resource Threshold of %v should be a positive value, got %v", resourceName, thresholdPercent) + } + if thresholdPercent > 100 { + return fmt.Errorf("estimated resource Threshold of %v should be less than 100, got %v", resourceName, thresholdPercent) + } + } + return nil +} diff --git a/pkg/scheduler/apis/config/zz_generated.deepcopy.go b/pkg/scheduler/apis/config/zz_generated.deepcopy.go new file mode 100644 index 000000000..ab7d66052 --- /dev/null +++ b/pkg/scheduler/apis/config/zz_generated.deepcopy.go @@ -0,0 +1,83 @@ +//go:build !ignore_autogenerated +// +build !ignore_autogenerated + +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package config + +import ( + v1 "k8s.io/api/core/v1" + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LoadAwareSchedulingArgs) DeepCopyInto(out *LoadAwareSchedulingArgs) { + *out = *in + out.TypeMeta = in.TypeMeta + if in.FilterExpiredNodeMetrics != nil { + in, out := &in.FilterExpiredNodeMetrics, &out.FilterExpiredNodeMetrics + *out = new(bool) + **out = **in + } + if in.NodeMetricExpirationSeconds != nil { + in, out := &in.NodeMetricExpirationSeconds, &out.NodeMetricExpirationSeconds + *out = new(int64) + **out = **in + } + if in.ResourceWeights != nil { + in, out := &in.ResourceWeights, &out.ResourceWeights + *out = make(map[v1.ResourceName]int64, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.UsageThresholds != nil { + in, out := &in.UsageThresholds, &out.UsageThresholds + *out = make(map[v1.ResourceName]int64, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.EstimatedScalingFactors != nil { + in, out := &in.EstimatedScalingFactors, &out.EstimatedScalingFactors + *out = make(map[v1.ResourceName]int64, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LoadAwareSchedulingArgs. +func (in *LoadAwareSchedulingArgs) DeepCopy() *LoadAwareSchedulingArgs { + if in == nil { + return nil + } + out := new(LoadAwareSchedulingArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *LoadAwareSchedulingArgs) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} diff --git a/pkg/scheduler/frameworkext/framework_extender.go b/pkg/scheduler/frameworkext/framework_extender.go new file mode 100644 index 000000000..cd4cd8e1a --- /dev/null +++ b/pkg/scheduler/frameworkext/framework_extender.go @@ -0,0 +1,72 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package frameworkext + +import ( + "sync" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/framework" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + + koordinatorclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned" + koordinatorinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions" +) + +// ExtendedHandle extends the k8s scheduling framework Handle interface +// to facilitate plugins to access Koordinator's resources and states. +type ExtendedHandle interface { + framework.Handle + KoordinatorClientSet() koordinatorclientset.Interface + KoordinatorSharedInformerFactory() koordinatorinformers.SharedInformerFactory +} + +type frameworkExtendedHandleImpl struct { + once sync.Once + framework.Handle + koordinatorClientSet koordinatorclientset.Interface + koordinatorSharedInformerFactory koordinatorinformers.SharedInformerFactory +} + +func NewExtendedHandle( + koordinatorClientSet koordinatorclientset.Interface, + koordinatorSharedInformerFactory koordinatorinformers.SharedInformerFactory, +) ExtendedHandle { + return &frameworkExtendedHandleImpl{ + koordinatorClientSet: koordinatorClientSet, + koordinatorSharedInformerFactory: koordinatorSharedInformerFactory, + } +} + +func (ext *frameworkExtendedHandleImpl) KoordinatorClientSet() koordinatorclientset.Interface { + return ext.koordinatorClientSet +} + +func (ext *frameworkExtendedHandleImpl) KoordinatorSharedInformerFactory() koordinatorinformers.SharedInformerFactory { + return ext.koordinatorSharedInformerFactory +} + +// PluginFactoryProxy is used to proxy the call to the PluginFactory function and pass in the ExtendedHandle for the custom plugin +func PluginFactoryProxy(extendHandle ExtendedHandle, factoryFn frameworkruntime.PluginFactory) frameworkruntime.PluginFactory { + return func(args runtime.Object, handle framework.Handle) (framework.Plugin, error) { + impl := extendHandle.(*frameworkExtendedHandleImpl) + impl.once.Do(func() { + impl.Handle = handle + }) + return factoryFn(args, extendHandle) + } +} diff --git a/pkg/scheduler/plugins/loadaware/load_aware.go b/pkg/scheduler/plugins/loadaware/load_aware.go new file mode 100644 index 000000000..a5681a535 --- /dev/null +++ b/pkg/scheduler/plugins/loadaware/load_aware.go @@ -0,0 +1,304 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadaware + +import ( + "context" + "fmt" + "math" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/klog/v2" + resourceapi "k8s.io/kubernetes/pkg/api/v1/resource" + "k8s.io/kubernetes/pkg/scheduler/framework" + + "github.com/koordinator-sh/koordinator/apis/extension" + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + slolisters "github.com/koordinator-sh/koordinator/pkg/client/listers/slo/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/validation" + "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" +) + +const ( + Name = "LoadAwareScheduling" + ErrReasonNodeMetricExpired = "node(s) nodeMetric expired" + ErrReasonUsageExceedThreshold = "node(s) %s usage exceed threshold" +) + +const ( + // DefaultMilliCPURequest defines default milli cpu request number. + DefaultMilliCPURequest int64 = 250 // 0.25 core + // DefaultMemoryRequest defines default memory request size. + DefaultMemoryRequest int64 = 200 * 1024 * 1024 // 200 MB + // DefaultNodeMetricReportInterval defines the default koodlet report NodeMetric interval. + DefaultNodeMetricReportInterval = 60 * time.Second +) + +var ( + _ framework.FilterPlugin = &Plugin{} + _ framework.ScorePlugin = &Plugin{} + _ framework.ReservePlugin = &Plugin{} +) + +type Plugin struct { + handle framework.Handle + args *config.LoadAwareSchedulingArgs + nodeMetricLister slolisters.NodeMetricLister + podAssignCache *podAssignCache +} + +func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) { + pluginArgs, ok := args.(*config.LoadAwareSchedulingArgs) + if !ok { + return nil, fmt.Errorf("want args to be of type LoadAwareSchedulingArgs, got %T", args) + } + + if err := validation.ValidateLoadAwareSchedulingArgs(pluginArgs); err != nil { + return nil, err + } + + frameworkExtender, ok := handle.(frameworkext.ExtendedHandle) + if !ok { + return nil, fmt.Errorf("want handle to be of type frameworkext.ExtendedHandle, got %T", handle) + } + + assignCache := newPodAssignCache() + frameworkExtender.SharedInformerFactory().Core().V1().Pods().Informer().AddEventHandler(assignCache) + nodeMetricLister := frameworkExtender.KoordinatorSharedInformerFactory().Slo().V1alpha1().NodeMetrics().Lister() + + return &Plugin{ + handle: handle, + args: pluginArgs, + nodeMetricLister: nodeMetricLister, + podAssignCache: assignCache, + }, nil +} + +func (p *Plugin) Name() string { return Name } + +func (p *Plugin) Filter(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + node := nodeInfo.Node() + if node == nil { + return framework.NewStatus(framework.Error, "node not found") + } + + nodeMetric, err := p.nodeMetricLister.Get(node.Name) + if err != nil { + return framework.NewStatus(framework.Error, err.Error()) + } + + if p.args.FilterExpiredNodeMetrics != nil && *p.args.FilterExpiredNodeMetrics && p.args.NodeMetricExpirationSeconds != nil { + if isNodeMetricExpired(nodeMetric, *p.args.NodeMetricExpirationSeconds) { + return framework.NewStatus(framework.Unschedulable, ErrReasonNodeMetricExpired) + } + } + + usageThresholds := p.args.UsageThresholds + customUsageThresholds, err := extension.GetCustomUsageThresholds(node) + if err != nil { + klog.V(5).ErrorS(err, "failed to GetCustomUsageThresholds from", "node", node.Name) + } else { + if len(customUsageThresholds.UsageThresholds) > 0 { + usageThresholds = customUsageThresholds.UsageThresholds + } + } + + if len(usageThresholds) > 0 { + if nodeMetric.Status.NodeMetric == nil { + return nil + } + for resourceName, threshold := range usageThresholds { + if threshold == 0 { + continue + } + total := node.Status.Allocatable[resourceName] + if total.IsZero() { + continue + } + used := nodeMetric.Status.NodeMetric.NodeUsage.ResourceList[resourceName] + usage := int64(math.Round(float64(used.MilliValue()) / float64(total.MilliValue()) * 100)) + if usage >= threshold { + return framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonUsageExceedThreshold, resourceName)) + } + } + } + + return nil +} + +func (p *Plugin) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +func (p *Plugin) Reserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { + p.podAssignCache.assign(nodeName, pod) + return nil +} + +func (p *Plugin) Unreserve(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) { + p.podAssignCache.unAssign(nodeName, pod) +} + +func (p *Plugin) Score(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string) (int64, *framework.Status) { + nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, fmt.Sprintf("getting node %q from Snapshot: %v", nodeName, err)) + } + node := nodeInfo.Node() + if node == nil { + return 0, framework.NewStatus(framework.Error, "node not found") + } + nodeMetric, err := p.nodeMetricLister.Get(nodeName) + if err != nil { + return 0, framework.NewStatus(framework.Error, "nodeMetric not found") + } + if p.args.NodeMetricExpirationSeconds != nil && isNodeMetricExpired(nodeMetric, *p.args.NodeMetricExpirationSeconds) { + return 0, nil + } + + estimatedUsed := estimatedPodUsed(pod, p.args.ResourceWeights, p.args.EstimatedScalingFactors) + estimatedAssignedPodUsage := p.estimatedAssignedPodUsage(nodeName, nodeMetric) + for resourceName, value := range estimatedAssignedPodUsage { + estimatedUsed[resourceName] += value + } + + allocatable := make(map[corev1.ResourceName]int64) + for resourceName := range p.args.ResourceWeights { + quantity := node.Status.Allocatable[resourceName] + if resourceName == corev1.ResourceCPU { + allocatable[resourceName] = quantity.MilliValue() + } else { + allocatable[resourceName] = quantity.Value() + } + if nodeMetric.Status.NodeMetric != nil { + quantity = nodeMetric.Status.NodeMetric.NodeUsage.ResourceList[resourceName] + if resourceName == corev1.ResourceCPU { + estimatedUsed[resourceName] += quantity.MilliValue() + } else { + estimatedUsed[resourceName] += quantity.Value() + } + } + } + + score := loadAwareSchedulingScorer(p.args.ResourceWeights, estimatedUsed, allocatable) + return score, nil +} + +func isNodeMetricExpired(nodeMetric *slov1alpha1.NodeMetric, nodeMetricExpirationSeconds int64) bool { + return nodeMetric == nil || + nodeMetric.Status.UpdateTime == nil || + nodeMetricExpirationSeconds > 0 && + time.Since(nodeMetric.Status.UpdateTime.Time) >= time.Duration(nodeMetricExpirationSeconds)*time.Second +} + +func (p *Plugin) estimatedAssignedPodUsage(nodeName string, nodeMetric *slov1alpha1.NodeMetric) map[corev1.ResourceName]int64 { + estimatedUsed := make(map[corev1.ResourceName]int64) + nodeMetricReportInterval := getNodeMetricReportInterval(nodeMetric) + p.podAssignCache.lock.RLock() + defer p.podAssignCache.lock.RUnlock() + for _, assignInfo := range p.podAssignCache.podInfoItems[nodeName] { + if assignInfo.timestamp.After(nodeMetric.Status.UpdateTime.Time) || + assignInfo.timestamp.Before(nodeMetric.Status.UpdateTime.Time) && + nodeMetric.Status.UpdateTime.Sub(assignInfo.timestamp) < nodeMetricReportInterval { + estimated := estimatedPodUsed(assignInfo.pod, p.args.ResourceWeights, p.args.EstimatedScalingFactors) + for resourceName, value := range estimated { + estimatedUsed[resourceName] += value + } + } + } + return estimatedUsed +} + +func getNodeMetricReportInterval(nodeMetric *slov1alpha1.NodeMetric) time.Duration { + if nodeMetric.Spec.CollectPolicy == nil || nodeMetric.Spec.CollectPolicy.ReportIntervalSeconds == nil { + return DefaultNodeMetricReportInterval + } + return time.Duration(*nodeMetric.Spec.CollectPolicy.ReportIntervalSeconds) * time.Second +} + +func estimatedPodUsed(pod *corev1.Pod, resourceWeights map[corev1.ResourceName]int64, scalingFactors map[corev1.ResourceName]int64) map[corev1.ResourceName]int64 { + requests, limits := resourceapi.PodRequestsAndLimits(pod) + estimatedUsed := make(map[corev1.ResourceName]int64) + priorityClass := extension.GetPriorityClass(pod) + for resourceName := range resourceWeights { + realResourceName := extension.TranslateResourceNameByPriorityClass(priorityClass, resourceName) + estimatedUsed[resourceName] = estimatedUsedByResource(requests, limits, realResourceName, scalingFactors[resourceName]) + } + return estimatedUsed +} + +func estimatedUsedByResource(requests, limits corev1.ResourceList, resourceName corev1.ResourceName, scalingFactor int64) int64 { + limitQuantity := limits[resourceName] + requestQuantity := requests[resourceName] + var quantity resource.Quantity + if limitQuantity.Cmp(requestQuantity) > 0 { + scalingFactor = 100 + quantity = limitQuantity + } else { + quantity = requestQuantity + } + + if quantity.IsZero() { + switch resourceName { + case corev1.ResourceCPU, extension.BatchCPU: + return DefaultMilliCPURequest + case corev1.ResourceMemory, extension.BatchMemory: + return DefaultMemoryRequest + } + return 0 + } + + var estimatedUsed int64 + switch resourceName { + case corev1.ResourceCPU: + estimatedUsed = int64(math.Round(float64(quantity.MilliValue()) * float64(scalingFactor) / 100)) + if estimatedUsed > limitQuantity.MilliValue() { + estimatedUsed = limitQuantity.MilliValue() + } + default: + estimatedUsed = int64(math.Round(float64(quantity.Value()) * float64(scalingFactor) / 100)) + if estimatedUsed > limitQuantity.Value() { + estimatedUsed = limitQuantity.Value() + } + } + return estimatedUsed +} + +func loadAwareSchedulingScorer(resToWeightMap map[corev1.ResourceName]int64, used, allocatable map[corev1.ResourceName]int64) int64 { + var nodeScore, weightSum int64 + for resourceName, weight := range resToWeightMap { + resourceScore := leastRequestedScore(used[resourceName], allocatable[resourceName]) + nodeScore += resourceScore * weight + weightSum += weight + } + return nodeScore / weightSum +} + +func leastRequestedScore(requested, capacity int64) int64 { + if capacity == 0 { + return 0 + } + if requested > capacity { + return 0 + } + + return ((capacity - requested) * framework.MaxNodeScore) / capacity +} diff --git a/pkg/scheduler/plugins/loadaware/load_aware_test.go b/pkg/scheduler/plugins/loadaware/load_aware_test.go new file mode 100644 index 000000000..af65b56e6 --- /dev/null +++ b/pkg/scheduler/plugins/loadaware/load_aware_test.go @@ -0,0 +1,1052 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadaware + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/client-go/informers" + kubefake "k8s.io/client-go/kubernetes/fake" + scheduledconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" + "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/utils/pointer" + + "github.com/koordinator-sh/koordinator/apis/extension" + slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" + koordfake "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake" + koordinatorinformers "github.com/koordinator-sh/koordinator/pkg/client/informers/externalversions" + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/v1beta2" + "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" +) + +var _ framework.SharedLister = &testSharedLister{} + +type testSharedLister struct { + nodes []*corev1.Node + nodeInfos []*framework.NodeInfo + nodeInfoMap map[string]*framework.NodeInfo +} + +func newTestSharedLister(pods []*corev1.Pod, nodes []*corev1.Node) *testSharedLister { + nodeInfoMap := make(map[string]*framework.NodeInfo) + nodeInfos := make([]*framework.NodeInfo, 0) + for _, pod := range pods { + nodeName := pod.Spec.NodeName + if _, ok := nodeInfoMap[nodeName]; !ok { + nodeInfoMap[nodeName] = framework.NewNodeInfo() + } + nodeInfoMap[nodeName].AddPod(pod) + } + for _, node := range nodes { + if _, ok := nodeInfoMap[node.Name]; !ok { + nodeInfoMap[node.Name] = framework.NewNodeInfo() + } + nodeInfoMap[node.Name].SetNode(node) + } + + for _, v := range nodeInfoMap { + nodeInfos = append(nodeInfos, v) + } + + return &testSharedLister{ + nodes: nodes, + nodeInfos: nodeInfos, + nodeInfoMap: nodeInfoMap, + } +} + +func (f *testSharedLister) NodeInfos() framework.NodeInfoLister { + return f +} + +func (f *testSharedLister) List() ([]*framework.NodeInfo, error) { + return f.nodeInfos, nil +} + +func (f *testSharedLister) HavePodsWithAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) HavePodsWithRequiredAntiAffinityList() ([]*framework.NodeInfo, error) { + return nil, nil +} + +func (f *testSharedLister) Get(nodeName string) (*framework.NodeInfo, error) { + return f.nodeInfoMap[nodeName], nil +} + +func TestNew(t *testing.T) { + var v1beta2args v1beta2.LoadAwareSchedulingArgs + v1beta2.SetDefaults_LoadAwareSchedulingArgs(&v1beta2args) + var loadAwareSchedulingArgs config.LoadAwareSchedulingArgs + err := v1beta2.Convert_v1beta2_LoadAwareSchedulingArgs_To_config_LoadAwareSchedulingArgs(&v1beta2args, &loadAwareSchedulingArgs, nil) + assert.NoError(t, err) + + loadAwareSchedulingPluginConfig := scheduledconfig.PluginConfig{ + Name: Name, + Args: &loadAwareSchedulingArgs, + } + + koordClientSet := koordfake.NewSimpleClientset() + koordSharedInformerFactory := koordinatorinformers.NewSharedInformerFactory(koordClientSet, 0) + extendHandle := frameworkext.NewExtendedHandle(koordClientSet, koordSharedInformerFactory) + proxyNew := frameworkext.PluginFactoryProxy(extendHandle, New) + + registeredPlugins := []schedulertesting.RegisterPluginFunc{ + func(reg *runtime.Registry, profile *scheduledconfig.KubeSchedulerProfile) { + profile.PluginConfig = []scheduledconfig.PluginConfig{ + loadAwareSchedulingPluginConfig, + } + }, + schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + schedulertesting.RegisterFilterPlugin(Name, proxyNew), + schedulertesting.RegisterScorePlugin(Name, proxyNew, 1), + schedulertesting.RegisterReservePlugin(Name, proxyNew), + } + + cs := kubefake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + snapshot := newTestSharedLister(nil, nil) + fh, err := schedulertesting.NewFramework(registeredPlugins, "koord-scheduler", + runtime.WithClientSet(cs), + runtime.WithInformerFactory(informerFactory), + runtime.WithSnapshotSharedLister(snapshot), + ) + assert.Nil(t, err) + + p, err := proxyNew(&loadAwareSchedulingArgs, fh) + assert.NotNil(t, p) + assert.Nil(t, err) +} + +func TestFilterExpiredNodeMetric(t *testing.T) { + tests := []struct { + name string + nodeMetric *slov1alpha1.NodeMetric + wantStatus *framework.Status + }{ + { + name: "filter healthy nodeMetrics", + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now(), + }, + }, + }, + wantStatus: nil, + }, + { + name: "filter unhealthy nodeMetric with nil updateTime", + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + }, + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNodeMetricExpired), + }, + { + name: "filter unhealthy nodeMetric with expired updateTime", + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now().Add(-180 * time.Second), + }, + }, + }, + wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNodeMetricExpired), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var v1beta2args v1beta2.LoadAwareSchedulingArgs + v1beta2.SetDefaults_LoadAwareSchedulingArgs(&v1beta2args) + var loadAwareSchedulingArgs config.LoadAwareSchedulingArgs + err := v1beta2.Convert_v1beta2_LoadAwareSchedulingArgs_To_config_LoadAwareSchedulingArgs(&v1beta2args, &loadAwareSchedulingArgs, nil) + assert.NoError(t, err) + + loadAwareSchedulingPluginConfig := scheduledconfig.PluginConfig{ + Name: Name, + Args: &loadAwareSchedulingArgs, + } + + koordClientSet := koordfake.NewSimpleClientset() + koordSharedInformerFactory := koordinatorinformers.NewSharedInformerFactory(koordClientSet, 0) + extendHandle := frameworkext.NewExtendedHandle(koordClientSet, koordSharedInformerFactory) + proxyNew := frameworkext.PluginFactoryProxy(extendHandle, New) + + registeredPlugins := []schedulertesting.RegisterPluginFunc{ + func(reg *runtime.Registry, profile *scheduledconfig.KubeSchedulerProfile) { + profile.PluginConfig = []scheduledconfig.PluginConfig{ + loadAwareSchedulingPluginConfig, + } + }, + schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + schedulertesting.RegisterFilterPlugin(Name, proxyNew), + schedulertesting.RegisterScorePlugin(Name, proxyNew, 1), + schedulertesting.RegisterReservePlugin(Name, proxyNew), + } + + cs := kubefake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: tt.nodeMetric.Name, + }, + }, + } + + snapshot := newTestSharedLister(nil, nodes) + fh, err := schedulertesting.NewFramework(registeredPlugins, "koord-scheduler", + runtime.WithClientSet(cs), + runtime.WithInformerFactory(informerFactory), + runtime.WithSnapshotSharedLister(snapshot), + ) + assert.Nil(t, err) + + p, err := proxyNew(&loadAwareSchedulingArgs, fh) + assert.NotNil(t, p) + assert.Nil(t, err) + + _, err = koordClientSet.SloV1alpha1().NodeMetrics().Create(context.TODO(), tt.nodeMetric, metav1.CreateOptions{}) + assert.NoError(t, err) + + koordSharedInformerFactory.Start(context.TODO().Done()) + koordSharedInformerFactory.WaitForCacheSync(context.TODO().Done()) + + cycleState := framework.NewCycleState() + + nodeInfo, err := snapshot.Get(tt.nodeMetric.Name) + assert.NoError(t, err) + assert.NotNil(t, nodeInfo) + + status := p.(*Plugin).Filter(context.TODO(), cycleState, &corev1.Pod{}, nodeInfo) + assert.True(t, tt.wantStatus.Equal(status), "want status: %s, but got %s", tt.wantStatus.Message(), status.Message()) + }) + } +} + +func TestFilterUsage(t *testing.T) { + tests := []struct { + name string + usageThresholds map[corev1.ResourceName]int64 + customUsageThresholds map[corev1.ResourceName]int64 + nodeMetric *slov1alpha1.NodeMetric + wantStatus *framework.Status + }{ + { + name: "filter normal usage", + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now(), + }, + NodeMetric: &slov1alpha1.NodeMetricInfo{ + NodeUsage: slov1alpha1.ResourceMap{ + ResourceList: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("60"), + corev1.ResourceMemory: resource.MustParse("256Gi"), + }, + }, + }, + }, + }, + wantStatus: nil, + }, + { + name: "filter exceed cpu usage", + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now(), + }, + NodeMetric: &slov1alpha1.NodeMetricInfo{ + NodeUsage: slov1alpha1.ResourceMap{ + ResourceList: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("70"), + corev1.ResourceMemory: resource.MustParse("256Gi"), + }, + }, + }, + }, + }, + wantStatus: framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonUsageExceedThreshold, corev1.ResourceCPU)), + }, + { + name: "filter exceed memory usage", + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now(), + }, + NodeMetric: &slov1alpha1.NodeMetricInfo{ + NodeUsage: slov1alpha1.ResourceMap{ + ResourceList: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("30"), + corev1.ResourceMemory: resource.MustParse("500Gi"), + }, + }, + }, + }, + }, + wantStatus: framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonUsageExceedThreshold, corev1.ResourceMemory)), + }, + { + name: "filter exceed memory usage by custom usage thresholds", + customUsageThresholds: map[corev1.ResourceName]int64{ + corev1.ResourceMemory: 60, + }, + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now(), + }, + NodeMetric: &slov1alpha1.NodeMetricInfo{ + NodeUsage: slov1alpha1.ResourceMap{ + ResourceList: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("30"), + corev1.ResourceMemory: resource.MustParse("316Gi"), + }, + }, + }, + }, + }, + wantStatus: framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonUsageExceedThreshold, corev1.ResourceMemory)), + }, + { + name: "disable filter exceed memory usage", + usageThresholds: map[corev1.ResourceName]int64{ + corev1.ResourceMemory: 0, + }, + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now(), + }, + NodeMetric: &slov1alpha1.NodeMetricInfo{ + NodeUsage: slov1alpha1.ResourceMap{ + ResourceList: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("30"), + corev1.ResourceMemory: resource.MustParse("500Gi"), + }, + }, + }, + }, + }, + wantStatus: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var v1beta2args v1beta2.LoadAwareSchedulingArgs + v1beta2args.FilterExpiredNodeMetrics = pointer.Bool(false) + if len(tt.usageThresholds) > 0 { + v1beta2args.UsageThresholds = tt.usageThresholds + } + v1beta2.SetDefaults_LoadAwareSchedulingArgs(&v1beta2args) + var loadAwareSchedulingArgs config.LoadAwareSchedulingArgs + err := v1beta2.Convert_v1beta2_LoadAwareSchedulingArgs_To_config_LoadAwareSchedulingArgs(&v1beta2args, &loadAwareSchedulingArgs, nil) + assert.NoError(t, err) + + loadAwareSchedulingPluginConfig := scheduledconfig.PluginConfig{ + Name: Name, + Args: &loadAwareSchedulingArgs, + } + + koordClientSet := koordfake.NewSimpleClientset() + koordSharedInformerFactory := koordinatorinformers.NewSharedInformerFactory(koordClientSet, 0) + extendHandle := frameworkext.NewExtendedHandle(koordClientSet, koordSharedInformerFactory) + proxyNew := frameworkext.PluginFactoryProxy(extendHandle, New) + + registeredPlugins := []schedulertesting.RegisterPluginFunc{ + func(reg *runtime.Registry, profile *scheduledconfig.KubeSchedulerProfile) { + profile.PluginConfig = []scheduledconfig.PluginConfig{ + loadAwareSchedulingPluginConfig, + } + }, + schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + schedulertesting.RegisterFilterPlugin(Name, proxyNew), + schedulertesting.RegisterScorePlugin(Name, proxyNew, 1), + schedulertesting.RegisterReservePlugin(Name, proxyNew), + } + + cs := kubefake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: tt.nodeMetric.Name, + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + }, + }, + }, + } + + if len(tt.customUsageThresholds) > 0 { + data, err := json.Marshal(&extension.CustomUsageThresholds{UsageThresholds: tt.customUsageThresholds}) + if err != nil { + t.Errorf("failed to marshal, err: %v", err) + } + node := nodes[0] + if len(node.Annotations) == 0 { + node.Annotations = map[string]string{} + } + node.Annotations[extension.AnnotationCustomUsageThresholds] = string(data) + } + + snapshot := newTestSharedLister(nil, nodes) + fh, err := schedulertesting.NewFramework(registeredPlugins, "koord-scheduler", + runtime.WithClientSet(cs), + runtime.WithInformerFactory(informerFactory), + runtime.WithSnapshotSharedLister(snapshot), + ) + assert.Nil(t, err) + + p, err := proxyNew(&loadAwareSchedulingArgs, fh) + assert.NotNil(t, p) + assert.Nil(t, err) + + _, err = koordClientSet.SloV1alpha1().NodeMetrics().Create(context.TODO(), tt.nodeMetric, metav1.CreateOptions{}) + assert.NoError(t, err) + + koordSharedInformerFactory.Start(context.TODO().Done()) + koordSharedInformerFactory.WaitForCacheSync(context.TODO().Done()) + + cycleState := framework.NewCycleState() + + nodeInfo, err := snapshot.Get(tt.nodeMetric.Name) + assert.NoError(t, err) + assert.NotNil(t, nodeInfo) + + status := p.(*Plugin).Filter(context.TODO(), cycleState, &corev1.Pod{}, nodeInfo) + assert.True(t, tt.wantStatus.Equal(status), "want status: %s, but got %s", tt.wantStatus.Message(), status.Message()) + }) + } +} + +func TestScore(t *testing.T) { + tests := []struct { + name string + pod *corev1.Pod + assignedPod []*podAssignInfo + nodeMetric *slov1alpha1.NodeMetric + wantScore int64 + wantStatus *framework.Status + }{ + { + name: "score node with expired nodeMetric", + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now().Add(-180 * time.Second), + }, + }, + }, + wantScore: 0, + wantStatus: nil, + }, + { + name: "score empty node", + pod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + }, + }, + }, + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now(), + }, + }, + }, + wantScore: 90, + wantStatus: nil, + }, + { + name: "score cert load node", + pod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + }, + }, + }, + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now(), + }, + NodeMetric: &slov1alpha1.NodeMetricInfo{ + NodeUsage: slov1alpha1.ResourceMap{ + ResourceList: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourceMemory: resource.MustParse("10Gi"), + }, + }, + }, + }, + }, + wantScore: 72, + wantStatus: nil, + }, + { + name: "score cert load node with just assigned pod", + pod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + }, + }, + }, + assignedPod: []*podAssignInfo{ + { + timestamp: time.Now(), + pod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + }, + }, + }, + }, + }, + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now(), + }, + NodeMetric: &slov1alpha1.NodeMetricInfo{ + NodeUsage: slov1alpha1.ResourceMap{ + ResourceList: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourceMemory: resource.MustParse("10Gi"), + }, + }, + }, + }, + }, + wantScore: 63, + wantStatus: nil, + }, + { + name: "score cert load node with just assigned pod where after updateTime", + pod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + }, + }, + }, + assignedPod: []*podAssignInfo{ + { + timestamp: time.Now(), + pod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + }, + }, + }, + }, + }, + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now().Add(-10 * time.Second), + }, + NodeMetric: &slov1alpha1.NodeMetricInfo{ + NodeUsage: slov1alpha1.ResourceMap{ + ResourceList: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourceMemory: resource.MustParse("10Gi"), + }, + }, + }, + }, + }, + wantScore: 63, + wantStatus: nil, + }, + { + name: "score cert load node with just assigned pod where before updateTime", + pod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + }, + }, + }, + assignedPod: []*podAssignInfo{ + { + timestamp: time.Now().Add(-10 * time.Second), + pod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + }, + }, + }, + }, + }, + }, + }, + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now(), + }, + NodeMetric: &slov1alpha1.NodeMetricInfo{ + NodeUsage: slov1alpha1.ResourceMap{ + ResourceList: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourceMemory: resource.MustParse("10Gi"), + }, + }, + }, + }, + }, + wantScore: 63, + wantStatus: nil, + }, + { + name: "score batch Pod", + pod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Priority: pointer.Int32(extension.PriorityBatchValueMin), + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + extension.BatchCPU: resource.MustParse("16000"), + extension.BatchMemory: resource.MustParse("32Gi"), + }, + Requests: corev1.ResourceList{ + extension.BatchCPU: resource.MustParse("16000"), + extension.BatchMemory: resource.MustParse("32Gi"), + }, + }, + }, + }, + }, + }, + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now(), + }, + }, + }, + wantScore: 90, + wantStatus: nil, + }, + { + name: "score request less than limit", + pod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("8"), + corev1.ResourceMemory: resource.MustParse("16Gi"), + }, + }, + }, + }, + }, + }, + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now(), + }, + }, + }, + wantScore: 88, + wantStatus: nil, + }, + { + name: "score empty pod", + pod: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + }, + }, + }, + }, + nodeMetric: &slov1alpha1.NodeMetric{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + Spec: slov1alpha1.NodeMetricSpec{ + CollectPolicy: &slov1alpha1.NodeMetricCollectPolicy{ + ReportIntervalSeconds: pointer.Int64(60), + }, + }, + Status: slov1alpha1.NodeMetricStatus{ + UpdateTime: &metav1.Time{ + Time: time.Now(), + }, + }, + }, + wantScore: 99, + wantStatus: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var v1beta2args v1beta2.LoadAwareSchedulingArgs + v1beta2.SetDefaults_LoadAwareSchedulingArgs(&v1beta2args) + var loadAwareSchedulingArgs config.LoadAwareSchedulingArgs + err := v1beta2.Convert_v1beta2_LoadAwareSchedulingArgs_To_config_LoadAwareSchedulingArgs(&v1beta2args, &loadAwareSchedulingArgs, nil) + assert.NoError(t, err) + + loadAwareSchedulingPluginConfig := scheduledconfig.PluginConfig{ + Name: Name, + Args: &loadAwareSchedulingArgs, + } + + koordClientSet := koordfake.NewSimpleClientset() + koordSharedInformerFactory := koordinatorinformers.NewSharedInformerFactory(koordClientSet, 0) + extendHandle := frameworkext.NewExtendedHandle(koordClientSet, koordSharedInformerFactory) + proxyNew := frameworkext.PluginFactoryProxy(extendHandle, New) + + registeredPlugins := []schedulertesting.RegisterPluginFunc{ + func(reg *runtime.Registry, profile *scheduledconfig.KubeSchedulerProfile) { + profile.PluginConfig = []scheduledconfig.PluginConfig{ + loadAwareSchedulingPluginConfig, + } + }, + schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + schedulertesting.RegisterFilterPlugin(Name, proxyNew), + schedulertesting.RegisterScorePlugin(Name, proxyNew, 1), + schedulertesting.RegisterReservePlugin(Name, proxyNew), + } + + cs := kubefake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(cs, 0) + + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: tt.nodeMetric.Name, + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("512Gi"), + }, + }, + }, + } + + snapshot := newTestSharedLister(nil, nodes) + fh, err := schedulertesting.NewFramework(registeredPlugins, "koord-scheduler", + runtime.WithClientSet(cs), + runtime.WithInformerFactory(informerFactory), + runtime.WithSnapshotSharedLister(snapshot), + ) + assert.Nil(t, err) + + p, err := proxyNew(&loadAwareSchedulingArgs, fh) + assert.NotNil(t, p) + assert.Nil(t, err) + + _, err = koordClientSet.SloV1alpha1().NodeMetrics().Create(context.TODO(), tt.nodeMetric, metav1.CreateOptions{}) + assert.NoError(t, err) + + koordSharedInformerFactory.Start(context.TODO().Done()) + koordSharedInformerFactory.WaitForCacheSync(context.TODO().Done()) + + cycleState := framework.NewCycleState() + + nodeInfo, err := snapshot.Get(tt.nodeMetric.Name) + assert.NoError(t, err) + assert.NotNil(t, nodeInfo) + + assignCache := p.(*Plugin).podAssignCache + for _, v := range tt.assignedPod { + m := assignCache.podInfoItems[tt.nodeMetric.Name] + if m == nil { + m = map[types.UID]*podAssignInfo{} + assignCache.podInfoItems[tt.nodeMetric.Name] = m + } + v.pod.UID = uuid.NewUUID() + m[v.pod.UID] = v + } + + score, status := p.(*Plugin).Score(context.TODO(), cycleState, tt.pod, tt.nodeMetric.Name) + assert.Equal(t, tt.wantScore, score) + assert.True(t, tt.wantStatus.Equal(status), "want status: %s, but got %s", tt.wantStatus.Message(), status.Message()) + }) + } +} diff --git a/pkg/scheduler/plugins/loadaware/pod_assign_cache.go b/pkg/scheduler/plugins/loadaware/pod_assign_cache.go new file mode 100644 index 000000000..0f89e926b --- /dev/null +++ b/pkg/scheduler/plugins/loadaware/pod_assign_cache.go @@ -0,0 +1,117 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadaware + +import ( + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" + + "github.com/koordinator-sh/koordinator/pkg/util" +) + +var ( + timeNowFn = time.Now +) + +// podAssignCache stores the Pod information that has been successfully scheduled or is about to be bound +type podAssignCache struct { + lock sync.RWMutex + // podInfoItems stores podAssignInfo according to each node. + // podAssignInfo is indexed using the Pod's types.UID + podInfoItems map[string]map[types.UID]*podAssignInfo +} + +type podAssignInfo struct { + timestamp time.Time + pod *corev1.Pod +} + +func newPodAssignCache() *podAssignCache { + return &podAssignCache{ + podInfoItems: map[string]map[types.UID]*podAssignInfo{}, + } +} + +func (p *podAssignCache) assign(nodeName string, pod *corev1.Pod) { + if nodeName == "" || util.IsPodTerminated(pod) { + return + } + p.lock.Lock() + defer p.lock.Unlock() + m := p.podInfoItems[nodeName] + if m == nil { + m = make(map[types.UID]*podAssignInfo) + p.podInfoItems[nodeName] = m + } + m[pod.UID] = &podAssignInfo{ + timestamp: timeNowFn(), + pod: pod, + } +} + +func (p *podAssignCache) unAssign(nodeName string, pod *corev1.Pod) { + if nodeName == "" { + return + } + p.lock.Lock() + defer p.lock.Unlock() + delete(p.podInfoItems[nodeName], pod.UID) + if len(p.podInfoItems[nodeName]) == 0 { + delete(p.podInfoItems, nodeName) + } +} + +func (p *podAssignCache) OnAdd(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if !ok { + return + } + p.assign(pod.Spec.NodeName, pod) +} + +func (p *podAssignCache) OnUpdate(oldObj, newObj interface{}) { + pod, ok := newObj.(*corev1.Pod) + if !ok { + return + } + if util.IsPodTerminated(pod) { + p.unAssign(pod.Spec.NodeName, pod) + } else { + p.assign(pod.Spec.NodeName, pod) + } +} + +func (p *podAssignCache) OnDelete(obj interface{}) { + var pod *corev1.Pod + switch t := obj.(type) { + case *corev1.Pod: + pod = t + case cache.DeletedFinalStateUnknown: + var ok bool + pod, ok = t.Obj.(*corev1.Pod) + if !ok { + return + } + default: + return + } + p.unAssign(pod.Spec.NodeName, pod) +} diff --git a/pkg/scheduler/plugins/loadaware/pod_assign_cache_test.go b/pkg/scheduler/plugins/loadaware/pod_assign_cache_test.go new file mode 100644 index 000000000..6a189aaea --- /dev/null +++ b/pkg/scheduler/plugins/loadaware/pod_assign_cache_test.go @@ -0,0 +1,253 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadaware + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +var fakeTimeNowFn = func() time.Time { + t := time.Time{} + t.Add(100 * time.Second) + return t +} + +func TestPodAssignCache_OnAdd(t *testing.T) { + tests := []struct { + name string + pod *corev1.Pod + wantCache map[string]map[types.UID]*podAssignInfo + }{ + { + name: "update pending pod", + pod: &corev1.Pod{}, + wantCache: map[string]map[types.UID]*podAssignInfo{}, + }, + { + name: "update terminated pod", + pod: &corev1.Pod{ + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodFailed, + }, + }, + wantCache: map[string]map[types.UID]*podAssignInfo{}, + }, + { + name: "update scheduled running pod", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "123456789", + Namespace: "default", + Name: "test", + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + wantCache: map[string]map[types.UID]*podAssignInfo{ + "test-node": { + "123456789": &podAssignInfo{ + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "123456789", + Namespace: "default", + Name: "test", + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + timestamp: fakeTimeNowFn(), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + preTimeNowFn := timeNowFn + defer func() { + timeNowFn = preTimeNowFn + }() + timeNowFn = fakeTimeNowFn + assignCache := newPodAssignCache() + assignCache.OnAdd(tt.pod) + assert.Equal(t, tt.wantCache, assignCache.podInfoItems) + }) + } +} + +func TestPodAssignCache_OnUpdate(t *testing.T) { + tests := []struct { + name string + pod *corev1.Pod + assignCache *podAssignCache + wantCache map[string]map[types.UID]*podAssignInfo + }{ + { + name: "update pending pod", + pod: &corev1.Pod{}, + wantCache: map[string]map[types.UID]*podAssignInfo{}, + }, + { + name: "update terminated pod", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "123456789", + Namespace: "default", + Name: "test", + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodFailed, + }, + }, + assignCache: &podAssignCache{ + podInfoItems: map[string]map[types.UID]*podAssignInfo{ + "test-node": { + "123456789": &podAssignInfo{ + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "123456789", + Namespace: "default", + Name: "test", + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + timestamp: fakeTimeNowFn(), + }, + }, + }, + }, + wantCache: map[string]map[types.UID]*podAssignInfo{}, + }, + { + name: "update scheduled running pod", + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "123456789", + Namespace: "default", + Name: "test", + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + wantCache: map[string]map[types.UID]*podAssignInfo{ + "test-node": { + "123456789": &podAssignInfo{ + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "123456789", + Namespace: "default", + Name: "test", + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + timestamp: fakeTimeNowFn(), + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + preTimeNowFn := timeNowFn + defer func() { + timeNowFn = preTimeNowFn + }() + timeNowFn = fakeTimeNowFn + assignCache := tt.assignCache + if assignCache == nil { + assignCache = newPodAssignCache() + } + assignCache.OnUpdate(nil, tt.pod) + assert.Equal(t, tt.wantCache, assignCache.podInfoItems) + }) + } +} + +func TestPodAssignCache_OnDelete(t *testing.T) { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "123456789", + Namespace: "default", + Name: "test", + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodFailed, + }, + } + assignCache := &podAssignCache{ + podInfoItems: map[string]map[types.UID]*podAssignInfo{ + "test-node": { + "123456789": &podAssignInfo{ + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "123456789", + Namespace: "default", + Name: "test", + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + }, + timestamp: fakeTimeNowFn(), + }, + }, + }, + } + assignCache.OnDelete(pod) + wantCache := map[string]map[types.UID]*podAssignInfo{} + assert.Equal(t, wantCache, assignCache.podInfoItems) +} diff --git a/pkg/util/pod.go b/pkg/util/pod.go index 337e55235..2adabf3bc 100644 --- a/pkg/util/pod.go +++ b/pkg/util/pod.go @@ -243,3 +243,7 @@ func GetPodRequest(pod *corev1.Pod, resourceNames ...corev1.ResourceName) corev1 } return result } + +func IsPodTerminated(pod *corev1.Pod) bool { + return pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed +} diff --git a/pkg/webhook/OWNERS b/pkg/webhook/OWNERS index 8c167275b..3b98633c3 100644 --- a/pkg/webhook/OWNERS +++ b/pkg/webhook/OWNERS @@ -1,2 +1,2 @@ reviewers: - - joseph.t.lee + - eahydra diff --git a/vendor/k8s.io/kubernetes/pkg/scheduler/testing/fake_extender.go b/vendor/k8s.io/kubernetes/pkg/scheduler/testing/fake_extender.go new file mode 100644 index 000000000..0422fc55d --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/scheduler/testing/fake_extender.go @@ -0,0 +1,376 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + "fmt" + "sort" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + corev1helpers "k8s.io/component-helpers/scheduling/corev1" + extenderv1 "k8s.io/kube-scheduler/extender/v1" + "k8s.io/kubernetes/pkg/scheduler/framework" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + "k8s.io/kubernetes/pkg/scheduler/util" +) + +// FitPredicate is a function type which is used in fake extender. +type FitPredicate func(pod *v1.Pod, node *v1.Node) *framework.Status + +// PriorityFunc is a function type which is used in fake extender. +type PriorityFunc func(pod *v1.Pod, nodes []*v1.Node) (*framework.NodeScoreList, error) + +// PriorityConfig is used in fake extender to perform Prioritize function. +type PriorityConfig struct { + Function PriorityFunc + Weight int64 +} + +// ErrorPredicateExtender implements FitPredicate function to always return error status. +func ErrorPredicateExtender(pod *v1.Pod, node *v1.Node) *framework.Status { + return framework.NewStatus(framework.Error, "some error") +} + +// FalsePredicateExtender implements FitPredicate function to always return unschedulable status. +func FalsePredicateExtender(pod *v1.Pod, node *v1.Node) *framework.Status { + return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("pod is unschedulable on the node %q", node.Name)) +} + +// FalseAndUnresolvePredicateExtender implements fitPredicate to always return unschedulable and unresolvable status. +func FalseAndUnresolvePredicateExtender(pod *v1.Pod, node *v1.Node) *framework.Status { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("pod is unschedulable and unresolvable on the node %q", node.Name)) +} + +// TruePredicateExtender implements FitPredicate function to always return success status. +func TruePredicateExtender(pod *v1.Pod, node *v1.Node) *framework.Status { + return framework.NewStatus(framework.Success) +} + +// Node1PredicateExtender implements FitPredicate function to return true +// when the given node's name is "node1"; otherwise return false. +func Node1PredicateExtender(pod *v1.Pod, node *v1.Node) *framework.Status { + if node.Name == "node1" { + return framework.NewStatus(framework.Success) + } + return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name)) +} + +// Node2PredicateExtender implements FitPredicate function to return true +// when the given node's name is "node2"; otherwise return false. +func Node2PredicateExtender(pod *v1.Pod, node *v1.Node) *framework.Status { + if node.Name == "node2" { + return framework.NewStatus(framework.Success) + } + return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("node %q is not allowed", node.Name)) +} + +// ErrorPrioritizerExtender implements PriorityFunc function to always return error. +func ErrorPrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*framework.NodeScoreList, error) { + return &framework.NodeScoreList{}, fmt.Errorf("some error") +} + +// Node1PrioritizerExtender implements PriorityFunc function to give score 10 +// if the given node's name is "node1"; otherwise score 1. +func Node1PrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*framework.NodeScoreList, error) { + result := framework.NodeScoreList{} + for _, node := range nodes { + score := 1 + if node.Name == "node1" { + score = 10 + } + result = append(result, framework.NodeScore{Name: node.Name, Score: int64(score)}) + } + return &result, nil +} + +// Node2PrioritizerExtender implements PriorityFunc function to give score 10 +// if the given node's name is "node2"; otherwise score 1. +func Node2PrioritizerExtender(pod *v1.Pod, nodes []*v1.Node) (*framework.NodeScoreList, error) { + result := framework.NodeScoreList{} + for _, node := range nodes { + score := 1 + if node.Name == "node2" { + score = 10 + } + result = append(result, framework.NodeScore{Name: node.Name, Score: int64(score)}) + } + return &result, nil +} + +type node2PrioritizerPlugin struct{} + +// NewNode2PrioritizerPlugin returns a factory function to build node2PrioritizerPlugin. +func NewNode2PrioritizerPlugin() frameworkruntime.PluginFactory { + return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return &node2PrioritizerPlugin{}, nil + } +} + +// Name returns name of the plugin. +func (pl *node2PrioritizerPlugin) Name() string { + return "Node2Prioritizer" +} + +// Score return score 100 if the given nodeName is "node2"; otherwise return score 10. +func (pl *node2PrioritizerPlugin) Score(_ context.Context, _ *framework.CycleState, _ *v1.Pod, nodeName string) (int64, *framework.Status) { + score := 10 + if nodeName == "node2" { + score = 100 + } + return int64(score), nil +} + +// ScoreExtensions returns nil. +func (pl *node2PrioritizerPlugin) ScoreExtensions() framework.ScoreExtensions { + return nil +} + +// FakeExtender is a data struct which implements the Extender interface. +type FakeExtender struct { + Predicates []FitPredicate + Prioritizers []PriorityConfig + Weight int64 + NodeCacheCapable bool + FilteredNodes []*v1.Node + UnInterested bool + Ignorable bool + + // Cached node information for fake extender + CachedNodeNameToInfo map[string]*framework.NodeInfo +} + +// Name returns name of the extender. +func (f *FakeExtender) Name() string { + return "FakeExtender" +} + +// IsIgnorable returns a bool value indicating whether internal errors can be ignored. +func (f *FakeExtender) IsIgnorable() bool { + return f.Ignorable +} + +// SupportsPreemption returns true indicating the extender supports preemption. +func (f *FakeExtender) SupportsPreemption() bool { + // Assume preempt verb is always defined. + return true +} + +// ProcessPreemption implements the extender preempt function. +func (f *FakeExtender) ProcessPreemption( + pod *v1.Pod, + nodeNameToVictims map[string]*extenderv1.Victims, + nodeInfos framework.NodeInfoLister, +) (map[string]*extenderv1.Victims, error) { + nodeNameToVictimsCopy := map[string]*extenderv1.Victims{} + // We don't want to change the original nodeNameToVictims + for k, v := range nodeNameToVictims { + // In real world implementation, extender's user should have their own way to get node object + // by name if needed (e.g. query kube-apiserver etc). + // + // For test purpose, we just use node from parameters directly. + nodeNameToVictimsCopy[k] = v + } + + for nodeName, victims := range nodeNameToVictimsCopy { + // Try to do preemption on extender side. + nodeInfo, _ := nodeInfos.Get(nodeName) + extenderVictimPods, extenderPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(pod, nodeInfo.Node()) + if err != nil { + return nil, err + } + // If it's unfit after extender's preemption, this node is unresolvable by preemption overall, + // let's remove it from potential preemption nodes. + if !fits { + delete(nodeNameToVictimsCopy, nodeName) + } else { + // Append new victims to original victims + nodeNameToVictimsCopy[nodeName].Pods = append(victims.Pods, extenderVictimPods...) + nodeNameToVictimsCopy[nodeName].NumPDBViolations = victims.NumPDBViolations + int64(extenderPDBViolations) + } + } + return nodeNameToVictimsCopy, nil +} + +// selectVictimsOnNodeByExtender checks the given nodes->pods map with predicates on extender's side. +// Returns: +// 1. More victim pods (if any) amended by preemption phase of extender. +// 2. Number of violating victim (used to calculate PDB). +// 3. Fits or not after preemption phase on extender's side. +func (f *FakeExtender) selectVictimsOnNodeByExtender(pod *v1.Pod, node *v1.Node) ([]*v1.Pod, int, bool, error) { + // If a extender support preemption but have no cached node info, let's run filter to make sure + // default scheduler's decision still stand with given pod and node. + if !f.NodeCacheCapable { + err := f.runPredicate(pod, node) + if err.IsSuccess() { + return []*v1.Pod{}, 0, true, nil + } else if err.IsUnschedulable() { + return nil, 0, false, nil + } else { + return nil, 0, false, err.AsError() + } + } + + // Otherwise, as a extender support preemption and have cached node info, we will assume cachedNodeNameToInfo is available + // and get cached node info by given node name. + nodeInfoCopy := f.CachedNodeNameToInfo[node.GetName()].Clone() + + var potentialVictims []*v1.Pod + + removePod := func(rp *v1.Pod) { + nodeInfoCopy.RemovePod(rp) + } + addPod := func(ap *v1.Pod) { + nodeInfoCopy.AddPod(ap) + } + // As the first step, remove all the lower priority pods from the node and + // check if the given pod can be scheduled. + podPriority := corev1helpers.PodPriority(pod) + for _, p := range nodeInfoCopy.Pods { + if corev1helpers.PodPriority(p.Pod) < podPriority { + potentialVictims = append(potentialVictims, p.Pod) + removePod(p.Pod) + } + } + sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i], potentialVictims[j]) }) + + // If the new pod does not fit after removing all the lower priority pods, + // we are almost done and this node is not suitable for preemption. + status := f.runPredicate(pod, nodeInfoCopy.Node()) + if status.IsSuccess() { + // pass + } else if status.IsUnschedulable() { + // does not fit + return nil, 0, false, nil + } else { + // internal errors + return nil, 0, false, status.AsError() + } + + var victims []*v1.Pod + + // TODO(harry): handle PDBs in the future. + numViolatingVictim := 0 + + reprievePod := func(p *v1.Pod) bool { + addPod(p) + status := f.runPredicate(pod, nodeInfoCopy.Node()) + if !status.IsSuccess() { + removePod(p) + victims = append(victims, p) + } + return status.IsSuccess() + } + + // For now, assume all potential victims to be non-violating. + // Now we try to reprieve non-violating victims. + for _, p := range potentialVictims { + reprievePod(p) + } + + return victims, numViolatingVictim, true, nil +} + +// runPredicate run predicates of extender one by one for given pod and node. +// Returns: fits or not. +func (f *FakeExtender) runPredicate(pod *v1.Pod, node *v1.Node) *framework.Status { + for _, predicate := range f.Predicates { + status := predicate(pod, node) + if !status.IsSuccess() { + return status + } + } + return framework.NewStatus(framework.Success) +} + +// Filter implements the extender Filter function. +func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) ([]*v1.Node, extenderv1.FailedNodesMap, extenderv1.FailedNodesMap, error) { + var filtered []*v1.Node + failedNodesMap := extenderv1.FailedNodesMap{} + failedAndUnresolvableMap := extenderv1.FailedNodesMap{} + for _, node := range nodes { + status := f.runPredicate(pod, node) + if status.IsSuccess() { + filtered = append(filtered, node) + } else if status.Code() == framework.Unschedulable { + failedNodesMap[node.Name] = fmt.Sprintf("FakeExtender: node %q failed", node.Name) + } else if status.Code() == framework.UnschedulableAndUnresolvable { + failedAndUnresolvableMap[node.Name] = fmt.Sprintf("FakeExtender: node %q failed and unresolvable", node.Name) + } else { + return nil, nil, nil, status.AsError() + } + } + + f.FilteredNodes = filtered + if f.NodeCacheCapable { + return filtered, failedNodesMap, failedAndUnresolvableMap, nil + } + return filtered, failedNodesMap, failedAndUnresolvableMap, nil +} + +// Prioritize implements the extender Prioritize function. +func (f *FakeExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*extenderv1.HostPriorityList, int64, error) { + result := extenderv1.HostPriorityList{} + combinedScores := map[string]int64{} + for _, prioritizer := range f.Prioritizers { + weight := prioritizer.Weight + if weight == 0 { + continue + } + priorityFunc := prioritizer.Function + prioritizedList, err := priorityFunc(pod, nodes) + if err != nil { + return &extenderv1.HostPriorityList{}, 0, err + } + for _, hostEntry := range *prioritizedList { + combinedScores[hostEntry.Name] += hostEntry.Score * weight + } + } + for host, score := range combinedScores { + result = append(result, extenderv1.HostPriority{Host: host, Score: score}) + } + return &result, f.Weight, nil +} + +// Bind implements the extender Bind function. +func (f *FakeExtender) Bind(binding *v1.Binding) error { + if len(f.FilteredNodes) != 0 { + for _, node := range f.FilteredNodes { + if node.Name == binding.Target.Name { + f.FilteredNodes = nil + return nil + } + } + err := fmt.Errorf("Node %v not in filtered nodes %v", binding.Target.Name, f.FilteredNodes) + f.FilteredNodes = nil + return err + } + return nil +} + +// IsBinder returns true indicating the extender implements the Binder function. +func (f *FakeExtender) IsBinder() bool { + return true +} + +// IsInterested returns a bool true indicating whether extender +func (f *FakeExtender) IsInterested(pod *v1.Pod) bool { + return !f.UnInterested +} + +var _ framework.Extender = &FakeExtender{} diff --git a/vendor/k8s.io/kubernetes/pkg/scheduler/testing/fake_plugins.go b/vendor/k8s.io/kubernetes/pkg/scheduler/testing/fake_plugins.go new file mode 100644 index 000000000..b025cb15c --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/scheduler/testing/fake_plugins.go @@ -0,0 +1,233 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/kubernetes/pkg/scheduler/framework" + frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" +) + +// ErrReasonFake is a fake error message denotes the filter function errored. +const ErrReasonFake = "Nodes failed the fake plugin" + +// FalseFilterPlugin is a filter plugin which always return Unschedulable when Filter function is called. +type FalseFilterPlugin struct{} + +// Name returns name of the plugin. +func (pl *FalseFilterPlugin) Name() string { + return "FalseFilter" +} + +// Filter invoked at the filter extension point. +func (pl *FalseFilterPlugin) Filter(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + return framework.NewStatus(framework.Unschedulable, ErrReasonFake) +} + +// NewFalseFilterPlugin initializes a FalseFilterPlugin and returns it. +func NewFalseFilterPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return &FalseFilterPlugin{}, nil +} + +// TrueFilterPlugin is a filter plugin which always return Success when Filter function is called. +type TrueFilterPlugin struct{} + +// Name returns name of the plugin. +func (pl *TrueFilterPlugin) Name() string { + return "TrueFilter" +} + +// Filter invoked at the filter extension point. +func (pl *TrueFilterPlugin) Filter(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + return nil +} + +// NewTrueFilterPlugin initializes a TrueFilterPlugin and returns it. +func NewTrueFilterPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return &TrueFilterPlugin{}, nil +} + +// FakeFilterPlugin is a test filter plugin to record how many times its Filter() function have +// been called, and it returns different 'Code' depending on its internal 'failedNodeReturnCodeMap'. +type FakeFilterPlugin struct { + NumFilterCalled int32 + FailedNodeReturnCodeMap map[string]framework.Code +} + +// Name returns name of the plugin. +func (pl *FakeFilterPlugin) Name() string { + return "FakeFilter" +} + +// Filter invoked at the filter extension point. +func (pl *FakeFilterPlugin) Filter(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + atomic.AddInt32(&pl.NumFilterCalled, 1) + + if returnCode, ok := pl.FailedNodeReturnCodeMap[nodeInfo.Node().Name]; ok { + return framework.NewStatus(returnCode, fmt.Sprintf("injecting failure for pod %v", pod.Name)) + } + + return nil +} + +// NewFakeFilterPlugin initializes a fakeFilterPlugin and returns it. +func NewFakeFilterPlugin(failedNodeReturnCodeMap map[string]framework.Code) frameworkruntime.PluginFactory { + return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return &FakeFilterPlugin{ + FailedNodeReturnCodeMap: failedNodeReturnCodeMap, + }, nil + } +} + +// MatchFilterPlugin is a filter plugin which return Success when the evaluated pod and node +// have the same name; otherwise return Unschedulable. +type MatchFilterPlugin struct{} + +// Name returns name of the plugin. +func (pl *MatchFilterPlugin) Name() string { + return "MatchFilter" +} + +// Filter invoked at the filter extension point. +func (pl *MatchFilterPlugin) Filter(_ context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { + node := nodeInfo.Node() + if node == nil { + return framework.NewStatus(framework.Error, "node not found") + } + if pod.Name == node.Name { + return nil + } + return framework.NewStatus(framework.Unschedulable, ErrReasonFake) +} + +// NewMatchFilterPlugin initializes a MatchFilterPlugin and returns it. +func NewMatchFilterPlugin(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return &MatchFilterPlugin{}, nil +} + +// FakePreFilterPlugin is a test filter plugin. +type FakePreFilterPlugin struct { + Status *framework.Status +} + +// Name returns name of the plugin. +func (pl *FakePreFilterPlugin) Name() string { + return "FakePreFilter" +} + +// PreFilter invoked at the PreFilter extension point. +func (pl *FakePreFilterPlugin) PreFilter(_ context.Context, _ *framework.CycleState, pod *v1.Pod) *framework.Status { + return pl.Status +} + +// PreFilterExtensions no extensions implemented by this plugin. +func (pl *FakePreFilterPlugin) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + +// NewFakePreFilterPlugin initializes a fakePreFilterPlugin and returns it. +func NewFakePreFilterPlugin(status *framework.Status) frameworkruntime.PluginFactory { + return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return &FakePreFilterPlugin{ + Status: status, + }, nil + } +} + +// FakeReservePlugin is a test reserve plugin. +type FakeReservePlugin struct { + Status *framework.Status +} + +// Name returns name of the plugin. +func (pl *FakeReservePlugin) Name() string { + return "FakeReserve" +} + +// Reserve invoked at the Reserve extension point. +func (pl *FakeReservePlugin) Reserve(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) *framework.Status { + return pl.Status +} + +// Unreserve invoked at the Unreserve extension point. +func (pl *FakeReservePlugin) Unreserve(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) { +} + +// NewFakeReservePlugin initializes a fakeReservePlugin and returns it. +func NewFakeReservePlugin(status *framework.Status) frameworkruntime.PluginFactory { + return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return &FakeReservePlugin{ + Status: status, + }, nil + } +} + +// FakePreBindPlugin is a test prebind plugin. +type FakePreBindPlugin struct { + Status *framework.Status +} + +// Name returns name of the plugin. +func (pl *FakePreBindPlugin) Name() string { + return "FakePreBind" +} + +// PreBind invoked at the PreBind extension point. +func (pl *FakePreBindPlugin) PreBind(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) *framework.Status { + return pl.Status +} + +// NewFakePreBindPlugin initializes a fakePreBindPlugin and returns it. +func NewFakePreBindPlugin(status *framework.Status) frameworkruntime.PluginFactory { + return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return &FakePreBindPlugin{ + Status: status, + }, nil + } +} + +// FakePermitPlugin is a test permit plugin. +type FakePermitPlugin struct { + Status *framework.Status + Timeout time.Duration +} + +// Name returns name of the plugin. +func (pl *FakePermitPlugin) Name() string { + return "FakePermit" +} + +// Permit invoked at the Permit extension point. +func (pl *FakePermitPlugin) Permit(_ context.Context, _ *framework.CycleState, _ *v1.Pod, _ string) (*framework.Status, time.Duration) { + return pl.Status, pl.Timeout +} + +// NewFakePermitPlugin initializes a fakePermitPlugin and returns it. +func NewFakePermitPlugin(status *framework.Status, timeout time.Duration) frameworkruntime.PluginFactory { + return func(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { + return &FakePermitPlugin{ + Status: status, + Timeout: timeout, + }, nil + } +} diff --git a/vendor/k8s.io/kubernetes/pkg/scheduler/testing/framework_helpers.go b/vendor/k8s.io/kubernetes/pkg/scheduler/testing/framework_helpers.go new file mode 100644 index 000000000..904e7f2e1 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/scheduler/testing/framework_helpers.go @@ -0,0 +1,145 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/kube-scheduler/config/v1beta2" + schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config" + "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/runtime" +) + +var configDecoder = scheme.Codecs.UniversalDecoder() + +// NewFramework creates a Framework from the register functions and options. +func NewFramework(fns []RegisterPluginFunc, profileName string, opts ...runtime.Option) (framework.Framework, error) { + registry := runtime.Registry{} + profile := &schedulerapi.KubeSchedulerProfile{ + SchedulerName: profileName, + Plugins: &schedulerapi.Plugins{}, + } + for _, f := range fns { + f(®istry, profile) + } + return runtime.NewFramework(registry, profile, opts...) +} + +// RegisterPluginFunc is a function signature used in method RegisterFilterPlugin() +// to register a Filter Plugin to a given registry. +type RegisterPluginFunc func(reg *runtime.Registry, profile *schedulerapi.KubeSchedulerProfile) + +// RegisterQueueSortPlugin returns a function to register a QueueSort Plugin to a given registry. +func RegisterQueueSortPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc { + return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "QueueSort") +} + +// RegisterPreFilterPlugin returns a function to register a PreFilter Plugin to a given registry. +func RegisterPreFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc { + return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "PreFilter") +} + +// RegisterFilterPlugin returns a function to register a Filter Plugin to a given registry. +func RegisterFilterPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc { + return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Filter") +} + +// RegisterReservePlugin returns a function to register a Reserve Plugin to a given registry. +func RegisterReservePlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc { + return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Reserve") +} + +// RegisterPermitPlugin returns a function to register a Permit Plugin to a given registry. +func RegisterPermitPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc { + return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Permit") +} + +// RegisterPreBindPlugin returns a function to register a PreBind Plugin to a given registry. +func RegisterPreBindPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc { + return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "PreBind") +} + +// RegisterScorePlugin returns a function to register a Score Plugin to a given registry. +func RegisterScorePlugin(pluginName string, pluginNewFunc runtime.PluginFactory, weight int32) RegisterPluginFunc { + return RegisterPluginAsExtensionsWithWeight(pluginName, weight, pluginNewFunc, "Score") +} + +// RegisterPreScorePlugin returns a function to register a Score Plugin to a given registry. +func RegisterPreScorePlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc { + return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "PreScore") +} + +// RegisterBindPlugin returns a function to register a Bind Plugin to a given registry. +func RegisterBindPlugin(pluginName string, pluginNewFunc runtime.PluginFactory) RegisterPluginFunc { + return RegisterPluginAsExtensions(pluginName, pluginNewFunc, "Bind") +} + +// RegisterPluginAsExtensions returns a function to register a Plugin as given extensionPoints to a given registry. +func RegisterPluginAsExtensions(pluginName string, pluginNewFunc runtime.PluginFactory, extensions ...string) RegisterPluginFunc { + return RegisterPluginAsExtensionsWithWeight(pluginName, 1, pluginNewFunc, extensions...) +} + +// RegisterPluginAsExtensionsWithWeight returns a function to register a Plugin as given extensionPoints with weight to a given registry. +func RegisterPluginAsExtensionsWithWeight(pluginName string, weight int32, pluginNewFunc runtime.PluginFactory, extensions ...string) RegisterPluginFunc { + return func(reg *runtime.Registry, profile *schedulerapi.KubeSchedulerProfile) { + reg.Register(pluginName, pluginNewFunc) + for _, extension := range extensions { + ps := getPluginSetByExtension(profile.Plugins, extension) + if ps == nil { + continue + } + ps.Enabled = append(ps.Enabled, schedulerapi.Plugin{Name: pluginName, Weight: weight}) + } + // Use defaults from latest config API version. + var gvk schema.GroupVersionKind + gvk = v1beta2.SchemeGroupVersion.WithKind(pluginName + "Args") + if args, _, err := configDecoder.Decode(nil, &gvk, nil); err == nil { + profile.PluginConfig = append(profile.PluginConfig, schedulerapi.PluginConfig{ + Name: pluginName, + Args: args, + }) + } + } +} + +func getPluginSetByExtension(plugins *schedulerapi.Plugins, extension string) *schedulerapi.PluginSet { + switch extension { + case "QueueSort": + return &plugins.QueueSort + case "Filter": + return &plugins.Filter + case "PreFilter": + return &plugins.PreFilter + case "PreScore": + return &plugins.PreScore + case "Score": + return &plugins.Score + case "Bind": + return &plugins.Bind + case "Reserve": + return &plugins.Reserve + case "Permit": + return &plugins.Permit + case "PreBind": + return &plugins.PreBind + case "PostBind": + return &plugins.PostBind + default: + return nil + } +} diff --git a/vendor/k8s.io/kubernetes/pkg/scheduler/testing/workload_prep.go b/vendor/k8s.io/kubernetes/pkg/scheduler/testing/workload_prep.go new file mode 100644 index 000000000..d8e7b0068 --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/scheduler/testing/workload_prep.go @@ -0,0 +1,137 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "fmt" + + v1 "k8s.io/api/core/v1" +) + +type keyVal struct { + k string + v string +} + +// MakeNodesAndPodsForEvenPodsSpread serves as a testing helper for EvenPodsSpread feature. +// It builds a fake cluster containing running Pods and Nodes. +// The size of Pods and Nodes are determined by input arguments. +// The specs of Pods and Nodes are generated with the following rules: +// - Each generated node is applied with a unique label: "node: node". +// - Each generated node is applied with a rotating label: "zone: zone[0-9]". +// - Depending on the input labels, each generated pod will be applied with +// label "key1", "key1,key2", ..., "key1,key2,...,keyN" in a rotating manner. +func MakeNodesAndPodsForEvenPodsSpread(labels map[string]string, existingPodsNum, allNodesNum, filteredNodesNum int) (existingPods []*v1.Pod, allNodes []*v1.Node, filteredNodes []*v1.Node) { + var labelPairs []keyVal + for k, v := range labels { + labelPairs = append(labelPairs, keyVal{k: k, v: v}) + } + zones := 10 + // build nodes + for i := 0; i < allNodesNum; i++ { + node := MakeNode().Name(fmt.Sprintf("node%d", i)). + Label(v1.LabelTopologyZone, fmt.Sprintf("zone%d", i%zones)). + Label(v1.LabelHostname, fmt.Sprintf("node%d", i)).Obj() + allNodes = append(allNodes, node) + } + filteredNodes = allNodes[:filteredNodesNum] + // build pods + for i := 0; i < existingPodsNum; i++ { + podWrapper := MakePod().Name(fmt.Sprintf("pod%d", i)).Node(fmt.Sprintf("node%d", i%allNodesNum)) + // apply labels[0], labels[0,1], ..., labels[all] to each pod in turn + for _, p := range labelPairs[:i%len(labelPairs)+1] { + podWrapper = podWrapper.Label(p.k, p.v) + } + existingPods = append(existingPods, podWrapper.Obj()) + } + return +} + +// MakeNodesAndPodsForPodAffinity serves as a testing helper for Pod(Anti)Affinity feature. +// It builds a fake cluster containing running Pods and Nodes. +// For simplicity, the Nodes will be labelled with "region", "zone" and "node". Nodes[i] will be applied with: +// - "region": "region" + i%3 +// - "zone": "zone" + i%10 +// - "node": "node" + i +// The Pods will be applied with various combinations of PodAffinity and PodAntiAffinity terms. +func MakeNodesAndPodsForPodAffinity(existingPodsNum, allNodesNum int) (existingPods []*v1.Pod, allNodes []*v1.Node) { + tpKeyToSizeMap := map[string]int{ + "region": 3, + "zone": 10, + "node": allNodesNum, + } + // build nodes to spread across all topology domains + for i := 0; i < allNodesNum; i++ { + nodeName := fmt.Sprintf("node%d", i) + nodeWrapper := MakeNode().Name(nodeName) + for tpKey, size := range tpKeyToSizeMap { + nodeWrapper = nodeWrapper.Label(tpKey, fmt.Sprintf("%s%d", tpKey, i%size)) + } + allNodes = append(allNodes, nodeWrapper.Obj()) + } + + labels := []string{"foo", "bar", "baz"} + tpKeys := []string{"region", "zone", "node"} + + // Build pods. + // Each pod will be created with one affinity and one anti-affinity terms using all combinations of + // affinity and anti-affinity kinds listed below + // e.g., the first pod will have {affinity, anti-affinity} terms of kinds {NilPodAffinity, NilPodAffinity}; + // the second will be {NilPodAffinity, PodAntiAffinityWithRequiredReq}, etc. + affinityKinds := []PodAffinityKind{ + NilPodAffinity, + PodAffinityWithRequiredReq, + PodAffinityWithPreferredReq, + PodAffinityWithRequiredPreferredReq, + } + antiAffinityKinds := []PodAffinityKind{ + NilPodAffinity, + PodAntiAffinityWithRequiredReq, + PodAntiAffinityWithPreferredReq, + PodAntiAffinityWithRequiredPreferredReq, + } + + totalSize := len(affinityKinds) * len(antiAffinityKinds) + for i := 0; i < existingPodsNum; i++ { + podWrapper := MakePod().Name(fmt.Sprintf("pod%d", i)).Node(fmt.Sprintf("node%d", i%allNodesNum)) + label, tpKey := labels[i%len(labels)], tpKeys[i%len(tpKeys)] + + affinityIdx := i % totalSize + // len(affinityKinds) is equal to len(antiAffinityKinds) + leftIdx, rightIdx := affinityIdx/len(affinityKinds), affinityIdx%len(affinityKinds) + podWrapper = podWrapper.PodAffinityExists(label, tpKey, affinityKinds[leftIdx]) + podWrapper = podWrapper.PodAntiAffinityExists(label, tpKey, antiAffinityKinds[rightIdx]) + existingPods = append(existingPods, podWrapper.Obj()) + } + + return +} + +// MakeNodesAndPods serves as a testing helper to generate regular Nodes and Pods +// that don't use any advanced scheduling features. +func MakeNodesAndPods(existingPodsNum, allNodesNum int) (existingPods []*v1.Pod, allNodes []*v1.Node) { + // build nodes + for i := 0; i < allNodesNum; i++ { + allNodes = append(allNodes, MakeNode().Name(fmt.Sprintf("node%d", i)).Obj()) + } + // build pods + for i := 0; i < existingPodsNum; i++ { + podWrapper := MakePod().Name(fmt.Sprintf("pod%d", i)).Node(fmt.Sprintf("node%d", i%allNodesNum)) + existingPods = append(existingPods, podWrapper.Obj()) + } + return +} diff --git a/vendor/k8s.io/kubernetes/pkg/scheduler/testing/wrappers.go b/vendor/k8s.io/kubernetes/pkg/scheduler/testing/wrappers.go new file mode 100644 index 000000000..599700a8e --- /dev/null +++ b/vendor/k8s.io/kubernetes/pkg/scheduler/testing/wrappers.go @@ -0,0 +1,489 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "fmt" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/pointer" +) + +var zero int64 + +// NodeSelectorWrapper wraps a NodeSelector inside. +type NodeSelectorWrapper struct{ v1.NodeSelector } + +// MakeNodeSelector creates a NodeSelector wrapper. +func MakeNodeSelector() *NodeSelectorWrapper { + return &NodeSelectorWrapper{v1.NodeSelector{}} +} + +// In injects a matchExpression (with an operator IN) as a selectorTerm +// to the inner nodeSelector. +// NOTE: appended selecterTerms are ORed. +func (s *NodeSelectorWrapper) In(key string, vals []string) *NodeSelectorWrapper { + expression := v1.NodeSelectorRequirement{ + Key: key, + Operator: v1.NodeSelectorOpIn, + Values: vals, + } + selectorTerm := v1.NodeSelectorTerm{} + selectorTerm.MatchExpressions = append(selectorTerm.MatchExpressions, expression) + s.NodeSelectorTerms = append(s.NodeSelectorTerms, selectorTerm) + return s +} + +// NotIn injects a matchExpression (with an operator NotIn) as a selectorTerm +// to the inner nodeSelector. +func (s *NodeSelectorWrapper) NotIn(key string, vals []string) *NodeSelectorWrapper { + expression := v1.NodeSelectorRequirement{ + Key: key, + Operator: v1.NodeSelectorOpNotIn, + Values: vals, + } + selectorTerm := v1.NodeSelectorTerm{} + selectorTerm.MatchExpressions = append(selectorTerm.MatchExpressions, expression) + s.NodeSelectorTerms = append(s.NodeSelectorTerms, selectorTerm) + return s +} + +// Obj returns the inner NodeSelector. +func (s *NodeSelectorWrapper) Obj() *v1.NodeSelector { + return &s.NodeSelector +} + +// LabelSelectorWrapper wraps a LabelSelector inside. +type LabelSelectorWrapper struct{ metav1.LabelSelector } + +// MakeLabelSelector creates a LabelSelector wrapper. +func MakeLabelSelector() *LabelSelectorWrapper { + return &LabelSelectorWrapper{metav1.LabelSelector{}} +} + +// Label applies a {k,v} pair to the inner LabelSelector. +func (s *LabelSelectorWrapper) Label(k, v string) *LabelSelectorWrapper { + if s.MatchLabels == nil { + s.MatchLabels = make(map[string]string) + } + s.MatchLabels[k] = v + return s +} + +// In injects a matchExpression (with an operator In) to the inner labelSelector. +func (s *LabelSelectorWrapper) In(key string, vals []string) *LabelSelectorWrapper { + expression := metav1.LabelSelectorRequirement{ + Key: key, + Operator: metav1.LabelSelectorOpIn, + Values: vals, + } + s.MatchExpressions = append(s.MatchExpressions, expression) + return s +} + +// NotIn injects a matchExpression (with an operator NotIn) to the inner labelSelector. +func (s *LabelSelectorWrapper) NotIn(key string, vals []string) *LabelSelectorWrapper { + expression := metav1.LabelSelectorRequirement{ + Key: key, + Operator: metav1.LabelSelectorOpNotIn, + Values: vals, + } + s.MatchExpressions = append(s.MatchExpressions, expression) + return s +} + +// Exists injects a matchExpression (with an operator Exists) to the inner labelSelector. +func (s *LabelSelectorWrapper) Exists(k string) *LabelSelectorWrapper { + expression := metav1.LabelSelectorRequirement{ + Key: k, + Operator: metav1.LabelSelectorOpExists, + } + s.MatchExpressions = append(s.MatchExpressions, expression) + return s +} + +// NotExist injects a matchExpression (with an operator NotExist) to the inner labelSelector. +func (s *LabelSelectorWrapper) NotExist(k string) *LabelSelectorWrapper { + expression := metav1.LabelSelectorRequirement{ + Key: k, + Operator: metav1.LabelSelectorOpDoesNotExist, + } + s.MatchExpressions = append(s.MatchExpressions, expression) + return s +} + +// Obj returns the inner LabelSelector. +func (s *LabelSelectorWrapper) Obj() *metav1.LabelSelector { + return &s.LabelSelector +} + +// PodWrapper wraps a Pod inside. +type PodWrapper struct{ v1.Pod } + +// MakePod creates a Pod wrapper. +func MakePod() *PodWrapper { + return &PodWrapper{v1.Pod{}} +} + +// Obj returns the inner Pod. +func (p *PodWrapper) Obj() *v1.Pod { + return &p.Pod +} + +// Name sets `s` as the name of the inner pod. +func (p *PodWrapper) Name(s string) *PodWrapper { + p.SetName(s) + return p +} + +// UID sets `s` as the UID of the inner pod. +func (p *PodWrapper) UID(s string) *PodWrapper { + p.SetUID(types.UID(s)) + return p +} + +// SchedulerName sets `s` as the scheduler name of the inner pod. +func (p *PodWrapper) SchedulerName(s string) *PodWrapper { + p.Spec.SchedulerName = s + return p +} + +// Namespace sets `s` as the namespace of the inner pod. +func (p *PodWrapper) Namespace(s string) *PodWrapper { + p.SetNamespace(s) + return p +} + +// OwnerReference updates the owning controller of the pod. +func (p *PodWrapper) OwnerReference(name string, gvk schema.GroupVersionKind) *PodWrapper { + p.OwnerReferences = []metav1.OwnerReference{ + { + APIVersion: gvk.GroupVersion().String(), + Kind: gvk.Kind, + Name: name, + Controller: pointer.BoolPtr(true), + }, + } + return p +} + +// Container appends a container into PodSpec of the inner pod. +func (p *PodWrapper) Container(s string) *PodWrapper { + p.Spec.Containers = append(p.Spec.Containers, v1.Container{ + Name: fmt.Sprintf("con%d", len(p.Spec.Containers)), + Image: s, + }) + return p +} + +// Priority sets a priority value into PodSpec of the inner pod. +func (p *PodWrapper) Priority(val int32) *PodWrapper { + p.Spec.Priority = &val + return p +} + +// Terminating sets the inner pod's deletionTimestamp to current timestamp. +func (p *PodWrapper) Terminating() *PodWrapper { + now := metav1.Now() + p.DeletionTimestamp = &now + return p +} + +// ZeroTerminationGracePeriod sets the TerminationGracePeriodSeconds of the inner pod to zero. +func (p *PodWrapper) ZeroTerminationGracePeriod() *PodWrapper { + p.Spec.TerminationGracePeriodSeconds = &zero + return p +} + +// Node sets `s` as the nodeName of the inner pod. +func (p *PodWrapper) Node(s string) *PodWrapper { + p.Spec.NodeName = s + return p +} + +// NodeSelector sets `m` as the nodeSelector of the inner pod. +func (p *PodWrapper) NodeSelector(m map[string]string) *PodWrapper { + p.Spec.NodeSelector = m + return p +} + +// NodeAffinityIn creates a HARD node affinity (with the operator In) +// and injects into the inner pod. +func (p *PodWrapper) NodeAffinityIn(key string, vals []string) *PodWrapper { + if p.Spec.Affinity == nil { + p.Spec.Affinity = &v1.Affinity{} + } + if p.Spec.Affinity.NodeAffinity == nil { + p.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{} + } + nodeSelector := MakeNodeSelector().In(key, vals).Obj() + p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = nodeSelector + return p +} + +// NodeAffinityNotIn creates a HARD node affinity (with the operator NotIn) +// and injects into the inner pod. +func (p *PodWrapper) NodeAffinityNotIn(key string, vals []string) *PodWrapper { + if p.Spec.Affinity == nil { + p.Spec.Affinity = &v1.Affinity{} + } + if p.Spec.Affinity.NodeAffinity == nil { + p.Spec.Affinity.NodeAffinity = &v1.NodeAffinity{} + } + nodeSelector := MakeNodeSelector().NotIn(key, vals).Obj() + p.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution = nodeSelector + return p +} + +// StartTime sets `t` as .status.startTime for the inner pod. +func (p *PodWrapper) StartTime(t metav1.Time) *PodWrapper { + p.Status.StartTime = &t + return p +} + +// NominatedNodeName sets `n` as the .Status.NominatedNodeName of the inner pod. +func (p *PodWrapper) NominatedNodeName(n string) *PodWrapper { + p.Status.NominatedNodeName = n + return p +} + +// Toleration creates a toleration (with the operator Exists) +// and injects into the inner pod. +func (p *PodWrapper) Toleration(key string) *PodWrapper { + p.Spec.Tolerations = append(p.Spec.Tolerations, v1.Toleration{ + Key: key, + Operator: v1.TolerationOpExists, + }) + return p +} + +// HostPort creates a container with a hostPort valued `hostPort`, +// and injects into the inner pod. +func (p *PodWrapper) HostPort(port int32) *PodWrapper { + p.Spec.Containers = append(p.Spec.Containers, v1.Container{ + Ports: []v1.ContainerPort{{HostPort: port}}, + }) + return p +} + +// PodAffinityKind represents different kinds of PodAffinity. +type PodAffinityKind int + +const ( + // NilPodAffinity is a no-op which doesn't apply any PodAffinity. + NilPodAffinity PodAffinityKind = iota + // PodAffinityWithRequiredReq applies a HARD requirement to pod.spec.affinity.PodAffinity. + PodAffinityWithRequiredReq + // PodAffinityWithPreferredReq applies a SOFT requirement to pod.spec.affinity.PodAffinity. + PodAffinityWithPreferredReq + // PodAffinityWithRequiredPreferredReq applies HARD and SOFT requirements to pod.spec.affinity.PodAffinity. + PodAffinityWithRequiredPreferredReq + // PodAntiAffinityWithRequiredReq applies a HARD requirement to pod.spec.affinity.PodAntiAffinity. + PodAntiAffinityWithRequiredReq + // PodAntiAffinityWithPreferredReq applies a SOFT requirement to pod.spec.affinity.PodAntiAffinity. + PodAntiAffinityWithPreferredReq + // PodAntiAffinityWithRequiredPreferredReq applies HARD and SOFT requirements to pod.spec.affinity.PodAntiAffinity. + PodAntiAffinityWithRequiredPreferredReq +) + +// PodAffinityExists creates an PodAffinity with the operator "Exists" +// and injects into the inner pod. +func (p *PodWrapper) PodAffinityExists(labelKey, topologyKey string, kind PodAffinityKind) *PodWrapper { + if kind == NilPodAffinity { + return p + } + + if p.Spec.Affinity == nil { + p.Spec.Affinity = &v1.Affinity{} + } + if p.Spec.Affinity.PodAffinity == nil { + p.Spec.Affinity.PodAffinity = &v1.PodAffinity{} + } + labelSelector := MakeLabelSelector().Exists(labelKey).Obj() + term := v1.PodAffinityTerm{LabelSelector: labelSelector, TopologyKey: topologyKey} + switch kind { + case PodAffinityWithRequiredReq: + p.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append( + p.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution, + term, + ) + case PodAffinityWithPreferredReq: + p.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append( + p.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution, + v1.WeightedPodAffinityTerm{Weight: 1, PodAffinityTerm: term}, + ) + case PodAffinityWithRequiredPreferredReq: + p.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append( + p.Spec.Affinity.PodAffinity.RequiredDuringSchedulingIgnoredDuringExecution, + term, + ) + p.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append( + p.Spec.Affinity.PodAffinity.PreferredDuringSchedulingIgnoredDuringExecution, + v1.WeightedPodAffinityTerm{Weight: 1, PodAffinityTerm: term}, + ) + } + return p +} + +// PodAntiAffinityExists creates an PodAntiAffinity with the operator "Exists" +// and injects into the inner pod. +func (p *PodWrapper) PodAntiAffinityExists(labelKey, topologyKey string, kind PodAffinityKind) *PodWrapper { + if kind == NilPodAffinity { + return p + } + + if p.Spec.Affinity == nil { + p.Spec.Affinity = &v1.Affinity{} + } + if p.Spec.Affinity.PodAntiAffinity == nil { + p.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{} + } + labelSelector := MakeLabelSelector().Exists(labelKey).Obj() + term := v1.PodAffinityTerm{LabelSelector: labelSelector, TopologyKey: topologyKey} + switch kind { + case PodAntiAffinityWithRequiredReq: + p.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append( + p.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution, + term, + ) + case PodAntiAffinityWithPreferredReq: + p.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append( + p.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution, + v1.WeightedPodAffinityTerm{Weight: 1, PodAffinityTerm: term}, + ) + case PodAntiAffinityWithRequiredPreferredReq: + p.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append( + p.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution, + term, + ) + p.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution = append( + p.Spec.Affinity.PodAntiAffinity.PreferredDuringSchedulingIgnoredDuringExecution, + v1.WeightedPodAffinityTerm{Weight: 1, PodAffinityTerm: term}, + ) + } + return p +} + +// SpreadConstraint constructs a TopologySpreadConstraint object and injects +// into the inner pod. +func (p *PodWrapper) SpreadConstraint(maxSkew int, tpKey string, mode v1.UnsatisfiableConstraintAction, selector *metav1.LabelSelector) *PodWrapper { + c := v1.TopologySpreadConstraint{ + MaxSkew: int32(maxSkew), + TopologyKey: tpKey, + WhenUnsatisfiable: mode, + LabelSelector: selector, + } + p.Spec.TopologySpreadConstraints = append(p.Spec.TopologySpreadConstraints, c) + return p +} + +// Label sets a {k,v} pair to the inner pod. +func (p *PodWrapper) Label(k, v string) *PodWrapper { + if p.Labels == nil { + p.Labels = make(map[string]string) + } + p.Labels[k] = v + return p +} + +// Req adds a new container to the inner pod with given resource map. +func (p *PodWrapper) Req(resMap map[v1.ResourceName]string) *PodWrapper { + if len(resMap) == 0 { + return p + } + + res := v1.ResourceList{} + for k, v := range resMap { + res[k] = resource.MustParse(v) + } + p.Spec.Containers = append(p.Spec.Containers, v1.Container{ + Resources: v1.ResourceRequirements{ + Requests: res, + }, + }) + return p +} + +// PreemptionPolicy sets the give preemption policy to the inner pod. +func (p *PodWrapper) PreemptionPolicy(policy v1.PreemptionPolicy) *PodWrapper { + p.Spec.PreemptionPolicy = &policy + return p +} + +// NodeWrapper wraps a Node inside. +type NodeWrapper struct{ v1.Node } + +// MakeNode creates a Node wrapper. +func MakeNode() *NodeWrapper { + w := &NodeWrapper{v1.Node{}} + return w.Capacity(nil) +} + +// Obj returns the inner Node. +func (n *NodeWrapper) Obj() *v1.Node { + return &n.Node +} + +// Name sets `s` as the name of the inner pod. +func (n *NodeWrapper) Name(s string) *NodeWrapper { + n.SetName(s) + return n +} + +// UID sets `s` as the UID of the inner pod. +func (n *NodeWrapper) UID(s string) *NodeWrapper { + n.SetUID(types.UID(s)) + return n +} + +// Label applies a {k,v} label pair to the inner node. +func (n *NodeWrapper) Label(k, v string) *NodeWrapper { + if n.Labels == nil { + n.Labels = make(map[string]string) + } + n.Labels[k] = v + return n +} + +// Capacity sets the capacity and the allocatable resources of the inner node. +// Each entry in `resources` corresponds to a resource name and its quantity. +// By default, the capacity and allocatable number of pods are set to 32. +func (n *NodeWrapper) Capacity(resources map[v1.ResourceName]string) *NodeWrapper { + res := v1.ResourceList{ + v1.ResourcePods: resource.MustParse("32"), + } + for name, value := range resources { + res[name] = resource.MustParse(value) + } + n.Status.Capacity, n.Status.Allocatable = res, res + return n +} + +// Images sets the images of the inner node. Each entry in `images` corresponds +// to an image name and its size in bytes. +func (n *NodeWrapper) Images(images map[string]int64) *NodeWrapper { + var containerImages []v1.ContainerImage + for name, size := range images { + containerImages = append(containerImages, v1.ContainerImage{Names: []string{name}, SizeBytes: size}) + } + n.Status.Images = containerImages + return n +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 6f5cc93b7..77e2872a0 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1254,15 +1254,14 @@ k8s.io/kube-openapi/pkg/util k8s.io/kube-openapi/pkg/util/proto k8s.io/kube-openapi/pkg/util/sets k8s.io/kube-openapi/pkg/validation/spec -# k8s.io/kube-scheduler v0.0.0 => k8s.io/kube-scheduler v0.22.6 +# k8s.io/kube-scheduler v0.22.6 => k8s.io/kube-scheduler v0.22.6 ## explicit; go 1.16 k8s.io/kube-scheduler/config/v1 k8s.io/kube-scheduler/config/v1beta1 k8s.io/kube-scheduler/config/v1beta2 k8s.io/kube-scheduler/extender/v1 -# k8s.io/kubernetes v0.0.0-00010101000000-000000000000 => k8s.io/kubernetes v1.22.6 +# k8s.io/kubernetes v1.22.6 => k8s.io/kubernetes v1.22.6 ## explicit; go 1.16 -k8s.io/kubernetes/cmd/kube-scheduler/app k8s.io/kubernetes/cmd/kube-scheduler/app/config k8s.io/kubernetes/cmd/kube-scheduler/app/options k8s.io/kubernetes/pkg/api/legacyscheme @@ -1330,6 +1329,7 @@ k8s.io/kubernetes/pkg/scheduler/internal/queue k8s.io/kubernetes/pkg/scheduler/metrics k8s.io/kubernetes/pkg/scheduler/metrics/resources k8s.io/kubernetes/pkg/scheduler/profile +k8s.io/kubernetes/pkg/scheduler/testing k8s.io/kubernetes/pkg/scheduler/util k8s.io/kubernetes/pkg/security/apparmor k8s.io/kubernetes/pkg/securitycontext