Skip to content

Commit

Permalink
scheduler: extend reservation nominator to support reservation preemp…
Browse files Browse the repository at this point in the history
…tion

Signed-off-by: xulinfei.xlf <xulinfei.xlf@alibaba-inc.com>
  • Loading branch information
xulinfei.xlf committed Mar 4, 2024
1 parent 19de595 commit b0f68f6
Show file tree
Hide file tree
Showing 8 changed files with 276 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ func MakeReservationErrorHandler(
return false
}

if _, ok := schedulingErr.(*framework.FitError); !ok || !fwk.HasPostFilterPlugins() {
if extendedHandle, ok := fwk.(frameworkext.ExtendedHandle); ok {
extendedHandle.GetReservationNominator().DeleteNominatedReservationIfExists(pod)
}
}

reservationErrorFn(podInfo, schedulingErr)

rName := reservationutil.GetReservationNameFromReservePod(pod)
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (ext *frameworkExtenderImpl) RunPostFilterPlugins(ctx context.Context, stat
result, status := ext.Framework.RunPostFilterPlugins(ctx, state, pod, filteredNodeStatusMap)
if result == nil || result.NominatingInfo.NominatedNodeName == "" {
ext.GetReservationNominator().RemoveNominatedReservations(pod)
ext.GetReservationNominator().DeleteNominatedReservationIfExists(pod)
}
return result, status
}
Expand Down Expand Up @@ -461,6 +462,7 @@ func (ext *frameworkExtenderImpl) RunReservePluginsReserve(ctx context.Context,
}
status := ext.Framework.RunReservePluginsReserve(ctx, cycleState, pod, nodeName)
ext.GetReservationNominator().RemoveNominatedReservations(pod)
ext.GetReservationNominator().DeleteNominatedReservationIfExists(pod)
return status
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/frameworkext/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ type ReservationNominator interface {
AddNominatedReservation(pod *corev1.Pod, nodeName string, rInfo *ReservationInfo)
RemoveNominatedReservations(pod *corev1.Pod)
GetNominatedReservation(pod *corev1.Pod, nodeName string) *ReservationInfo
AddNominatedReservationToNode(reservePod *corev1.Pod, nodeName string)
DeleteNominatedReservationIfExists(reservePod *corev1.Pod)
}

const (
Expand Down
48 changes: 47 additions & 1 deletion pkg/scheduler/frameworkext/testing/fake_reservation_nominator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
Expand All @@ -34,7 +35,10 @@ type FakeNominator struct {
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated.
nominatedPodToNode map[types.UID]map[string]types.UID
reservations map[types.UID]*frameworkext.ReservationInfo
lock sync.RWMutex
// nominatedReservation is map keyed by nodeName, value is the nominated reservations
nominatedReservation map[string][]*framework.PodInfo
nominatedReservationToNode map[types.UID]string
lock sync.RWMutex
}

func NewFakeReservationNominator() *FakeNominator {
Expand Down Expand Up @@ -87,3 +91,45 @@ func (nm *FakeNominator) NominateReservation(ctx context.Context, cycleState *fr
rInfo := nm.GetNominatedReservation(pod, nodeName)
return rInfo, nil
}

func (nm *FakeNominator) AddNominatedReservationToNode(rInfo *corev1.Pod, nodeName string) {
nm.lock.Lock()
defer nm.lock.Unlock()

// Always delete the reservation if it already exists, to ensure we never store more than
// one instance of the reservation.
nm.deleteReservation(rInfo)

nm.nominatedReservationToNode[rInfo.UID] = nodeName
for _, npi := range nm.nominatedReservation[nodeName] {
if npi.Pod.UID == rInfo.UID {
klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod))
return
}
}
nm.nominatedReservation[nodeName] = append(nm.nominatedReservation[nodeName], framework.NewPodInfo(rInfo))
}

func (nm *FakeNominator) DeleteNominatedReservationIfExists(rInfo *corev1.Pod) {
nm.lock.Lock()
defer nm.lock.Unlock()

nm.deleteReservation(rInfo)
}

func (nm *FakeNominator) deleteReservation(rInfo *corev1.Pod) {
nnn, ok := nm.nominatedReservationToNode[rInfo.UID]
if !ok {
return
}
for i, np := range nm.nominatedReservation[nnn] {
if np.Pod.UID == rInfo.UID {
nm.nominatedReservation[nnn] = append(nm.nominatedReservation[nnn][:i], nm.nominatedReservation[nnn][i+1:]...)
if len(nm.nominatedReservation[nnn]) == 0 {
delete(nm.nominatedReservation, nnn)
}
break
}
}
delete(nm.nominatedReservationToNode, rInfo.UID)
}
113 changes: 111 additions & 2 deletions pkg/scheduler/plugins/reservation/nominator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
schedulingcorev1 "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

