From ad7f84d4aa7d3bf603f109baff3db83951aea7b5 Mon Sep 17 00:00:00 2001 From: Joseph Date: Fri, 19 Jan 2024 13:36:06 +0800 Subject: [PATCH] scheduler: remove the restriction that reservation cannot be preempted Signed-off-by: Joseph --- .../eventhandlers/reservation_handler.go | 9 ---- pkg/scheduler/plugins/deviceshare/plugin.go | 8 ---- pkg/scheduler/plugins/elasticquota/plugin.go | 9 ---- pkg/scheduler/plugins/reservation/plugin.go | 12 +----- .../plugins/reservation/plugin_test.go | 43 ------------------- pkg/util/reservation/reservation.go | 15 ++++++- pkg/util/reservation/reservation_test.go | 17 +++++++- 7 files changed, 31 insertions(+), 82 deletions(-) diff --git a/pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go b/pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go index 2869fd487..6ae949e16 100644 --- a/pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go +++ b/pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go @@ -18,7 +18,6 @@ package eventhandlers import ( "context" - "math" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -30,7 +29,6 @@ import ( "k8s.io/kubernetes/pkg/scheduler" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/profile" - "k8s.io/utils/pointer" schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" koordclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned" @@ -274,8 +272,6 @@ func addReservationToSchedulerCache(sched frameworkext.Scheduler, obj interface{ // update pod cache and trigger pod assigned event for scheduling queue reservePod := reservationutil.NewReservePod(r) - // Forces priority to be set to maximum to prevent preemption. - reservePod.Spec.Priority = pointer.Int32(math.MaxInt32) if err = sched.GetCache().AddPod(reservePod); err != nil { klog.ErrorS(err, "Failed to add reservation into SchedulerCache", "reservation", klog.KObj(reservePod)) } else { @@ -338,9 +334,6 @@ func updateReservationInSchedulerCache(sched frameworkext.Scheduler, oldObj, new } oldReservePod := reservationutil.NewReservePod(oldR) newReservePod := reservationutil.NewReservePod(newR) - // Forces priority to be set to maximum to prevent preemption. - oldReservePod.Spec.Priority = pointer.Int32(math.MaxInt32) - newReservePod.Spec.Priority = pointer.Int32(math.MaxInt32) if err := sched.GetCache().UpdatePod(oldReservePod, newReservePod); err != nil { klog.ErrorS(err, "Failed to update reservation into SchedulerCache", "reservation", klog.KObj(newR)) } else { @@ -380,8 +373,6 @@ func deleteReservationFromSchedulerCache(sched frameworkext.Scheduler, obj inter } reservePod := reservationutil.NewReservePod(r) - // Forces priority to be set to maximum to prevent preemption. - reservePod.Spec.Priority = pointer.Int32(math.MaxInt32) if _, err = sched.GetCache().GetPod(reservePod); err == nil { if len(rInfo.AllocatedPorts) > 0 { allocatablePorts := util.RequestedHostPorts(reservePod) diff --git a/pkg/scheduler/plugins/deviceshare/plugin.go b/pkg/scheduler/plugins/deviceshare/plugin.go index 52aef8fe6..c388820eb 100644 --- a/pkg/scheduler/plugins/deviceshare/plugin.go +++ b/pkg/scheduler/plugins/deviceshare/plugin.go @@ -161,10 +161,6 @@ func (p *Plugin) PreFilterExtensions() framework.PreFilterExtensions { } func (p *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { - if reservationutil.IsReservePod(podInfoToAdd.Pod) { - return nil - } - state, status := getPreFilterState(cycleState) if !status.IsSuccess() { return status @@ -220,10 +216,6 @@ func (p *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState, p } func (p *Plugin) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { - if reservationutil.IsReservePod(podInfoToRemove.Pod) { - return nil - } - state, status := getPreFilterState(cycleState) if !status.IsSuccess() { return status diff --git a/pkg/scheduler/plugins/elasticquota/plugin.go b/pkg/scheduler/plugins/elasticquota/plugin.go index 3f4805361..3e84ada17 100644 --- a/pkg/scheduler/plugins/elasticquota/plugin.go +++ b/pkg/scheduler/plugins/elasticquota/plugin.go @@ -45,7 +45,6 @@ import ( "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" frameworkexthelper "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper" "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/elasticquota/core" - reservationutil "github.com/koordinator-sh/koordinator/pkg/util/reservation" "github.com/koordinator-sh/koordinator/pkg/util/transformer" ) @@ -262,10 +261,6 @@ func (g *Plugin) PreFilterExtensions() framework.PreFilterExtensions { // AddPod is called by the framework while trying to evaluate the impact // of adding podToAdd to the node while scheduling podToSchedule. func (g *Plugin) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *corev1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { - if reservationutil.IsReservePod(podInfoToAdd.Pod) { - return nil - } - postFilterState, err := getPostFilterState(state) if err != nil { klog.ErrorS(err, "Failed to read postFilterState from cycleState", "elasticQuotaSnapshotKey", postFilterState) @@ -286,10 +281,6 @@ func (g *Plugin) AddPod(ctx context.Context, state *framework.CycleState, podToS // RemovePod is called by the framework while trying to evaluate the impact // of removing podToRemove from the node while scheduling podToSchedule. func (g *Plugin) RemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *corev1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { - if reservationutil.IsReservePod(podInfoToRemove.Pod) { - return nil - } - postFilterState, err := getPostFilterState(state) if err != nil { klog.ErrorS(err, "Failed to read postFilterState from cycleState", "elasticQuotaSnapshotKey", postFilterState) diff --git a/pkg/scheduler/plugins/reservation/plugin.go b/pkg/scheduler/plugins/reservation/plugin.go index 5b66b516e..38284a7c2 100644 --- a/pkg/scheduler/plugins/reservation/plugin.go +++ b/pkg/scheduler/plugins/reservation/plugin.go @@ -252,9 +252,6 @@ func (pl *Plugin) PreFilterExtensions() framework.PreFilterExtensions { } func (pl *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { - if reservationutil.IsReservePod(podInfoToAdd.Pod) || nodeInfo.Node() == nil { - return nil - } podRequests, _ := resourceapi.PodRequestsAndLimits(podInfoToAdd.Pod) if quotav1.IsZero(podRequests) { return nil @@ -283,10 +280,6 @@ func (pl *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState, } func (pl *Plugin) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status { - if reservationutil.IsReservePod(podInfoToRemove.Pod) || nodeInfo.Node() == nil { - return nil - } - podRequests, _ := resourceapi.PodRequestsAndLimits(podInfoToRemove.Pod) if quotav1.IsZero(podRequests) { return nil @@ -482,10 +475,7 @@ func fitsNode(podRequest *framework.Resource, nodeInfo *framework.NodeInfo, node } func (pl *Plugin) PostFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) { - if reservationutil.IsReservePod(pod) { - // return err to stop default preemption - return nil, framework.NewStatus(framework.Error) - } + // Implement an empty function to be compatible with existing configurations return nil, framework.NewStatus(framework.Unschedulable) } diff --git a/pkg/scheduler/plugins/reservation/plugin_test.go b/pkg/scheduler/plugins/reservation/plugin_test.go index cbcca657b..d068b1402 100644 --- a/pkg/scheduler/plugins/reservation/plugin_test.go +++ b/pkg/scheduler/plugins/reservation/plugin_test.go @@ -1556,49 +1556,6 @@ func TestPreFilterExtensionRemovePod(t *testing.T) { } } -func TestPostFilter(t *testing.T) { - reservePod := testGetReservePod(&corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - UID: "reserve-pod-0", - Name: "reserve-pod-0", - }, - Spec: corev1.PodSpec{ - NodeName: "node1", - }, - }) - tests := []struct { - name string - pod *corev1.Pod - wantStatus *framework.Status - }{ - { - name: "not reserve pod", - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "not-reserve", - }, - }, - wantStatus: framework.NewStatus(framework.Unschedulable), - }, - { - name: "reserve pod", - pod: reservePod, - wantStatus: framework.NewStatus(framework.Error), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - suit := newPluginTestSuitWith(t, []*corev1.Pod{reservePod}, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node1"}}}) - p, err := suit.pluginFactory() - assert.NoError(t, err) - pl := p.(*Plugin) - gotResult, status := pl.PostFilter(context.TODO(), nil, tt.pod, nil) - assert.Nil(t, gotResult) - assert.Equal(t, tt.wantStatus, status) - }) - } -} - func TestFilterReservation(t *testing.T) { reservation4C8G := &schedulingv1alpha1.Reservation{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/util/reservation/reservation.go b/pkg/util/reservation/reservation.go index 2cad2ff31..096481ff9 100644 --- a/pkg/util/reservation/reservation.go +++ b/pkg/util/reservation/reservation.go @@ -19,6 +19,8 @@ package reservation import ( "encoding/json" "fmt" + "math" + "strconv" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -95,8 +97,19 @@ func NewReservePod(r *schedulingv1alpha1.Reservation) *corev1.Pod { } if reservePod.Spec.Priority == nil { - reservePod.Spec.Priority = pointer.Int32(0) + if priorityVal, ok := r.Labels[extension.LabelPodPriority]; ok && priorityVal != "" { + priority, err := strconv.ParseInt(priorityVal, 10, 32) + if err == nil { + reservePod.Spec.Priority = pointer.Int32(int32(priority)) + } + } + } + + if reservePod.Spec.Priority == nil { + // Forces priority to be set to maximum to prevent preemption. + reservePod.Spec.Priority = pointer.Int32(math.MaxInt32) } + reservePod.Spec.SchedulerName = GetReservationSchedulerName(r) if IsReservationSucceeded(r) { diff --git a/pkg/util/reservation/reservation_test.go b/pkg/util/reservation/reservation_test.go index 2cfd6048d..3d8dfb205 100644 --- a/pkg/util/reservation/reservation_test.go +++ b/pkg/util/reservation/reservation_test.go @@ -17,6 +17,7 @@ limitations under the License. package reservation import ( + "math" "testing" "time" @@ -624,7 +625,7 @@ func TestReservePod(t *testing.T) { }, }, }, - Priority: pointer.Int32(0), + Priority: pointer.Int32(math.MaxInt32), InitContainers: []corev1.Container{ { Name: "test-init-container", @@ -645,6 +646,20 @@ func TestReservePod(t *testing.T) { reservation: reservation, wantReservePod: expectReservePod, }, + { + name: "custom priority", + reservation: func() *schedulingv1alpha1.Reservation { + r := reservation.DeepCopy() + r.Labels[apiext.LabelPodPriority] = "1000" + return r + }(), + wantReservePod: func() *corev1.Pod { + p := expectReservePod.DeepCopy() + p.Labels[apiext.LabelPodPriority] = "1000" + p.Spec.Priority = pointer.Int32(1000) + return p + }(), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {