diff --git a/cmd/scheduler/app/options/options.go b/cmd/scheduler/app/options/options.go index c5205cca18..4f2838292e 100644 --- a/cmd/scheduler/app/options/options.go +++ b/cmd/scheduler/app/options/options.go @@ -191,3 +191,11 @@ func (s *ServerOption) ParseCAFiles(decryptFunc DecryptFunc) error { return nil } + +// Default new and registry a default one +func Default() *ServerOption { + s := NewServerOption() + s.AddFlags(pflag.CommandLine) + s.RegisterOptions() + return s +} diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 8a66085d95..d150630c52 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -407,22 +407,64 @@ func (pgb *podgroupBinder) Bind(job *schedulingapi.JobInfo, cluster string) (*sc return job, nil } -func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32, ignoredProvisioners []string) *SchedulerCache { - logger := klog.FromContext(context.TODO()) - kubeClient, err := kubernetes.NewForConfig(config) - if err != nil { - panic(fmt.Sprintf("failed init kubeClient, with err: %v", err)) +// updateNodeSelectors parse and update node selector key value pairs to schedule cache +func (sc *SchedulerCache) updateNodeSelectors(nodeSelectors []string) { + for _, nodeSelectorLabel := range nodeSelectors { + nodeSelectorLabelLen := len(nodeSelectorLabel) + if nodeSelectorLabelLen <= 0 { + continue + } + // check input + index := strings.Index(nodeSelectorLabel, ":") + if index < 0 || index >= (nodeSelectorLabelLen-1) { + continue + } + nodeSelectorLabelName := strings.TrimSpace(nodeSelectorLabel[:index]) + nodeSelectorLabelValue := strings.TrimSpace(nodeSelectorLabel[index+1:]) + key := nodeSelectorLabelName + ":" + nodeSelectorLabelValue + sc.nodeSelectorLabels[key] = "" } - vcClient, err := vcclient.NewForConfig(config) - if err != nil { - panic(fmt.Sprintf("failed init vcClient, with err: %v", err)) +} + +// setBatchBindParallel configure the parallel when binding tasks to apiserver +func (sc *SchedulerCache) setBatchBindParallel() { + sc.BindFlowChannel = make(chan *schedulingapi.TaskInfo, 5000) + var batchNum int + batchNum, err := strconv.Atoi(os.Getenv("BATCH_BIND_NUM")) + if err == nil && batchNum > 0 { + sc.batchNum = batchNum + } else { + sc.batchNum = 1 } - eventClient, err := kubernetes.NewForConfig(config) - if err != nil { - panic(fmt.Sprintf("failed init eventClient, with err: %v", err)) +} + +func (sc *SchedulerCache) setDefaultVolumeBinder() { + logger := klog.FromContext(context.TODO()) + var capacityCheck *volumescheduling.CapacityCheck + if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) { + capacityCheck = &volumescheduling.CapacityCheck{ + CSIDriverInformer: sc.csiDriverInformer, + CSIStorageCapacityInformer: sc.csiStorageCapacityInformer, + } } + sc.VolumeBinder = &defaultVolumeBinder{ + volumeBinder: volumescheduling.NewVolumeBinder( + logger, + sc.kubeClient, + sc.podInformer, + sc.nodeInformer, + sc.csiNodeInformer, + sc.pvcInformer, + sc.pvInformer, + sc.scInformer, + capacityCheck, + 30*time.Second, + ), + } +} - // create default queue +// newDefaultQueue init default queue +func newDefaultQueue(vcClient vcclient.Interface, defaultQueue string) { reclaimable := true defaultQue := vcv1beta1.Queue{ ObjectMeta: metav1.ObjectMeta{ @@ -434,7 +476,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu }, } - err = retry.OnError(wait.Backoff{ + err := retry.OnError(wait.Backoff{ Steps: 60, Duration: time.Second, Factor: 1, @@ -448,6 +490,24 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu if err != nil && !apierrors.IsAlreadyExists(err) { panic(fmt.Errorf("failed init default queue, with err: %v", err)) } +} + +func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueue string, nodeSelectors []string, nodeWorkers uint32, ignoredProvisioners []string) *SchedulerCache { + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + panic(fmt.Sprintf("failed init kubeClient, with err: %v", err)) + } + vcClient, err := vcclient.NewForConfig(config) + if err != nil { + panic(fmt.Sprintf("failed init vcClient, with err: %v", err)) + } + eventClient, err := kubernetes.NewForConfig(config) + if err != nil { + panic(fmt.Sprintf("failed init eventClient, with err: %v", err)) + } + + // create default queue + newDefaultQueue(vcClient, defaultQueue) klog.Infof("Create init queue named default") sc := &SchedulerCache{ @@ -479,42 +539,21 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu sc.IgnoredCSIProvisioners = ignoredProvisionersSet if len(nodeSelectors) > 0 { - for _, nodeSelectorLabel := range nodeSelectors { - nodeSelectorLabelLen := len(nodeSelectorLabel) - if nodeSelectorLabelLen <= 0 { - continue - } - // check input - index := strings.Index(nodeSelectorLabel, ":") - if index < 0 || index >= (nodeSelectorLabelLen-1) { - continue - } - nodeSelectorLabelName := strings.TrimSpace(nodeSelectorLabel[:index]) - nodeSelectorLabelValue := strings.TrimSpace(nodeSelectorLabel[index+1:]) - key := nodeSelectorLabelName + ":" + nodeSelectorLabelValue - sc.nodeSelectorLabels[key] = "" - } + sc.updateNodeSelectors(nodeSelectors) } // Prepare event clients. broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: eventClient.CoreV1().Events("")}) sc.Recorder = broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: commonutil.GenerateComponentName(sc.schedulerNames)}) - sc.BindFlowChannel = make(chan *schedulingapi.TaskInfo, 5000) + // set concurrency configuration when binding + sc.setBatchBindParallel() if bindMethodMap == nil { klog.V(3).Info("no registered bind method, new a default one") bindMethodMap = NewDefaultBinder(sc.kubeClient, sc.Recorder) } sc.Binder = GetBindMethod() - var batchNum int - batchNum, err = strconv.Atoi(os.Getenv("BATCH_BIND_NUM")) - if err == nil && batchNum > 0 { - sc.batchNum = batchNum - } else { - sc.batchNum = 1 - } - sc.Evictor = &defaultEvictor{ kubeclient: sc.kubeClient, recorder: sc.Recorder, @@ -530,6 +569,14 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu vcclient: sc.vcClient, } + // add all events handlers + sc.addEventHandler() + // finally, init default volume binder which has dependencies on other informers + sc.setDefaultVolumeBinder() + return sc +} + +func (sc *SchedulerCache) addEventHandler() { informerFactory := informers.NewSharedInformerFactory(sc.kubeClient, 0) sc.informerFactory = informerFactory mySchedulerPodName, c := getMultiSchedulerInfo() @@ -604,31 +651,9 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu }, ) - var capacityCheck *volumescheduling.CapacityCheck if options.ServerOpts.EnableCSIStorage && utilfeature.DefaultFeatureGate.Enabled(features.CSIStorage) { sc.csiDriverInformer = informerFactory.Storage().V1().CSIDrivers() sc.csiStorageCapacityInformer = informerFactory.Storage().V1beta1().CSIStorageCapacities() - capacityCheck = &volumescheduling.CapacityCheck{ - CSIDriverInformer: sc.csiDriverInformer, - CSIStorageCapacityInformer: sc.csiStorageCapacityInformer, - } - } else { - capacityCheck = nil - } - - sc.VolumeBinder = &defaultVolumeBinder{ - volumeBinder: volumescheduling.NewVolumeBinder( - logger, - sc.kubeClient, - sc.podInformer, - sc.nodeInformer, - sc.csiNodeInformer, - sc.pvcInformer, - sc.pvInformer, - sc.scInformer, - capacityCheck, - 30*time.Second, - ), } // create informer for pod information @@ -637,7 +662,7 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu FilterFunc: func(obj interface{}) bool { switch v := obj.(type) { case *v1.Pod: - if !responsibleForPod(v, schedulerNames, mySchedulerPodName, c) { + if !responsibleForPod(v, sc.schedulerNames, mySchedulerPodName, c) { if len(v.Spec.NodeName) == 0 { return false } @@ -728,7 +753,6 @@ func newSchedulerCache(config *rest.Config, schedulerNames []string, defaultQueu DeleteFunc: sc.DeleteNumaInfoV1alpha1, }) } - return sc } // Run starts the schedulerCache diff --git a/pkg/scheduler/cache/cache_mock.go b/pkg/scheduler/cache/cache_mock.go new file mode 100644 index 0000000000..03f08cc190 --- /dev/null +++ b/pkg/scheduler/cache/cache_mock.go @@ -0,0 +1,126 @@ +package cache + +import ( + "math" + "os" + "strconv" + + schedulingv1 "k8s.io/api/scheduling/v1" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + + fakevcClient "volcano.sh/apis/pkg/client/clientset/versioned/fake" + "volcano.sh/volcano/cmd/scheduler/app/options" + schedulingapi "volcano.sh/volcano/pkg/scheduler/api" +) + +// NewCustomMockSchedulerCache returns a mock scheduler cache with custom interface +func NewCustomMockSchedulerCache(schedulerName string, + binder Binder, + evictor Evictor, + statusUpdater StatusUpdater, + PodGroupBinder BatchBinder, + volumeBinder VolumeBinder, + recorder record.EventRecorder, +) *SchedulerCache { + msc := newMockSchedulerCache(schedulerName) + // add all events handlers + msc.addEventHandler() + msc.Recorder = recorder + msc.Binder = binder + msc.Evictor = evictor + msc.StatusUpdater = statusUpdater + msc.PodGroupBinder = PodGroupBinder + // use custom volume binder + msc.VolumeBinder = volumeBinder + checkAndSetDefaultInterface(msc) + return msc +} + +// NewDefaultMockSchedulerCache returns a mock scheduler cache with interface mocked with default fake clients +// Notes that default events recorder's buffer only has a length 100; +// when use it do performance test, should use a &FakeRecorder{} without length limit to avoid block +func NewDefaultMockSchedulerCache(schedulerName string) *SchedulerCache { + msc := newMockSchedulerCache(schedulerName) + // add all events handlers + msc.addEventHandler() + checkAndSetDefaultInterface(msc) + return msc +} + +func checkAndSetDefaultInterface(sc *SchedulerCache) { + if sc.Recorder == nil { + sc.Recorder = record.NewFakeRecorder(100) // to avoid blocking, we can pass in &FakeRecorder{} to NewCustomMockSchedulerCache + } + if sc.Binder == nil { + sc.Binder = &DefaultBinder{ + kubeclient: sc.kubeClient, + recorder: sc.Recorder, + } + } + if sc.Evictor == nil { + sc.Evictor = &defaultEvictor{ + kubeclient: sc.kubeClient, + recorder: sc.Recorder, + } + } + if sc.StatusUpdater == nil { + sc.StatusUpdater = &defaultStatusUpdater{ + kubeclient: sc.kubeClient, + vcclient: sc.vcClient, + } + } + if sc.PodGroupBinder == nil { + sc.PodGroupBinder = &podgroupBinder{ + kubeclient: sc.kubeClient, + vcclient: sc.vcClient, + } + } + // finally, init default fake volume binder which has dependencies on other informers + if sc.VolumeBinder == nil { + sc.setDefaultVolumeBinder() + } +} + +func getNodeWorkers() uint32 { + if options.ServerOpts.NodeWorkerThreads > 0 { + return options.ServerOpts.NodeWorkerThreads + } + threads, err := strconv.Atoi(os.Getenv("NODE_WORKER_THREADS")) + if err == nil && threads > 0 && threads <= math.MaxUint32 { + return uint32(threads) + } + return 2 //default 2 +} + +// newMockSchedulerCache init the mock scheduler cache structure +func newMockSchedulerCache(schedulerName string) *SchedulerCache { + msc := &SchedulerCache{ + Jobs: make(map[schedulingapi.JobID]*schedulingapi.JobInfo), + Nodes: make(map[string]*schedulingapi.NodeInfo), + Queues: make(map[schedulingapi.QueueID]*schedulingapi.QueueInfo), + PriorityClasses: make(map[string]*schedulingv1.PriorityClass), + errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + DeletedJobs: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + kubeClient: fake.NewSimpleClientset(), + vcClient: fakevcClient.NewSimpleClientset(), + restConfig: nil, + defaultQueue: "default", + schedulerNames: []string{schedulerName}, + nodeSelectorLabels: make(map[string]string), + NamespaceCollection: make(map[string]*schedulingapi.NamespaceCollection), + CSINodesStatus: make(map[string]*schedulingapi.CSINodeStatusInfo), + imageStates: make(map[string]*imageState), + + NodeList: []string{}, + } + if len(options.ServerOpts.NodeSelector) > 0 { + msc.updateNodeSelectors(options.ServerOpts.NodeSelector) + } + msc.setBatchBindParallel() + msc.nodeWorkers = getNodeWorkers() + + return msc +} diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index c0fea2a5cf..a0edc1dfd3 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -26,16 +26,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/client-go/util/workqueue" - "k8s.io/klog/v2" - "volcano.sh/volcano/pkg/scheduler/api" - volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding" "volcano.sh/volcano/pkg/scheduler/util" ) @@ -318,73 +310,35 @@ func TestNodeOperation(t *testing.T) { } func TestBindTasks(t *testing.T) { - logger := klog.FromContext(context.TODO()) owner := buildOwnerReference("j1") scheduler := "fake-scheduler" - fakeKube := fake.NewSimpleClientset() - informerFactory := informers.NewSharedInformerFactory(fakeKube, 0) - sc := &SchedulerCache{ - Jobs: make(map[api.JobID]*api.JobInfo), - Nodes: make(map[string]*api.NodeInfo), - kubeClient: fakeKube, - schedulerNames: []string{scheduler}, - Recorder: record.NewFakeRecorder(10), - BindFlowChannel: make(chan *api.TaskInfo, 5000), - podInformer: informerFactory.Core().V1().Pods(), - nodeInformer: informerFactory.Core().V1().Nodes(), - csiNodeInformer: informerFactory.Storage().V1().CSINodes(), - pvcInformer: informerFactory.Core().V1().PersistentVolumeClaims(), - pvInformer: informerFactory.Core().V1().PersistentVolumes(), - scInformer: informerFactory.Storage().V1().StorageClasses(), - errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), - } - - sc.Binder = &DefaultBinder{sc.kubeClient, sc.Recorder} - sc.VolumeBinder = &defaultVolumeBinder{ - volumeBinder: volumescheduling.NewVolumeBinder( - logger, - sc.kubeClient, - sc.podInformer, - sc.nodeInformer, - sc.csiNodeInformer, - sc.pvcInformer, - sc.pvInformer, - sc.scInformer, - nil, - 100*time.Millisecond, - )} - - sc.podInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: sc.AddPod, - UpdateFunc: sc.UpdatePod, - DeleteFunc: sc.DeletePod, - }, - ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go wait.Until(sc.processBindTask, time.Millisecond*5, ctx.Done()) + + sc := NewDefaultMockSchedulerCache(scheduler) + sc.Run(ctx.Done()) + kubeCli := sc.kubeClient + pod := buildPod("c1", "p1", "", v1.PodPending, api.BuildResourceList("1000m", "1G"), []metav1.OwnerReference{owner}, make(map[string]string)) node := buildNode("n1", api.BuildResourceList("2000m", "10G", []api.ScalarResource{{Name: "pods", Value: "10"}}...)) pod.Annotations = map[string]string{"scheduling.k8s.io/group-name": "j1"} pod.Spec.SchedulerName = scheduler // make sure pod exist when calling fake client binding - fakeKube.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) - informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(ctx.Done()) + kubeCli.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) // set node in cache directly - sc.AddOrUpdateNode(node) + kubeCli.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + // wait for pod synced + time.Sleep(100 * time.Millisecond) task := api.NewTaskInfo(pod) task.NodeName = "n1" err := sc.AddBindTask(task) if err != nil { t.Errorf("failed to bind pod to node: %v", err) } - time.Sleep(200 * time.Millisecond) + time.Sleep(100 * time.Millisecond) r := sc.Recorder.(*record.FakeRecorder) if len(r.Events) != 1 { t.Fatalf("succesfully binding task should have 1 event") diff --git a/pkg/scheduler/cache/main_test.go b/pkg/scheduler/cache/main_test.go new file mode 100644 index 0000000000..f1e027e049 --- /dev/null +++ b/pkg/scheduler/cache/main_test.go @@ -0,0 +1,13 @@ +package cache + +import ( + "os" + "testing" + + "volcano.sh/volcano/cmd/scheduler/app/options" +) + +func TestMain(m *testing.M) { + options.Default() + os.Exit(m.Run()) +}