Expand All @@ -34,12 +35,17 @@ import (
type nominator struct {
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated.
nominatedPodToNode map[types.UID]map[string]types.UID
lock sync.RWMutex
// nominatedReservation is map keyed by nodeName, value is the nominated reservation's PodInfo
nominatedReservation map[string][]*framework.PodInfo
nominatedReservationToNode map[types.UID]string
lock sync.RWMutex
}

func newNominator() *nominator {
return &nominator{
nominatedPodToNode: map[types.UID]map[string]types.UID{},
nominatedPodToNode: map[types.UID]map[string]types.UID{},
nominatedReservation: map[string][]*framework.PodInfo{},
nominatedReservationToNode: map[types.UID]string{},
}
}

Expand All @@ -58,6 +64,59 @@ func (nm *nominator) AddNominatedReservation(pod *corev1.Pod, nodeName string, r
nodeToReservation[nodeName] = rInfo.UID()
}

func (nm *nominator) AddNominatedReservationToNode(pi *framework.PodInfo, nodeName string) {
nm.lock.Lock()
defer nm.lock.Unlock()

// Always delete the reservation if it already exists, to ensure we never store more than
// one instance of the reservation.
nm.deleteReservation(pi)

nm.nominatedReservationToNode[pi.Pod.UID] = nodeName
for _, npi := range nm.nominatedReservation[nodeName] {
if npi.Pod.UID == pi.Pod.UID {
klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod))
return
}
}
nm.nominatedReservation[nodeName] = append(nm.nominatedReservation[nodeName], pi)
}

func (nm *nominator) NominatedReservationForNode(nodeName string) []*framework.PodInfo {
nm.lock.RLock()
defer nm.lock.RUnlock()
// Make a copy of the nominated Pods so the caller can mutate safely.
reservationInfos := make([]*framework.PodInfo, len(nm.nominatedReservation[nodeName]))
for i := 0; i < len(reservationInfos); i++ {
reservationInfos[i] = nm.nominatedReservation[nodeName][i].DeepCopy()
}
return reservationInfos
}

func (nm *nominator) DeleteReservation(pi *framework.PodInfo) {
nm.lock.Lock()
defer nm.lock.Unlock()

nm.deleteReservation(pi)
}

func (nm *nominator) deleteReservation(pi *framework.PodInfo) {
nnn, ok := nm.nominatedReservationToNode[pi.Pod.UID]
if !ok {
return
}
for i, np := range nm.nominatedReservation[nnn] {
if np.Pod.UID == pi.Pod.UID {
nm.nominatedReservation[nnn] = append(nm.nominatedReservation[nnn][:i], nm.nominatedReservation[nnn][i+1:]...)
if len(nm.nominatedReservation[nnn]) == 0 {
delete(nm.nominatedReservation, nnn)
}
break
}
}
delete(nm.nominatedReservationToNode, pi.Pod.UID)
}

func (nm *nominator) RemoveNominatedReservation(pod *corev1.Pod) {
nm.lock.Lock()
defer nm.lock.Unlock()
Expand Down Expand Up @@ -141,6 +200,14 @@ func (pl *Plugin) RemoveNominatedReservations(pod *corev1.Pod) {
pl.nominator.RemoveNominatedReservation(pod)
}

func (pl *Plugin) AddNominatedReservationToNode(pod *corev1.Pod, nodeName string) {
pl.nominator.AddNominatedReservationToNode(framework.NewPodInfo(pod), nodeName)
}

func (pl *Plugin) DeleteNominatedReservationIfExists(pod *corev1.Pod) {
pl.nominator.DeleteReservation(framework.NewPodInfo(pod))
}

func (pl *Plugin) GetNominatedReservation(pod *corev1.Pod, nodeName string) *frameworkext.ReservationInfo {
reservationID := pl.nominator.GetNominatedReservation(pod, nodeName)
if reservationID == "" {
Expand All @@ -149,6 +216,48 @@ func (pl *Plugin) GetNominatedReservation(pod *corev1.Pod, nodeName string) *fra
return pl.reservationCache.getReservationInfoByUID(reservationID)
}

func (pl *Plugin) BeforeFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) (*corev1.Pod, *framework.NodeInfo, bool, *framework.Status) {
if !reservationutil.IsReservePod(pod) {
return pod, nodeInfo, false, nil
}

nominatedReservationInfos := pl.nominator.NominatedReservationForNode(nodeInfo.Node().Name)
if len(nominatedReservationInfos) == 0 {
return pod, nodeInfo, false, nil
}

if nodeInfo.Node() == nil {
// This may happen only in tests.
return pod, nodeInfo, false, nil
}

nodeInfoOut := nodeInfo.Clone()
stateOut := cycleState.Clone()

rName := reservationutil.GetReservationNameFromReservePod(pod)
_, err := pl.rLister.Get(rName)
if err != nil {
return pod, nodeInfo, false, framework.NewStatus(framework.Error, "reservation not found")
}

for _, rInfo := range nominatedReservationInfos {
if schedulingcorev1.PodPriority(rInfo.Pod) >= schedulingcorev1.PodPriority(pod) && rInfo.Pod.UID != pod.UID {
pInfo := framework.NewPodInfo(rInfo.Pod)
nodeInfoOut.AddPodInfo(pInfo)
status := pl.handle.RunPreFilterExtensionAddPod(ctx, stateOut, pod, pInfo, nodeInfoOut)
if !status.IsSuccess() {
return pod, nodeInfo, false, status
}
klog.V(4).Infof("toschedule reservation: %s, addReservation: %s",
reservationutil.GetReservationNameFromReservePod(pod),
reservationutil.GetReservationNameFromReservePod(rInfo.Pod))
}
}

cycleState = stateOut

Check failure on line 257 in pkg/scheduler/plugins/reservation/nominator.go

View workflow job for this annotation

GitHub Actions / golangci-lint

SA4006: this value of `cycleState` is never used (staticcheck)
return pod, nodeInfoOut, true, nil
}

func prioritizeReservations(
ctx context.Context,
fwk frameworkext.FrameworkExtender,
Expand Down
57 changes: 57 additions & 0 deletions pkg/scheduler/plugins/reservation/nominator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,60 @@ func TestMultiReservationsOnSameNode(t *testing.T) {
assert.Equal(t, 1, v)
}
}

func TestReservationsNominator(t *testing.T) {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: corev1.NodeStatus{
Allocatable: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("96"),
corev1.ResourceMemory: resource.MustParse("1886495404Ki"),
},
},
}

