Skip to content

Commit

Permalink
scheduler: extend reservation nominator to support reservation preemp…
Browse files Browse the repository at this point in the history
…tion

Signed-off-by: xulinfei.xlf <xulinfei.xlf@alibaba-inc.com>
  • Loading branch information
xulinfei.xlf committed Mar 4, 2024
1 parent 2d34fc6 commit f07c4a5
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func MakeReservationErrorHandler(
return false
}

if _, ok := schedulingErr.(*framework.FitError); !ok || !fwk.HasPostFilterPlugins() {
if extendedHandle, ok := fwk.(frameworkext.ExtendedHandle); ok {
extendedHandle.GetReservationNominator().DeleteNominatedReservationIfExists(pod)
}
}

reservationErrorFn(podInfo, schedulingErr)

rName := reservationutil.GetReservationNameFromReservePod(pod)
Expand Down
5 changes: 4 additions & 1 deletion pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (ext *frameworkExtenderImpl) RunFilterPluginsWithNominatedPods(ctx context.
if transformer == nil {
continue
}
newPod, newNodeInfo, transformed, status := transformer.BeforeFilter(ctx, cycleState, pod, nodeInfo)
newPod, newNodeInfo, newCycleState, transformed, status := transformer.BeforeFilter(ctx, cycleState, pod, nodeInfo)
if !status.IsSuccess() {
klog.ErrorS(status.AsError(), "Failed to run BeforeFilter", "pod", klog.KObj(pod), "plugin", transformer.Name())
return status
Expand All @@ -216,6 +216,7 @@ func (ext *frameworkExtenderImpl) RunFilterPluginsWithNominatedPods(ctx context.
klog.V(5).InfoS("BeforeFilter transformed", "transformer", transformer.Name(), "pod", klog.KObj(pod))
pod = newPod
nodeInfo = newNodeInfo
cycleState = newCycleState
}
}
status := ext.Framework.RunFilterPluginsWithNominatedPods(ctx, cycleState, pod, nodeInfo)
Expand All @@ -229,6 +230,7 @@ func (ext *frameworkExtenderImpl) RunPostFilterPlugins(ctx context.Context, stat
result, status := ext.Framework.RunPostFilterPlugins(ctx, state, pod, filteredNodeStatusMap)
if result == nil || result.NominatingInfo.NominatedNodeName == "" {
ext.GetReservationNominator().RemoveNominatedReservations(pod)
ext.GetReservationNominator().DeleteNominatedReservationIfExists(pod)
}
return result, status
}
Expand Down Expand Up @@ -461,6 +463,7 @@ func (ext *frameworkExtenderImpl) RunReservePluginsReserve(ctx context.Context,
}
status := ext.Framework.RunReservePluginsReserve(ctx, cycleState, pod, nodeName)
ext.GetReservationNominator().RemoveNominatedReservations(pod)
ext.GetReservationNominator().DeleteNominatedReservationIfExists(pod)
return status
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/frameworkext/framework_extender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,12 @@ func (h *TestTransformer) AfterPreFilter(ctx context.Context, cycleState *framew
return nil
}

func (h *TestTransformer) BeforeFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) (*corev1.Pod, *framework.NodeInfo, bool, *framework.Status) {
func (h *TestTransformer) BeforeFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) (*corev1.Pod, *framework.NodeInfo, *framework.CycleState, bool, *framework.Status) {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[fmt.Sprintf("BeforeFilter-%d", h.index)] = fmt.Sprintf("%d", h.index)
return pod, nodeInfo, true, nil
return pod, nodeInfo, cycleState, true, nil
}

func (h *TestTransformer) Filter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/frameworkext/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type PreFilterTransformer interface {
// FilterTransformer is executed before Filter.
type FilterTransformer interface {
SchedulingTransformer
BeforeFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) (*corev1.Pod, *framework.NodeInfo, bool, *framework.Status)
BeforeFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) (*corev1.Pod, *framework.NodeInfo, *framework.CycleState, bool, *framework.Status)
}

