From 253c4d46971768124bf36eb56934941f15f87428 Mon Sep 17 00:00:00 2001 From: Joseph Date: Thu, 28 Dec 2023 21:07:24 +0800 Subject: [PATCH] scheduler: FilterReservation/ScoreReservation perceives preemptible states (#1803) Signed-off-by: Joseph --- .../plugins/reservation/nominator_test.go | 11 +- pkg/scheduler/plugins/reservation/plugin.go | 52 ++--- .../plugins/reservation/plugin_test.go | 197 ++++++++++-------- pkg/scheduler/plugins/reservation/scoring.go | 21 +- .../plugins/reservation/scoring_test.go | 58 +++--- 5 files changed, 190 insertions(+), 149 deletions(-) diff --git a/pkg/scheduler/plugins/reservation/nominator_test.go b/pkg/scheduler/plugins/reservation/nominator_test.go index 419a999f2..95fca8051 100644 --- a/pkg/scheduler/plugins/reservation/nominator_test.go +++ b/pkg/scheduler/plugins/reservation/nominator_test.go @@ -272,7 +272,14 @@ func TestNominateReservation(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - suit := newPluginTestSuit(t) + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{}, + } + + suit := newPluginTestSuitWith(t, nil, []*corev1.Node{node}) plugin, err := suit.pluginFactory() assert.NoError(t, err) pl := plugin.(*Plugin) @@ -292,7 +299,7 @@ func TestNominateReservation(t *testing.T) { pl.reservationCache.updateReservation(reservation) } cycleState.Write(stateKey, state) - nominateRInfo, status := pl.NominateReservation(context.TODO(), cycleState, tt.pod, "test-node") + nominateRInfo, status := pl.NominateReservation(context.TODO(), cycleState, tt.pod, node.Name) if tt.wantReservation == nil { assert.Nil(t, nominateRInfo) } else { diff --git a/pkg/scheduler/plugins/reservation/plugin.go b/pkg/scheduler/plugins/reservation/plugin.go index 27c222d1b..4094c916e 100644 --- a/pkg/scheduler/plugins/reservation/plugin.go +++ b/pkg/scheduler/plugins/reservation/plugin.go @@ -345,33 +345,36 @@ func (pl *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, } if !reservationutil.IsReservePod(pod) { - return pl.filterWithReservations(ctx, cycleState, pod, nodeInfo) + state := getStateData(cycleState) + nodeRState := state.nodeReservationStates[node.Name] + matchedReservations := nodeRState.matched + if len(matchedReservations) == 0 { + if state.hasAffinity { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReservationAffinity) + } + + if len(state.preemptible[node.Name]) > 0 || len(state.preemptibleInRRs[node.Name]) > 0 { + preemptible := state.preemptible[node.Name] + preemptibleResource := framework.NewResource(preemptible) + nodeFits := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, nil, preemptibleResource) + if !nodeFits { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed) + } + } + return nil + } + + return pl.filterWithReservations(ctx, cycleState, pod, nodeInfo, matchedReservations) } // TODO: handle pre-allocation cases return nil } -func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status { +func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo, matchedReservations []*frameworkext.ReservationInfo) *framework.Status { node := nodeInfo.Node() state := getStateData(cycleState) nodeRState := state.nodeReservationStates[node.Name] - matchedReservations := nodeRState.matched - if len(matchedReservations) == 0 { - if state.hasAffinity { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReservationAffinity) - } - - if len(state.preemptible[node.Name]) > 0 || len(state.preemptibleInRRs[node.Name]) > 0 { - preemptible := state.preemptible[node.Name] - preemptibleResource := framework.NewResource(preemptible) - nodeFits := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, nil, preemptibleResource) - if !nodeFits { - return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed) - } - } - return nil - } var ( totalDefault int @@ -500,20 +503,21 @@ func (pl *Plugin) FilterReservation(ctx context.Context, cycleState *framework.C } if rInfo.IsAllocateOnce() && len(rInfo.AssignedPods) > 0 { - return framework.AsStatus(fmt.Errorf("reservation has allocateOnce enabled and has already been allocated")) + return framework.NewStatus(framework.Unschedulable, "reservation has allocateOnce enabled and has already been allocated") } podRequests, _ := resourceapi.PodRequestsAndLimits(pod) resourceNames := quotav1.Intersection(rInfo.ResourceNames, quotav1.ResourceNames(podRequests)) if len(resourceNames) == 0 { - return framework.AsStatus(fmt.Errorf("no intersection resources")) + return framework.NewStatus(framework.Unschedulable, "no intersection resources") } - remainedResource := quotav1.SubtractWithNonNegativeResult(rInfo.Allocatable, rInfo.Allocated) - if quotav1.IsZero(quotav1.Mask(remainedResource, resourceNames)) { - return framework.AsStatus(fmt.Errorf("insufficient resources in reservation")) + nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, "missing node") } - return nil + + return pl.filterWithReservations(ctx, cycleState, pod, nodeInfo, []*frameworkext.ReservationInfo{rInfo}) } func (pl *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { diff --git a/pkg/scheduler/plugins/reservation/plugin_test.go b/pkg/scheduler/plugins/reservation/plugin_test.go index 3749cc9c4..881ec40ca 100644 --- a/pkg/scheduler/plugins/reservation/plugin_test.go +++ b/pkg/scheduler/plugins/reservation/plugin_test.go @@ -524,6 +524,101 @@ func TestFilter(t *testing.T) { } } +func TestFilterWithPreemption(t *testing.T) { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + corev1.ResourcePods: resource.MustParse("100"), + }, + }, + } + tests := []struct { + name string + stateData *stateData + wantStatus *framework.Status + }{ + { + name: "successfully filter non-reservations with preemption", + stateData: &stateData{ + podRequestsResources: &framework.Resource{ + MilliCPU: 4 * 1000, + }, + preemptible: map[string]corev1.ResourceList{ + node.Name: { + corev1.ResourceCPU: resource.MustParse("4"), + }, + }, + nodeReservationStates: map[string]nodeReservationState{ + node.Name: { + podRequested: &framework.Resource{ + MilliCPU: 32 * 1000, + }, + }, + }, + }, + wantStatus: nil, + }, + { + name: "failed to filter non-reservations with preemption", + stateData: &stateData{ + podRequestsResources: &framework.Resource{ + MilliCPU: 4 * 1000, + }, + preemptible: map[string]corev1.ResourceList{ + node.Name: { + corev1.ResourceCPU: resource.MustParse("2"), + }, + }, + nodeReservationStates: map[string]nodeReservationState{ + node.Name: { + podRequested: &framework.Resource{ + MilliCPU: 32 * 1000, + }, + }, + }, + }, + wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed), + }, + { + name: "filter non-reservations with preemption but no preemptible resources", + stateData: &stateData{ + podRequestsResources: &framework.Resource{ + MilliCPU: 4 * 1000, + }, + preemptible: map[string]corev1.ResourceList{}, + nodeReservationStates: map[string]nodeReservationState{ + node.Name: { + podRequested: &framework.Resource{ + MilliCPU: 32 * 1000, + }, + }, + }, + }, + wantStatus: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pl := &Plugin{} + cycleState := framework.NewCycleState() + if tt.stateData.podRequestsResources == nil { + resources := framework.NewResource(tt.stateData.podRequests) + tt.stateData.podRequestsResources = resources + } + cycleState.Write(stateKey, tt.stateData) + nodeInfo := framework.NewNodeInfo() + nodeInfo.SetNode(node) + got := pl.Filter(context.TODO(), cycleState, &corev1.Pod{}, nodeInfo) + assert.Equal(t, tt.wantStatus, got) + }) + } +} + func Test_filterWithReservations(t *testing.T) { node := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -722,89 +817,6 @@ func Test_filterWithReservations(t *testing.T) { }, wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonReservationInsufficientResources), }, - { - name: "filter non-reservations with preemption", - stateData: &stateData{ - podRequestsResources: &framework.Resource{ - MilliCPU: 4 * 1000, - }, - preemptible: map[string]corev1.ResourceList{ - node.Name: { - corev1.ResourceCPU: resource.MustParse("4"), - }, - }, - nodeReservationStates: map[string]nodeReservationState{ - node.Name: { - podRequested: &framework.Resource{ - MilliCPU: 32 * 1000, - }, - }, - }, - }, - wantStatus: nil, - }, - { - name: "filter non-reservations with preemption but no preemptible resources", - stateData: &stateData{ - podRequestsResources: &framework.Resource{ - MilliCPU: 4 * 1000, - }, - preemptible: map[string]corev1.ResourceList{}, - nodeReservationStates: map[string]nodeReservationState{ - node.Name: { - podRequested: &framework.Resource{ - MilliCPU: 32 * 1000, - }, - }, - }, - }, - wantStatus: nil, - }, - { - name: "filter non-reservations with preemption but no preemptible resources and have preemptibleInRR", - stateData: &stateData{ - podRequestsResources: &framework.Resource{ - MilliCPU: 4 * 1000, - }, - preemptible: map[string]corev1.ResourceList{}, - preemptibleInRRs: map[string]map[types.UID]corev1.ResourceList{ - node.Name: { - "123456": { - corev1.ResourceCPU: resource.MustParse("4"), - }, - }, - }, - nodeReservationStates: map[string]nodeReservationState{ - node.Name: { - podRequested: &framework.Resource{ - MilliCPU: 32 * 1000, - }, - }, - }, - }, - wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed), - }, - { - name: "filter non-reservations with preemption", - stateData: &stateData{ - podRequestsResources: &framework.Resource{ - MilliCPU: 4 * 1000, - }, - preemptible: map[string]corev1.ResourceList{ - node.Name: { - corev1.ResourceCPU: resource.MustParse("2"), - }, - }, - nodeReservationStates: map[string]nodeReservationState{ - node.Name: { - podRequested: &framework.Resource{ - MilliCPU: 32 * 1000, - }, - }, - }, - }, - wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed), - }, { name: "filter default reservations with preemption", stateData: &stateData{ @@ -1213,7 +1225,7 @@ func Test_filterWithReservations(t *testing.T) { cycleState.Write(stateKey, tt.stateData) nodeInfo := framework.NewNodeInfo() nodeInfo.SetNode(node) - got := pl.filterWithReservations(context.TODO(), cycleState, &corev1.Pod{}, nodeInfo) + got := pl.filterWithReservations(context.TODO(), cycleState, &corev1.Pod{}, nodeInfo, tt.stateData.nodeReservationStates[node.Name].matched) assert.Equal(t, tt.wantStatus, got) }) } @@ -1624,7 +1636,7 @@ func TestFilterReservation(t *testing.T) { reservation4C8G, }, targetReservation: reservation2C4G, - wantStatus: framework.AsStatus(fmt.Errorf("no intersection resources")), + wantStatus: framework.NewStatus(framework.Unschedulable, "no intersection resources"), }, { name: "failed with allocateOnce and allocated reservation", @@ -1636,7 +1648,7 @@ func TestFilterReservation(t *testing.T) { allocateOnceAndAllocatedReservation, }, targetReservation: allocateOnceAndAllocatedReservation, - wantStatus: framework.AsStatus(fmt.Errorf("reservation has allocateOnce enabled and has already been allocated")), + wantStatus: framework.NewStatus(framework.Unschedulable, "reservation has allocateOnce enabled and has already been allocated"), }, { name: "missing reservation info but impossible", @@ -1649,12 +1661,18 @@ func TestFilterReservation(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - suit := newPluginTestSuit(t) + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{}, + } + + suit := newPluginTestSuitWith(t, nil, []*corev1.Node{node}) p, err := suit.pluginFactory() assert.NoError(t, err) assert.NotNil(t, p) pl := p.(*Plugin) - cycleState := framework.NewCycleState() pod := &corev1.Pod{ Spec: corev1.PodSpec{ Containers: []corev1.Container{ @@ -1681,10 +1699,11 @@ func TestFilterReservation(t *testing.T) { nodeRState.matched = append(nodeRState.matched, rInfo) state.nodeReservationStates[v.Status.NodeName] = nodeRState } + cycleState := framework.NewCycleState() cycleState.Write(stateKey, state) rInfo := frameworkext.NewReservationInfo(tt.targetReservation) - status := pl.FilterReservation(context.TODO(), cycleState, pod, rInfo, "test-node") + status := pl.FilterReservation(context.TODO(), cycleState, pod, rInfo, node.Name) assert.Equal(t, tt.wantStatus, status) }) } diff --git a/pkg/scheduler/plugins/reservation/scoring.go b/pkg/scheduler/plugins/reservation/scoring.go index d188bf9db..a9f40482e 100644 --- a/pkg/scheduler/plugins/reservation/scoring.go +++ b/pkg/scheduler/plugins/reservation/scoring.go @@ -117,7 +117,8 @@ func (pl *Plugin) Score(ctx context.Context, cycleState *framework.CycleState, p if reservationInfo == nil { return framework.MinNodeScore, nil } - return scoreReservation(pod, reservationInfo), nil + + return pl.ScoreReservation(ctx, cycleState, pod, reservationInfo, nodeName) } func (pl *Plugin) ScoreExtensions() framework.ScoreExtensions { @@ -146,7 +147,14 @@ func (pl *Plugin) ScoreReservation(ctx context.Context, cycleState *framework.Cy return 0, framework.AsStatus(fmt.Errorf("impossible, there is no relevant Reservation information")) } - return scoreReservation(pod, rInfo), nil + preemptibleInRR := state.preemptibleInRRs[nodeName][rInfo.UID()] + allocated := rInfo.Allocated + if len(preemptibleInRR) > 0 { + allocated = quotav1.SubtractWithNonNegativeResult(allocated, preemptibleInRR) + allocated = quotav1.Mask(allocated, rInfo.ResourceNames) + } + + return scoreReservation(pod, rInfo, allocated), nil } func (pl *Plugin) ReservationScoreExtensions() frameworkext.ReservationScoreExtensions { @@ -174,14 +182,11 @@ func findMostPreferredReservationByOrder(rOnNode []*frameworkext.ReservationInfo return highOrder, selectOrder } -func scoreReservation(pod *corev1.Pod, reservation *frameworkext.ReservationInfo) int64 { +func scoreReservation(pod *corev1.Pod, rInfo *frameworkext.ReservationInfo, allocated corev1.ResourceList) int64 { // TODO(joseph): we should support zero-request pods requested, _ := resourceapi.PodRequestsAndLimits(pod) - if allocated := reservation.Allocated; allocated != nil { - // consider multi owners sharing one reservation - requested = quotav1.Add(requested, allocated) - } - resources := quotav1.RemoveZeros(reservation.Allocatable) + requested = quotav1.Add(requested, allocated) + resources := quotav1.RemoveZeros(rInfo.Allocatable) w := int64(len(resources)) if w <= 0 { diff --git a/pkg/scheduler/plugins/reservation/scoring_test.go b/pkg/scheduler/plugins/reservation/scoring_test.go index 0d62fe36b..2ebc3cf05 100644 --- a/pkg/scheduler/plugins/reservation/scoring_test.go +++ b/pkg/scheduler/plugins/reservation/scoring_test.go @@ -203,7 +203,14 @@ func TestScore(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - suit := newPluginTestSuit(t) + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + Status: corev1.NodeStatus{}, + } + + suit := newPluginTestSuitWith(t, nil, []*corev1.Node{node}) p, err := suit.pluginFactory() assert.NoError(t, err) assert.NotNil(t, p) @@ -235,7 +242,7 @@ func TestScore(t *testing.T) { }) assert.True(t, status.IsSuccess()) - score, status := pl.Score(context.TODO(), cycleState, tt.pod, "test-node") + score, status := pl.Score(context.TODO(), cycleState, tt.pod, node.Name) assert.True(t, status.IsSuccess()) assert.Equal(t, tt.wantScore, score) }) @@ -296,7 +303,15 @@ func TestScoreWithOrder(t *testing.T) { } } - suit := newPluginTestSuit(t) + var nodes []*corev1.Node + for i := 0; i < 4; i++ { + nodes = append(nodes, &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-node-%d", i+1), + }, + }) + } + suit := newPluginTestSuitWith(t, nil, nodes) p, err := suit.pluginFactory() assert.NoError(t, err) assert.NotNil(t, p) @@ -332,15 +347,6 @@ func TestScoreWithOrder(t *testing.T) { cycleState := framework.NewCycleState() cycleState.Write(stateKey, state) - var nodes []*corev1.Node - for nodeName := range state.nodeReservationStates { - nodes = append(nodes, &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: nodeName, - }, - }) - } - status := pl.PreScore(context.TODO(), cycleState, normalPod, nodes) assert.True(t, status.IsSuccess()) assert.Equal(t, "test-node-4", state.preferredNode) @@ -659,7 +665,20 @@ func TestPreScoreWithNominateReservation(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - suit := newPluginTestSuit(t) + nodes := []*corev1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + }, + } + + suit := newPluginTestSuitWith(t, nil, nodes) plugin, err := suit.pluginFactory() assert.NoError(t, err) pl := plugin.(*Plugin) @@ -680,19 +699,6 @@ func TestPreScoreWithNominateReservation(t *testing.T) { } cycleState.Write(stateKey, state) - nodes := []*corev1.Node{ - { - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node", - }, - }, - { - ObjectMeta: metav1.ObjectMeta{ - Name: "test-node-1", - }, - }, - } - status := pl.PreScore(context.TODO(), cycleState, tt.pod, nodes) assert.Equal(t, tt.wantStatus, status.IsSuccess())