resourceList := corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("16"),
corev1.ResourceMemory: resource.MustParse("32Gi"),
}
labels := map[string]string{
"foo": "bar",
}
suit := newPluginTestSuitWith(t, nil, []*corev1.Node{node})
var reservations []*schedulingv1alpha1.Reservation
var pods []*corev1.Pod
for i := 0; i < 3; i++ {
r := newTestReservation(t, fmt.Sprintf("test-r-%d", i), labels, labels, node.Name, resourceList)
reservations = append(reservations, r)

Check failure on line 463 in pkg/scheduler/plugins/reservation/nominator_test.go

View workflow job for this annotation

GitHub Actions / golangci-lint

SA4010: this result of append is never used, except maybe in other appends (staticcheck)
pods = append(pods, reservationutil.NewReservePod(r))
_, err := suit.extenderFactory.KoordinatorClientSet().SchedulingV1alpha1().Reservations().Create(context.TODO(), r, metav1.CreateOptions{})
assert.NoError(t, err)
}
nodeInfo, err := suit.fw.SnapshotSharedLister().NodeInfos().Get(node.Name)
assert.NoError(t, err)
assert.Equal(t, 0, len(nodeInfo.Pods))

p, err := suit.pluginFactory()
assert.NoError(t, err)
pl := p.(*Plugin)

nominatorImpl := pl.handle.(frameworkext.FrameworkExtender).GetReservationNominator()

nominatorImpl.AddNominatedReservationToNode(pods[0], "node-1")
ctx := context.TODO()
state := framework.NewCycleState()
pod, nodeInfoOut, update, status := pl.BeforeFilter(ctx, state, pods[2], nodeInfo)
assert.Equal(t, pod, pods[2])
assert.True(t, update)
assert.True(t, status.IsSuccess())
assert.Equal(t, 1, len(nodeInfoOut.Pods))

nominatorImpl.AddNominatedReservationToNode(pods[1], "node-1")
pod, nodeInfoOut, update, status = pl.BeforeFilter(ctx, state, pods[2], nodeInfo)
assert.Equal(t, pod, pods[2])
assert.True(t, update)
assert.True(t, status.IsSuccess())
assert.Equal(t, 2, len(nodeInfoOut.Pods))
}
3 changes: 3 additions & 0 deletions pkg/scheduler/plugins/reservation/pod_eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
frameworkexthelper "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper"
Expand Down Expand Up @@ -95,6 +96,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) {
}

h.nominator.RemoveNominatedReservation(newPod)
h.nominator.DeleteReservation(framework.NewPodInfo(newPod))

var reservationUID types.UID
if oldPod != nil {
Expand Down Expand Up @@ -128,6 +130,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) {

func (h *podEventHandler) deletePod(pod *corev1.Pod) {
h.nominator.RemoveNominatedReservation(pod)
h.nominator.DeleteReservation(framework.NewPodInfo(pod))

reservationAllocated, err := apiext.GetReservationAllocated(pod)
if err == nil && reservationAllocated != nil && reservationAllocated.UID != "" {
Expand Down
Loading

0 comments on commit b0f68f6

Please sign in to comment.