Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add mock cache for ut and performance test #3269

Merged
merged 1 commit into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/scheduler/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,11 @@ func (s *ServerOption) ParseCAFiles(decryptFunc DecryptFunc) error {

return nil
}

// Default new and registry a default one
func Default() *ServerOption {
s := NewServerOption()
s.AddFlags(pflag.CommandLine)
s.RegisterOptions()
return s
}
146 changes: 85 additions & 61 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,22 +407,64 @@ func (pgb *podgroupBinder) Bind(job *schedulingapi.JobInfo, cluster string) (*sc
return job, nil
}

func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32, ignoredProvisioners []string) *SchedulerCache {
logger := klog.FromContext(context.TODO())
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init kubeClient, with err: %v", err))
// updateNodeSelectors parse and update node selector key value pairs to schedule cache
func (sc *SchedulerCache) updateNodeSelectors(nodeSelectors []string) {
for _, nodeSelectorLabel := range nodeSelectors {
nodeSelectorLabelLen := len(nodeSelectorLabel)
if nodeSelectorLabelLen <= 0 {
continue
}
// check input
index := strings.Index(nodeSelectorLabel, ":")
if index < 0 || index >= (nodeSelectorLabelLen-1) {
continue
}
nodeSelectorLabelName := strings.TrimSpace(nodeSelectorLabel[:index])
nodeSelectorLabelValue := strings.TrimSpace(nodeSelectorLabel[index+1:])
key := nodeSelectorLabelName + ":" + nodeSelectorLabelValue
sc.nodeSelectorLabels[key] = ""
}
vcClient, err := vcclient.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init vcClient, with err: %v", err))
}

// setBatchBindParallel configure the parallel when binding tasks to apiserver
func (sc *SchedulerCache) setBatchBindParallel() {
sc.BindFlowChannel = make(chan *schedulingapi.TaskInfo, 5000)
var batchNum int
batchNum, err := strconv.Atoi(os.Getenv("BATCH_BIND_NUM"))
if err == nil && batchNum > 0 {
sc.batchNum = batchNum
} else {
sc.batchNum = 1
}
eventClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init eventClient, with err: %v", err))
}

func (sc *SchedulerCache) setDefaultVolumeBinder() {
logger := klog.FromContext(context.TODO())
var capacityCheck *volumescheduling.CapacityCheck
if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) {
capacityCheck = &volumescheduling.CapacityCheck{
CSIDriverInformer: sc.csiDriverInformer,
CSIStorageCapacityInformer: sc.csiStorageCapacityInformer,
}
}
sc.VolumeBinder = &defaultVolumeBinder{
volumeBinder: volumescheduling.NewVolumeBinder(
logger,
sc.kubeClient,
sc.podInformer,
sc.nodeInformer,
sc.csiNodeInformer,
sc.pvcInformer,
sc.pvInformer,
sc.scInformer,
capacityCheck,
30*time.Second,
),
}
}

// create default queue
// newDefaultQueue init default queue
func newDefaultQueue(vcClient vcclient.Interface, defaultQueue string) {
reclaimable := true
defaultQue := vcv1beta1.Queue{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -434,7 +476,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
},
}

err = retry.OnError(wait.Backoff{
err := retry.OnError(wait.Backoff{
Steps: 60,
Duration: time.Second,
Factor: 1,
Expand All @@ -448,6 +490,24 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
if err != nil && !apierrors.IsAlreadyExists(err) {
panic(fmt.Errorf("failed init default queue, with err: %v", err))
}
}

func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32, ignoredProvisioners []string) *SchedulerCache {
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init kubeClient, with err: %v", err))
}
vcClient, err := vcclient.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init vcClient, with err: %v", err))
}
eventClient, err := kubernetes.NewForConfig(config)
if err != nil {
panic(fmt.Sprintf("failed init eventClient, with err: %v", err))
}

// create default queue
newDefaultQueue(vcClient, defaultQueue)
klog.Infof("Create init queue named default")

sc := &SchedulerCache{
Expand Down Expand Up @@ -479,42 +539,21 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
sc.IgnoredCSIProvisioners = ignoredProvisionersSet

if len(nodeSelectors) > 0 {
for _, nodeSelectorLabel := range nodeSelectors {
nodeSelectorLabelLen := len(nodeSelectorLabel)
if nodeSelectorLabelLen <= 0 {
continue
}
// check input
index := strings.Index(nodeSelectorLabel, ":")
if index < 0 || index >= (nodeSelectorLabelLen-1) {
continue
}
nodeSelectorLabelName := strings.TrimSpace(nodeSelectorLabel[:index])
nodeSelectorLabelValue := strings.TrimSpace(nodeSelectorLabel[index+1:])
key := nodeSelectorLabelName + ":" + nodeSelectorLabelValue
sc.nodeSelectorLabels[key] = ""
}
sc.updateNodeSelectors(nodeSelectors)
}
// Prepare event clients.
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: eventClient.CoreV1().Events("")})
sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: commonutil.GenerateComponentName(sc.schedulerNames)})

