Skip to content

Commit

Permalink
scheduler: FilterReservation/ScoreReservation perceives preemptible s…
Browse files Browse the repository at this point in the history
…tates (#1803)

Signed-off-by: Joseph <joseph.t.lee@outlook.com>
  • Loading branch information
eahydra committed Dec 28, 2023
1 parent 24b00f1 commit 253c4d4
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 149 deletions.
11 changes: 9 additions & 2 deletions pkg/scheduler/plugins/reservation/nominator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,14 @@ func TestNominateReservation(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
suit := newPluginTestSuit(t)
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
},
Status: corev1.NodeStatus{},
}

suit := newPluginTestSuitWith(t, nil, []*corev1.Node{node})
plugin, err := suit.pluginFactory()
assert.NoError(t, err)
pl := plugin.(*Plugin)
Expand All @@ -292,7 +299,7 @@ func TestNominateReservation(t *testing.T) {
pl.reservationCache.updateReservation(reservation)
}
cycleState.Write(stateKey, state)
nominateRInfo, status := pl.NominateReservation(context.TODO(), cycleState, tt.pod, "test-node")
nominateRInfo, status := pl.NominateReservation(context.TODO(), cycleState, tt.pod, node.Name)
if tt.wantReservation == nil {
assert.Nil(t, nominateRInfo)
} else {
Expand Down
52 changes: 28 additions & 24 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,33 +345,36 @@ func (pl *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState,
}

if !reservationutil.IsReservePod(pod) {
return pl.filterWithReservations(ctx, cycleState, pod, nodeInfo)
state := getStateData(cycleState)
nodeRState := state.nodeReservationStates[node.Name]
matchedReservations := nodeRState.matched
if len(matchedReservations) == 0 {
if state.hasAffinity {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReservationAffinity)
}

if len(state.preemptible[node.Name]) > 0 || len(state.preemptibleInRRs[node.Name]) > 0 {
preemptible := state.preemptible[node.Name]
preemptibleResource := framework.NewResource(preemptible)
nodeFits := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, nil, preemptibleResource)
if !nodeFits {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed)
}
}
return nil
}

return pl.filterWithReservations(ctx, cycleState, pod, nodeInfo, matchedReservations)
}

// TODO: handle pre-allocation cases
return nil
}

func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo, matchedReservations []*frameworkext.ReservationInfo) *framework.Status {
node := nodeInfo.Node()
state := getStateData(cycleState)
nodeRState := state.nodeReservationStates[node.Name]
matchedReservations := nodeRState.matched
if len(matchedReservations) == 0 {
if state.hasAffinity {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReservationAffinity)
}

if len(state.preemptible[node.Name]) > 0 || len(state.preemptibleInRRs[node.Name]) > 0 {
preemptible := state.preemptible[node.Name]
preemptibleResource := framework.NewResource(preemptible)
nodeFits := fitsNode(state.podRequestsResources, nodeInfo, &nodeRState, nil, preemptibleResource)
if !nodeFits {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed)
}
}
return nil
}

var (
totalDefault int
Expand Down Expand Up @@ -500,20 +503,21 @@ func (pl *Plugin) FilterReservation(ctx context.Context, cycleState *framework.C
}

if rInfo.IsAllocateOnce() && len(rInfo.AssignedPods) > 0 {
return framework.AsStatus(fmt.Errorf("reservation has allocateOnce enabled and has already been allocated"))
return framework.NewStatus(framework.Unschedulable, "reservation has allocateOnce enabled and has already been allocated")
}

podRequests, _ := resourceapi.PodRequestsAndLimits(pod)
resourceNames := quotav1.Intersection(rInfo.ResourceNames, quotav1.ResourceNames(podRequests))
if len(resourceNames) == 0 {
return framework.AsStatus(fmt.Errorf("no intersection resources"))
return framework.NewStatus(framework.Unschedulable, "no intersection resources")
}

remainedResource := quotav1.SubtractWithNonNegativeResult(rInfo.Allocatable, rInfo.Allocated)
if quotav1.IsZero(quotav1.Mask(remainedResource, resourceNames)) {
return framework.AsStatus(fmt.Errorf("insufficient resources in reservation"))
nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
if err != nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "missing node")
}
return nil

