Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: remove the restriction that reservation cannot be preempted #1859

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package eventhandlers

import (
"context"
"math"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -30,7 +29,6 @@ import (
"k8s.io/kubernetes/pkg/scheduler"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/utils/pointer"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
koordclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned"
Expand Down Expand Up @@ -274,8 +272,6 @@ func addReservationToSchedulerCache(sched frameworkext.Scheduler, obj interface{

// update pod cache and trigger pod assigned event for scheduling queue
reservePod := reservationutil.NewReservePod(r)
// Forces priority to be set to maximum to prevent preemption.
reservePod.Spec.Priority = pointer.Int32(math.MaxInt32)
if err = sched.GetCache().AddPod(reservePod); err != nil {
klog.ErrorS(err, "Failed to add reservation into SchedulerCache", "reservation", klog.KObj(reservePod))
} else {
Expand Down Expand Up @@ -338,9 +334,6 @@ func updateReservationInSchedulerCache(sched frameworkext.Scheduler, oldObj, new
}
oldReservePod := reservationutil.NewReservePod(oldR)
newReservePod := reservationutil.NewReservePod(newR)
// Forces priority to be set to maximum to prevent preemption.
oldReservePod.Spec.Priority = pointer.Int32(math.MaxInt32)
newReservePod.Spec.Priority = pointer.Int32(math.MaxInt32)
if err := sched.GetCache().UpdatePod(oldReservePod, newReservePod); err != nil {
klog.ErrorS(err, "Failed to update reservation into SchedulerCache", "reservation", klog.KObj(newR))
} else {
Expand Down Expand Up @@ -380,8 +373,6 @@ func deleteReservationFromSchedulerCache(sched frameworkext.Scheduler, obj inter
}

reservePod := reservationutil.NewReservePod(r)
// Forces priority to be set to maximum to prevent preemption.
reservePod.Spec.Priority = pointer.Int32(math.MaxInt32)
if _, err = sched.GetCache().GetPod(reservePod); err == nil {
if len(rInfo.AllocatedPorts) > 0 {
allocatablePorts := util.RequestedHostPorts(reservePod)
Expand Down
8 changes: 0 additions & 8 deletions pkg/scheduler/plugins/deviceshare/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,6 @@ func (p *Plugin) PreFilterExtensions() framework.PreFilterExtensions {
}

func (p *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationutil.IsReservePod(podInfoToAdd.Pod) {
return nil
}

state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return status
Expand Down Expand Up @@ -220,10 +216,6 @@ func (p *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState, p
}

func (p *Plugin) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationutil.IsReservePod(podInfoToRemove.Pod) {
return nil
}

state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return status
Expand Down
9 changes: 0 additions & 9 deletions pkg/scheduler/plugins/elasticquota/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
frameworkexthelper "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper"
"github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/elasticquota/core"
reservationutil "github.com/koordinator-sh/koordinator/pkg/util/reservation"
"github.com/koordinator-sh/koordinator/pkg/util/transformer"
)

Expand Down Expand Up @@ -262,10 +261,6 @@ func (g *Plugin) PreFilterExtensions() framework.PreFilterExtensions {
// AddPod is called by the framework while trying to evaluate the impact
// of adding podToAdd to the node while scheduling podToSchedule.
func (g *Plugin) AddPod(ctx context.Context, state *framework.CycleState, podToSchedule *corev1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationutil.IsReservePod(podInfoToAdd.Pod) {
return nil
}

postFilterState, err := getPostFilterState(state)
if err != nil {
klog.ErrorS(err, "Failed to read postFilterState from cycleState", "elasticQuotaSnapshotKey", postFilterState)
Expand All @@ -286,10 +281,6 @@ func (g *Plugin) AddPod(ctx context.Context, state *framework.CycleState, podToS
// RemovePod is called by the framework while trying to evaluate the impact
// of removing podToRemove from the node while scheduling podToSchedule.
func (g *Plugin) RemovePod(ctx context.Context, state *framework.CycleState, podToSchedule *corev1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationutil.IsReservePod(podInfoToRemove.Pod) {
return nil
}

postFilterState, err := getPostFilterState(state)
if err != nil {
klog.ErrorS(err, "Failed to read postFilterState from cycleState", "elasticQuotaSnapshotKey", postFilterState)
Expand Down
12 changes: 1 addition & 11 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,6 @@ func (pl *Plugin) PreFilterExtensions() framework.PreFilterExtensions {
}

func (pl *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podInfoToAdd *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationutil.IsReservePod(podInfoToAdd.Pod) || nodeInfo.Node() == nil {
return nil
}
podRequests, _ := resourceapi.PodRequestsAndLimits(podInfoToAdd.Pod)
if quotav1.IsZero(podRequests) {
return nil
Expand Down Expand Up @@ -283,10 +280,6 @@ func (pl *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState,
}

func (pl *Plugin) RemovePod(ctx context.Context, cycleState *framework.CycleState, podToSchedule *corev1.Pod, podInfoToRemove *framework.PodInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationutil.IsReservePod(podInfoToRemove.Pod) || nodeInfo.Node() == nil {
return nil
}

podRequests, _ := resourceapi.PodRequestsAndLimits(podInfoToRemove.Pod)
if quotav1.IsZero(podRequests) {
return nil
Expand Down Expand Up @@ -482,10 +475,7 @@ func fitsNode(podRequest *framework.Resource, nodeInfo *framework.NodeInfo, node
}

func (pl *Plugin) PostFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, _ framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
if reservationutil.IsReservePod(pod) {
// return err to stop default preemption
return nil, framework.NewStatus(framework.Error)
}
// Implement an empty function to be compatible with existing configurations
return nil, framework.NewStatus(framework.Unschedulable)
}

Expand Down
43 changes: 0 additions & 43 deletions pkg/scheduler/plugins/reservation/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1556,49 +1556,6 @@ func TestPreFilterExtensionRemovePod(t *testing.T) {
}
}

func TestPostFilter(t *testing.T) {
reservePod := testGetReservePod(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "reserve-pod-0",
Name: "reserve-pod-0",
},
Spec: corev1.PodSpec{
NodeName: "node1",
},
})
tests := []struct {
name string
pod *corev1.Pod
wantStatus *framework.Status
}{
{
name: "not reserve pod",
pod: &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "not-reserve",
},
},
wantStatus: framework.NewStatus(framework.Unschedulable),
},
{
name: "reserve pod",
pod: reservePod,
wantStatus: framework.NewStatus(framework.Error),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
suit := newPluginTestSuitWith(t, []*corev1.Pod{reservePod}, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node1"}}})
p, err := suit.pluginFactory()
assert.NoError(t, err)
pl := p.(*Plugin)
gotResult, status := pl.PostFilter(context.TODO(), nil, tt.pod, nil)
assert.Nil(t, gotResult)
assert.Equal(t, tt.wantStatus, status)
})
}
}

func TestFilterReservation(t *testing.T) {
reservation4C8G := &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Expand Down
15 changes: 14 additions & 1 deletion pkg/util/reservation/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package reservation
import (
"encoding/json"
"fmt"
"math"
"strconv"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -95,8 +97,19 @@ func NewReservePod(r *schedulingv1alpha1.Reservation) *corev1.Pod {
}

if reservePod.Spec.Priority == nil {
reservePod.Spec.Priority = pointer.Int32(0)
if priorityVal, ok := r.Labels[extension.LabelPodPriority]; ok && priorityVal != "" {
priority, err := strconv.ParseInt(priorityVal, 10, 32)
if err == nil {
reservePod.Spec.Priority = pointer.Int32(int32(priority))
}
}
}

if reservePod.Spec.Priority == nil {
// Forces priority to be set to maximum to prevent preemption.
reservePod.Spec.Priority = pointer.Int32(math.MaxInt32)
}

reservePod.Spec.SchedulerName = GetReservationSchedulerName(r)

if IsReservationSucceeded(r) {
Expand Down
17 changes: 16 additions & 1 deletion pkg/util/reservation/reservation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package reservation

import (
"math"
"testing"
"time"

Expand Down Expand Up @@ -624,7 +625,7 @@ func TestReservePod(t *testing.T) {
},
},
},
Priority: pointer.Int32(0),
Priority: pointer.Int32(math.MaxInt32),
InitContainers: []corev1.Container{
{
Name: "test-init-container",
Expand All @@ -645,6 +646,20 @@ func TestReservePod(t *testing.T) {
reservation: reservation,
wantReservePod: expectReservePod,
},
{
name: "custom priority",
reservation: func() *schedulingv1alpha1.Reservation {
r := reservation.DeepCopy()
r.Labels[apiext.LabelPodPriority] = "1000"
return r
}(),
wantReservePod: func() *corev1.Pod {
p := expectReservePod.DeepCopy()
p.Labels[apiext.LabelPodPriority] = "1000"
p.Spec.Priority = pointer.Int32(1000)
return p
}(),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
Loading