From bd92106363eb783e9906f2852a4817eac746b31a Mon Sep 17 00:00:00 2001 From: wangyang Date: Wed, 26 Apr 2023 14:26:07 +0800 Subject: [PATCH] Prioritize scheduling to machines that are satisfied with current resources, and then consider machines that are satisfied with future resources Signed-off-by: wangyang --- pkg/scheduler/actions/allocate/allocate.go | 74 +++++++++++++++------- 1 file changed, 50 insertions(+), 24 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 1b1a01cd84..d2c7bcf510 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -195,50 +195,76 @@ func (alloc *Action) Execute(ssn *framework.Session) { break } - var candidateNodes []*api.NodeInfo + // Candidate nodes are divided into two gradients: + // - the first gradient node: a list of free nodes that satisfy the task resource request; + // - The second gradient node: the node list whose sum of node idle resources and future idle meets the task resource request; + // Score the first gradient node first. If the first gradient node meets the requirements, ignore the second gradient node list, + // otherwise, score the second gradient node and select the appropriate node. + var candidateNodes [][]*api.NodeInfo + var idleCandidateNodes []*api.NodeInfo + var futureIdleCandidateNodes []*api.NodeInfo for _, n := range predicateNodes { - if task.InitResreq.LessEqual(n.Idle, api.Zero) || task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) { - candidateNodes = append(candidateNodes, n) + if task.InitResreq.LessEqual(n.Idle, api.Zero) { + idleCandidateNodes = append(idleCandidateNodes, n) + } else if task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) { + futureIdleCandidateNodes = append(futureIdleCandidateNodes, n) + } else { + klog.V(5).Infof("Predicate filtered node %v, idle: %v and future idle: %v do not meet the requirements of task: %v", + n.Name, n.Idle, n.FutureIdle(), task.Name) } } + candidateNodes = append(candidateNodes, idleCandidateNodes) + candidateNodes = append(candidateNodes, futureIdleCandidateNodes) + + var bestNode *api.NodeInfo + for index, nodes := range candidateNodes { + if klog.V(5).Enabled() { + for _, node := range nodes { + klog.V(5).Infof("node %v, idle: %v, future idle: %v", node.Name, node.Idle, node.FutureIdle()) + } + } + switch { + case len(nodes) == 0: + klog.V(5).Infof("Task: %v, no matching node is found in the candidateNodes(index: %d) list.", task.Name, index) + case len(nodes) == 1: // If only one node after predicate, just use it. + bestNode = nodes[0] + case len(nodes) > 1: // If more than one node after predicate, using "the best" one + nodeScores := util.PrioritizeNodes(task, nodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) + + bestNode = ssn.BestNodeFn(task, nodeScores) + if bestNode == nil { + bestNode = util.SelectBestNode(nodeScores) + } + } - var node *api.NodeInfo - switch { - case len(candidateNodes) == 0: // If not candidate nodes for this task, skip it. - continue - case len(candidateNodes) == 1: // If only one node after predicate, just use it. - node = candidateNodes[0] - case len(candidateNodes) > 1: // If more than one node after predicate, using "the best" one - nodeScores := util.PrioritizeNodes(task, candidateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn) - - node = ssn.BestNodeFn(task, nodeScores) - if node == nil { - node = util.SelectBestNode(nodeScores) + // If a proper node is found in idleCandidateNodes, skip futureIdleCandidateNodes and directly return the node information. + if bestNode != nil { + break } } // Allocate idle resource to the task. - if task.InitResreq.LessEqual(node.Idle, api.Zero) { + if task.InitResreq.LessEqual(bestNode.Idle, api.Zero) { klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", - task.Namespace, task.Name, node.Name) - if err := stmt.Allocate(task, node); err != nil { + task.Namespace, task.Name, bestNode.Name) + if err := stmt.Allocate(task, bestNode); err != nil { klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v", - task.UID, node.Name, ssn.UID, err) + task.UID, bestNode.Name, ssn.UID, err) } else { metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now()) } } else { klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources", - task.Namespace, task.Name, node.Name) + task.Namespace, task.Name, bestNode.Name) // Allocate releasing resource to the task if any. - if task.InitResreq.LessEqual(node.FutureIdle(), api.Zero) { + if task.InitResreq.LessEqual(bestNode.FutureIdle(), api.Zero) { klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>", - task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing) - if err := stmt.Pipeline(task, node.Name); err != nil { + task.Namespace, task.Name, bestNode.Name, task.InitResreq, bestNode.Releasing) + if err := stmt.Pipeline(task, bestNode.Name); err != nil { klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.", - task.UID, node.Name, ssn.UID, err) + task.UID, bestNode.Name, ssn.UID, err) } else { metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time)) metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())