return pl.filterWithReservations(ctx, cycleState, pod, nodeInfo, []*frameworkext.ReservationInfo{rInfo})
}

func (pl *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
Expand Down
197 changes: 108 additions & 89 deletions pkg/scheduler/plugins/reservation/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,101 @@ func TestFilter(t *testing.T) {
}
}

func TestFilterWithPreemption(t *testing.T) {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("32"),
corev1.ResourceMemory: resource.MustParse("32Gi"),
corev1.ResourcePods: resource.MustParse("100"),
},
},
}
tests := []struct {
name string
stateData *stateData
wantStatus *framework.Status
}{
{
name: "successfully filter non-reservations with preemption",
stateData: &stateData{
podRequestsResources: &framework.Resource{
MilliCPU: 4 * 1000,
},
preemptible: map[string]corev1.ResourceList{
node.Name: {
corev1.ResourceCPU: resource.MustParse("4"),
},
},
nodeReservationStates: map[string]nodeReservationState{
node.Name: {
podRequested: &framework.Resource{
MilliCPU: 32 * 1000,
},
},
},
},
wantStatus: nil,
},
{
name: "failed to filter non-reservations with preemption",
stateData: &stateData{
podRequestsResources: &framework.Resource{
MilliCPU: 4 * 1000,
},
preemptible: map[string]corev1.ResourceList{
node.Name: {
corev1.ResourceCPU: resource.MustParse("2"),
},
},
nodeReservationStates: map[string]nodeReservationState{
node.Name: {
podRequested: &framework.Resource{
MilliCPU: 32 * 1000,
},
},
},
},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed),
},
{
name: "filter non-reservations with preemption but no preemptible resources",
stateData: &stateData{
podRequestsResources: &framework.Resource{
MilliCPU: 4 * 1000,
},
preemptible: map[string]corev1.ResourceList{},
nodeReservationStates: map[string]nodeReservationState{
node.Name: {
podRequested: &framework.Resource{
MilliCPU: 32 * 1000,
},
},
},
},
wantStatus: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pl := &Plugin{}
cycleState := framework.NewCycleState()
if tt.stateData.podRequestsResources == nil {
resources := framework.NewResource(tt.stateData.podRequests)
tt.stateData.podRequestsResources = resources
}
cycleState.Write(stateKey, tt.stateData)
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(node)
got := pl.Filter(context.TODO(), cycleState, &corev1.Pod{}, nodeInfo)
assert.Equal(t, tt.wantStatus, got)
})
}
}

