Skip to content

Commit

Permalink
scheduler: enhance reservation error message
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed May 23, 2024
1 parent f5b611f commit 7708b19
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 36 deletions.
52 changes: 46 additions & 6 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package reservation
import (
"context"
"fmt"
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -158,8 +160,11 @@ type stateData struct {
}

type nodeReservationState struct {
nodeName string
matched []*frameworkext.ReservationInfo
nodeName string
matched []*frameworkext.ReservationInfo // owner and affinity matched, not unschedulable
isUnschedulableUnmatched int // owner matched but unmatched due to unschedulable
affinityUnmatched int // owner matched but unmatched due to affinity

// podRequested represents all Pods(including matched reservation) requested resources
// but excluding the already allocated from unmatched reservations
podRequested *framework.Resource
Expand Down Expand Up @@ -382,7 +387,7 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew

var hasSatisfiedReservation bool
allInsufficientResourcesByNode := sets.NewString()
allInsufficientResourcesByReservation := sets.NewString()
var allInsufficientResourcesByReservation []string
for _, rInfo := range matchedReservations {
resourceNames := quotav1.Intersection(rInfo.ResourceNames, podRequestsResourceNames)
if len(resourceNames) == 0 {
Expand Down Expand Up @@ -417,7 +422,7 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew
}
allInsufficientResourcesByNode.Insert(insufficientResourcesByNode...)
for _, insufficientResourceByReservation := range insufficientResourcesByReservation {
allInsufficientResourcesByReservation.Insert(string(insufficientResourceByReservation))
allInsufficientResourcesByReservation = append(allInsufficientResourcesByReservation, string(insufficientResourceByReservation))
}
}
}
Expand All @@ -428,7 +433,7 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew
for insufficientResourceByNode := range allInsufficientResourcesByNode {
failureReasons = append(failureReasons, fmt.Sprintf("Insufficient %s by node", insufficientResourceByNode))
}
for insufficientResourceByReservation := range allInsufficientResourcesByReservation {
for _, insufficientResourceByReservation := range allInsufficientResourcesByReservation {
failureReasons = append(failureReasons, fmt.Sprintf("Insufficient %s by reservation", insufficientResourceByReservation))
}
if len(failureReasons) == 0 {
Expand Down Expand Up @@ -497,7 +502,42 @@ func fitsNode(podRequest *framework.Resource, nodeInfo *framework.NodeInfo, node

func (pl *Plugin) PostFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
// Implement an empty function to be compatible with existing configurations
return nil, framework.NewStatus(framework.Unschedulable)
state := getStateData(cycleState)
reasons := pl.makePostFilterReasons(state)
return nil, framework.NewStatus(framework.Unschedulable, reasons...)
}

func (pl *Plugin) makePostFilterReasons(state *stateData) []string {
ownerMatched, affinityUnmatched, isUnSchedulableUnmatched := 0, 0, 0
for _, nodeState := range state.nodeReservationStates {
isUnSchedulableUnmatched += nodeState.isUnschedulableUnmatched
affinityUnmatched += nodeState.affinityUnmatched
ownerMatched += len(nodeState.matched) + nodeState.isUnschedulableUnmatched + nodeState.affinityUnmatched
}
if ownerMatched <= 0 {
return nil
}

// Make the error messages: The framework does not aggregate the same reasons for the PostFilter, so we need
// to prepare the exact messages by ourselves.
var reasons []string
var b strings.Builder
if affinityUnmatched > 0 {
b.WriteString(strconv.Itoa(affinityUnmatched))
b.WriteString(" Reservation(s) owner match but bad affinity")
reasons = append(reasons, b.String())
b.Reset()
}
if isUnSchedulableUnmatched > 0 {
b.WriteString(strconv.Itoa(isUnSchedulableUnmatched))
b.WriteString(" Reservation(s) is unschedulable")
reasons = append(reasons, b.String())
b.Reset()
}
b.WriteString(strconv.Itoa(ownerMatched))
b.WriteString(" Reservation(s) owner matched total")
reasons = append(reasons, b.String())
return reasons
}

func (pl *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
Expand Down
143 changes: 143 additions & 0 deletions pkg/scheduler/plugins/reservation/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,73 @@ func Test_filterWithReservations(t *testing.T) {
},
wantStatus: framework.NewStatus(framework.Unschedulable, "Insufficient cpu by reservation"),
},
{
name: "failed to filter multiple restricted reservations with preempt from node",
stateData: &stateData{
hasAffinity: true,
podRequests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
},
podRequestsResources: &framework.Resource{
MilliCPU: 4 * 1000,
},
preemptible: map[string]corev1.ResourceList{
node.Name: {
corev1.ResourceCPU: resource.MustParse("4"),
},
},
preemptibleInRRs: nil,
nodeReservationStates: map[string]nodeReservationState{
node.Name: {
podRequested: &framework.Resource{
MilliCPU: 38 * 1000,
},
rAllocated: &framework.Resource{
MilliCPU: 10000,
},
matched: []*frameworkext.ReservationInfo{
{
Reservation: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-r",
UID: "123456",
},
Spec: schedulingv1alpha1.ReservationSpec{
AllocatePolicy: schedulingv1alpha1.ReservationAllocatePolicyRestricted,
},
},
ResourceNames: []corev1.ResourceName{corev1.ResourceCPU},
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
},
Allocated: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("6"),
},
},
{
Reservation: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-r-1",
UID: "7891011",
},
Spec: schedulingv1alpha1.ReservationSpec{
AllocatePolicy: schedulingv1alpha1.ReservationAllocatePolicyRestricted,
},
},
ResourceNames: []corev1.ResourceName{corev1.ResourceCPU},
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("4"),
},
Allocated: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("1"),
},
},
},
},
},
},
wantStatus: framework.NewStatus(framework.Unschedulable, "Insufficient cpu by reservation", "Insufficient cpu by reservation"),
},
{
name: "failed to filter restricted reservations with preempt from reservation and node",
stateData: &stateData{
Expand Down Expand Up @@ -1766,6 +1833,82 @@ func TestFilterReservation(t *testing.T) {
}
}

func TestPostFilter(t *testing.T) {
type args struct {
hasStateData bool
nodeReservationStates map[string]nodeReservationState
}
tests := []struct {
name string
args args
want *framework.PostFilterResult
want1 *framework.Status
}{
{
name: "no reservation filtering",
args: args{
hasStateData: false,
},
want: nil,
want1: framework.NewStatus(framework.Unschedulable),
},
{
name: "show reservation owner matched, unschedulable unmatched",
args: args{
hasStateData: true,
nodeReservationStates: map[string]nodeReservationState{
"test-node-0": {
matched: make([]*frameworkext.ReservationInfo, 0),
isUnschedulableUnmatched: 3,
affinityUnmatched: 0,
},
"test-node-1": {
matched: make([]*frameworkext.ReservationInfo, 0),
isUnschedulableUnmatched: 1,
affinityUnmatched: 0,
},
},
},
want: nil,
want1: framework.NewStatus(framework.Unschedulable, "4 Reservation(s) is unschedulable", "4 Reservation(s) owner matched total"),
},
{
name: "show reservation owner matched, unschedulable and affinity unmatched",
args: args{
hasStateData: true,
nodeReservationStates: map[string]nodeReservationState{
"test-node-0": {
matched: make([]*frameworkext.ReservationInfo, 0),
isUnschedulableUnmatched: 0,
affinityUnmatched: 3,
},
"test-node-1": {
matched: make([]*frameworkext.ReservationInfo, 0),
isUnschedulableUnmatched: 1,
affinityUnmatched: 1,
},
},
},
want: nil,
want1: framework.NewStatus(framework.Unschedulable, "4 Reservation(s) owner match but bad affinity", "1 Reservation(s) is unschedulable", "5 Reservation(s) owner matched total"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pl := &Plugin{}
cycleState := framework.NewCycleState()
if tt.args.hasStateData {
cycleState.Write(stateKey, &stateData{
nodeReservationStates: tt.args.nodeReservationStates,
})
}
got, got1 := pl.PostFilter(context.TODO(), cycleState, nil, nil)
assert.Equal(t, tt.want, got)
assert.Equal(t, tt.want1, got1)
})
}
}

func TestReserve(t *testing.T) {
reservation2C4G := &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Expand Down
27 changes: 16 additions & 11 deletions pkg/scheduler/plugins/reservation/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (pl *Plugin) prepareMatchReservationState(ctx context.Context, cycleState *
}

var unmatched, matched []*frameworkext.ReservationInfo
isUnschedulableUnmatched, affinityUnmatched := 0, 0
status := pl.reservationCache.forEachAvailableReservationOnNode(node.Name, func(rInfo *frameworkext.ReservationInfo) (bool, *framework.Status) {
if !rInfo.IsAvailable() || rInfo.ParseError != nil {
return true, nil
Expand All @@ -111,8 +112,14 @@ func (pl *Plugin) prepareMatchReservationState(ctx context.Context, cycleState *
return true, nil
}

if !isReservedPod && !rInfo.IsUnschedulable() && matchReservation(pod, node, rInfo, reservationAffinity) {
matched = append(matched, rInfo.Clone())
if !isReservedPod && rInfo.Match(pod) { // owner matched
if rInfo.IsUnschedulable() {
isUnschedulableUnmatched++
} else if !matchReservationAffinity(node, rInfo, reservationAffinity) {
affinityUnmatched++
} else {
matched = append(matched, rInfo.Clone())
}

} else if len(rInfo.AssignedPods) > 0 {
unmatched = append(unmatched, rInfo.Clone())
Expand Down Expand Up @@ -182,10 +189,12 @@ func (pl *Plugin) prepareMatchReservationState(ctx context.Context, cycleState *
if len(matched) > 0 || len(unmatched) > 0 {
index := atomic.AddInt32(&stateIndex, 1)
allNodeReservationStates[index-1] = &nodeReservationState{
nodeName: node.Name,
matched: matched,
podRequested: podRequested,
rAllocated: framework.NewResource(rAllocated),
nodeName: node.Name,
matched: matched,
isUnschedulableUnmatched: isUnschedulableUnmatched,
affinityUnmatched: affinityUnmatched,
podRequested: podRequested,
rAllocated: framework.NewResource(rAllocated),
}
allPluginToRestoreState[index-1] = pluginToRestoreState
}
Expand Down Expand Up @@ -347,11 +356,7 @@ func calculateResource(pod *corev1.Pod) (res framework.Resource, non0CPU int64,
return
}

func matchReservation(pod *corev1.Pod, node *corev1.Node, reservation *frameworkext.ReservationInfo, reservationAffinity *reservationutil.RequiredReservationAffinity) bool {
if !reservation.Match(pod) {
return false
}

func matchReservationAffinity(node *corev1.Node, reservation *frameworkext.ReservationInfo, reservationAffinity *reservationutil.RequiredReservationAffinity) bool {
if reservationAffinity != nil {
// NOTE: There are some special scenarios.
// For example, the AZ where the Pod wants to select the Reservation is cn-hangzhou, but the Reservation itself
Expand Down
Loading

0 comments on commit 7708b19

Please sign in to comment.