Skip to content

Commit

Permalink
scheduler: improve the failed event of Reservation's scheduling
Browse files Browse the repository at this point in the history
Signed-off-by: wangjianyu.wjy <wangjianyu.wjy@alibaba-inc.com>
  • Loading branch information
wangjianyu.wjy committed Mar 12, 2024
1 parent 3bf7ba8 commit b9b54f4
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 26 deletions.
43 changes: 25 additions & 18 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package reservation
import (
"context"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -55,9 +56,9 @@ const (
// ErrReasonReservationInactive is the reason for the reservation is failed/succeeded and should not be used.
ErrReasonReservationInactive = "reservation is not active"
// ErrReasonNoReservationsMeetRequirements is the reason for no reservation(s) to meet the requirements.
ErrReasonNoReservationsMeetRequirements = "node(s) no reservation(s) to meet the requirements"
ErrReasonNoReservationsMeetRequirements = "node(s) no matched reservation(s) to meet the resource requirements, %v"
// ErrReasonPreemptionFailed is the reason for preemption failed
ErrReasonPreemptionFailed = "node(s) preemption failed due to insufficient resources"
ErrReasonPreemptionFailed = "node(s) preemption failed due to insufficient %v resources"
)

var (
Expand Down Expand Up @@ -359,9 +360,9 @@ func (pl *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState,
if len(state.preemptible[node.Name]) > 0 || len(state.preemptibleInRRs[node.Name]) > 0 {
preemptible := state.preemptible[node.Name]
preemptibleResource := framework.NewResource(preemptible)
nodeFits := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, nil, preemptibleResource)
if !nodeFits {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed)
insufficientResources := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, nil, preemptibleResource)
if len(insufficientResources) != 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf(ErrReasonPreemptionFailed, insufficientResources))
}
}
return nil
Expand All @@ -378,27 +379,30 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew
node := nodeInfo.Node()
state := getStateData(cycleState)
nodeRState := state.nodeReservationStates[node.Name]
podRequests, _ := resourceapi.PodRequestsAndLimits(pod)
podRequestsResourceNames := quotav1.ResourceNames(podRequests)
podRequestsResourceNames := quotav1.ResourceNames(state.podRequests)

var hasSatisfiedReservation bool
reservationInsufficientResources := make([]string, 0, len(matchedReservations))
for _, rInfo := range matchedReservations {
resourceNames := quotav1.Intersection(rInfo.ResourceNames, podRequestsResourceNames)
if len(resourceNames) == 0 {
reservationInsufficientResources = append(reservationInsufficientResources, fmt.Sprintf("%s/%s insufficient %v", rInfo.GetNamespace(), rInfo.GetName(), podRequestsResourceNames))
continue
}

preemptibleInRR := state.preemptibleInRRs[node.Name][rInfo.UID()]
preemptible := framework.NewResource(preemptibleInRR)
preemptible.Add(state.preemptible[node.Name])
nodeFits := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, rInfo, preemptible)
insufficientResources := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, rInfo, preemptible)
nodeFits := len(insufficientResources) == 0
allocatePolicy := rInfo.GetAllocatePolicy()
if allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyDefault ||
allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyAligned {
if nodeFits {
hasSatisfiedReservation = true
break
}
reservationInsufficientResources = append(reservationInsufficientResources, fmt.Sprintf("%s/%s insufficient %v", rInfo.GetNamespace(), rInfo.GetName(), insufficientResources))
} else if allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyRestricted {
allocated := rInfo.Allocated
if len(preemptibleInRR) > 0 {
Expand All @@ -407,34 +411,37 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew
allocated = quotav1.Mask(allocated, rInfo.ResourceNames)
rRemained := quotav1.SubtractWithNonNegativeResult(rInfo.Allocatable, allocated)
requests := quotav1.Mask(state.podRequests, rInfo.ResourceNames)
fits, _ := quotav1.LessThanOrEqual(requests, rRemained)
fits, insufficientResourcesInReservation := quotav1.LessThanOrEqual(requests, rRemained)
if fits && nodeFits {
hasSatisfiedReservation = true
break
}
reservationInsufficientResources = append(reservationInsufficientResources, fmt.Sprintf("%s/%s insufficient %v", rInfo.GetNamespace(), rInfo.GetName(), quotav1.ToSet(insufficientResources).Union(quotav1.ToSet(insufficientResourcesInReservation)).List()))
}
}
// The Pod requirement must be allocated from Reservation, but currently no Reservation meets the requirement
if !hasSatisfiedReservation && requiredFromReservation {
return framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements)
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonNoReservationsMeetRequirements, strings.Join(reservationInsufficientResources, ",")))
}
return nil
}

var dummyResource = framework.NewResource(nil)

