Skip to content

Commit

Permalink
scheduler: extend reservation nominator to support reservation preemp…
Browse files Browse the repository at this point in the history
…tion

Signed-off-by: xulinfei.xlf <xulinfei.xlf@alibaba-inc.com>
  • Loading branch information
xulinfei.xlf committed Mar 5, 2024
1 parent 2d34fc6 commit 49dcbe3
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 8 deletions.
16 changes: 11 additions & 5 deletions pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ func MakeReservationErrorHandler(
}

// if the pod is not a reserve pod, use the default error handler
if !reservationutil.IsReservePod(pod) {
// If the Pod failed to schedule or no post-filter plugins, should remove exist NominatedReservation of the Pod.
if _, ok := schedulingErr.(*framework.FitError); !ok || !fwk.HasPostFilterPlugins() {
extendedHandle := fwk.(frameworkext.ExtendedHandle)
extendedHandle.GetReservationNominator().RemoveNominatedReservations(pod)
// If the Pod failed to schedule or no post-filter plugins, should remove exist NominatedReservation of the Pod.
if _, ok := schedulingErr.(*framework.FitError); !ok || !fwk.HasPostFilterPlugins() {
if extendedHandle, ok := fwk.(frameworkext.ExtendedHandle); ok {
if !reservationutil.IsReservePod(pod) {
extendedHandle.GetReservationNominator().RemoveNominatedReservations(pod)
} else {
extendedHandle.GetReservationNominator().DeleteNominatedReservePod(pod)
}
}
}

if !reservationutil.IsReservePod(pod) {
return false
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (ext *frameworkExtenderImpl) RunPostFilterPlugins(ctx context.Context, stat
result, status := ext.Framework.RunPostFilterPlugins(ctx, state, pod, filteredNodeStatusMap)
if result == nil || result.NominatingInfo.NominatedNodeName == "" {
ext.GetReservationNominator().RemoveNominatedReservations(pod)
ext.GetReservationNominator().DeleteNominatedReservePod(pod)
}
return result, status
}
Expand Down Expand Up @@ -461,6 +462,7 @@ func (ext *frameworkExtenderImpl) RunReservePluginsReserve(ctx context.Context,
}
status := ext.Framework.RunReservePluginsReserve(ctx, cycleState, pod, nodeName)
ext.GetReservationNominator().RemoveNominatedReservations(pod)
ext.GetReservationNominator().DeleteNominatedReservePod(pod)
return status
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/frameworkext/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ type ReservationNominator interface {
AddNominatedReservation(pod *corev1.Pod, nodeName string, rInfo *ReservationInfo)
RemoveNominatedReservations(pod *corev1.Pod)
GetNominatedReservation(pod *corev1.Pod, nodeName string) *ReservationInfo
AddNominatedReservePod(reservePod *corev1.Pod, nodeName string)
DeleteNominatedReservePod(reservePod *corev1.Pod)
}

const (
Expand Down
48 changes: 47 additions & 1 deletion pkg/scheduler/frameworkext/testing/fake_reservation_nominator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
Expand All @@ -34,7 +35,10 @@ type FakeNominator struct {
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated.
nominatedPodToNode map[types.UID]map[string]types.UID
reservations map[types.UID]*frameworkext.ReservationInfo
lock sync.RWMutex
// nominatedReservation is map keyed by nodeName, value is the nominated reservations
nominatedReservation map[string][]*framework.PodInfo
nominatedReservationToNode map[types.UID]string
lock sync.RWMutex
}

func NewFakeReservationNominator() *FakeNominator {
Expand Down Expand Up @@ -87,3 +91,45 @@ func (nm *FakeNominator) NominateReservation(ctx context.Context, cycleState *fr
rInfo := nm.GetNominatedReservation(pod, nodeName)
return rInfo, nil
}

func (nm *FakeNominator) AddNominatedReservePod(rInfo *corev1.Pod, nodeName string) {
nm.lock.Lock()
defer nm.lock.Unlock()

// Always delete the reservation if it already exists, to ensure we never store more than
// one instance of the reservation.
nm.deleteReservation(rInfo)

nm.nominatedReservationToNode[rInfo.UID] = nodeName
for _, npi := range nm.nominatedReservation[nodeName] {
if npi.Pod.UID == rInfo.UID {
klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod))
return
}
}
nm.nominatedReservation[nodeName] = append(nm.nominatedReservation[nodeName], framework.NewPodInfo(rInfo))
}

func (nm *FakeNominator) DeleteNominatedReservePod(rInfo *corev1.Pod) {
nm.lock.Lock()
defer nm.lock.Unlock()

nm.deleteReservation(rInfo)
}

func (nm *FakeNominator) deleteReservation(rInfo *corev1.Pod) {
nnn, ok := nm.nominatedReservationToNode[rInfo.UID]
if !ok {
return
}
for i, np := range nm.nominatedReservation[nnn] {
if np.Pod.UID == rInfo.UID {
nm.nominatedReservation[nnn] = append(nm.nominatedReservation[nnn][:i], nm.nominatedReservation[nnn][i+1:]...)
if len(nm.nominatedReservation[nnn]) == 0 {
delete(nm.nominatedReservation, nnn)
}
break
}
}
delete(nm.nominatedReservationToNode, rInfo.UID)
}
111 changes: 109 additions & 2 deletions pkg/scheduler/plugins/reservation/nominator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
schedulingcorev1 "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

Expand All @@ -34,12 +35,17 @@ import (
type nominator struct {
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated.
nominatedPodToNode map[types.UID]map[string]types.UID
lock sync.RWMutex
// nominatedReservePod is map keyed by nodeName, value is the nominated reservation's PodInfo
nominatedReservePod map[string][]*framework.PodInfo
nominatedReservePodToNode map[types.UID]string
lock sync.RWMutex
}

func newNominator() *nominator {
return &nominator{
nominatedPodToNode: map[types.UID]map[string]types.UID{},
nominatedPodToNode: map[types.UID]map[string]types.UID{},
nominatedReservePod: map[string][]*framework.PodInfo{},
nominatedReservePodToNode: map[types.UID]string{},
}
}

Expand All @@ -58,6 +64,59 @@ func (nm *nominator) AddNominatedReservation(pod *corev1.Pod, nodeName string, r
nodeToReservation[nodeName] = rInfo.UID()
}

func (nm *nominator) AddNominatedReservePod(pi *framework.PodInfo, nodeName string) {
nm.lock.Lock()
defer nm.lock.Unlock()

// Always delete the reservation if it already exists, to ensure we never store more than
// one instance of the reservation.
nm.deleteReservePod(pi)

nm.nominatedReservePodToNode[pi.Pod.UID] = nodeName
for _, npi := range nm.nominatedReservePod[nodeName] {
if npi.Pod.UID == pi.Pod.UID {
klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod))
return
}
}
nm.nominatedReservePod[nodeName] = append(nm.nominatedReservePod[nodeName], pi)
}

func (nm *nominator) NominatedReservePodForNode(nodeName string) []*framework.PodInfo {
nm.lock.RLock()
defer nm.lock.RUnlock()
// Make a copy of the nominated Pods so the caller can mutate safely.
reservePods := make([]*framework.PodInfo, len(nm.nominatedReservePod[nodeName]))
for i := 0; i < len(reservePods); i++ {
reservePods[i] = nm.nominatedReservePod[nodeName][i].DeepCopy()
}
return reservePods
}

func (nm *nominator) DeleteReservePod(pi *framework.PodInfo) {
nm.lock.Lock()
defer nm.lock.Unlock()

nm.deleteReservePod(pi)
}

func (nm *nominator) deleteReservePod(pi *framework.PodInfo) {
nnn, ok := nm.nominatedReservePodToNode[pi.Pod.UID]
if !ok {
return
}
for i, np := range nm.nominatedReservePod[nnn] {
if np.Pod.UID == pi.Pod.UID {
nm.nominatedReservePod[nnn] = append(nm.nominatedReservePod[nnn][:i], nm.nominatedReservePod[nnn][i+1:]...)
if len(nm.nominatedReservePod[nnn]) == 0 {
delete(nm.nominatedReservePod, nnn)
}
break
}
}
delete(nm.nominatedReservePodToNode, pi.Pod.UID)
}

func (nm *nominator) RemoveNominatedReservation(pod *corev1.Pod) {
nm.lock.Lock()
defer nm.lock.Unlock()
Expand Down Expand Up @@ -141,6 +200,14 @@ func (pl *Plugin) RemoveNominatedReservations(pod *corev1.Pod) {
pl.nominator.RemoveNominatedReservation(pod)
}

func (pl *Plugin) AddNominatedReservePod(pod *corev1.Pod, nodeName string) {
pl.nominator.AddNominatedReservePod(framework.NewPodInfo(pod), nodeName)
}

func (pl *Plugin) DeleteNominatedReservePod(pod *corev1.Pod) {
pl.nominator.DeleteReservePod(framework.NewPodInfo(pod))
}

func (pl *Plugin) GetNominatedReservation(pod *corev1.Pod, nodeName string) *frameworkext.ReservationInfo {
reservationID := pl.nominator.GetNominatedReservation(pod, nodeName)
if reservationID == "" {
Expand All @@ -149,6 +216,46 @@ func (pl *Plugin) GetNominatedReservation(pod *corev1.Pod, nodeName string) *fra
return pl.reservationCache.getReservationInfoByUID(reservationID)
}

func (pl *Plugin) BeforeFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) (*corev1.Pod, *framework.NodeInfo, bool, *framework.Status) {
if !reservationutil.IsReservePod(pod) {
return pod, nodeInfo, false, nil
}

nominatedReservationInfos := pl.nominator.NominatedReservePodForNode(nodeInfo.Node().Name)
if len(nominatedReservationInfos) == 0 {
return pod, nodeInfo, false, nil
}

if nodeInfo.Node() == nil {
// This may happen only in tests.
return pod, nodeInfo, false, nil
}

nodeInfoOut := nodeInfo.Clone()

rName := reservationutil.GetReservationNameFromReservePod(pod)
_, err := pl.rLister.Get(rName)
if err != nil {
return pod, nodeInfo, false, framework.NewStatus(framework.Error, "reservation not found")
}

for _, rInfo := range nominatedReservationInfos {
if schedulingcorev1.PodPriority(rInfo.Pod) >= schedulingcorev1.PodPriority(pod) && rInfo.Pod.UID != pod.UID {
pInfo := framework.NewPodInfo(rInfo.Pod)
nodeInfoOut.AddPodInfo(pInfo)
status := pl.handle.RunPreFilterExtensionAddPod(ctx, cycleState, pod, pInfo, nodeInfoOut)
if !status.IsSuccess() {
return pod, nodeInfo, false, status
}
klog.V(4).Infof("toschedule reservation: %s, added reservation: %s",
reservationutil.GetReservationNameFromReservePod(pod),
reservationutil.GetReservationNameFromReservePod(rInfo.Pod))
}
}

return pod, nodeInfoOut, true, nil
}

func prioritizeReservations(
ctx context.Context,
fwk frameworkext.FrameworkExtender,
Expand Down
55 changes: 55 additions & 0 deletions pkg/scheduler/plugins/reservation/nominator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,58 @@ func TestMultiReservationsOnSameNode(t *testing.T) {
assert.Equal(t, 1, v)
}
}

func TestReservationsNominator(t *testing.T) {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: corev1.NodeStatus{
Allocatable: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("96"),
corev1.ResourceMemory: resource.MustParse("1886495404Ki"),
},
},
}

resourceList := corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("16"),
corev1.ResourceMemory: resource.MustParse("32Gi"),
}
labels := map[string]string{
"foo": "bar",
}
suit := newPluginTestSuitWith(t, nil, []*corev1.Node{node})
var pods []*corev1.Pod
for i := 0; i < 3; i++ {
r := newTestReservation(t, fmt.Sprintf("test-r-%d", i), labels, labels, node.Name, resourceList)
pods = append(pods, reservationutil.NewReservePod(r))
_, err := suit.extenderFactory.KoordinatorClientSet().SchedulingV1alpha1().Reservations().Create(context.TODO(), r, metav1.CreateOptions{})
assert.NoError(t, err)
}
nodeInfo, err := suit.fw.SnapshotSharedLister().NodeInfos().Get(node.Name)
assert.NoError(t, err)
assert.Equal(t, 0, len(nodeInfo.Pods))

p, err := suit.pluginFactory()
assert.NoError(t, err)
pl := p.(*Plugin)

nominatorImpl := pl.handle.(frameworkext.FrameworkExtender).GetReservationNominator()

nominatorImpl.AddNominatedReservePod(pods[0], "node-1")
ctx := context.TODO()
state := framework.NewCycleState()
pod, nodeInfoOut, update, status := pl.BeforeFilter(ctx, state, pods[2], nodeInfo)
assert.Equal(t, pod, pods[2])
assert.True(t, update)
assert.True(t, status.IsSuccess())
assert.Equal(t, 1, len(nodeInfoOut.Pods))

nominatorImpl.AddNominatedReservePod(pods[1], "node-1")
pod, nodeInfoOut, update, status = pl.BeforeFilter(ctx, state, pods[2], nodeInfo)
assert.Equal(t, pod, pods[2])
assert.True(t, update)
assert.True(t, status.IsSuccess())
assert.Equal(t, 2, len(nodeInfoOut.Pods))
}
3 changes: 3 additions & 0 deletions pkg/scheduler/plugins/reservation/pod_eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
frameworkexthelper "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper"
Expand Down Expand Up @@ -95,6 +96,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) {
}

h.nominator.RemoveNominatedReservation(newPod)
h.nominator.DeleteReservePod(framework.NewPodInfo(newPod))

var reservationUID types.UID
if oldPod != nil {
Expand Down Expand Up @@ -128,6 +130,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) {

func (h *podEventHandler) deletePod(pod *corev1.Pod) {
h.nominator.RemoveNominatedReservation(pod)
h.nominator.DeleteReservePod(framework.NewPodInfo(pod))

reservationAllocated, err := apiext.GetReservationAllocated(pod)
if err == nil && reservationAllocated != nil && reservationAllocated.UID != "" {
Expand Down
Loading

0 comments on commit 49dcbe3

Please sign in to comment.