From 7bee28e3940b94c8a551329fe34cb8cb7b1a40a6 Mon Sep 17 00:00:00 2001 From: wangyang Date: Wed, 14 Jun 2023 20:35:36 +0800 Subject: [PATCH 1/6] Transform the predicate interface to support the outer judgment of allocate and preempt at the same time Signed-off-by: wangyang --- docs/design/device-sharing.md | 21 +++- pkg/scheduler/actions/allocate/allocate.go | 52 +++++++-- pkg/scheduler/actions/backfill/backfill.go | 62 +++++++--- pkg/scheduler/actions/preempt/preempt.go | 27 ++++- pkg/scheduler/actions/reclaim/reclaim.go | 38 ++++++- .../devices/nvidia/gpushare/device_info.go | 14 ++- .../api/devices/nvidia/vgpu/device_info.go | 10 +- pkg/scheduler/api/devices/util.go | 38 +++++++ pkg/scheduler/api/shared_device_pool.go | 20 +++- pkg/scheduler/api/types.go | 32 +++++- pkg/scheduler/framework/session_plugins.go | 20 ++-- pkg/scheduler/framework/util.go | 32 +++++- pkg/scheduler/plugins/extender/argument.go | 2 +- pkg/scheduler/plugins/extender/extender.go | 12 +- pkg/scheduler/plugins/numaaware/numaaware.go | 18 ++- .../plugins/predicates/predicates.go | 106 ++++++++++++------ .../plugins/predicates/proportional.go | 12 +- .../plugins/predicates/proportional_test.go | 12 +- pkg/scheduler/plugins/tdm/tdm.go | 16 ++- pkg/scheduler/plugins/tdm/tdm_test.go | 13 ++- pkg/scheduler/plugins/usage/usage.go | 19 +++- pkg/scheduler/util/predicate_helper.go | 2 +- 22 files changed, 453 insertions(+), 125 deletions(-) create mode 100644 pkg/scheduler/api/devices/util.go diff --git a/docs/design/device-sharing.md b/docs/design/device-sharing.md index e2e5828b88..6d5bb9de45 100644 --- a/docs/design/device-sharing.md +++ b/docs/design/device-sharing.md @@ -26,7 +26,26 @@ type Devices interface { //HasDeviceRequest checks if the 'pod' request this device HasDeviceRequest(pod *v1.Pod) bool //FiltreNode checks if the 'pod' fit in current node - FilterNode(pod *v1.Pod) (bool, error) + // The first return value represents the filtering result, and the value range is "0, 1, 2, 3" + // 0: Success + // Success means that plugin ran correctly and found pod schedulable. + + // 1: Error + // Error is used for internal plugin errors, unexpected input, etc. + + // 2: Unschedulable + // Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to + // preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the + // scheduler skip preemption. + // The accompanying status message should explain why the pod is unschedulable. + + // 3: UnschedulableAndUnresolvable + // UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and + // preemption would not change anything. Plugins should return Unschedulable if it is possible + // that the pod can get scheduled with preemption. + // The accompanying status message should explain why the pod is unschedulable. + FilterNode(pod *v1.Pod) (int, string, error) + //Allocate action in predicate Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error //Release action in predicate diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 02fd566426..3db2762c64 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -17,6 +17,7 @@ package allocate import ( + "fmt" "time" "k8s.io/klog/v2" @@ -96,13 +97,22 @@ func (alloc *Action) Execute(ssn *framework.Session) { pendingTasks := map[api.JobID]*util.PriorityQueue{} allNodes := ssn.NodeList - predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { // Check for Resource Predicate if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok { - return api.NewFitError(task, node, reason) + return nil, api.NewFitError(task, node, reason) + } + predicateStatus, err := ssn.PredicateFn(task, node) + if err != nil { + return nil, err + } + for _, status := range predicateStatus { + if status != nil && status.Code != api.Success { + return nil, api.NewFitError(task, node, status.Reason) + } } - return ssn.PredicateFn(task, node) + return nil, nil } // To pick tuple for job, we choose to pick namespace firstly. @@ -167,13 +177,7 @@ func (alloc *Action) Execute(ssn *framework.Session) { klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(ssn.Nodes), job.Namespace, job.Name) - if err := ssn.PrePredicateFn(task); err != nil { - klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) - fitErrors := api.NewFitErrors() - for _, ni := range allNodes { - fitErrors.SetNodeError(ni.Name, err) - } - job.NodesFitErrors[task.UID] = fitErrors + if err := prePredicateforAllocate(ssn, task, allNodes, job); err != nil { break } @@ -250,4 +254,32 @@ func (alloc *Action) Execute(ssn *framework.Session) { } } +func prePredicateforAllocate(ssn *framework.Session, task *api.TaskInfo, allNodes []*api.NodeInfo, job *api.JobInfo) error { + prePredicateStatus, err := ssn.PrePredicateFn(task) + if err != nil { + klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) + fitErrors := api.NewFitErrors() + for _, ni := range allNodes { + fitErrors.SetNodeError(ni.Name, err) + } + job.NodesFitErrors[task.UID] = fitErrors + return fmt.Errorf("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) + } + + for _, status := range prePredicateStatus { + if status != nil && status.Code != api.Success { + klog.V(3).Infof("PrePredicate for task %s/%s failed, Code is %d, reason is %s", + task.Namespace, task.Name, status.Code, status.Reason) + fitErrors := api.NewFitErrors() + err := fmt.Errorf("PrePredicate for task %s/%s, %s", task.Namespace, task.Name, status.Reason) + for _, ni := range allNodes { + fitErrors.SetNodeError(ni.Name, err) + } + job.NodesFitErrors[task.UID] = fitErrors + return err + } + } + return nil +} + func (alloc *Action) UnInitialize() {} diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 18437136ba..2e464a2d10 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -17,6 +17,7 @@ limitations under the License. package backfill import ( + "fmt" "time" "k8s.io/klog/v2" @@ -56,26 +57,20 @@ func (backfill *Action) Execute(ssn *framework.Session) { for _, task := range job.TaskStatusIndex[api.Pending] { if task.InitResreq.IsEmpty() { allocated := false - fe := api.NewFitErrors() - - if err := ssn.PrePredicateFn(task); err != nil { - klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) - for _, ni := range ssn.Nodes { - fe.SetNodeError(ni.Name, err) - } - job.NodesFitErrors[task.UID] = fe - break + if err := prePredicateforBackfill(ssn, task, job); err != nil { + klog.V(3).Infof("backfill %s", err.Error()) + continue } + fe := api.NewFitErrors() // As task did not request resources, so it only need to meet predicates. // TODO (k82cn): need to prioritize nodes to avoid pod hole. for _, node := range ssn.Nodes { // TODO (k82cn): predicates did not consider pod number for now, there'll // be ping-pong case here. - if err := ssn.PredicateFn(task, node); err != nil { - klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, node.Name, err) - fe.SetNodeError(node.Name, err) + err := predicateforBackfill(ssn, task, node, fe) + if err != nil { + klog.V(3).Infof("backfill %s", err.Error()) continue } @@ -101,4 +96,45 @@ func (backfill *Action) Execute(ssn *framework.Session) { } } +func predicateforBackfill(ssn *framework.Session, task *api.TaskInfo, node *api.NodeInfo, fe *api.FitErrors) error { + predicateStatus, err := ssn.PredicateFn(task, node) + if err != nil { + fe.SetNodeError(node.Name, err) + return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, node.Name, err) + } + for _, status := range predicateStatus { + if status != nil && status.Code != api.Success { + return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %s", + task.Namespace, task.Name, node.Name, status.Reason) + } + } + return nil +} + +func prePredicateforBackfill(ssn *framework.Session, task *api.TaskInfo, job *api.JobInfo) error { + prePredicateStatus, err := ssn.PrePredicateFn(task) + if err != nil { + fitErrors := api.NewFitErrors() + for _, ni := range ssn.Nodes { + fitErrors.SetNodeError(ni.Name, err) + } + job.NodesFitErrors[task.UID] = fitErrors + return fmt.Errorf("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) + } + + for _, status := range prePredicateStatus { + if status != nil && status.Code != api.Success { + fitErrors := api.NewFitErrors() + err := fmt.Errorf("PrePredicate for task %s/%s failed, %s", task.Namespace, task.Name, status.Reason) + for _, ni := range ssn.Nodes { + fitErrors.SetNodeError(ni.Name, err) + } + job.NodesFitErrors[task.UID] = fitErrors + return err + } + } + return nil +} + func (backfill *Action) UnInitialize() {} diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index fd250e1e34..43cfe61c8d 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -203,13 +203,32 @@ func preempt( predicateHelper util.PredicateHelper, ) (bool, error) { assigned := false - allNodes := ssn.NodeList - - if err := ssn.PrePredicateFn(preemptor); err != nil { + prePredicateStatus, err := ssn.PrePredicateFn(preemptor) + if err != nil { return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err) } - predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, ssn.PredicateFn, true) + for _, status := range prePredicateStatus { + if status != nil && status.Code != api.Success && status.Code != api.Unschedulable { + return false, fmt.Errorf("PrePredicate for task %s/%s failed, %v", preemptor.Namespace, preemptor.Name, status.Reason) + } + } + + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateStatus, err := ssn.PredicateFn(task, node) + if err != nil { + return nil, err + } + for _, status := range predicateStatus { + if status != nil && status.Code != api.Success && status.Code != api.Unschedulable { + return nil, api.NewFitError(task, node, status.Reason) + } + } + + return nil, nil + } + + predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true) nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 8bb96740d1..d31bdca89e 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -17,6 +17,8 @@ limitations under the License. package reclaim import ( + "fmt" + "k8s.io/klog/v2" "volcano.sh/volcano/pkg/scheduler/api" @@ -116,15 +118,16 @@ func (ra *Action) Execute(ssn *framework.Session) { continue } - if err := ssn.PrePredicateFn(task); err != nil { - klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) + if err := prePredicateforReclaim(ssn, task); err != nil { + klog.V(3).Infof("reclaim %s", err.Error()) continue } assigned := false for _, n := range ssn.Nodes { // If predicates failed, next node. - if err := ssn.PredicateFn(task, n); err != nil { + if err := predicateforReclaim(ssn, task, n); err != nil { + klog.V(3).Infof("reclaim %s", err.Error()) continue } @@ -207,5 +210,34 @@ func (ra *Action) Execute(ssn *framework.Session) { } } +func predicateforReclaim(ssn *framework.Session, task *api.TaskInfo, n *api.NodeInfo) error { + predicateStatus, err := ssn.PredicateFn(task, n) + if err != nil { + return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, n.Name, err) + } + for _, status := range predicateStatus { + if status != nil && status.Code != api.Success && status.Code != api.Unschedulable { + return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, n.Name, status.Reason) + } + } + return nil +} + +func prePredicateforReclaim(ssn *framework.Session, task *api.TaskInfo) error { + prePredicateStatus, err := ssn.PrePredicateFn(task) + if err != nil { + return fmt.Errorf("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) + } + + for _, status := range prePredicateStatus { + if status != nil && status.Code != api.Success && status.Code != api.Unschedulable { + return fmt.Errorf("PrePredicate for task %s/%s failed, %v", task.Namespace, task.Name, status.Reason) + } + } + return nil +} + func (ra *Action) UnInitialize() { } diff --git a/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go b/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go index 167139f0b3..3604698364 100644 --- a/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go +++ b/pkg/scheduler/api/devices/nvidia/gpushare/device_info.go @@ -18,6 +18,7 @@ package gpushare import ( "context" + "fmt" "github.com/pkg/errors" v1 "k8s.io/api/core/v1" @@ -26,6 +27,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "volcano.sh/volcano/pkg/scheduler/api/devices" "volcano.sh/volcano/pkg/scheduler/plugins/util/nodelock" ) @@ -146,24 +148,24 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro return nil } -func (gs *GPUDevices) FilterNode(pod *v1.Pod) (bool, error) { +func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) { klog.V(4).Infoln("DeviceSharing:Into FitInPod", pod.Name) if GpuSharingEnable { fit, err := checkNodeGPUSharingPredicate(pod, gs) - if err != nil { + if err != nil || !fit { klog.Errorln("deviceSharing err=", err.Error()) - return fit, err + return devices.Unschedulable, fmt.Sprintf("GpuShare %s", err.Error()), err } } if GpuNumberEnable { fit, err := checkNodeGPUNumberPredicate(pod, gs) - if err != nil { + if err != nil || !fit { klog.Errorln("deviceSharing err=", err.Error()) - return fit, err + return devices.Unschedulable, fmt.Sprintf("GpuNumber %s", err.Error()), err } } klog.V(4).Infoln("DeviceSharing:FitInPod successed") - return true, nil + return devices.Success, "", nil } func (gs *GPUDevices) GetStatus() string { diff --git a/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go b/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go index e956e1dafa..ced16ee487 100644 --- a/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go +++ b/pkg/scheduler/api/devices/nvidia/vgpu/device_info.go @@ -17,6 +17,7 @@ limitations under the License. package vgpu import ( + "fmt" "strconv" "strings" "time" @@ -26,6 +27,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" + "volcano.sh/volcano/pkg/scheduler/api/devices" "volcano.sh/volcano/pkg/scheduler/plugins/util/nodelock" ) @@ -177,17 +179,17 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro return nil } -func (gs *GPUDevices) FilterNode(pod *v1.Pod) (bool, error) { +func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) { klog.V(3).Infoln("4pdvgpuDeviceSharing:Into FitInPod", pod.Name) if VGPUEnable { fit, _, err := checkNodeGPUSharingPredicate(pod, gs, true) - if err != nil { + if err != nil || !fit { klog.Errorln("deviceSharing err=", err.Error()) - return fit, err + return devices.Unschedulable, fmt.Sprintf("4pdvgpuDeviceSharing %s", err.Error()), err } } klog.V(3).Infoln("4pdvgpu DeviceSharing:FitInPod successed") - return true, nil + return devices.Success, "", nil } func (gs *GPUDevices) GetStatus() string { diff --git a/pkg/scheduler/api/devices/util.go b/pkg/scheduler/api/devices/util.go new file mode 100644 index 0000000000..783d7bb202 --- /dev/null +++ b/pkg/scheduler/api/devices/util.go @@ -0,0 +1,38 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package devices + +// These are predefined codes used in a Status. +const ( + // Success means that plugin ran correctly and found pod schedulable. + // NOTE: A nil status is also considered as "Success". + Success int = iota + // Error is used for internal plugin errors, unexpected input, etc. + Error + // Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to + // preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the + // scheduler skip preemption. + // The accompanying status message should explain why the pod is unschedulable. + Unschedulable + // UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and + // preemption would not change anything. Plugins should return Unschedulable if it is possible + // that the pod can get scheduled with preemption. + // The accompanying status message should explain why the pod is unschedulable. + UnschedulableAndUnresolvable + // Wait is used when a Permit plugin finds a pod scheduling should wait. + Wait + // Skip is used when a Bind plugin chooses to skip binding. + Skip +) diff --git a/pkg/scheduler/api/shared_device_pool.go b/pkg/scheduler/api/shared_device_pool.go index 9156abc3a7..865dba0dcd 100644 --- a/pkg/scheduler/api/shared_device_pool.go +++ b/pkg/scheduler/api/shared_device_pool.go @@ -39,7 +39,25 @@ type Devices interface { //HasDeviceRequest checks if the 'pod' request this device HasDeviceRequest(pod *v1.Pod) bool //FiltreNode checks if the 'pod' fit in current node - FilterNode(pod *v1.Pod) (bool, error) + // The first return value represents the filtering result, and the value range is "0, 1, 2, 3" + // 0: Success + // Success means that plugin ran correctly and found pod schedulable. + + // 1: Error + // Error is used for internal plugin errors, unexpected input, etc. + + // 2: Unschedulable + // Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to + // preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the + // scheduler skip preemption. + // The accompanying status message should explain why the pod is unschedulable. + + // 3: UnschedulableAndUnresolvable + // UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and + // preemption would not change anything. Plugins should return Unschedulable if it is possible + // that the pod can get scheduled with preemption. + // The accompanying status message should explain why the pod is unschedulable. + FilterNode(pod *v1.Pod) (int, string, error) //Allocate action in predicate Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error //Release action in predicate diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index 699cf8a918..6729c291d4 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -124,6 +124,34 @@ type ValidateResult struct { Message string } +// These are predefined codes used in a Status. +const ( + // Success means that plugin ran correctly and found pod schedulable. + // NOTE: A nil status is also considered as "Success". + Success int = iota + // Error is used for internal plugin errors, unexpected input, etc. + Error + // Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to + // preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the + // scheduler skip preemption. + // The accompanying status message should explain why the pod is unschedulable. + Unschedulable + // UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and + // preemption would not change anything. Plugins should return Unschedulable if it is possible + // that the pod can get scheduled with preemption. + // The accompanying status message should explain why the pod is unschedulable. + UnschedulableAndUnresolvable + // Wait is used when a Permit plugin finds a pod scheduling should wait. + Wait + // Skip is used when a Bind plugin chooses to skip binding. + Skip +) + +type Status struct { + Code int + Reason string +} + // ValidateExFn is the func declaration used to validate the result. type ValidateExFn func(interface{}) *ValidateResult @@ -134,10 +162,10 @@ type VoteFn func(interface{}) int type JobEnqueuedFn func(interface{}) // PredicateFn is the func declaration used to predicate node for task. -type PredicateFn func(*TaskInfo, *NodeInfo) error +type PredicateFn func(*TaskInfo, *NodeInfo) ([]*Status, error) // PrePredicateFn is the func declaration used to pre-predicate node for task. -type PrePredicateFn func(*TaskInfo) error +type PrePredicateFn func(*TaskInfo) ([]*Status, error) // BestNodeFn is the func declaration used to return the nodeScores to plugins. type BestNodeFn func(*TaskInfo, map[float64][]*NodeInfo) *NodeInfo diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index 76a3a41c63..6a3e5d63c0 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -601,7 +601,8 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool { } // PredicateFn invoke predicate function of the plugins -func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error { +func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateStatus := make([]*api.Status, 0) for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { if !isEnabled(plugin.EnabledPredicate) { @@ -611,17 +612,19 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error { if !found { continue } - err := pfn(task, node) + status, err := pfn(task, node) + predicateStatus = append(predicateStatus, status...) if err != nil { - return err + return predicateStatus, err } } } - return nil + return predicateStatus, nil } // PrePredicateFn invoke predicate function of the plugins -func (ssn *Session) PrePredicateFn(task *api.TaskInfo) error { +func (ssn *Session) PrePredicateFn(task *api.TaskInfo) ([]*api.Status, error) { + prePredicateStatus := make([]*api.Status, 0) for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { // we use same option as predicates for they are @@ -632,13 +635,14 @@ func (ssn *Session) PrePredicateFn(task *api.TaskInfo) error { if !found { continue } - err := pfn(task) + status, err := pfn(task) + prePredicateStatus = append(prePredicateStatus, status...) if err != nil { - return err + return prePredicateStatus, err } } } - return nil + return prePredicateStatus, nil } // BestNodeFn invoke bestNode function of the plugins diff --git a/pkg/scheduler/framework/util.go b/pkg/scheduler/framework/util.go index 0528c9dba5..412472aeb0 100644 --- a/pkg/scheduler/framework/util.go +++ b/pkg/scheduler/framework/util.go @@ -17,12 +17,14 @@ limitations under the License. package framework import ( + "fmt" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/klog/v2" - schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/framework" + k8sframework "k8s.io/kubernetes/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/api" ) @@ -223,10 +225,10 @@ func (pal *PodAffinityLister) FilteredList(podFilter PodFilter, selector labels. } // GenerateNodeMapAndSlice returns the nodeMap and nodeSlice generated from ssn -func GenerateNodeMapAndSlice(nodes map[string]*api.NodeInfo) map[string]*schedulernodeinfo.NodeInfo { - nodeMap := make(map[string]*schedulernodeinfo.NodeInfo) +func GenerateNodeMapAndSlice(nodes map[string]*api.NodeInfo) map[string]*k8sframework.NodeInfo { + nodeMap := make(map[string]*k8sframework.NodeInfo) for _, node := range nodes { - nodeInfo := schedulernodeinfo.NewNodeInfo(node.Pods()...) + nodeInfo := k8sframework.NewNodeInfo(node.Pods()...) nodeInfo.SetNode(node.Node) nodeMap[node.Name] = nodeInfo // add imagestate into nodeinfo @@ -263,3 +265,25 @@ func (nl *NodeLister) List() ([]*v1.Node, error) { } return nodes, nil } + +// The state of the k8s prefile is converted to the internal state of the volcano +func ConvertPredicateStatus(status *k8sframework.Status) (*api.Status, error) { + internalStatus := &api.Status{} + if status.Code() == k8sframework.Success { + internalStatus.Code = api.Success + return internalStatus, nil + } else if status.Code() == k8sframework.Unschedulable { + internalStatus.Code = api.Unschedulable + internalStatus.Reason = status.Message() + return internalStatus, nil + } else if status.Code() == k8sframework.UnschedulableAndUnresolvable { + internalStatus.Code = api.UnschedulableAndUnresolvable + internalStatus.Reason = status.Message() + return internalStatus, nil + } else { + internalStatus.Code = api.Error + internalStatus.Reason = status.Message() + return internalStatus, fmt.Errorf("Convert predicate status error, k8s status code is %d, Reason is %s", + status.Code(), status.Message()) + } +} diff --git a/pkg/scheduler/plugins/extender/argument.go b/pkg/scheduler/plugins/extender/argument.go index 123cd18920..cdd130322e 100644 --- a/pkg/scheduler/plugins/extender/argument.go +++ b/pkg/scheduler/plugins/extender/argument.go @@ -22,7 +22,7 @@ type PredicateRequest struct { } type PredicateResponse struct { - ErrorMessage string `json:"errorMessage"` + Status []*api.Status `json:"status"` } type PrioritizeRequest struct { diff --git a/pkg/scheduler/plugins/extender/extender.go b/pkg/scheduler/plugins/extender/extender.go index 43a88139d3..fea755f71c 100644 --- a/pkg/scheduler/plugins/extender/extender.go +++ b/pkg/scheduler/plugins/extender/extender.go @@ -161,22 +161,20 @@ func (ep *extenderPlugin) OnSessionOpen(ssn *framework.Session) { } if ep.config.predicateVerb != "" { - ssn.AddPredicateFn(ep.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { + ssn.AddPredicateFn(ep.Name(), func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { resp := &PredicateResponse{} err := ep.send(ep.config.predicateVerb, &PredicateRequest{Task: task, Node: node}, resp) if err != nil { klog.Warningf("Predicate failed with error %v", err) if ep.config.ignorable { - return nil + return nil, nil } - return err + return nil, err } - if resp.ErrorMessage == "" { - return nil - } - return errors.New(resp.ErrorMessage) + predicateStatus := resp.Status + return predicateStatus, nil }) } diff --git a/pkg/scheduler/plugins/numaaware/numaaware.go b/pkg/scheduler/plugins/numaaware/numaaware.go index 2a67a67d13..d428c26b63 100644 --- a/pkg/scheduler/plugins/numaaware/numaaware.go +++ b/pkg/scheduler/plugins/numaaware/numaaware.go @@ -112,14 +112,16 @@ func (pp *numaPlugin) OnSessionOpen(ssn *framework.Session) { }, }) - predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateStatus := make([]*api.Status, 0) + numaStatus := &api.Status{} if v1qos.GetPodQOS(task.Pod) != v1.PodQOSGuaranteed { klog.V(3).Infof("task %s isn't Guaranteed pod", task.Name) - return nil + return predicateStatus, nil } if fit, err := filterNodeByPolicy(task, node, pp.nodeResSets); !fit { - return err + return predicateStatus, err } resNumaSets := pp.nodeResSets[node.Name].Clone() @@ -130,7 +132,11 @@ func (pp *numaPlugin) OnSessionOpen(ssn *framework.Session) { providersHints := policy.AccumulateProvidersHints(&container, node.NumaSchedulerInfo, resNumaSets, pp.hintProviders) hit, admit := taskPolicy.Predicate(providersHints) if !admit { - return fmt.Errorf("plugin %s predicates failed for task %s container %s on node %s", + numaStatus.Code = api.UnschedulableAndUnresolvable + numaStatus.Reason = fmt.Sprintf("plugin %s predicates failed for task %s container %s on node %s", + pp.Name(), task.Name, container.Name, node.Name) + predicateStatus = append(predicateStatus, numaStatus) + return predicateStatus, fmt.Errorf("plugin %s predicates failed for task %s container %s on node %s", pp.Name(), task.Name, container.Name, node.Name) } @@ -154,7 +160,9 @@ func (pp *numaPlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof(" task %s's on node<%s> resAssignMap: %v", task.Name, node.Name, pp.assignRes[task.UID][node.Name]) - return nil + numaStatus.Code = api.Success + predicateStatus = append(predicateStatus, numaStatus) + return predicateStatus, nil } ssn.AddPredicateFn(pp.Name(), predicateFn) diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index c4d6110851..0c4b00cf83 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -351,7 +351,8 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { state := k8sframework.NewCycleState() - ssn.AddPrePredicateFn(pp.Name(), func(task *api.TaskInfo) error { + ssn.AddPrePredicateFn(pp.Name(), func(task *api.TaskInfo) ([]*api.Status, error) { + prePredicateStatus := make([]*api.Status, 0) // Check NodePorts if predicate.nodePortEnable { nodePortFilter.PreFilter(context.TODO(), state, task.Pod) @@ -369,8 +370,10 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { // the processing logic needs to be added to the return value result. if predicate.podAffinityEnable { _, status := podAffinityFilter.PreFilter(context.TODO(), state, task.Pod) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message()) + podAffinityStatus, err := framework.ConvertPredicateStatus(status) + prePredicateStatus = append(prePredicateStatus, podAffinityStatus) + if err != nil { + return prePredicateStatus, fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message()) } } @@ -386,58 +389,74 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { // the processing logic needs to be added to the return value result. if predicate.podTopologySpreadEnable { _, status := podTopologySpreadFilter.PreFilter(context.TODO(), state, task.Pod) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s pre-predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) + podTopologyStatus, err := framework.ConvertPredicateStatus(status) + prePredicateStatus = append(prePredicateStatus, podTopologyStatus) + if err != nil { + return prePredicateStatus, fmt.Errorf("plugin %s pre-predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) } } - return nil + return prePredicateStatus, nil }) - ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) error { + ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateStatus := make([]*api.Status, 0) nodeInfo, found := nodeMap[node.Name] if !found { - return fmt.Errorf("failed to predicates, node info for %s not found", node.Name) + return predicateStatus, fmt.Errorf("failed to predicates, node info for %s not found", node.Name) } if node.Allocatable.MaxTaskNum <= len(nodeInfo.Pods) { klog.V(4).Infof("NodePodNumber predicates Task <%s/%s> on Node <%s> failed", task.Namespace, task.Name, node.Name) - return api.NewFitError(task, node, api.NodePodNumberExceeded) + podsNumStatus := &api.Status{ + Code: api.Unschedulable, + Reason: fmt.Sprintf("Task <%s/%s> on Node <%s> failed, reason: %s", + task.Namespace, task.Name, node.Name, api.NodePodNumberExceeded), + } + predicateStatus = append(predicateStatus, podsNumStatus) } - predicateByStablefilter := func(pod *v1.Pod, nodeInfo *k8sframework.NodeInfo) (bool, error) { + predicateByStablefilter := func(pod *v1.Pod, nodeInfo *k8sframework.NodeInfo) ([]*api.Status, bool, error) { // CheckNodeUnschedulable + predicateStatus := make([]*api.Status, 0) status := nodeUnscheduleFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return false, fmt.Errorf("plugin %s predicates failed %s", nodeunschedulable.Name, status.Message()) + nodeUnscheduleStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, nodeUnscheduleStatus) + if err != nil { + return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeunschedulable.Name, status.Message()) } // Check NodeAffinity if predicate.nodeAffinityEnable { status := nodeAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return false, fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message()) + nodeAffinityStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, nodeAffinityStatus) + if err != nil { + return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message()) } } // PodToleratesNodeTaints: TaintToleration if predicate.taintTolerationEnable { status := tolerationFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return false, fmt.Errorf("plugin %s predicates failed %s", tainttoleration.Name, status.Message()) + tolerationStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, tolerationStatus) + if err != nil { + return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", tainttoleration.Name, status.Message()) } } - return true, nil + return predicateStatus, true, nil } // Check PredicateWithCache var err error var fit bool + predicateCacheStatus := make([]*api.Status, 0) if predicate.cacheEnable { fit, err = pCache.PredicateWithCache(node.Name, task.Pod) if err != nil { - fit, err = predicateByStablefilter(task.Pod, nodeInfo) + predicateCacheStatus, fit, err = predicateByStablefilter(task.Pod, nodeInfo) pCache.UpdateCache(node.Name, task.Pod, fit) } else { if !fit { @@ -445,58 +464,74 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { } } } else { - fit, err = predicateByStablefilter(task.Pod, nodeInfo) + predicateCacheStatus, fit, err = predicateByStablefilter(task.Pod, nodeInfo) } + predicateStatus = append(predicateStatus, predicateCacheStatus...) if !fit { - return err + return predicateStatus, err } // Check NodePort if predicate.nodePortEnable { status := nodePortFilter.Filter(context.TODO(), state, nil, nodeInfo) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s predicates failed %s", nodeports.Name, status.Message()) + nodePortStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, nodePortStatus) + if err != nil { + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", nodeports.Name, status.Message()) } } // Check PodAffinity if predicate.podAffinityEnable { status := podAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s predicates failed %s", interpodaffinity.Name, status.Message()) + podAffinityStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, podAffinityStatus) + if err != nil { + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", interpodaffinity.Name, status.Message()) } } // Check NodeVolumeLimits if predicate.nodeVolumeLimitsEnable { status := nodeVolumeLimitsCSIFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s predicates failed %s", nodeVolumeLimitsCSIFilter.Name(), status.Message()) + nodeVolumeStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, nodeVolumeStatus) + if err != nil { + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", nodeVolumeLimitsCSIFilter.Name(), status.Message()) } } // Check VolumeZone if predicate.volumeZoneEnable { status := volumeZoneFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message()) + volumeZoneStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, volumeZoneStatus) + if err != nil { + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message()) } } // Check PodTopologySpread if predicate.podTopologySpreadEnable { status := podTopologySpreadFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - if !status.IsSuccess() { - return fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) + podTopologyStatus, err := framework.ConvertPredicateStatus(status) + predicateStatus = append(predicateStatus, podTopologyStatus) + if err != nil { + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) } } for _, val := range api.RegisteredDevices { if devices, ok := node.Others[val].(api.Devices); ok { - fit, err = devices.FilterNode(task.Pod) + code, msg, err := devices.FilterNode(task.Pod) + filterNodeStatus := &api.Status{ + Code: code, + Reason: msg, + } + predicateStatus = append(predicateStatus, filterNodeStatus) if err != nil { - return err + return predicateStatus, err } } else { klog.Warningf("Devices %s assertion conversion failed, skip", val) @@ -508,14 +543,15 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { if predicate.proportionalEnable { // Check ProportionalPredicate - fit, err := checkNodeResourceIsProportional(task, node, predicate.proportional) + proportionalStatus, err := checkNodeResourceIsProportional(task, node, predicate.proportional) + predicateStatus = append(predicateStatus, proportionalStatus) if err != nil { - return err + return predicateStatus, err } klog.V(4).Infof("checkNodeResourceIsProportional predicates Task <%s/%s> on Node <%s>: fit %v", task.Namespace, task.Name, node.Name, fit) } - return nil + return predicateStatus, nil }) } diff --git a/pkg/scheduler/plugins/predicates/proportional.go b/pkg/scheduler/plugins/predicates/proportional.go index e3ee83ca62..bd83d0f54f 100644 --- a/pkg/scheduler/plugins/predicates/proportional.go +++ b/pkg/scheduler/plugins/predicates/proportional.go @@ -25,10 +25,12 @@ import ( ) // checkNodeResourceIsProportional checks if a gpu:cpu:memory is Proportional -func checkNodeResourceIsProportional(task *api.TaskInfo, node *api.NodeInfo, proportional map[v1.ResourceName]baseResource) (bool, error) { +func checkNodeResourceIsProportional(task *api.TaskInfo, node *api.NodeInfo, proportional map[v1.ResourceName]baseResource) (*api.Status, error) { + status := &api.Status{} for resourceName := range proportional { if value, found := task.Resreq.ScalarResources[resourceName]; found && value > 0 { - return true, nil + status.Code = api.Success + return status, nil } } for resourceName, resourceRate := range proportional { @@ -38,9 +40,11 @@ func checkNodeResourceIsProportional(task *api.TaskInfo, node *api.NodeInfo, pro r := node.Idle.Clone() r = r.Sub(task.Resreq) if r.MilliCPU < cpuReserved || r.Memory < memoryReserved { - return false, fmt.Errorf("proportional of resource %s check failed", resourceName) + status.Code = api.Unschedulable + status.Reason = fmt.Sprintf("proportional of resource %s check failed", resourceName) + return status, fmt.Errorf("proportional of resource %s check failed", resourceName) } } } - return true, nil + return status, nil } diff --git a/pkg/scheduler/plugins/predicates/proportional_test.go b/pkg/scheduler/plugins/predicates/proportional_test.go index 08b14c3f81..a893b2ac78 100644 --- a/pkg/scheduler/plugins/predicates/proportional_test.go +++ b/pkg/scheduler/plugins/predicates/proportional_test.go @@ -45,7 +45,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) { tests := []struct { name string args args - want bool + want int wantErr bool }{ { @@ -55,7 +55,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) { node: n1, proportional: proportional, }, - true, + api.Success, false, }, { @@ -65,7 +65,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) { node: n1, proportional: proportional, }, - false, + api.Unschedulable, true, }, { @@ -75,7 +75,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) { node: n1, proportional: proportional, }, - true, + api.Success, false, }, { @@ -85,7 +85,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) { node: n2, proportional: proportional, }, - true, + api.Success, false, }, } @@ -96,7 +96,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) { t.Errorf("checkNodeResourceIsProportional() error = %v, wantErr %v", err, tt.wantErr) return } - if got != tt.want { + if got.Code != tt.want { t.Errorf("checkNodeResourceIsProportional() got = %v, want %v", got, tt.want) } }) diff --git a/pkg/scheduler/plugins/tdm/tdm.go b/pkg/scheduler/plugins/tdm/tdm.go index 2bdd139ece..f28aac4d18 100644 --- a/pkg/scheduler/plugins/tdm/tdm.go +++ b/pkg/scheduler/plugins/tdm/tdm.go @@ -143,24 +143,30 @@ func (tp *tdmPlugin) OnSessionOpen(ssn *framework.Session) { }() // tdm plugin just handle revocable node - predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateStatus := make([]*api.Status, 0) + tdmStatus := &api.Status{} if node.RevocableZone == "" { - return nil + return predicateStatus, nil } if err := tp.availableRevocableZone(node.RevocableZone); err != nil { - return fmt.Errorf("plugin %s predicates %w", tp.Name(), err) + tdmStatus.Code = api.UnschedulableAndUnresolvable + tdmStatus.Reason = fmt.Sprintf("plugin %s predicates %v", tp.Name(), err) + return predicateStatus, fmt.Errorf("plugin %s predicates %v", tp.Name(), err) } klog.V(4).Infof("TDM node %v revocable zone %v:%v is active", node.Name, node.RevocableZone, tp.revocableZone[node.RevocableZone]) if len(task.RevocableZone) == 0 { msg := fmt.Sprintf("task %s/%s is not allow to dispatch to revocable node %s", task.Namespace, task.Name, node.Name) - return fmt.Errorf("plugin %s predicates %s", tp.Name(), msg) + return predicateStatus, fmt.Errorf("plugin %s predicates %s", tp.Name(), msg) } + tdmStatus.Code = api.Success + predicateStatus = append(predicateStatus, tdmStatus) klog.V(4).Infof("TDM filter for Task %s/%s on node %s pass.", task.Namespace, task.Name, node.Name) - return nil + return predicateStatus, nil } // tdm plugin just handle revocable node diff --git a/pkg/scheduler/plugins/tdm/tdm_test.go b/pkg/scheduler/plugins/tdm/tdm_test.go index 88c2248681..c2ec63f1d7 100644 --- a/pkg/scheduler/plugins/tdm/tdm_test.go +++ b/pkg/scheduler/plugins/tdm/tdm_test.go @@ -288,7 +288,18 @@ func Test_TDM(t *testing.T) { predicatedNode := make([]*api.NodeInfo, 0) for _, node := range ssn.Nodes { - if err := ssn.PredicateFn(task, node); err != nil { + predicateStatus, err := ssn.PredicateFn(task, node) + if err != nil { + continue + } + predicateIsSuccess := true + for _, status := range predicateStatus { + if status != nil && status.Code != api.Success { + predicateIsSuccess = false + break + } + } + if predicateIsSuccess == false { continue } predicatedNode = append(predicatedNode, node) diff --git a/pkg/scheduler/plugins/usage/usage.go b/pkg/scheduler/plugins/usage/usage.go index 690111f42a..d11388d722 100644 --- a/pkg/scheduler/plugins/usage/usage.go +++ b/pkg/scheduler/plugins/usage/usage.go @@ -124,12 +124,17 @@ func (up *usagePlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof("Threshold arguments :%v", argsValue) } - predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error { + predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { + predicateStatus := make([]*api.Status, 0) + usageStatus := &api.Status{} for period, value := range up.threshold.cpuUsageAvg { klog.V(4).Infof("predicateFn cpuUsageAvg:%v", up.threshold.cpuUsageAvg) if node.ResourceUsage.CPUUsageAvg[period] > value { msg := fmt.Sprintf("Node %s cpu usage %f exceeds the threshold %f", node.Name, node.ResourceUsage.CPUUsageAvg[period], value) - return fmt.Errorf("plugin %s predicates failed %s", up.Name(), msg) + usageStatus.Code = api.Unschedulable + usageStatus.Reason = msg + predicateStatus = append(predicateStatus, usageStatus) + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", up.Name(), msg) } } @@ -137,11 +142,17 @@ func (up *usagePlugin) OnSessionOpen(ssn *framework.Session) { klog.V(4).Infof("predicateFn memUsageAvg:%v", up.threshold.memUsageAvg) if node.ResourceUsage.MEMUsageAvg[period] > value { msg := fmt.Sprintf("Node %s mem usage %f exceeds the threshold %f", node.Name, node.ResourceUsage.MEMUsageAvg[period], value) - return fmt.Errorf("plugin %s memory usage predicates failed %s", up.Name(), msg) + usageStatus.Code = api.Unschedulable + usageStatus.Reason = msg + predicateStatus = append(predicateStatus, usageStatus) + return predicateStatus, fmt.Errorf("plugin %s memory usage predicates failed %s", up.Name(), msg) } } + + usageStatus.Code = api.Success + predicateStatus = append(predicateStatus, usageStatus) klog.V(4).Infof("Usage plugin filter for task %s/%s on node %s pass.", task.Namespace, task.Name, node.Name) - return nil + return predicateStatus, nil } nodeOrderFn := func(task *api.TaskInfo, node *api.NodeInfo) (float64, error) { diff --git a/pkg/scheduler/util/predicate_helper.go b/pkg/scheduler/util/predicate_helper.go index 114851b41d..64430092dc 100644 --- a/pkg/scheduler/util/predicate_helper.go +++ b/pkg/scheduler/util/predicate_helper.go @@ -70,7 +70,7 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI } // TODO (k82cn): Enable eCache for performance improvement. - if err := fn(task, node); err != nil { + if _, err := fn(task, node); err != nil { klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) errorLock.Lock() From 24ad50ee260462527c5155f8cd16d8a4ac9d25a9 Mon Sep 17 00:00:00 2001 From: wangyang Date: Mon, 19 Jun 2023 16:49:01 +0800 Subject: [PATCH 2/6] prePredicate does not affect the preemption function, and the modification of this part of adaptation will be rolled back Signed-off-by: wangyang --- pkg/scheduler/actions/allocate/allocate.go | 37 ++++-------------- pkg/scheduler/actions/backfill/backfill.go | 38 +++++-------------- pkg/scheduler/actions/preempt/preempt.go | 7 +--- pkg/scheduler/actions/reclaim/reclaim.go | 18 +-------- pkg/scheduler/api/types.go | 2 +- pkg/scheduler/framework/session_plugins.go | 10 ++--- .../plugins/predicates/predicates.go | 17 +++------ 7 files changed, 30 insertions(+), 99 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 3db2762c64..db5aa057c2 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -17,7 +17,6 @@ package allocate import ( - "fmt" "time" "k8s.io/klog/v2" @@ -177,7 +176,13 @@ func (alloc *Action) Execute(ssn *framework.Session) { klog.V(3).Infof("There are <%d> nodes for Job <%v/%v>", len(ssn.Nodes), job.Namespace, job.Name) - if err := prePredicateforAllocate(ssn, task, allNodes, job); err != nil { + if err := ssn.PrePredicateFn(task); err != nil { + klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) + fitErrors := api.NewFitErrors() + for _, ni := range allNodes { + fitErrors.SetNodeError(ni.Name, err) + } + job.NodesFitErrors[task.UID] = fitErrors break } @@ -254,32 +259,4 @@ func (alloc *Action) Execute(ssn *framework.Session) { } } -func prePredicateforAllocate(ssn *framework.Session, task *api.TaskInfo, allNodes []*api.NodeInfo, job *api.JobInfo) error { - prePredicateStatus, err := ssn.PrePredicateFn(task) - if err != nil { - klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) - fitErrors := api.NewFitErrors() - for _, ni := range allNodes { - fitErrors.SetNodeError(ni.Name, err) - } - job.NodesFitErrors[task.UID] = fitErrors - return fmt.Errorf("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) - } - - for _, status := range prePredicateStatus { - if status != nil && status.Code != api.Success { - klog.V(3).Infof("PrePredicate for task %s/%s failed, Code is %d, reason is %s", - task.Namespace, task.Name, status.Code, status.Reason) - fitErrors := api.NewFitErrors() - err := fmt.Errorf("PrePredicate for task %s/%s, %s", task.Namespace, task.Name, status.Reason) - for _, ni := range allNodes { - fitErrors.SetNodeError(ni.Name, err) - } - job.NodesFitErrors[task.UID] = fitErrors - return err - } - } - return nil -} - func (alloc *Action) UnInitialize() {} diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 2e464a2d10..eb61341d9a 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -57,12 +57,17 @@ func (backfill *Action) Execute(ssn *framework.Session) { for _, task := range job.TaskStatusIndex[api.Pending] { if task.InitResreq.IsEmpty() { allocated := false - if err := prePredicateforBackfill(ssn, task, job); err != nil { - klog.V(3).Infof("backfill %s", err.Error()) - continue + fe := api.NewFitErrors() + + if err := ssn.PrePredicateFn(task); err != nil { + klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) + for _, ni := range ssn.Nodes { + fe.SetNodeError(ni.Name, err) + } + job.NodesFitErrors[task.UID] = fe + break } - fe := api.NewFitErrors() // As task did not request resources, so it only need to meet predicates. // TODO (k82cn): need to prioritize nodes to avoid pod hole. for _, node := range ssn.Nodes { @@ -112,29 +117,4 @@ func predicateforBackfill(ssn *framework.Session, task *api.TaskInfo, node *api. return nil } -func prePredicateforBackfill(ssn *framework.Session, task *api.TaskInfo, job *api.JobInfo) error { - prePredicateStatus, err := ssn.PrePredicateFn(task) - if err != nil { - fitErrors := api.NewFitErrors() - for _, ni := range ssn.Nodes { - fitErrors.SetNodeError(ni.Name, err) - } - job.NodesFitErrors[task.UID] = fitErrors - return fmt.Errorf("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) - } - - for _, status := range prePredicateStatus { - if status != nil && status.Code != api.Success { - fitErrors := api.NewFitErrors() - err := fmt.Errorf("PrePredicate for task %s/%s failed, %s", task.Namespace, task.Name, status.Reason) - for _, ni := range ssn.Nodes { - fitErrors.SetNodeError(ni.Name, err) - } - job.NodesFitErrors[task.UID] = fitErrors - return err - } - } - return nil -} - func (backfill *Action) UnInitialize() {} diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 43cfe61c8d..27e52a146c 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -204,15 +204,10 @@ func preempt( ) (bool, error) { assigned := false allNodes := ssn.NodeList - prePredicateStatus, err := ssn.PrePredicateFn(preemptor) + err := ssn.PrePredicateFn(preemptor) if err != nil { return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err) } - for _, status := range prePredicateStatus { - if status != nil && status.Code != api.Success && status.Code != api.Unschedulable { - return false, fmt.Errorf("PrePredicate for task %s/%s failed, %v", preemptor.Namespace, preemptor.Name, status.Reason) - } - } predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { predicateStatus, err := ssn.PredicateFn(task, node) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index d31bdca89e..fcff06f79e 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -118,8 +118,8 @@ func (ra *Action) Execute(ssn *framework.Session) { continue } - if err := prePredicateforReclaim(ssn, task); err != nil { - klog.V(3).Infof("reclaim %s", err.Error()) + if err := ssn.PrePredicateFn(task); err != nil { + klog.V(3).Infof("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) continue } @@ -225,19 +225,5 @@ func predicateforReclaim(ssn *framework.Session, task *api.TaskInfo, n *api.Node return nil } -func prePredicateforReclaim(ssn *framework.Session, task *api.TaskInfo) error { - prePredicateStatus, err := ssn.PrePredicateFn(task) - if err != nil { - return fmt.Errorf("PrePredicate for task %s/%s failed for: %v", task.Namespace, task.Name, err) - } - - for _, status := range prePredicateStatus { - if status != nil && status.Code != api.Success && status.Code != api.Unschedulable { - return fmt.Errorf("PrePredicate for task %s/%s failed, %v", task.Namespace, task.Name, status.Reason) - } - } - return nil -} - func (ra *Action) UnInitialize() { } diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index 6729c291d4..09c5799d00 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -165,7 +165,7 @@ type JobEnqueuedFn func(interface{}) type PredicateFn func(*TaskInfo, *NodeInfo) ([]*Status, error) // PrePredicateFn is the func declaration used to pre-predicate node for task. -type PrePredicateFn func(*TaskInfo) ([]*Status, error) +type PrePredicateFn func(*TaskInfo) error // BestNodeFn is the func declaration used to return the nodeScores to plugins. type BestNodeFn func(*TaskInfo, map[float64][]*NodeInfo) *NodeInfo diff --git a/pkg/scheduler/framework/session_plugins.go b/pkg/scheduler/framework/session_plugins.go index 6a3e5d63c0..b8bcff7a50 100644 --- a/pkg/scheduler/framework/session_plugins.go +++ b/pkg/scheduler/framework/session_plugins.go @@ -623,8 +623,7 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) ([]*api. } // PrePredicateFn invoke predicate function of the plugins -func (ssn *Session) PrePredicateFn(task *api.TaskInfo) ([]*api.Status, error) { - prePredicateStatus := make([]*api.Status, 0) +func (ssn *Session) PrePredicateFn(task *api.TaskInfo) error { for _, tier := range ssn.Tiers { for _, plugin := range tier.Plugins { // we use same option as predicates for they are @@ -635,14 +634,13 @@ func (ssn *Session) PrePredicateFn(task *api.TaskInfo) ([]*api.Status, error) { if !found { continue } - status, err := pfn(task) - prePredicateStatus = append(prePredicateStatus, status...) + err := pfn(task) if err != nil { - return prePredicateStatus, err + return err } } } - return prePredicateStatus, nil + return nil } // BestNodeFn invoke bestNode function of the plugins diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index 0c4b00cf83..489a533da4 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -351,8 +351,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { state := k8sframework.NewCycleState() - ssn.AddPrePredicateFn(pp.Name(), func(task *api.TaskInfo) ([]*api.Status, error) { - prePredicateStatus := make([]*api.Status, 0) + ssn.AddPrePredicateFn(pp.Name(), func(task *api.TaskInfo) error { // Check NodePorts if predicate.nodePortEnable { nodePortFilter.PreFilter(context.TODO(), state, task.Pod) @@ -370,10 +369,8 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { // the processing logic needs to be added to the return value result. if predicate.podAffinityEnable { _, status := podAffinityFilter.PreFilter(context.TODO(), state, task.Pod) - podAffinityStatus, err := framework.ConvertPredicateStatus(status) - prePredicateStatus = append(prePredicateStatus, podAffinityStatus) - if err != nil { - return prePredicateStatus, fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message()) + if !status.IsSuccess() { + return fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message()) } } @@ -389,13 +386,11 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { // the processing logic needs to be added to the return value result. if predicate.podTopologySpreadEnable { _, status := podTopologySpreadFilter.PreFilter(context.TODO(), state, task.Pod) - podTopologyStatus, err := framework.ConvertPredicateStatus(status) - prePredicateStatus = append(prePredicateStatus, podTopologyStatus) - if err != nil { - return prePredicateStatus, fmt.Errorf("plugin %s pre-predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) + if !status.IsSuccess() { + return fmt.Errorf("plugin %s pre-predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) } } - return prePredicateStatus, nil + return nil }) ssn.AddPredicateFn(pp.Name(), func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { From 17eeca5fb7149124306e5ad95c65910325c55836 Mon Sep 17 00:00:00 2001 From: wangyang Date: Tue, 27 Jun 2023 20:17:13 +0800 Subject: [PATCH 3/6] Abstract predicate state judgment logic to reduce duplication of code Signed-off-by: w00568049 --- pkg/scheduler/actions/allocate/allocate.go | 15 +++++-------- pkg/scheduler/actions/backfill/backfill.go | 25 ++++++---------------- pkg/scheduler/actions/preempt/preempt.go | 15 +++++-------- pkg/scheduler/actions/reclaim/reclaim.go | 24 ++++++--------------- pkg/scheduler/util/predicate_helper.go | 19 ++++++++++++++++ 5 files changed, 42 insertions(+), 56 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index db5aa057c2..6771a43147 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -101,17 +101,12 @@ func (alloc *Action) Execute(ssn *framework.Session) { if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok { return nil, api.NewFitError(task, node, reason) } - predicateStatus, err := ssn.PredicateFn(task, node) - if err != nil { - return nil, err - } - for _, status := range predicateStatus { - if status != nil && status.Code != api.Success { - return nil, api.NewFitError(task, node, status.Reason) - } - } - return nil, nil + // Only nodes whose status is success after predicate filtering can be scheduled. + admitStatus := map[int]struct{}{ + api.Success: {}, + } + return nil, util.PredicateForAdmitStatus(ssn, task, node, admitStatus) } // To pick tuple for job, we choose to pick namespace firstly. diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index eb61341d9a..b0d14d3e6f 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -17,7 +17,6 @@ limitations under the License. package backfill import ( - "fmt" "time" "k8s.io/klog/v2" @@ -25,6 +24,7 @@ import ( "volcano.sh/volcano/pkg/scheduler/api" "volcano.sh/volcano/pkg/scheduler/framework" "volcano.sh/volcano/pkg/scheduler/metrics" + "volcano.sh/volcano/pkg/scheduler/util" ) type Action struct{} @@ -73,9 +73,14 @@ func (backfill *Action) Execute(ssn *framework.Session) { for _, node := range ssn.Nodes { // TODO (k82cn): predicates did not consider pod number for now, there'll // be ping-pong case here. - err := predicateforBackfill(ssn, task, node, fe) + // Only nodes whose status is success after predicate filtering can be scheduled. + admitStatus := map[int]struct{}{ + api.Success: {}, + } + err := util.PredicateForAdmitStatus(ssn, task, node, admitStatus) if err != nil { klog.V(3).Infof("backfill %s", err.Error()) + fe.SetNodeError(node.Name, err) continue } @@ -101,20 +106,4 @@ func (backfill *Action) Execute(ssn *framework.Session) { } } -func predicateforBackfill(ssn *framework.Session, task *api.TaskInfo, node *api.NodeInfo, fe *api.FitErrors) error { - predicateStatus, err := ssn.PredicateFn(task, node) - if err != nil { - fe.SetNodeError(node.Name, err) - return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, node.Name, err) - } - for _, status := range predicateStatus { - if status != nil && status.Code != api.Success { - return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %s", - task.Namespace, task.Name, node.Name, status.Reason) - } - } - return nil -} - func (backfill *Action) UnInitialize() {} diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 27e52a146c..6c9039dac6 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -210,17 +210,12 @@ func preempt( } predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { - predicateStatus, err := ssn.PredicateFn(task, node) - if err != nil { - return nil, err + // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. + admitStatus := map[int]struct{}{ + api.Success: {}, + api.Unschedulable: {}, } - for _, status := range predicateStatus { - if status != nil && status.Code != api.Success && status.Code != api.Unschedulable { - return nil, api.NewFitError(task, node, status.Reason) - } - } - - return nil, nil + return nil, util.PredicateForAdmitStatus(ssn, task, node, admitStatus) } predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index fcff06f79e..390598317c 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -17,8 +17,6 @@ limitations under the License. package reclaim import ( - "fmt" - "k8s.io/klog/v2" "volcano.sh/volcano/pkg/scheduler/api" @@ -125,8 +123,13 @@ func (ra *Action) Execute(ssn *framework.Session) { assigned := false for _, n := range ssn.Nodes { + // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. + admitStatus := map[int]struct{}{ + api.Success: {}, + api.Unschedulable: {}, + } // If predicates failed, next node. - if err := predicateforReclaim(ssn, task, n); err != nil { + if err := util.PredicateForAdmitStatus(ssn, task, n, admitStatus); err != nil { klog.V(3).Infof("reclaim %s", err.Error()) continue } @@ -210,20 +213,5 @@ func (ra *Action) Execute(ssn *framework.Session) { } } -func predicateforReclaim(ssn *framework.Session, task *api.TaskInfo, n *api.NodeInfo) error { - predicateStatus, err := ssn.PredicateFn(task, n) - if err != nil { - return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, n.Name, err) - } - for _, status := range predicateStatus { - if status != nil && status.Code != api.Success && status.Code != api.Unschedulable { - return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, n.Name, status.Reason) - } - } - return nil -} - func (ra *Action) UnInitialize() { } diff --git a/pkg/scheduler/util/predicate_helper.go b/pkg/scheduler/util/predicate_helper.go index 64430092dc..56a0e96be2 100644 --- a/pkg/scheduler/util/predicate_helper.go +++ b/pkg/scheduler/util/predicate_helper.go @@ -10,6 +10,7 @@ import ( "k8s.io/klog/v2" "volcano.sh/volcano/pkg/scheduler/api" + "volcano.sh/volcano/pkg/scheduler/framework" ) type PredicateHelper interface { @@ -100,6 +101,24 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI return predicateNodes, fe } +func PredicateForAdmitStatus(ssn *framework.Session, task *api.TaskInfo, n *api.NodeInfo, admitStatus map[int]struct{}) error { + predicateStatus, err := ssn.PredicateFn(task, n) + if err != nil { + return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, n.Name, err) + } + for _, status := range predicateStatus { + if status == nil { + continue + } + if _, ok := admitStatus[status.Code]; !ok { + return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, n.Name, status.Reason) + } + } + return nil +} + func taskGroupID(task *api.TaskInfo) string { return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey()) } From 1d22455804b802591f5bccb6f845d7753bd7ee92 Mon Sep 17 00:00:00 2001 From: w00568049 Date: Tue, 27 Jun 2023 21:34:12 +0800 Subject: [PATCH 4/6] Resolve the cyclic dependency problem between framework and util Signed-off-by: w00568049 --- pkg/scheduler/actions/allocate/allocate.go | 8 +++++++- pkg/scheduler/actions/backfill/backfill.go | 12 ++++++++++-- pkg/scheduler/actions/preempt/preempt.go | 10 +++++++--- pkg/scheduler/actions/reclaim/reclaim.go | 11 +++++++++-- pkg/scheduler/util/predicate_helper.go | 12 +++--------- 5 files changed, 36 insertions(+), 17 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 6771a43147..4362719767 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -17,6 +17,7 @@ package allocate import ( + "fmt" "time" "k8s.io/klog/v2" @@ -102,11 +103,16 @@ func (alloc *Action) Execute(ssn *framework.Session) { return nil, api.NewFitError(task, node, reason) } + predicateStatus, err := ssn.PredicateFn(task, node) + if err != nil { + return nil, fmt.Errorf("allocate predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, node.Name, err) + } // Only nodes whose status is success after predicate filtering can be scheduled. admitStatus := map[int]struct{}{ api.Success: {}, } - return nil, util.PredicateForAdmitStatus(ssn, task, node, admitStatus) + return nil, util.CheckPredicateStatus(predicateStatus, admitStatus) } // To pick tuple for job, we choose to pick namespace firstly. diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index b0d14d3e6f..468e011cde 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -77,9 +77,17 @@ func (backfill *Action) Execute(ssn *framework.Session) { admitStatus := map[int]struct{}{ api.Success: {}, } - err := util.PredicateForAdmitStatus(ssn, task, node, admitStatus) + predicateStatus, err := ssn.PredicateFn(task, node) if err != nil { - klog.V(3).Infof("backfill %s", err.Error()) + klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, node.Name, err) + fe.SetNodeError(node.Name, err) + continue + } + err = util.CheckPredicateStatus(predicateStatus, admitStatus) + if err != nil { + klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, node.Name, err) fe.SetNodeError(node.Name, err) continue } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 6c9039dac6..4850816f9e 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -204,8 +204,7 @@ func preempt( ) (bool, error) { assigned := false allNodes := ssn.NodeList - err := ssn.PrePredicateFn(preemptor) - if err != nil { + if err := ssn.PrePredicateFn(preemptor); err != nil { return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err) } @@ -215,7 +214,12 @@ func preempt( api.Success: {}, api.Unschedulable: {}, } - return nil, util.PredicateForAdmitStatus(ssn, task, node, admitStatus) + predicateStatus, err := ssn.PredicateFn(task, node) + if err != nil { + return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, node.Name, err) + } + return nil, util.CheckPredicateStatus(predicateStatus, admitStatus) } predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 390598317c..d8b17a06df 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -128,9 +128,16 @@ func (ra *Action) Execute(ssn *framework.Session) { api.Success: {}, api.Unschedulable: {}, } + predicateStatus, err := ssn.PredicateFn(task, n) + if err != nil { + klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, n.Name, err) + continue + } // If predicates failed, next node. - if err := util.PredicateForAdmitStatus(ssn, task, n, admitStatus); err != nil { - klog.V(3).Infof("reclaim %s", err.Error()) + if err := util.CheckPredicateStatus(predicateStatus, admitStatus); err != nil { + klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v", + task.Namespace, task.Name, n.Name, err) continue } diff --git a/pkg/scheduler/util/predicate_helper.go b/pkg/scheduler/util/predicate_helper.go index 56a0e96be2..dfc3de39aa 100644 --- a/pkg/scheduler/util/predicate_helper.go +++ b/pkg/scheduler/util/predicate_helper.go @@ -10,7 +10,6 @@ import ( "k8s.io/klog/v2" "volcano.sh/volcano/pkg/scheduler/api" - "volcano.sh/volcano/pkg/scheduler/framework" ) type PredicateHelper interface { @@ -101,19 +100,14 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI return predicateNodes, fe } -func PredicateForAdmitStatus(ssn *framework.Session, task *api.TaskInfo, n *api.NodeInfo, admitStatus map[int]struct{}) error { - predicateStatus, err := ssn.PredicateFn(task, n) - if err != nil { - return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, n.Name, err) - } +func CheckPredicateStatus(predicateStatus []*api.Status, admitStatus map[int]struct{}) error { for _, status := range predicateStatus { if status == nil { continue } if _, ok := admitStatus[status.Code]; !ok { - return fmt.Errorf("Predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, n.Name, status.Reason) + return fmt.Errorf("Predicates status (code: %d) does not meet the expectation (admit status: %v), message: %s", + status.Code, admitStatus, status.Reason) } } return nil From b3ac59818e1c4fd180e2e543c748095de35bef49 Mon Sep 17 00:00:00 2001 From: wangyang Date: Thu, 29 Jun 2023 16:39:13 +0800 Subject: [PATCH 5/6] Abstract the definition and member method of the predicate status Signed-off-by: wangyang --- pkg/scheduler/actions/allocate/allocate.go | 14 +++--- pkg/scheduler/actions/backfill/backfill.go | 17 ++++---- pkg/scheduler/actions/preempt/preempt.go | 14 +++--- pkg/scheduler/actions/reclaim/reclaim.go | 18 +++----- pkg/scheduler/framework/util.go | 2 +- pkg/scheduler/util/predicate_helper.go | 51 +++++++++++++++++----- 6 files changed, 74 insertions(+), 42 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 4362719767..eb0af30bec 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -102,17 +102,19 @@ func (alloc *Action) Execute(ssn *framework.Session) { if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok { return nil, api.NewFitError(task, node, reason) } - - predicateStatus, err := ssn.PredicateFn(task, node) + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, node) if err != nil { return nil, fmt.Errorf("allocate predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) } - // Only nodes whose status is success after predicate filtering can be scheduled. - admitStatus := map[int]struct{}{ - api.Success: {}, + + if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || + statusSets.ContainsErrorSkipOrWait() { + return nil, fmt.Errorf("predicates failed in allocate for task <%s/%s> on node <%s>, status is not success", + task.Namespace, task.Name, node.Name) } - return nil, util.CheckPredicateStatus(predicateStatus, admitStatus) + return nil, nil } // To pick tuple for job, we choose to pick namespace firstly. diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 468e011cde..780c14ca78 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -17,6 +17,7 @@ limitations under the License. package backfill import ( + "fmt" "time" "k8s.io/klog/v2" @@ -74,20 +75,20 @@ func (backfill *Action) Execute(ssn *framework.Session) { // TODO (k82cn): predicates did not consider pod number for now, there'll // be ping-pong case here. // Only nodes whose status is success after predicate filtering can be scheduled. - admitStatus := map[int]struct{}{ - api.Success: {}, - } - predicateStatus, err := ssn.PredicateFn(task, node) + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, node) if err != nil { klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) fe.SetNodeError(node.Name, err) continue } - err = util.CheckPredicateStatus(predicateStatus, admitStatus) - if err != nil { - klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, node.Name, err) + + if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || + statusSets.ContainsErrorSkipOrWait() { + err := fmt.Errorf("predicates failed in backfill for task <%s/%s> on node <%s>, status is not success", + task.Namespace, task.Name, node.Name) + klog.V(3).Infof("err: %v", err) fe.SetNodeError(node.Name, err) continue } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 4850816f9e..f3be6aa64b 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -210,16 +210,18 @@ func preempt( predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. - admitStatus := map[int]struct{}{ - api.Success: {}, - api.Unschedulable: {}, - } - predicateStatus, err := ssn.PredicateFn(task, node) + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, node) if err != nil { return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) } - return nil, util.CheckPredicateStatus(predicateStatus, admitStatus) + + if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { + return nil, fmt.Errorf("predicates failed in preempt for task <%s/%s> on node <%s>, status is not success or unschedulable", + task.Namespace, task.Name, node.Name) + } + return nil, nil } predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true) diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index d8b17a06df..8bced3d9ec 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -123,24 +123,20 @@ func (ra *Action) Execute(ssn *framework.Session) { assigned := false for _, n := range ssn.Nodes { - // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. - admitStatus := map[int]struct{}{ - api.Success: {}, - api.Unschedulable: {}, - } - predicateStatus, err := ssn.PredicateFn(task, n) + var statusSets util.StatusSets + statusSets, err := ssn.PredicateFn(task, n) if err != nil { klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, n.Name, err) continue } - // If predicates failed, next node. - if err := util.CheckPredicateStatus(predicateStatus, admitStatus); err != nil { - klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, n.Name, err) + + // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. + if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { + klog.V(3).Infof("predicates failed in reclaim for task <%s/%s> on node <%s>, status is not success or unschedulable.", + task.Namespace, task.Name, n.Name) continue } - klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", task.Namespace, task.Name, n.Name) diff --git a/pkg/scheduler/framework/util.go b/pkg/scheduler/framework/util.go index 412472aeb0..e43ef65e34 100644 --- a/pkg/scheduler/framework/util.go +++ b/pkg/scheduler/framework/util.go @@ -266,7 +266,7 @@ func (nl *NodeLister) List() ([]*v1.Node, error) { return nodes, nil } -// The state of the k8s prefile is converted to the internal state of the volcano +// ConvertPredicateStatus return predicate status from k8sframework status func ConvertPredicateStatus(status *k8sframework.Status) (*api.Status, error) { internalStatus := &api.Status{} if status.Code() == k8sframework.Success { diff --git a/pkg/scheduler/util/predicate_helper.go b/pkg/scheduler/util/predicate_helper.go index dfc3de39aa..6895b86de4 100644 --- a/pkg/scheduler/util/predicate_helper.go +++ b/pkg/scheduler/util/predicate_helper.go @@ -100,23 +100,54 @@ func (ph *predicateHelper) PredicateNodes(task *api.TaskInfo, nodes []*api.NodeI return predicateNodes, fe } -func CheckPredicateStatus(predicateStatus []*api.Status, admitStatus map[int]struct{}) error { - for _, status := range predicateStatus { +func taskGroupID(task *api.TaskInfo) string { + return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey()) +} + +func NewPredicateHelper() PredicateHelper { + return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}} +} + +type PredicateStatus interface { + IsContainsUnschedulable() bool + IsContainsUnschedulableAndUnresolvable() bool + IsContainsErrorSkipOrWait() bool +} + +type StatusSets []*api.Status + +func (s StatusSets) ContainsUnschedulable() bool { + for _, status := range s { if status == nil { continue } - if _, ok := admitStatus[status.Code]; !ok { - return fmt.Errorf("Predicates status (code: %d) does not meet the expectation (admit status: %v), message: %s", - status.Code, admitStatus, status.Reason) + if status.Code == api.Unschedulable { + return true } } - return nil + return false } -func taskGroupID(task *api.TaskInfo) string { - return fmt.Sprintf("%s/%s", task.Job, task.GetTaskSpecKey()) +func (s StatusSets) ContainsUnschedulableAndUnresolvable() bool { + for _, status := range s { + if status == nil { + continue + } + if status.Code == api.UnschedulableAndUnresolvable { + return true + } + } + return false } -func NewPredicateHelper() PredicateHelper { - return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}} +func (s StatusSets) ContainsErrorSkipOrWait() bool { + for _, status := range s { + if status == nil { + continue + } + if status.Code == api.Error || status.Code == api.Skip || status.Code == api.Wait { + return true + } + } + return false } From cb8a18632c9ab1d6e06532529eee29d00e46e53e Mon Sep 17 00:00:00 2001 From: w00568049 Date: Wed, 12 Jul 2023 21:07:00 +0800 Subject: [PATCH 6/6] The invalid interface is deleted and the log information is updated Signed-off-by: w00568049 --- pkg/scheduler/actions/allocate/allocate.go | 4 ++-- pkg/scheduler/actions/backfill/backfill.go | 4 ++-- pkg/scheduler/util/predicate_helper.go | 6 ------ 3 files changed, 4 insertions(+), 10 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index eb0af30bec..1ad5a470a3 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -105,7 +105,7 @@ func (alloc *Action) Execute(ssn *framework.Session) { var statusSets util.StatusSets statusSets, err := ssn.PredicateFn(task, node) if err != nil { - return nil, fmt.Errorf("allocate predicates failed for task <%s/%s> on node <%s>: %v", + return nil, fmt.Errorf("predicates failed in allocate for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) } @@ -229,7 +229,7 @@ func (alloc *Action) Execute(ssn *framework.Session) { metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) } } else { - klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s> with limited resources", + klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources", task.Namespace, task.Name, node.Name) // Allocate releasing resource to the task if any. diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 780c14ca78..6f6847778b 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -78,7 +78,7 @@ func (backfill *Action) Execute(ssn *framework.Session) { var statusSets util.StatusSets statusSets, err := ssn.PredicateFn(task, node) if err != nil { - klog.V(3).Infof("backfill predicates failed for task <%s/%s> on node <%s>: %v", + klog.V(3).Infof("predicates failed in backfill for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, node.Name, err) fe.SetNodeError(node.Name, err) continue @@ -88,7 +88,7 @@ func (backfill *Action) Execute(ssn *framework.Session) { statusSets.ContainsErrorSkipOrWait() { err := fmt.Errorf("predicates failed in backfill for task <%s/%s> on node <%s>, status is not success", task.Namespace, task.Name, node.Name) - klog.V(3).Infof("err: %v", err) + klog.V(3).Infof("%v", err) fe.SetNodeError(node.Name, err) continue } diff --git a/pkg/scheduler/util/predicate_helper.go b/pkg/scheduler/util/predicate_helper.go index 6895b86de4..ef7983f452 100644 --- a/pkg/scheduler/util/predicate_helper.go +++ b/pkg/scheduler/util/predicate_helper.go @@ -108,12 +108,6 @@ func NewPredicateHelper() PredicateHelper { return &predicateHelper{taskPredicateErrorCache: map[string]map[string]error{}} } -type PredicateStatus interface { - IsContainsUnschedulable() bool - IsContainsUnschedulableAndUnresolvable() bool - IsContainsErrorSkipOrWait() bool -} - type StatusSets []*api.Status func (s StatusSets) ContainsUnschedulable() bool {