// fitsNode checks if node have enough resources to host the pod.
func fitsNode(podRequest *framework.Resource, nodeInfo *framework.NodeInfo, nodeRState *nodeReservationState, rInfo *frameworkext.ReservationInfo, preemptible *framework.Resource) bool {
func fitsNode(podRequest *framework.Resource, nodeInfo *framework.NodeInfo, nodeRState *nodeReservationState, rInfo *frameworkext.ReservationInfo, preemptible *framework.Resource) []corev1.ResourceName {
insufficientResources := make([]corev1.ResourceName, 0, 4)

allowedPodNumber := nodeInfo.Allocatable.AllowedPodNumber
if len(nodeInfo.Pods)-len(nodeRState.matched)+1 > allowedPodNumber {
return false
insufficientResources = append(insufficientResources, corev1.ResourcePods)
}

if podRequest.MilliCPU == 0 &&
podRequest.Memory == 0 &&
podRequest.EphemeralStorage == 0 &&
len(podRequest.ScalarResources) == 0 {
return true
return insufficientResources
}

var rRemained *framework.Resource
Expand All @@ -457,22 +464,22 @@ func fitsNode(podRequest *framework.Resource, nodeInfo *framework.NodeInfo, node
}

if podRequest.MilliCPU > (nodeInfo.Allocatable.MilliCPU - (podRequested.MilliCPU - rRemained.MilliCPU - allRAllocated.MilliCPU - preemptible.MilliCPU)) {
return false
insufficientResources = append(insufficientResources, corev1.ResourceCPU)
}
if podRequest.Memory > (nodeInfo.Allocatable.Memory - (podRequested.Memory - rRemained.Memory - allRAllocated.Memory - preemptible.Memory)) {
return false
insufficientResources = append(insufficientResources, corev1.ResourceMemory)
}
if podRequest.EphemeralStorage > (nodeInfo.Allocatable.EphemeralStorage - (podRequested.EphemeralStorage - rRemained.EphemeralStorage - allRAllocated.EphemeralStorage - preemptible.EphemeralStorage)) {
return false
insufficientResources = append(insufficientResources, corev1.ResourceEphemeralStorage)
}

for rName, rQuant := range podRequest.ScalarResources {
if rQuant > (nodeInfo.Allocatable.ScalarResources[rName] - (podRequested.ScalarResources[rName] - rRemained.ScalarResources[rName] - allRAllocated.ScalarResources[rName] - preemptible.ScalarResources[rName])) {
return false
insufficientResources = append(insufficientResources, rName)
}
}

return true
return insufficientResources
}

func (pl *Plugin) PostFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
Expand Down
22 changes: 14 additions & 8 deletions pkg/scheduler/plugins/reservation/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func TestFilterWithPreemption(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed),
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf(ErrReasonPreemptionFailed, []corev1.ResourceName{corev1.ResourceCPU})),
},
{
name: "filter non-reservations with preemption but no preemptible resources",
Expand Down Expand Up @@ -772,7 +772,7 @@ func Test_filterWithReservations(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements),
wantStatus: framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonNoReservationsMeetRequirements, fmt.Sprintf("%s/%s insufficient %v", "", "test-r", []corev1.ResourceName{corev1.ResourceCPU}))),
},
{
name: "filter restricted reservation with nodeInfo",
Expand Down Expand Up @@ -861,7 +861,7 @@ func Test_filterWithReservations(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements),
wantStatus: framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonNoReservationsMeetRequirements, fmt.Sprintf("%s/%s insufficient %v", "", "test-r", []corev1.ResourceName{corev1.ResourceCPU}))),
},
{
name: "filter default reservations with preemption",
Expand Down Expand Up @@ -962,6 +962,9 @@ func Test_filterWithReservations(t *testing.T) {
name: "failed to filter default reservations with preempt from reservation",
stateData: &stateData{
hasAffinity: true,
podRequests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
},
podRequestsResources: &framework.Resource{
MilliCPU: 4 * 1000,
},
Expand Down Expand Up @@ -1002,12 +1005,15 @@ func Test_filterWithReservations(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements),
wantStatus: framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonNoReservationsMeetRequirements, fmt.Sprintf("%s/%s insufficient %v", "", "test-r", []corev1.ResourceName{corev1.ResourceCPU}))),
},
{
name: "failed to filter default reservations with preempt from node",
stateData: &stateData{
hasAffinity: true,
podRequests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
},
podRequestsResources: &framework.Resource{
MilliCPU: 4 * 1000,
},
Expand Down Expand Up @@ -1046,7 +1052,7 @@ func Test_filterWithReservations(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements),
wantStatus: framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonNoReservationsMeetRequirements, fmt.Sprintf("%s/%s insufficient %v", "", "test-r", []corev1.ResourceName{corev1.ResourceCPU}))),
},
{
name: "filter restricted reservations with preempt from reservation",
Expand Down Expand Up @@ -1142,7 +1148,7 @@ func Test_filterWithReservations(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements),
wantStatus: framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonNoReservationsMeetRequirements, fmt.Sprintf("%s/%s insufficient %v", "", "test-r", []corev1.ResourceName{corev1.ResourceCPU}))),
},
{
name: "failed to filter restricted reservations with preempt from reservation and node",
Expand Down Expand Up @@ -1199,7 +1205,7 @@ func Test_filterWithReservations(t *testing.T) {
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements),
wantStatus: framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonNoReservationsMeetRequirements, fmt.Sprintf("%s/%s insufficient %v", "", "test-r", []corev1.ResourceName{corev1.ResourceCPU}))),
},
{
name: "filter restricted reservations with preempt from reservation and node",
Expand Down Expand Up @@ -1673,7 +1679,7 @@ func TestFilterReservation(t *testing.T) {
reservation4C8G,
},
targetReservation: reservation2C4G,
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonNoReservationsMeetRequirements),
wantStatus: framework.NewStatus(framework.Unschedulable, fmt.Sprintf(ErrReasonNoReservationsMeetRequirements, fmt.Sprintf("%s/%s insufficient %v", reservation2C4G.Namespace, reservation2C4G.Name, []corev1.ResourceName{corev1.ResourceEphemeralStorage}))),
},
{
name: "failed with allocateOnce and allocated reservation",
Expand Down

0 comments on commit b9b54f4

Please sign in to comment.