Skip to content

Commit

Permalink
Merge pull request volcano-sh#2916 from wangyang0616/feature_predicat…
Browse files Browse the repository at this point in the history
…e_suport_preempt

Refactor PredicateFn for allocate and preempt actions
  • Loading branch information
volcano-sh-bot authored Jul 13, 2023
2 parents 0debbc4 + cb8a186 commit 5302995
Show file tree
Hide file tree
Showing 22 changed files with 385 additions and 95 deletions.
21 changes: 20 additions & 1 deletion docs/design/device-sharing.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,26 @@ type Devices interface {
//HasDeviceRequest checks if the 'pod' request this device
HasDeviceRequest(pod *v1.Pod) bool
//FiltreNode checks if the 'pod' fit in current node
FilterNode(pod *v1.Pod) (bool, error)
// The first return value represents the filtering result, and the value range is "0, 1, 2, 3"
// 0: Success
// Success means that plugin ran correctly and found pod schedulable.
// 1: Error
// Error is used for internal plugin errors, unexpected input, etc.
// 2: Unschedulable
// Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to
// preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the
// scheduler skip preemption.
// The accompanying status message should explain why the pod is unschedulable.
// 3: UnschedulableAndUnresolvable
// UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and
// preemption would not change anything. Plugins should return Unschedulable if it is possible
// that the pod can get scheduled with preemption.
// The accompanying status message should explain why the pod is unschedulable.
FilterNode(pod *v1.Pod) (int, string, error)
//Allocate action in predicate
Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error
//Release action in predicate
Expand Down
20 changes: 16 additions & 4 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package allocate

import (
"fmt"
"time"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -96,13 +97,24 @@ func (alloc *Action) Execute(ssn *framework.Session) {
pendingTasks := map[api.JobID]*util.PriorityQueue{}

allNodes := ssn.NodeList
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) error {
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
// Check for Resource Predicate
if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok {
return api.NewFitError(task, node, reason)
return nil, api.NewFitError(task, node, reason)
}
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, fmt.Errorf("predicates failed in allocate for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
}

return ssn.PredicateFn(task, node)
if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
return nil, fmt.Errorf("predicates failed in allocate for task <%s/%s> on node <%s>, status is not success",
task.Namespace, task.Name, node.Name)
}
return nil, nil
}

// To pick <namespace, queue> tuple for job, we choose to pick namespace firstly.
Expand Down Expand Up @@ -217,7 +229,7 @@ func (alloc *Action) Execute(ssn *framework.Session) {
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
}
} else {
klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s> with limited resources",
klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)

// Allocate releasing resource to the task if any.
Expand Down
18 changes: 16 additions & 2 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ limitations under the License.
package backfill

import (
"fmt"
"time"

"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api"
"volcano.sh/volcano/pkg/scheduler/framework"
"volcano.sh/volcano/pkg/scheduler/metrics"
"volcano.sh/volcano/pkg/scheduler/util"
)

type Action struct{}
Expand Down Expand Up @@ -72,13 +74,25 @@ func (backfill *Action) Execute(ssn *framework.Session) {
for _, node := range ssn.Nodes {
// TODO (k82cn): predicates did not consider pod number for now, there'll
// be ping-pong case here.
if err := ssn.PredicateFn(task, node); err != nil {
klog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
// Only nodes whose status is success after predicate filtering can be scheduled.
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
klog.V(3).Infof("predicates failed in backfill for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
fe.SetNodeError(node.Name, err)
continue
}

if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
err := fmt.Errorf("predicates failed in backfill for task <%s/%s> on node <%s>, status is not success",
task.Namespace, task.Name, node.Name)
klog.V(3).Infof("%v", err)
fe.SetNodeError(node.Name, err)
continue
}

klog.V(3).Infof("Binding Task <%v/%v> to node <%v>", task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v", task.UID, node.Name, ssn.UID)
Expand Down
21 changes: 18 additions & 3 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,13 +203,28 @@ func preempt(
predicateHelper util.PredicateHelper,
) (bool, error) {
assigned := false

allNodes := ssn.NodeList

if err := ssn.PrePredicateFn(preemptor); err != nil {
return false, fmt.Errorf("PrePredicate for task %s/%s failed for: %v", preemptor.Namespace, preemptor.Name, err)
}
predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, ssn.PredicateFn, true)

predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
// Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate.
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
}

if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
return nil, fmt.Errorf("predicates failed in preempt for task <%s/%s> on node <%s>, status is not success or unschedulable",
task.Namespace, task.Name, node.Name)
}
return nil, nil
}