sc.BindFlowChannel = make(chan *schedulingapi.TaskInfo, 5000)
// set concurrency configuration when binding
sc.setBatchBindParallel()
if bindMethodMap == nil {
klog.V(3).Info("no registered bind method, new a default one")
bindMethodMap = NewDefaultBinder(sc.kubeClient, sc.Recorder)
}
sc.Binder = GetBindMethod()

var batchNum int
batchNum, err = strconv.Atoi(os.Getenv("BATCH_BIND_NUM"))
if err == nil && batchNum > 0 {
sc.batchNum = batchNum
} else {
sc.batchNum = 1
}

sc.Evictor = &defaultEvictor{
kubeclient: sc.kubeClient,
recorder: sc.Recorder,
Expand All @@ -530,6 +569,14 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
vcclient: sc.vcClient,
}

// add all events handlers
sc.addEventHandler()
// finally, init default volume binder which has dependencies on other informers
sc.setDefaultVolumeBinder()
return sc
}

func (sc *SchedulerCache) addEventHandler() {
informerFactory := informers.NewSharedInformerFactory(sc.kubeClient, 0)
sc.informerFactory = informerFactory
mySchedulerPodName, c := getMultiSchedulerInfo()
Expand Down Expand Up @@ -604,31 +651,9 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
},
)

var capacityCheck *volumescheduling.CapacityCheck
if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) {
sc.csiDriverInformer = informerFactory.Storage().V1().CSIDrivers()
sc.csiStorageCapacityInformer = informerFactory.Storage().V1beta1().CSIStorageCapacities()
capacityCheck = &volumescheduling.CapacityCheck{
CSIDriverInformer: sc.csiDriverInformer,
CSIStorageCapacityInformer: sc.csiStorageCapacityInformer,
}
} else {
capacityCheck = nil
}

sc.VolumeBinder = &defaultVolumeBinder{
volumeBinder: volumescheduling.NewVolumeBinder(
logger,
sc.kubeClient,
sc.podInformer,
sc.nodeInformer,
sc.csiNodeInformer,
sc.pvcInformer,
sc.pvInformer,
sc.scInformer,
capacityCheck,
30*time.Second,
),
}

// create informer for pod information
Expand All @@ -637,7 +662,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
FilterFunc: func(obj interface{}) bool {
switch v := obj.(type) {
case *v1.Pod:
if !responsibleForPod(v, schedulerNames, mySchedulerPodName, c) {
if !responsibleForPod(v, sc.schedulerNames, mySchedulerPodName, c) {
if len(v.Spec.NodeName) == 0 {
return false
}
Expand Down Expand Up @@ -728,7 +753,6 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu
DeleteFunc: sc.DeleteNumaInfoV1alpha1,
})
}
return sc
}

// Run starts the schedulerCache
Expand Down
126 changes: 126 additions & 0 deletions pkg/scheduler/cache/cache_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package cache

import (
"math"
"os"
"strconv"

schedulingv1 "k8s.io/api/scheduling/v1"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

fakevcClient "volcano.sh/apis/pkg/client/clientset/versioned/fake"
"volcano.sh/volcano/cmd/scheduler/app/options"
schedulingapi "volcano.sh/volcano/pkg/scheduler/api"
)

