diff --git a/cmd/scheduler/app/options/options_test.go b/cmd/scheduler/app/options/options_test.go index e815e6ff3e1..30999528d84 100644 --- a/cmd/scheduler/app/options/options_test.go +++ b/cmd/scheduler/app/options/options_test.go @@ -57,6 +57,7 @@ func TestAddFlags(t *testing.T) { PercentageOfNodesToFind: defaultPercentageOfNodesToFind, EnableLeaderElection: true, LockObjectNamespace: defaultLockObjectNamespace, + NodeWorkerThreads: defaultNodeWorkers, } if !reflect.DeepEqual(expected, s) { diff --git a/pkg/scheduler/actions/allocate/allocate_test.go b/pkg/scheduler/actions/allocate/allocate_test.go index fa36d32e382..0d8b46a7201 100644 --- a/pkg/scheduler/actions/allocate/allocate_test.go +++ b/pkg/scheduler/actions/allocate/allocate_test.go @@ -269,7 +269,7 @@ func TestAllocate(t *testing.T) { } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) @@ -455,7 +455,7 @@ func TestAllocateWithDynamicPVC(t *testing.T) { schedulerCache.AddPod(pod) } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } trueValue := true diff --git a/pkg/scheduler/actions/preempt/preempt_test.go b/pkg/scheduler/actions/preempt/preempt_test.go index 8a80a762b68..771bf3d44b7 100644 --- a/pkg/scheduler/actions/preempt/preempt_test.go +++ b/pkg/scheduler/actions/preempt/preempt_test.go @@ -314,7 +314,7 @@ func TestPreempt(t *testing.T) { Value: 10, } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) diff --git a/pkg/scheduler/actions/reclaim/reclaim_test.go b/pkg/scheduler/actions/reclaim/reclaim_test.go index f02b7383d36..1c197b1fec9 100644 --- a/pkg/scheduler/actions/reclaim/reclaim_test.go +++ b/pkg/scheduler/actions/reclaim/reclaim_test.go @@ -148,7 +148,7 @@ func TestReclaim(t *testing.T) { Value: 10, } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) diff --git a/pkg/scheduler/actions/shuffle/shuffle_test.go b/pkg/scheduler/actions/shuffle/shuffle_test.go index 226e3ef3c4c..638d5d94dd4 100644 --- a/pkg/scheduler/actions/shuffle/shuffle_test.go +++ b/pkg/scheduler/actions/shuffle/shuffle_test.go @@ -163,7 +163,7 @@ func TestShuffle(t *testing.T) { } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, q := range test.queues { schedulerCache.AddQueueV1beta1(q) diff --git a/pkg/scheduler/cache/cache_test.go b/pkg/scheduler/cache/cache_test.go index 90001b038c0..2ec46a78902 100644 --- a/pkg/scheduler/cache/cache_test.go +++ b/pkg/scheduler/cache/cache_test.go @@ -33,7 +33,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - "volcano.sh/volcano/pkg/scheduler/api" volumescheduling "volcano.sh/volcano/pkg/scheduler/capabilities/volumebinding" "volcano.sh/volcano/pkg/scheduler/util" @@ -163,7 +162,7 @@ func TestSchedulerCache_Bind_NodeWithSufficientResources(t *testing.T) { cache.AddPod(pod) node := buildNode("n1", buildResourceList("2000m", "10G")) - cache.AddNode(node) + cache.AddOrUpdateNode(node) task := api.NewTaskInfo(pod) task.Job = "j1" @@ -195,7 +194,7 @@ func TestSchedulerCache_Bind_NodeWithInsufficientResources(t *testing.T) { cache.AddPod(pod) node := buildNode("n1", buildResourceList("2000m", "10G")) - cache.AddNode(node) + cache.AddOrUpdateNode(node) task := api.NewTaskInfo(pod) task.Job = "j1" @@ -307,7 +306,7 @@ func TestNodeOperation(t *testing.T) { } for _, n := range test.nodes { - cache.AddNode(n) + cache.AddOrUpdateNode(n) } if !reflect.DeepEqual(cache, test.expected) { @@ -316,7 +315,7 @@ func TestNodeOperation(t *testing.T) { } // delete node - cache.DeleteNode(test.deletedNode) + cache.RemoveNode(test.deletedNode.Name) if !reflect.DeepEqual(cache, test.delExpect) { t.Errorf("case %d: \n expected %v, \n got %v \n", i, test.delExpect, cache) @@ -344,6 +343,7 @@ func TestBindTasks(t *testing.T) { pvInformer: informerFactory.Core().V1().PersistentVolumes(), scInformer: informerFactory.Storage().V1().StorageClasses(), errTasks: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + nodeQueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), } sc.Binder = &DefaultBinder{} @@ -367,12 +367,6 @@ func TestBindTasks(t *testing.T) { DeleteFunc: sc.DeletePod, }, ) - sc.nodeInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: sc.AddNode, - UpdateFunc: sc.UpdateNode, - }, - ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go wait.Until(sc.processBindTask, time.Millisecond*5, ctx.Done()) @@ -383,9 +377,10 @@ func TestBindTasks(t *testing.T) { // make sure pod exist when calling fake client binding fakeKube.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) - fakeKube.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(ctx.Done()) + // set node in cache directly + sc.AddOrUpdateNode(node) task := api.NewTaskInfo(pod) task.NodeName = "n1" diff --git a/pkg/scheduler/cache/event_handlers_test.go b/pkg/scheduler/cache/event_handlers_test.go index a82f1393d42..c2035dd638b 100644 --- a/pkg/scheduler/cache/event_handlers_test.go +++ b/pkg/scheduler/cache/event_handlers_test.go @@ -75,7 +75,7 @@ func TestSchedulerCache_updateTask(t *testing.T) { } for _, n := range test.Nodes { - cache.AddNode(n) + cache.AddOrUpdateNode(n) } cache.AddPod(test.OldPod) @@ -129,7 +129,7 @@ func TestSchedulerCache_UpdatePod(t *testing.T) { } for _, n := range test.Nodes { - cache.AddNode(n) + cache.AddOrUpdateNode(n) } cache.AddPod(test.OldPod) @@ -210,7 +210,7 @@ func TestSchedulerCache_AddPodGroupV1beta1(t *testing.T) { } for _, n := range test.Nodes { - cache.AddNode(n) + cache.AddOrUpdateNode(n) } test.Pod.Annotations = map[string]string{ "scheduling.k8s.io/group-name": "j1", @@ -336,7 +336,7 @@ func TestSchedulerCache_UpdatePodGroupV1beta1(t *testing.T) { } for _, n := range test.Nodes { - cache.AddNode(n) + cache.AddOrUpdateNode(n) } test.Pod.Annotations = map[string]string{ "scheduling.k8s.io/group-name": "j1", @@ -431,7 +431,7 @@ func TestSchedulerCache_DeletePodGroupV1beta1(t *testing.T) { cache.DeletedJobs = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) for _, n := range test.Nodes { - cache.AddNode(n) + cache.AddOrUpdateNode(n) } test.Pod.Annotations = map[string]string{ "scheduling.k8s.io/group-name": "j1", diff --git a/pkg/scheduler/plugins/binpack/binpack_test.go b/pkg/scheduler/plugins/binpack/binpack_test.go index 0105b3866ba..c35f4e1653e 100644 --- a/pkg/scheduler/plugins/binpack/binpack_test.go +++ b/pkg/scheduler/plugins/binpack/binpack_test.go @@ -265,7 +265,7 @@ func TestNode(t *testing.T) { Recorder: record.NewFakeRecorder(100), } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) diff --git a/pkg/scheduler/plugins/drf/hdrf_test.go b/pkg/scheduler/plugins/drf/hdrf_test.go index c4919f4042a..8bbc4cfc0e6 100644 --- a/pkg/scheduler/plugins/drf/hdrf_test.go +++ b/pkg/scheduler/plugins/drf/hdrf_test.go @@ -231,7 +231,7 @@ func TestHDRF(t *testing.T) { Recorder: record.NewFakeRecorder(100), } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, q := range test.queueSpecs { schedulerCache.AddQueueV1beta1( diff --git a/pkg/scheduler/plugins/predicates/predicates_test.go b/pkg/scheduler/plugins/predicates/predicates_test.go index 50541bae078..b39e29676f9 100644 --- a/pkg/scheduler/plugins/predicates/predicates_test.go +++ b/pkg/scheduler/plugins/predicates/predicates_test.go @@ -70,7 +70,7 @@ func TestEventHandler(t *testing.T) { return } - sc := cache.New(config, option.SchedulerNames, option.DefaultQueue, option.NodeSelector) + sc := cache.New(config, option.SchedulerNames, option.DefaultQueue, option.NodeSelector, option.NodeWorkerThreads) schedulerCache := sc.(*cache.SchedulerCache) // pending pods @@ -157,7 +157,7 @@ func TestEventHandler(t *testing.T) { } }() for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) diff --git a/pkg/scheduler/plugins/proportion/proportion_test.go b/pkg/scheduler/plugins/proportion/proportion_test.go index f7b4398b5c6..dcc0512f565 100644 --- a/pkg/scheduler/plugins/proportion/proportion_test.go +++ b/pkg/scheduler/plugins/proportion/proportion_test.go @@ -217,7 +217,7 @@ func TestProportion(t *testing.T) { schedulerCache.DeletedJobs = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) diff --git a/pkg/scheduler/plugins/tdm/tdm_test.go b/pkg/scheduler/plugins/tdm/tdm_test.go index c2ec63f1d7f..c696bc77922 100644 --- a/pkg/scheduler/plugins/tdm/tdm_test.go +++ b/pkg/scheduler/plugins/tdm/tdm_test.go @@ -251,7 +251,7 @@ func Test_TDM(t *testing.T) { Recorder: record.NewFakeRecorder(100), } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } schedulerCache.AddPod(test.pod) @@ -713,7 +713,7 @@ func Test_TDM_victimsFn(t *testing.T) { Recorder: record.NewFakeRecorder(100), } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { diff --git a/pkg/scheduler/plugins/usage/usage_test.go b/pkg/scheduler/plugins/usage/usage_test.go index d738cbddc5a..3b1bb4383c6 100644 --- a/pkg/scheduler/plugins/usage/usage_test.go +++ b/pkg/scheduler/plugins/usage/usage_test.go @@ -314,7 +314,7 @@ func TestUsage_predicateFn(t *testing.T) { } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod) @@ -513,7 +513,7 @@ func TestUsage_nodeOrderFn(t *testing.T) { } for _, node := range test.nodes { - schedulerCache.AddNode(node) + schedulerCache.AddOrUpdateNode(node) } for _, pod := range test.pods { schedulerCache.AddPod(pod)