predicateNodes, _ := predicateHelper.PredicateNodes(preemptor, allNodes, predicateFn, true)

nodeScores := util.PrioritizeNodes(preemptor, predicateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

Expand Down
13 changes: 11 additions & 2 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,20 @@ func (ra *Action) Execute(ssn *framework.Session) {

assigned := false
for _, n := range ssn.Nodes {
// If predicates failed, next node.
if err := ssn.PredicateFn(task, n); err != nil {
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, n)
if err != nil {
klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, n.Name, err)
continue
}

// Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate.
if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
klog.V(3).Infof("predicates failed in reclaim for task <%s/%s> on node <%s>, status is not success or unschedulable.",
task.Namespace, task.Name, n.Name)
continue
}
klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
task.Namespace, task.Name, n.Name)

Expand Down
14 changes: 8 additions & 6 deletions pkg/scheduler/api/devices/nvidia/gpushare/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package gpushare

import (
"context"
"fmt"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
Expand All @@ -26,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api/devices"
"volcano.sh/volcano/pkg/scheduler/plugins/util/nodelock"
)

Expand Down Expand Up @@ -146,24 +148,24 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro
return nil
}

func (gs *GPUDevices) FilterNode(pod *v1.Pod) (bool, error) {
func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) {
klog.V(4).Infoln("DeviceSharing:Into FitInPod", pod.Name)
if GpuSharingEnable {
fit, err := checkNodeGPUSharingPredicate(pod, gs)
if err != nil {
if err != nil || !fit {
klog.Errorln("deviceSharing err=", err.Error())
return fit, err
return devices.Unschedulable, fmt.Sprintf("GpuShare %s", err.Error()), err
}
}
if GpuNumberEnable {
fit, err := checkNodeGPUNumberPredicate(pod, gs)
if err != nil {
if err != nil || !fit {
klog.Errorln("deviceSharing err=", err.Error())
return fit, err
return devices.Unschedulable, fmt.Sprintf("GpuNumber %s", err.Error()), err
}
}
klog.V(4).Infoln("DeviceSharing:FitInPod successed")
return true, nil
return devices.Success, "", nil
}

func (gs *GPUDevices) GetStatus() string {
Expand Down
10 changes: 6 additions & 4 deletions pkg/scheduler/api/devices/nvidia/vgpu/device_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package vgpu

import (
"fmt"
"strconv"
"strings"
"time"
Expand All @@ -26,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"volcano.sh/volcano/pkg/scheduler/api/devices"
"volcano.sh/volcano/pkg/scheduler/plugins/util/nodelock"
)

Expand Down Expand Up @@ -177,17 +179,17 @@ func (gs *GPUDevices) Release(kubeClient kubernetes.Interface, pod *v1.Pod) erro
return nil
}

func (gs *GPUDevices) FilterNode(pod *v1.Pod) (bool, error) {
func (gs *GPUDevices) FilterNode(pod *v1.Pod) (int, string, error) {
klog.V(3).Infoln("4pdvgpuDeviceSharing:Into FitInPod", pod.Name)
if VGPUEnable {
fit, _, err := checkNodeGPUSharingPredicate(pod, gs, true)
if err != nil {
if err != nil || !fit {
klog.Errorln("deviceSharing err=", err.Error())
return fit, err
return devices.Unschedulable, fmt.Sprintf("4pdvgpuDeviceSharing %s", err.Error()), err
}
}
klog.V(3).Infoln("4pdvgpu DeviceSharing:FitInPod successed")
return true, nil
return devices.Success, "", nil
}

func (gs *GPUDevices) GetStatus() string {
Expand Down
38 changes: 38 additions & 0 deletions pkg/scheduler/api/devices/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package devices

// These are predefined codes used in a Status.
const (
// Success means that plugin ran correctly and found pod schedulable.
// NOTE: A nil status is also considered as "Success".
Success int = iota
// Error is used for internal plugin errors, unexpected input, etc.
Error
// Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to
// preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the
// scheduler skip preemption.
// The accompanying status message should explain why the pod is unschedulable.
Unschedulable
// UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and
// preemption would not change anything. Plugins should return Unschedulable if it is possible
// that the pod can get scheduled with preemption.
// The accompanying status message should explain why the pod is unschedulable.
UnschedulableAndUnresolvable
// Wait is used when a Permit plugin finds a pod scheduling should wait.
Wait
// Skip is used when a Bind plugin chooses to skip binding.
Skip
)
20 changes: 19 additions & 1 deletion pkg/scheduler/api/shared_device_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,25 @@ type Devices interface {
//HasDeviceRequest checks if the 'pod' request this device
HasDeviceRequest(pod *v1.Pod) bool
//FiltreNode checks if the 'pod' fit in current node
FilterNode(pod *v1.Pod) (bool, error)
// The first return value represents the filtering result, and the value range is "0, 1, 2, 3"
// 0: Success
// Success means that plugin ran correctly and found pod schedulable.

// 1: Error
// Error is used for internal plugin errors, unexpected input, etc.

// 2: Unschedulable
// Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to
// preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the
// scheduler skip preemption.
// The accompanying status message should explain why the pod is unschedulable.

// 3: UnschedulableAndUnresolvable
// UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and
// preemption would not change anything. Plugins should return Unschedulable if it is possible
// that the pod can get scheduled with preemption.
// The accompanying status message should explain why the pod is unschedulable.
FilterNode(pod *v1.Pod) (int, string, error)
//Allocate action in predicate
Allocate(kubeClient kubernetes.Interface, pod *v1.Pod) error
//Release action in predicate
Expand Down
30 changes: 29 additions & 1 deletion pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,34 @@ type ValidateResult struct {
Message string
}

// These are predefined codes used in a Status.
const (
// Success means that plugin ran correctly and found pod schedulable.
// NOTE: A nil status is also considered as "Success".
Success int = iota
// Error is used for internal plugin errors, unexpected input, etc.
Error
// Unschedulable is used when a plugin finds a pod unschedulable. The scheduler might attempt to
// preempt other pods to get this pod scheduled. Use UnschedulableAndUnresolvable to make the
// scheduler skip preemption.
// The accompanying status message should explain why the pod is unschedulable.
Unschedulable
// UnschedulableAndUnresolvable is used when a plugin finds a pod unschedulable and
// preemption would not change anything. Plugins should return Unschedulable if it is possible
// that the pod can get scheduled with preemption.
// The accompanying status message should explain why the pod is unschedulable.
UnschedulableAndUnresolvable
// Wait is used when a Permit plugin finds a pod scheduling should wait.
Wait
// Skip is used when a Bind plugin chooses to skip binding.
Skip
)

type Status struct {
Code int
Reason string
}

// ValidateExFn is the func declaration used to validate the result.
type ValidateExFn func(interface{}) *ValidateResult

Expand All @@ -134,7 +162,7 @@ type VoteFn func(interface{}) int
type JobEnqueuedFn func(interface{})

// PredicateFn is the func declaration used to predicate node for task.
type PredicateFn func(*TaskInfo, *NodeInfo) error
type PredicateFn func(*TaskInfo, *NodeInfo) ([]*Status, error)

// PrePredicateFn is the func declaration used to pre-predicate node for task.
type PrePredicateFn func(*TaskInfo) error
Expand Down
10 changes: 6 additions & 4 deletions pkg/scheduler/framework/session_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,8 @@ func (ssn *Session) TaskOrderFn(l, r interface{}) bool {
}

// PredicateFn invoke predicate function of the plugins
func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error {
func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
predicateStatus := make([]*api.Status, 0)
for _, tier := range ssn.Tiers {
for _, plugin := range tier.Plugins {
if !isEnabled(plugin.EnabledPredicate) {
Expand All @@ -611,13 +612,14 @@ func (ssn *Session) PredicateFn(task *api.TaskInfo, node *api.NodeInfo) error {
if !found {
continue
}
err := pfn(task, node)
status, err := pfn(task, node)
predicateStatus = append(predicateStatus, status...)
if err != nil {
return err
return predicateStatus, err
}
}
}
return nil
return predicateStatus, nil
}

// PrePredicateFn invoke predicate function of the plugins
Expand Down
Loading

0 comments on commit 5302995

Please sign in to comment.