Skip to content

Commit

Permalink
scheduler: remove the restriction that reservation cannot be preempted (
Browse files Browse the repository at this point in the history
koordinator-sh#1859)

Signed-off-by: Joseph <joseph.t.lee@outlook.com>
  • Loading branch information
eahydra authored and ls-2018 committed Mar 25, 2024
1 parent ba7a386 commit 9d148bd
Show file tree
Hide file tree
Showing 7 changed files with 31 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package eventhandlers

import (
"context"
"math"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -30,7 +29,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/utils/pointer"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
koordclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -274,8 +272,6 @@ func addReservationToSchedulerCache(sched frameworkext.Scheduler, obj interface{

// update pod cache and trigger pod assigned event for scheduling queue
reservePod := reservationutil.NewReservePod(r)
// Forces priority to be set to maximum to prevent preemption.
reservePod.Spec.Priority = pointer.Int32(math.MaxInt32)
if err = sched.GetCache().AddPod(reservePod); err != nil {
klog.ErrorS(err, "Failed to add reservation into SchedulerCache", "reservation", klog.KObj(reservePod))
} else {
Expand Down Expand Up @@ -338,9 +334,6 @@ func updateReservationInSchedulerCache(sched frameworkext.Scheduler, oldObj, new
}
oldReservePod := reservationutil.NewReservePod(oldR)
newReservePod := reservationutil.NewReservePod(newR)
// Forces priority to be set to maximum to prevent preemption.
oldReservePod.Spec.Priority = pointer.Int32(math.MaxInt32)
newReservePod.Spec.Priority = pointer.Int32(math.MaxInt32)
if err := sched.GetCache().UpdatePod(oldReservePod, newReservePod); err != nil {
klog.ErrorS(err, "Failed to update reservation into SchedulerCache", "reservation", klog.KObj(newR))
} else {
Expand Down Expand Up @@ -380,8 +373,6 @@ func deleteReservationFromSchedulerCache(sched frameworkext.Scheduler, obj inter
}

reservePod := reservationutil.NewReservePod(r)
// Forces priority to be set to maximum to prevent preemption.
reservePod.Spec.Priority = pointer.Int32(math.MaxInt32)
if _, err = sched.GetCache().GetPod(reservePod); err == nil {
if len(rInfo.AllocatedPorts) > 0 {
allocatablePorts := util.RequestedHostPorts(reservePod)
Expand Down
8 changes: 0 additions & 8 deletions pkg/scheduler/plugins/deviceshare/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,6 @@ func (p *Plugin) PreFilterExtensions() framework.PreFilterExtensions {
}

func (p *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationutil.IsReservePod(podInfoToAdd.Pod) {
return nil
}

state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return status
Expand Down Expand Up @@ -220,10 +216,6 @@ func (p *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState, p
}

func (p *Plugin) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationutil.IsReservePod(podInfoToRemove.Pod) {
return nil
}

state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return status
Expand Down
9 changes: 0 additions & 9 deletions pkg/scheduler/plugins/elasticquota/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
frameworkexthelper "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper"
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/elasticquota/core"
reservationutil "github.com/koordinator-sh/koordinator/pkg/util/reservation"
"github.com/koordinator-sh/koordinator/pkg/util/transformer"
)

Expand Down Expand Up @@ -262,10 +261,6 @@ func (g *Plugin) PreFilterExtensions() framework.PreFilterExtensions {
// AddPod is called by the framework while trying to evaluate the impact
// of adding podToAdd to the node while scheduling podToSchedule.
func (g *Plugin) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *corev1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationutil.IsReservePod(podInfoToAdd.Pod) {
return nil
}

postFilterState, err := getPostFilterState(state)
if err != nil {
klog.ErrorS(err, "Failed to read postFilterState from cycleState", "elasticQuotaSnapshotKey", postFilterState)
Expand All @@ -286,10 +281,6 @@ func (g *Plugin) AddPod(ctx context.Context, state *framework.CycleState, podToS
// RemovePod is called by the framework while trying to evaluate the impact
// of removing podToRemove from the node while scheduling podToSchedule.
func (g *Plugin) RemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *corev1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationutil.IsReservePod(podInfoToRemove.Pod) {
return nil
}

postFilterState, err := getPostFilterState(state)
if err != nil {
klog.ErrorS(err, "Failed to read postFilterState from cycleState", "elasticQuotaSnapshotKey", postFilterState)
Expand Down
12 changes: 1 addition & 11 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,6 @@ func (pl *Plugin) PreFilterExtensions() framework.PreFilterExtensions {
}

func (pl *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationutil.IsReservePod(podInfoToAdd.Pod) || nodeInfo.Node() == nil {
return nil
}
podRequests, _ := resourceapi.PodRequestsAndLimits(podInfoToAdd.Pod)
if quotav1.IsZero(podRequests) {
return nil
Expand Down Expand Up @@ -283,10 +280,6 @@ func (pl *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState,
}

func (pl *Plugin) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationutil.IsReservePod(podInfoToRemove.Pod) || nodeInfo.Node() == nil {
return nil
}

podRequests, _ := resourceapi.PodRequestsAndLimits(podInfoToRemove.Pod)
if quotav1.IsZero(podRequests) {
return nil
Expand Down Expand Up @@ -482,10 +475,7 @@ func fitsNode(podRequest *framework.Resource, nodeInfo *framework.NodeInfo, node
}

func (pl *Plugin) PostFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
if reservationutil.IsReservePod(pod) {
// return err to stop default preemption
return nil, framework.NewStatus(framework.Error)
}
// Implement an empty function to be compatible with existing configurations
return nil, framework.NewStatus(framework.Unschedulable)
}

Expand Down
43 changes: 0 additions & 43 deletions pkg/scheduler/plugins/reservation/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1556,49 +1556,6 @@ func TestPreFilterExtensionRemovePod(t *testing.T) {
}
}

func TestPostFilter(t *testing.T) {
reservePod := testGetReservePod(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "reserve-pod-0",
Name: "reserve-pod-0",
},
Spec: corev1.PodSpec{
NodeName: "node1",
},
})
tests := []struct {
name string
pod *corev1.Pod
wantStatus *framework.Status
}{
{
name: "not reserve pod",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "not-reserve",
},
},
wantStatus: framework.NewStatus(framework.Unschedulable),
},
{
name: "reserve pod",
pod: reservePod,
wantStatus: framework.NewStatus(framework.Error),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
suit := newPluginTestSuitWith(t, []*corev1.Pod{reservePod}, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node1"}}})
p, err := suit.pluginFactory()
assert.NoError(t, err)
pl := p.(*Plugin)
gotResult, status := pl.PostFilter(context.TODO(), nil, tt.pod, nil)
assert.Nil(t, gotResult)
assert.Equal(t, tt.wantStatus, status)
})
}
}

func TestFilterReservation(t *testing.T) {
reservation4C8G := &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Expand Down
15 changes: 14 additions & 1 deletion pkg/util/reservation/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package reservation
import (
"encoding/json"
"fmt"
"math"
"strconv"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -95,8 +97,19 @@ func NewReservePod(r *schedulingv1alpha1.Reservation) *corev1.Pod {
}

if reservePod.Spec.Priority == nil {
reservePod.Spec.Priority = pointer.Int32(0)
if priorityVal, ok := r.Labels[extension.LabelPodPriority]; ok && priorityVal != "" {
priority, err := strconv.ParseInt(priorityVal, 10, 32)
if err == nil {
reservePod.Spec.Priority = pointer.Int32(int32(priority))
}
}
}

if reservePod.Spec.Priority == nil {
// Forces priority to be set to maximum to prevent preemption.
reservePod.Spec.Priority = pointer.Int32(math.MaxInt32)
}

reservePod.Spec.SchedulerName = GetReservationSchedulerName(r)

if IsReservationSucceeded(r) {
Expand Down
17 changes: 16 additions & 1 deletion pkg/util/reservation/reservation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package reservation

import (
"math"
"testing"
"time"

Expand Down Expand Up @@ -624,7 +625,7 @@ func TestReservePod(t *testing.T) {
},
},
},
Priority: pointer.Int32(0),
Priority: pointer.Int32(math.MaxInt32),
InitContainers: []corev1.Container{
{
Name: "test-init-container",
Expand All @@ -645,6 +646,20 @@ func TestReservePod(t *testing.T) {
reservation: reservation,
wantReservePod: expectReservePod,
},
{
name: "custom priority",
reservation: func() *schedulingv1alpha1.Reservation {
r := reservation.DeepCopy()
r.Labels[apiext.LabelPodPriority] = "1000"
return r
}(),
wantReservePod: func() *corev1.Pod {
p := expectReservePod.DeepCopy()
p.Labels[apiext.LabelPodPriority] = "1000"
p.Spec.Priority = pointer.Int32(1000)
return p
}(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit 9d148bd

Please sign in to comment.