// ScoreTransformer is executed before Score.
Expand Down Expand Up @@ -132,6 +132,8 @@ type ReservationNominator interface {
AddNominatedReservation(pod *corev1.Pod, nodeName string, rInfo *ReservationInfo)
RemoveNominatedReservations(pod *corev1.Pod)
GetNominatedReservation(pod *corev1.Pod, nodeName string) *ReservationInfo
AddNominatedReservationToNode(reservePod *corev1.Pod, nodeName string)
DeleteNominatedReservationIfExists(reservePod *corev1.Pod)
}

const (
Expand Down
48 changes: 47 additions & 1 deletion pkg/scheduler/frameworkext/testing/fake_reservation_nominator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
Expand All @@ -34,7 +35,10 @@ type FakeNominator struct {
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated.
nominatedPodToNode map[types.UID]map[string]types.UID
reservations map[types.UID]*frameworkext.ReservationInfo
lock sync.RWMutex
// nominatedReservation is map keyed by nodeName, value is the nominated reservations
nominatedReservation map[string][]*framework.PodInfo
nominatedReservationToNode map[types.UID]string
lock sync.RWMutex
}

func NewFakeReservationNominator() *FakeNominator {
Expand Down Expand Up @@ -87,3 +91,45 @@ func (nm *FakeNominator) NominateReservation(ctx context.Context, cycleState *fr
rInfo := nm.GetNominatedReservation(pod, nodeName)
return rInfo, nil
}

func (nm *FakeNominator) AddNominatedReservationToNode(rInfo *corev1.Pod, nodeName string) {
nm.lock.Lock()
defer nm.lock.Unlock()

// Always delete the reservation if it already exists, to ensure we never store more than
// one instance of the reservation.
nm.deleteReservation(rInfo)

nm.nominatedReservationToNode[rInfo.UID] = nodeName
for _, npi := range nm.nominatedReservation[nodeName] {
if npi.Pod.UID == rInfo.UID {
klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod))
return
}
}
nm.nominatedReservation[nodeName] = append(nm.nominatedReservation[nodeName], framework.NewPodInfo(rInfo))
}

func (nm *FakeNominator) DeleteNominatedReservationIfExists(rInfo *corev1.Pod) {
nm.lock.Lock()
defer nm.lock.Unlock()

nm.deleteReservation(rInfo)
}

func (nm *FakeNominator) deleteReservation(rInfo *corev1.Pod) {
nnn, ok := nm.nominatedReservationToNode[rInfo.UID]
if !ok {
return
}
for i, np := range nm.nominatedReservation[nnn] {
if np.Pod.UID == rInfo.UID {
nm.nominatedReservation[nnn] = append(nm.nominatedReservation[nnn][:i], nm.nominatedReservation[nnn][i+1:]...)
if len(nm.nominatedReservation[nnn]) == 0 {
delete(nm.nominatedReservation, nnn)
}
break
}
}
delete(nm.nominatedReservationToNode, rInfo.UID)
}
112 changes: 110 additions & 2 deletions pkg/scheduler/plugins/reservation/nominator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
schedulingcorev1 "k8s.io/component-helpers/scheduling/corev1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

Expand All @@ -34,12 +35,17 @@ import (
type nominator struct {
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated.
nominatedPodToNode map[types.UID]map[string]types.UID
lock sync.RWMutex
// nominatedReservation is map keyed by nodeName, value is the nominated reservation's PodInfo
nominatedReservation map[string][]*framework.PodInfo
nominatedReservationToNode map[types.UID]string
lock sync.RWMutex
}

func newNominator() *nominator {
return &nominator{
nominatedPodToNode: map[types.UID]map[string]types.UID{},
nominatedPodToNode: map[types.UID]map[string]types.UID{},
nominatedReservation: map[string][]*framework.PodInfo{},
nominatedReservationToNode: map[types.UID]string{},
}
}

Expand All @@ -58,6 +64,59 @@ func (nm *nominator) AddNominatedReservation(pod *corev1.Pod, nodeName string, r
nodeToReservation[nodeName] = rInfo.UID()
}

func (nm *nominator) AddNominatedReservationToNode(pi *framework.PodInfo, nodeName string) {
nm.lock.Lock()
defer nm.lock.Unlock()

// Always delete the reservation if it already exists, to ensure we never store more than
// one instance of the reservation.
nm.deleteReservation(pi)

nm.nominatedReservationToNode[pi.Pod.UID] = nodeName
for _, npi := range nm.nominatedReservation[nodeName] {
if npi.Pod.UID == pi.Pod.UID {
klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod))
return
}
}
nm.nominatedReservation[nodeName] = append(nm.nominatedReservation[nodeName], pi)
}

