Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick for release-1.8] msg information optimization; preemption logic optimization #3082

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package allocate

import (
"fmt"
"time"

"k8s.io/klog/v2"
Expand Down Expand Up @@ -105,14 +104,12 @@ func (alloc *Action) Execute(ssn *framework.Session) {
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, fmt.Errorf("predicates failed in allocate for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
return nil, api.NewFitError(task, node, err.Error())
}

if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
return nil, fmt.Errorf("predicates failed in allocate for task <%s/%s> on node <%s>, status is not success",
task.Namespace, task.Name, node.Name)
return nil, api.NewFitError(task, node, statusSets.Message())
}
return nil, nil
}
Expand Down
6 changes: 1 addition & 5 deletions pkg/scheduler/actions/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,13 @@ func (backfill *Action) Execute(ssn *framework.Session) {
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
klog.V(3).Infof("predicates failed in backfill for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
fe.SetNodeError(node.Name, err)
continue
}

if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() ||
statusSets.ContainsErrorSkipOrWait() {
err := fmt.Errorf("predicates failed in backfill for task <%s/%s> on node <%s>, status is not success",
task.Namespace, task.Name, node.Name)
klog.V(3).Infof("%v", err)
err := fmt.Errorf("%s", statusSets.Message())
fe.SetNodeError(node.Name, err)
continue
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,11 @@ func preempt(
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
if err != nil {
return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
return nil, api.NewFitError(task, node, err.Error())
}

if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
return nil, fmt.Errorf("predicates failed in preempt for task <%s/%s> on node <%s>, status is not success or unschedulable",
task.Namespace, task.Name, node.Name)
return nil, api.NewFitError(task, node, statusSets.Message())
}
return nil, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,15 @@ func (ra *Action) Execute(ssn *framework.Session) {
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, n)
if err != nil {
klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v",
klog.V(5).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, n.Name, err)
continue
}

// Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate.
if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() {
klog.V(3).Infof("predicates failed in reclaim for task <%s/%s> on node <%s>, status is not success or unschedulable.",
task.Namespace, task.Name, n.Name)
klog.V(5).Infof("predicates failed in reclaim for task <%s/%s> on node <%s>, reason is %s.",
task.Namespace, task.Name, n.Name, statusSets.Message())
continue
}
klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.",
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ type Status struct {
Reason string
}

// String represents status string
func (s Status) String() string {
return s.Reason
}

// ValidateExFn is the func declaration used to validate the result.
type ValidateExFn func(interface{}) *ValidateResult

Expand Down
6 changes: 0 additions & 6 deletions pkg/scheduler/framework/job_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (

const (
jobUpdaterWorker = 16

jobConditionUpdateTime = time.Minute
jobConditionUpdateTimeJitter = 30 * time.Second
)

// TimeJitterAfter means: new after old + duration + jitter
Expand Down Expand Up @@ -61,9 +58,6 @@ func isPodGroupConditionsUpdated(newCondition, oldCondition []scheduling.PodGrou

newTime := newCond.LastTransitionTime
oldTime := oldCond.LastTransitionTime
if TimeJitterAfter(newTime.Time, oldTime.Time, jobConditionUpdateTime, jobConditionUpdateTimeJitter) {
return true
}

// if newCond is not new enough, we treat it the same as the old one
newCond.LastTransitionTime = oldTime
Expand Down
13 changes: 5 additions & 8 deletions pkg/scheduler/framework/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ limitations under the License.
package framework

import (
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -267,23 +265,22 @@ func (nl *NodeLister) List() ([]*v1.Node, error) {
}

// ConvertPredicateStatus return predicate status from k8sframework status
func ConvertPredicateStatus(status *k8sframework.Status) (*api.Status, error) {
func ConvertPredicateStatus(status *k8sframework.Status) *api.Status {
internalStatus := &api.Status{}
if status.Code() == k8sframework.Success {
internalStatus.Code = api.Success
return internalStatus, nil
return internalStatus
} else if status.Code() == k8sframework.Unschedulable {
internalStatus.Code = api.Unschedulable
internalStatus.Reason = status.Message()
return internalStatus, nil
return internalStatus
} else if status.Code() == k8sframework.UnschedulableAndUnresolvable {
internalStatus.Code = api.UnschedulableAndUnresolvable
internalStatus.Reason = status.Message()
return internalStatus, nil
return internalStatus
} else {
internalStatus.Code = api.Error
internalStatus.Reason = status.Message()
return internalStatus, fmt.Errorf("Convert predicate status error, k8s status code is %d, Reason is %s",
status.Code(), status.Message())
return internalStatus
}
}
82 changes: 46 additions & 36 deletions pkg/scheduler/plugins/predicates/predicates.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,10 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
ssn.AddPrePredicateFn(pp.Name(), func(task *api.TaskInfo) error {
// Check NodePorts
if predicate.nodePortEnable {
nodePortFilter.PreFilter(context.TODO(), state, task.Pod)
_, status := nodePortFilter.PreFilter(context.TODO(), state, task.Pod)
if !status.IsSuccess() {
return fmt.Errorf("plugin %s pre-predicates failed %s", interpodaffinity.Name, status.Message())
}
}

// InterPodAffinity Predicate
Expand Down Expand Up @@ -404,40 +407,44 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
klog.V(4).Infof("NodePodNumber predicates Task <%s/%s> on Node <%s> failed",
task.Namespace, task.Name, node.Name)
podsNumStatus := &api.Status{
Code: api.Unschedulable,
Reason: fmt.Sprintf("Task <%s/%s> on Node <%s> failed, reason: %s",
task.Namespace, task.Name, node.Name, api.NodePodNumberExceeded),
// TODO(wangyang0616): When the number of pods of a node reaches the upper limit, preemption is not supported for now.
// Record details in #3079 (volcano.sh/volcano)
// In the preempt stage, the pipeline of the pod number is not considered,
// the preemption of the pod number is released directly, which will cause the pods in the node to be cyclically evicted.
Code: api.UnschedulableAndUnresolvable,
Reason: api.NodePodNumberExceeded,
}
predicateStatus = append(predicateStatus, podsNumStatus)
return predicateStatus, fmt.Errorf("%s", api.NodePodNumberExceeded)
}

predicateByStablefilter := func(pod *v1.Pod, nodeInfo *k8sframework.NodeInfo) ([]*api.Status, bool, error) {
// CheckNodeUnschedulable
predicateStatus := make([]*api.Status, 0)
status := nodeUnscheduleFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
nodeUnscheduleStatus, err := framework.ConvertPredicateStatus(status)
predicateStatus = append(predicateStatus, nodeUnscheduleStatus)
if err != nil {
return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeunschedulable.Name, status.Message())
nodeUnscheduleStatus := framework.ConvertPredicateStatus(status)
if nodeUnscheduleStatus.Code != api.Success {
predicateStatus = append(predicateStatus, nodeUnscheduleStatus)
return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeUnscheduleFilter.Name(), status.Message())
}

// Check NodeAffinity
if predicate.nodeAffinityEnable {
status := nodeAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
nodeAffinityStatus, err := framework.ConvertPredicateStatus(status)
predicateStatus = append(predicateStatus, nodeAffinityStatus)
if err != nil {
return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message())
nodeAffinityStatus := framework.ConvertPredicateStatus(status)
if nodeAffinityStatus.Code != api.Success {
predicateStatus = append(predicateStatus, nodeAffinityStatus)
return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeAffinityFilter.Name(), status.Message())
}
}

// PodToleratesNodeTaints: TaintToleration
if predicate.taintTolerationEnable {
status := tolerationFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
tolerationStatus, err := framework.ConvertPredicateStatus(status)
predicateStatus = append(predicateStatus, tolerationStatus)
if err != nil {
return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", tainttoleration.Name, status.Message())
tolerationStatus := framework.ConvertPredicateStatus(status)
if tolerationStatus.Code != api.Success {
predicateStatus = append(predicateStatus, tolerationStatus)
return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", tolerationFilter.Name(), status.Message())
}
}

Expand Down Expand Up @@ -470,49 +477,49 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
// Check NodePort
if predicate.nodePortEnable {
status := nodePortFilter.Filter(context.TODO(), state, nil, nodeInfo)
nodePortStatus, err := framework.ConvertPredicateStatus(status)
predicateStatus = append(predicateStatus, nodePortStatus)
if err != nil {
return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", nodeports.Name, status.Message())
nodePortStatus := framework.ConvertPredicateStatus(status)
if nodePortStatus.Code != api.Success {
predicateStatus = append(predicateStatus, nodePortStatus)
return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", nodePortFilter.Name(), status.Message())
}
}

// Check PodAffinity
if predicate.podAffinityEnable {
status := podAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
podAffinityStatus, err := framework.ConvertPredicateStatus(status)
predicateStatus = append(predicateStatus, podAffinityStatus)
if err != nil {
return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", interpodaffinity.Name, status.Message())
podAffinityStatus := framework.ConvertPredicateStatus(status)
if podAffinityStatus.Code != api.Success {
predicateStatus = append(predicateStatus, podAffinityStatus)
return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", podAffinityFilter.Name(), status.Message())
}
}

// Check NodeVolumeLimits
if predicate.nodeVolumeLimitsEnable {
status := nodeVolumeLimitsCSIFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
nodeVolumeStatus, err := framework.ConvertPredicateStatus(status)
predicateStatus = append(predicateStatus, nodeVolumeStatus)
if err != nil {
nodeVolumeStatus := framework.ConvertPredicateStatus(status)
if nodeVolumeStatus.Code != api.Success {
predicateStatus = append(predicateStatus, nodeVolumeStatus)
return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", nodeVolumeLimitsCSIFilter.Name(), status.Message())
}
}

// Check VolumeZone
if predicate.volumeZoneEnable {
status := volumeZoneFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
volumeZoneStatus, err := framework.ConvertPredicateStatus(status)
predicateStatus = append(predicateStatus, volumeZoneStatus)
if err != nil {
volumeZoneStatus := framework.ConvertPredicateStatus(status)
if volumeZoneStatus.Code != api.Success {
predicateStatus = append(predicateStatus, volumeZoneStatus)
return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message())
}
}

// Check PodTopologySpread
if predicate.podTopologySpreadEnable {
status := podTopologySpreadFilter.Filter(context.TODO(), state, task.Pod, nodeInfo)
podTopologyStatus, err := framework.ConvertPredicateStatus(status)
predicateStatus = append(predicateStatus, podTopologyStatus)
if err != nil {
podTopologyStatus := framework.ConvertPredicateStatus(status)
if podTopologyStatus.Code != api.Success {
predicateStatus = append(predicateStatus, podTopologyStatus)
return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message())
}
}
Expand All @@ -524,10 +531,13 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
Code: code,
Reason: msg,
}
predicateStatus = append(predicateStatus, filterNodeStatus)
if err != nil {
return predicateStatus, err
}
if filterNodeStatus.Code != api.Success {
predicateStatus = append(predicateStatus, filterNodeStatus)
return predicateStatus, fmt.Errorf("plugin device filternode predicates failed %s", msg)
}
} else {
klog.Warningf("Devices %s assertion conversion failed, skip", val)
}
Expand All @@ -539,8 +549,8 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) {
if predicate.proportionalEnable {
// Check ProportionalPredicate
proportionalStatus, err := checkNodeResourceIsProportional(task, node, predicate.proportional)
predicateStatus = append(predicateStatus, proportionalStatus)
if err != nil {
if proportionalStatus.Code != api.Success {
predicateStatus = append(predicateStatus, proportionalStatus)
return predicateStatus, err
}
klog.V(4).Infof("checkNodeResourceIsProportional predicates Task <%s/%s> on Node <%s>: fit %v",
Expand Down
7 changes: 4 additions & 3 deletions pkg/scheduler/plugins/predicates/proportional.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ import (

// checkNodeResourceIsProportional checks if a gpu:cpu:memory is Proportional
func checkNodeResourceIsProportional(task *api.TaskInfo, node *api.NodeInfo, proportional map[v1.ResourceName]baseResource) (*api.Status, error) {
status := &api.Status{}
status := &api.Status{
Code: api.Success,
}
for resourceName := range proportional {
if value, found := task.Resreq.ScalarResources[resourceName]; found && value > 0 {
status.Code = api.Success
return status, nil
}
}
Expand All @@ -40,7 +41,7 @@ func checkNodeResourceIsProportional(task *api.TaskInfo, node *api.NodeInfo, pro
memoryReserved := value * resourceRate.Memory * 1000 * 1000

if node.Idle.MilliCPU-task.Resreq.MilliCPU < cpuReserved || node.Idle.Memory-task.Resreq.Memory < memoryReserved {
status.Code = api.Unschedulable
status.Code = api.UnschedulableAndUnresolvable
status.Reason = fmt.Sprintf("proportional of resource %s check failed", resourceName)
return status, fmt.Errorf("proportional of resource %s check failed", resourceName)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/predicates/proportional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) {
node: n1,
proportional: proportional,
},
api.Unschedulable,
api.UnschedulableAndUnresolvable,
true,
},
{
Expand Down
31 changes: 31 additions & 0 deletions pkg/scheduler/util/predicate_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package util
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -145,3 +146,33 @@ func (s StatusSets) ContainsErrorSkipOrWait() bool {
}
return false
}

// Message return the message generated from StatusSets
func (s StatusSets) Message() string {
if s == nil {
return ""
}
all := make([]string, 0, len(s))
for _, status := range s {
if status.Reason == "" {
continue
}
all = append(all, status.Reason)
}
return strings.Join(all, ",")
}

// Reasons return the reasons list
func (s StatusSets) Reasons() []string {
if s == nil {
return nil
}
all := make([]string, 0, len(s))
for _, status := range s {
if status.Reason == "" {
continue
}
all = append(all, status.Reason)
}
return all
}
Loading