// NewCustomMockSchedulerCache returns a mock scheduler cache with custom interface
func NewCustomMockSchedulerCache(schedulerName string,
lowang-bh marked this conversation as resolved.
Show resolved Hide resolved
binder Binder,
evictor Evictor,
statusUpdater StatusUpdater,
PodGroupBinder BatchBinder,
volumeBinder VolumeBinder,
recorder record.EventRecorder,
) *SchedulerCache {
msc := newMockSchedulerCache(schedulerName)
// add all events handlers
msc.addEventHandler()
msc.Recorder = recorder
msc.Binder = binder
msc.Evictor = evictor
msc.StatusUpdater = statusUpdater
msc.PodGroupBinder = PodGroupBinder
// use custom volume binder
msc.VolumeBinder = volumeBinder
checkAndSetDefaultInterface(msc)
return msc
}

// NewDefaultMockSchedulerCache returns a mock scheduler cache with interface mocked with default fake clients
// Notes that default events recorder's buffer only has a length 100;
// when use it do performance test, should use a &FakeRecorder{} without length limit to avoid block
func NewDefaultMockSchedulerCache(schedulerName string) *SchedulerCache {
msc := newMockSchedulerCache(schedulerName)
// add all events handlers
msc.addEventHandler()
checkAndSetDefaultInterface(msc)
return msc
}

func checkAndSetDefaultInterface(sc *SchedulerCache) {
if sc.Recorder == nil {
sc.Recorder = record.NewFakeRecorder(100) // to avoid blocking, we can pass in &FakeRecorder{} to NewCustomMockSchedulerCache
}
if sc.Binder == nil {
sc.Binder = &DefaultBinder{
kubeclient: sc.kubeClient,
recorder: sc.Recorder,
}
}
if sc.Evictor == nil {
sc.Evictor = &defaultEvictor{
kubeclient: sc.kubeClient,
recorder: sc.Recorder,
}
}
if sc.StatusUpdater == nil {
sc.StatusUpdater = &defaultStatusUpdater{
kubeclient: sc.kubeClient,
vcclient: sc.vcClient,
}
}
if sc.PodGroupBinder == nil {
sc.PodGroupBinder = &podgroupBinder{
kubeclient: sc.kubeClient,
vcclient: sc.vcClient,
}
}
// finally, init default fake volume binder which has dependencies on other informers
if sc.VolumeBinder == nil {
sc.setDefaultVolumeBinder()
}
}

func getNodeWorkers() uint32 {
if options.ServerOpts.NodeWorkerThreads > 0 {
return options.ServerOpts.NodeWorkerThreads
}
threads, err := strconv.Atoi(os.Getenv("NODE_WORKER_THREADS"))
if err == nil && threads > 0 && threads <= math.MaxUint32 {
return uint32(threads)
Fixed Show fixed Hide fixed
}
return 2 //default 2
}

// newMockSchedulerCache init the mock scheduler cache structure
func newMockSchedulerCache(schedulerName string) *SchedulerCache {
msc := &SchedulerCache{
Jobs: make(map[schedulingapi.JobID]*schedulingapi.JobInfo),
Nodes: make(map[string]*schedulingapi.NodeInfo),
Queues: make(map[schedulingapi.QueueID]*schedulingapi.QueueInfo),
PriorityClasses: make(map[string]*schedulingv1.PriorityClass),
errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
DeletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
kubeClient: fake.NewSimpleClientset(),
vcClient: fakevcClient.NewSimpleClientset(),
restConfig: nil,
defaultQueue: "default",
schedulerNames: []string{schedulerName},
nodeSelectorLabels: make(map[string]string),
NamespaceCollection: make(map[string]*schedulingapi.NamespaceCollection),
CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo),
imageStates: make(map[string]*imageState),

NodeList: []string{},
}
if len(options.ServerOpts.NodeSelector) > 0 {
msc.updateNodeSelectors(options.ServerOpts.NodeSelector)
}
msc.setBatchBindParallel()
msc.nodeWorkers = getNodeWorkers()

return msc
}
Loading
Loading