func (nm *nominator) NominatedReservationForNode(nodeName string) []*framework.PodInfo {
nm.lock.RLock()
defer nm.lock.RUnlock()
// Make a copy of the nominated Pods so the caller can mutate safely.
reservationInfos := make([]*framework.PodInfo, len(nm.nominatedReservation[nodeName]))
for i := 0; i < len(reservationInfos); i++ {
reservationInfos[i] = nm.nominatedReservation[nodeName][i].DeepCopy()
}
return reservationInfos
}

func (nm *nominator) DeleteReservation(pi *framework.PodInfo) {
nm.lock.Lock()
defer nm.lock.Unlock()

nm.deleteReservation(pi)
}

func (nm *nominator) deleteReservation(pi *framework.PodInfo) {
nnn, ok := nm.nominatedReservationToNode[pi.Pod.UID]
if !ok {
return
}
for i, np := range nm.nominatedReservation[nnn] {
if np.Pod.UID == pi.Pod.UID {
nm.nominatedReservation[nnn] = append(nm.nominatedReservation[nnn][:i], nm.nominatedReservation[nnn][i+1:]...)
if len(nm.nominatedReservation[nnn]) == 0 {
delete(nm.nominatedReservation, nnn)
}
break
}
}
delete(nm.nominatedReservationToNode, pi.Pod.UID)
}

func (nm *nominator) RemoveNominatedReservation(pod *corev1.Pod) {
nm.lock.Lock()
defer nm.lock.Unlock()
Expand Down Expand Up @@ -141,6 +200,14 @@ func (pl *Plugin) RemoveNominatedReservations(pod *corev1.Pod) {
pl.nominator.RemoveNominatedReservation(pod)
}

func (pl *Plugin) AddNominatedReservationToNode(pod *corev1.Pod, nodeName string) {
pl.nominator.AddNominatedReservationToNode(framework.NewPodInfo(pod), nodeName)
}

func (pl *Plugin) DeleteNominatedReservationIfExists(pod *corev1.Pod) {
pl.nominator.DeleteReservation(framework.NewPodInfo(pod))
}

func (pl *Plugin) GetNominatedReservation(pod *corev1.Pod, nodeName string) *frameworkext.ReservationInfo {
reservationID := pl.nominator.GetNominatedReservation(pod, nodeName)
if reservationID == "" {
Expand All @@ -149,6 +216,47 @@ func (pl *Plugin) GetNominatedReservation(pod *corev1.Pod, nodeName string) *fra
return pl.reservationCache.getReservationInfoByUID(reservationID)
}

func (pl *Plugin) BeforeFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) (*corev1.Pod, *framework.NodeInfo, *framework.CycleState, bool, *framework.Status) {
if !reservationutil.IsReservePod(pod) {
return pod, nodeInfo, cycleState, false, nil
}

nominatedReservationInfos := pl.nominator.NominatedReservationForNode(nodeInfo.Node().Name)
if len(nominatedReservationInfos) == 0 {
return pod, nodeInfo, cycleState, false, nil
}

if nodeInfo.Node() == nil {
// This may happen only in tests.
return pod, nodeInfo, cycleState, false, nil
}

nodeInfoOut := nodeInfo.Clone()
stateOut := cycleState.Clone()

rName := reservationutil.GetReservationNameFromReservePod(pod)
_, err := pl.rLister.Get(rName)
if err != nil {
return pod, nodeInfo, cycleState, false, framework.NewStatus(framework.Error, "reservation not found")
}

for _, rInfo := range nominatedReservationInfos {
if schedulingcorev1.PodPriority(rInfo.Pod) >= schedulingcorev1.PodPriority(pod) && rInfo.Pod.UID != pod.UID {
pInfo := framework.NewPodInfo(rInfo.Pod)
nodeInfoOut.AddPodInfo(pInfo)
status := pl.handle.RunPreFilterExtensionAddPod(ctx, stateOut, pod, pInfo, nodeInfoOut)
if !status.IsSuccess() {
return pod, nodeInfo, cycleState, false, status
}
klog.V(4).Infof("toschedule reservation: %s, addReservation: %s",
reservationutil.GetReservationNameFromReservePod(pod),
reservationutil.GetReservationNameFromReservePod(rInfo.Pod))
}
}

return pod, nodeInfoOut, stateOut, true, nil
}

func prioritizeReservations(
ctx context.Context,
fwk frameworkext.FrameworkExtender,
Expand Down
55 changes: 55 additions & 0 deletions pkg/scheduler/plugins/reservation/nominator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,58 @@ func TestMultiReservationsOnSameNode(t *testing.T) {
assert.Equal(t, 1, v)
}
}

func TestReservationsNominator(t *testing.T) {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: corev1.NodeStatus{
Allocatable: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("96"),
corev1.ResourceMemory: resource.MustParse("1886495404Ki"),
},
},
}