func Test_filterWithReservations(t *testing.T) {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -722,89 +817,6 @@ func Test_filterWithReservations(t *testing.T) {
},
wantStatus: framework.NewStatus(framework.Unschedulable, ErrReasonReservationInsufficientResources),
},
{
name: "filter non-reservations with preemption",
stateData: &stateData{
podRequestsResources: &framework.Resource{
MilliCPU: 4 * 1000,
},
preemptible: map[string]corev1.ResourceList{
node.Name: {
corev1.ResourceCPU: resource.MustParse("4"),
},
},
nodeReservationStates: map[string]nodeReservationState{
node.Name: {
podRequested: &framework.Resource{
MilliCPU: 32 * 1000,
},
},
},
},
wantStatus: nil,
},
{
name: "filter non-reservations with preemption but no preemptible resources",
stateData: &stateData{
podRequestsResources: &framework.Resource{
MilliCPU: 4 * 1000,
},
preemptible: map[string]corev1.ResourceList{},
nodeReservationStates: map[string]nodeReservationState{
node.Name: {
podRequested: &framework.Resource{
MilliCPU: 32 * 1000,
},
},
},
},
wantStatus: nil,
},
{
name: "filter non-reservations with preemption but no preemptible resources and have preemptibleInRR",
stateData: &stateData{
podRequestsResources: &framework.Resource{
MilliCPU: 4 * 1000,
},
preemptible: map[string]corev1.ResourceList{},
preemptibleInRRs: map[string]map[types.UID]corev1.ResourceList{
node.Name: {
"123456": {
corev1.ResourceCPU: resource.MustParse("4"),
},
},
},
nodeReservationStates: map[string]nodeReservationState{
node.Name: {
podRequested: &framework.Resource{
MilliCPU: 32 * 1000,
},
},
},
},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed),
},
{
name: "filter non-reservations with preemption",
stateData: &stateData{
podRequestsResources: &framework.Resource{
MilliCPU: 4 * 1000,
},
preemptible: map[string]corev1.ResourceList{
node.Name: {
corev1.ResourceCPU: resource.MustParse("2"),
},
},
nodeReservationStates: map[string]nodeReservationState{
node.Name: {
podRequested: &framework.Resource{
MilliCPU: 32 * 1000,
},
},
},
},
wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPreemptionFailed),
},
{
name: "filter default reservations with preemption",
stateData: &stateData{
Expand Down Expand Up @@ -1213,7 +1225,7 @@ func Test_filterWithReservations(t *testing.T) {
cycleState.Write(stateKey, tt.stateData)
nodeInfo := framework.NewNodeInfo()
nodeInfo.SetNode(node)
got := pl.filterWithReservations(context.TODO(), cycleState, &corev1.Pod{}, nodeInfo)
got := pl.filterWithReservations(context.TODO(), cycleState, &corev1.Pod{}, nodeInfo, tt.stateData.nodeReservationStates[node.Name].matched)
assert.Equal(t, tt.wantStatus, got)
})
}
Expand Down Expand Up @@ -1624,7 +1636,7 @@ func TestFilterReservation(t *testing.T) {
reservation4C8G,
},
targetReservation: reservation2C4G,
wantStatus: framework.AsStatus(fmt.Errorf("no intersection resources")),
wantStatus: framework.NewStatus(framework.Unschedulable, "no intersection resources"),
},
{
name: "failed with allocateOnce and allocated reservation",
Expand All @@ -1636,7 +1648,7 @@ func TestFilterReservation(t *testing.T) {
allocateOnceAndAllocatedReservation,
},
targetReservation: allocateOnceAndAllocatedReservation,
wantStatus: framework.AsStatus(fmt.Errorf("reservation has allocateOnce enabled and has already been allocated")),
wantStatus: framework.NewStatus(framework.Unschedulable, "reservation has allocateOnce enabled and has already been allocated"),
},
{
name: "missing reservation info but impossible",
Expand All @@ -1649,12 +1661,18 @@ func TestFilterReservation(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
suit := newPluginTestSuit(t)
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
},
Status: corev1.NodeStatus{},
}

suit := newPluginTestSuitWith(t, nil, []*corev1.Node{node})
p, err := suit.pluginFactory()
assert.NoError(t, err)
assert.NotNil(t, p)
pl := p.(*Plugin)
cycleState := framework.NewCycleState()
pod := &corev1.Pod{
Spec: corev1.PodSpec{
Containers: []corev1.Container{
Expand All @@ -1681,10 +1699,11 @@ func TestFilterReservation(t *testing.T) {
nodeRState.matched = append(nodeRState.matched, rInfo)
state.nodeReservationStates[v.Status.NodeName] = nodeRState
}
cycleState := framework.NewCycleState()
cycleState.Write(stateKey, state)

rInfo := frameworkext.NewReservationInfo(tt.targetReservation)
status := pl.FilterReservation(context.TODO(), cycleState, pod, rInfo, "test-node")
status := pl.FilterReservation(context.TODO(), cycleState, pod, rInfo, node.Name)
assert.Equal(t, tt.wantStatus, status)
})
}
Expand Down
Loading

0 comments on commit 253c4d4

Please sign in to comment.