resourceList := corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("16"),
corev1.ResourceMemory: resource.MustParse("32Gi"),
}
labels := map[string]string{
"foo": "bar",
}
suit := newPluginTestSuitWith(t, nil, []*corev1.Node{node})
var pods []*corev1.Pod
for i := 0; i < 3; i++ {
r := newTestReservation(t, fmt.Sprintf("test-r-%d", i), labels, labels, node.Name, resourceList)
pods = append(pods, reservationutil.NewReservePod(r))
_, err := suit.extenderFactory.KoordinatorClientSet().SchedulingV1alpha1().Reservations().Create(context.TODO(), r, metav1.CreateOptions{})
assert.NoError(t, err)
}
nodeInfo, err := suit.fw.SnapshotSharedLister().NodeInfos().Get(node.Name)
assert.NoError(t, err)
assert.Equal(t, 0, len(nodeInfo.Pods))

p, err := suit.pluginFactory()
assert.NoError(t, err)
pl := p.(*Plugin)

nominatorImpl := pl.handle.(frameworkext.FrameworkExtender).GetReservationNominator()

nominatorImpl.AddNominatedReservationToNode(pods[0], "node-1")
ctx := context.TODO()
state := framework.NewCycleState()
pod, nodeInfoOut, _, update, status := pl.BeforeFilter(ctx, state, pods[2], nodeInfo)
assert.Equal(t, pod, pods[2])
assert.True(t, update)
assert.True(t, status.IsSuccess())
assert.Equal(t, 1, len(nodeInfoOut.Pods))

nominatorImpl.AddNominatedReservationToNode(pods[1], "node-1")
pod, nodeInfoOut, _, update, status = pl.BeforeFilter(ctx, state, pods[2], nodeInfo)
assert.Equal(t, pod, pods[2])
assert.True(t, update)
assert.True(t, status.IsSuccess())
assert.Equal(t, 2, len(nodeInfoOut.Pods))
}
3 changes: 3 additions & 0 deletions pkg/scheduler/plugins/reservation/pod_eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
frameworkexthelper "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper"
Expand Down Expand Up @@ -95,6 +96,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) {
}

h.nominator.RemoveNominatedReservation(newPod)
h.nominator.DeleteReservation(framework.NewPodInfo(newPod))

var reservationUID types.UID
if oldPod != nil {
Expand Down Expand Up @@ -128,6 +130,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) {

func (h *podEventHandler) deletePod(pod *corev1.Pod) {
h.nominator.RemoveNominatedReservation(pod)
h.nominator.DeleteReservation(framework.NewPodInfo(pod))

reservationAllocated, err := apiext.GetReservationAllocated(pod)
if err == nil && reservationAllocated != nil && reservationAllocated.UID != "" {
Expand Down
Loading

0 comments on commit f07c4a5

Please sign in to comment.