Skip to content

Commit

Permalink
scheduler: extend reservation nominator to support reservation preemp…
Browse files Browse the repository at this point in the history
…tion (#1936)

Signed-off-by: xulinfei.xlf <xulinfei.xlf@alibaba-inc.com>
Co-authored-by: xulinfei.xlf <xulinfei.xlf@alibaba-inc.com>
  • Loading branch information
xulinfei1996 and xulinfei.xlf committed Mar 7, 2024
1 parent e0f2d59 commit 3581c3d
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 8 deletions.
16 changes: 11 additions & 5 deletions pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,18 @@ func MakeReservationErrorHandler(
}

// if the pod is not a reserve pod, use the default error handler
if !reservationutil.IsReservePod(pod) {
// If the Pod failed to schedule or no post-filter plugins, should remove exist NominatedReservation of the Pod.
if _, ok := schedulingErr.(*framework.FitError); !ok || !fwk.HasPostFilterPlugins() {
extendedHandle := fwk.(frameworkext.ExtendedHandle)
extendedHandle.GetReservationNominator().RemoveNominatedReservations(pod)
// If the Pod failed to schedule or no post-filter plugins, should remove exist NominatedReservation of the Pod.
if _, ok := schedulingErr.(*framework.FitError); !ok || !fwk.HasPostFilterPlugins() {
if extendedHandle, ok := fwk.(frameworkext.ExtendedHandle); ok {
if !reservationutil.IsReservePod(pod) {
extendedHandle.GetReservationNominator().RemoveNominatedReservations(pod)
} else {
extendedHandle.GetReservationNominator().DeleteNominatedReservePod(pod)
}
}
}

if !reservationutil.IsReservePod(pod) {
return false
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ func (ext *frameworkExtenderImpl) RunPostFilterPlugins(ctx context.Context, stat
result, status := ext.Framework.RunPostFilterPlugins(ctx, state, pod, filteredNodeStatusMap)
if result == nil || result.NominatingInfo.NominatedNodeName == "" {
ext.GetReservationNominator().RemoveNominatedReservations(pod)
ext.GetReservationNominator().DeleteNominatedReservePod(pod)
}
return result, status
}
Expand Down Expand Up @@ -461,6 +462,7 @@ func (ext *frameworkExtenderImpl) RunReservePluginsReserve(ctx context.Context,
}
status := ext.Framework.RunReservePluginsReserve(ctx, cycleState, pod, nodeName)
ext.GetReservationNominator().RemoveNominatedReservations(pod)
ext.GetReservationNominator().DeleteNominatedReservePod(pod)
return status
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/frameworkext/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ type ReservationNominator interface {
AddNominatedReservation(pod *corev1.Pod, nodeName string, rInfo *ReservationInfo)
RemoveNominatedReservations(pod *corev1.Pod)
GetNominatedReservation(pod *corev1.Pod, nodeName string) *ReservationInfo
AddNominatedReservePod(reservePod *corev1.Pod, nodeName string)
DeleteNominatedReservePod(reservePod *corev1.Pod)
}

const (
Expand Down
48 changes: 47 additions & 1 deletion pkg/scheduler/frameworkext/testing/fake_reservation_nominator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

"github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext"
Expand All @@ -34,7 +35,10 @@ type FakeNominator struct {
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated.
nominatedPodToNode map[types.UID]map[string]types.UID
reservations map[types.UID]*frameworkext.ReservationInfo
lock sync.RWMutex
// nominatedReservePod is map keyed by nodeName, value is the nominated reservations
nominatedReservePod map[string][]*framework.PodInfo
nominatedReservePodToNode map[types.UID]string
lock sync.RWMutex
}

func NewFakeReservationNominator() *FakeNominator {
Expand Down Expand Up @@ -87,3 +91,45 @@ func (nm *FakeNominator) NominateReservation(ctx context.Context, cycleState *fr
rInfo := nm.GetNominatedReservation(pod, nodeName)
return rInfo, nil
}

func (nm *FakeNominator) AddNominatedReservePod(rInfo *corev1.Pod, nodeName string) {
nm.lock.Lock()
defer nm.lock.Unlock()

// Always delete the reservation if it already exists, to ensure we never store more than
// one instance of the reservation.
nm.deleteReservePod(rInfo)

nm.nominatedReservePodToNode[rInfo.UID] = nodeName
for _, npi := range nm.nominatedReservePod[nodeName] {
if npi.Pod.UID == rInfo.UID {
klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod))
return
}
}
nm.nominatedReservePod[nodeName] = append(nm.nominatedReservePod[nodeName], framework.NewPodInfo(rInfo))
}

func (nm *FakeNominator) DeleteNominatedReservePod(rInfo *corev1.Pod) {
nm.lock.Lock()
defer nm.lock.Unlock()

nm.deleteReservePod(rInfo)
}

func (nm *FakeNominator) deleteReservePod(rInfo *corev1.Pod) {
nnn, ok := nm.nominatedReservePodToNode[rInfo.UID]
if !ok {
return
}
for i, np := range nm.nominatedReservePod[nnn] {
if np.Pod.UID == rInfo.UID {
nm.nominatedReservePod[nnn] = append(nm.nominatedReservePod[nnn][:i], nm.nominatedReservePod[nnn][i+1:]...)
if len(nm.nominatedReservePod[nnn]) == 0 {
delete(nm.nominatedReservePod, nnn)
}
break
}
}
delete(nm.nominatedReservePodToNode, rInfo.UID)
}
70 changes: 68 additions & 2 deletions pkg/scheduler/plugins/reservation/nominator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,17 @@ import (
type nominator struct {
// nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated.
nominatedPodToNode map[types.UID]map[string]types.UID
lock sync.RWMutex
// nominatedReservePod is map keyed by nodeName, value is the nominated reservation's PodInfo
nominatedReservePod map[string][]*framework.PodInfo
nominatedReservePodToNode map[types.UID]string
lock sync.RWMutex
}

func newNominator() *nominator {
return &nominator{
nominatedPodToNode: map[types.UID]map[string]types.UID{},
nominatedPodToNode: map[types.UID]map[string]types.UID{},
nominatedReservePod: map[string][]*framework.PodInfo{},
nominatedReservePodToNode: map[types.UID]string{},
}
}

Expand All @@ -58,6 +63,59 @@ func (nm *nominator) AddNominatedReservation(pod *corev1.Pod, nodeName string, r
nodeToReservation[nodeName] = rInfo.UID()
}

func (nm *nominator) AddNominatedReservePod(pi *framework.PodInfo, nodeName string) {
nm.lock.Lock()
defer nm.lock.Unlock()

// Always delete the reservation if it already exists, to ensure we never store more than
// one instance of the reservation.
nm.deleteReservePod(pi)

nm.nominatedReservePodToNode[pi.Pod.UID] = nodeName
for _, npi := range nm.nominatedReservePod[nodeName] {
if npi.Pod.UID == pi.Pod.UID {
klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod))
return
}
}
nm.nominatedReservePod[nodeName] = append(nm.nominatedReservePod[nodeName], pi)
}

func (nm *nominator) NominatedReservePodForNode(nodeName string) []*framework.PodInfo {
nm.lock.RLock()
defer nm.lock.RUnlock()
// Make a copy of the nominated Pods so the caller can mutate safely.
reservePods := make([]*framework.PodInfo, len(nm.nominatedReservePod[nodeName]))
for i := 0; i < len(reservePods); i++ {
reservePods[i] = nm.nominatedReservePod[nodeName][i].DeepCopy()
}
return reservePods
}

func (nm *nominator) DeleteReservePod(pi *framework.PodInfo) {
nm.lock.Lock()
defer nm.lock.Unlock()

nm.deleteReservePod(pi)
}

func (nm *nominator) deleteReservePod(pi *framework.PodInfo) {
nnn, ok := nm.nominatedReservePodToNode[pi.Pod.UID]
if !ok {
return
}
for i, np := range nm.nominatedReservePod[nnn] {
if np.Pod.UID == pi.Pod.UID {
nm.nominatedReservePod[nnn] = append(nm.nominatedReservePod[nnn][:i], nm.nominatedReservePod[nnn][i+1:]...)
if len(nm.nominatedReservePod[nnn]) == 0 {
delete(nm.nominatedReservePod, nnn)
}
break
}
}
delete(nm.nominatedReservePodToNode, pi.Pod.UID)
}

func (nm *nominator) RemoveNominatedReservation(pod *corev1.Pod) {
nm.lock.Lock()
defer nm.lock.Unlock()
Expand Down Expand Up @@ -141,6 +199,14 @@ func (pl *Plugin) RemoveNominatedReservations(pod *corev1.Pod) {
pl.nominator.RemoveNominatedReservation(pod)
}

func (pl *Plugin) AddNominatedReservePod(pod *corev1.Pod, nodeName string) {
pl.nominator.AddNominatedReservePod(framework.NewPodInfo(pod), nodeName)
}

func (pl *Plugin) DeleteNominatedReservePod(pod *corev1.Pod) {
pl.nominator.DeleteReservePod(framework.NewPodInfo(pod))
}

func (pl *Plugin) GetNominatedReservation(pod *corev1.Pod, nodeName string) *frameworkext.ReservationInfo {
reservationID := pl.nominator.GetNominatedReservation(pod, nodeName)
if reservationID == "" {
Expand Down
55 changes: 55 additions & 0 deletions pkg/scheduler/plugins/reservation/nominator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,3 +434,58 @@ func TestMultiReservationsOnSameNode(t *testing.T) {
assert.Equal(t, 1, v)
}
}

func TestReservationsNominator(t *testing.T) {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-1",
},
Status: corev1.NodeStatus{
Allocatable: map[corev1.ResourceName]resource.Quantity{
corev1.ResourceCPU: resource.MustParse("96"),
corev1.ResourceMemory: resource.MustParse("1886495404Ki"),
},
},
}

resourceList := corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("16"),
corev1.ResourceMemory: resource.MustParse("32Gi"),
}
labels := map[string]string{
"foo": "bar",
}
suit := newPluginTestSuitWith(t, nil, []*corev1.Node{node})
var pods []*corev1.Pod
for i := 0; i < 3; i++ {
r := newTestReservation(t, fmt.Sprintf("test-r-%d", i), labels, labels, node.Name, resourceList)
pods = append(pods, reservationutil.NewReservePod(r))
_, err := suit.extenderFactory.KoordinatorClientSet().SchedulingV1alpha1().Reservations().Create(context.TODO(), r, metav1.CreateOptions{})
assert.NoError(t, err)
}
nodeInfo, err := suit.fw.SnapshotSharedLister().NodeInfos().Get(node.Name)
assert.NoError(t, err)
assert.Equal(t, 0, len(nodeInfo.Pods))

p, err := suit.pluginFactory()
assert.NoError(t, err)
pl := p.(*Plugin)

nominatorImpl := pl.handle.(frameworkext.FrameworkExtender).GetReservationNominator()

nominatorImpl.AddNominatedReservePod(pods[0], "node-1")
ctx := context.TODO()
state := framework.NewCycleState()
pod, nodeInfoOut, update, status := pl.BeforeFilter(ctx, state, pods[2], nodeInfo)
assert.Equal(t, pod, pods[2])
assert.True(t, update)
assert.True(t, status.IsSuccess())
assert.Equal(t, 1, len(nodeInfoOut.Pods))

nominatorImpl.AddNominatedReservePod(pods[1], "node-1")
pod, nodeInfoOut, update, status = pl.BeforeFilter(ctx, state, pods[2], nodeInfo)
assert.Equal(t, pod, pods[2])
assert.True(t, update)
assert.True(t, status.IsSuccess())
assert.Equal(t, 2, len(nodeInfoOut.Pods))
}
3 changes: 3 additions & 0 deletions pkg/scheduler/plugins/reservation/pod_eventhandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
frameworkexthelper "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper"
Expand Down Expand Up @@ -95,6 +96,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) {
}

h.nominator.RemoveNominatedReservation(newPod)
h.nominator.DeleteReservePod(framework.NewPodInfo(newPod))

var reservationUID types.UID
if oldPod != nil {
Expand Down Expand Up @@ -128,6 +130,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) {

func (h *podEventHandler) deletePod(pod *corev1.Pod) {
h.nominator.RemoveNominatedReservation(pod)
h.nominator.DeleteReservePod(framework.NewPodInfo(pod))

reservationAllocated, err := apiext.GetReservationAllocated(pod)
if err == nil && reservationAllocated != nil && reservationAllocated.UID != "" {
Expand Down
48 changes: 48 additions & 0 deletions pkg/scheduler/plugins/reservation/pod_eventhandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/pkg/scheduler/framework"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
Expand Down Expand Up @@ -75,20 +76,38 @@ func TestPodEventHandler(t *testing.T) {
},
}

handler.nominator.AddNominatedReservePod(framework.NewPodInfo(pod), "test-node-1")
assert.Equal(t, "test-node-1", handler.nominator.nominatedReservePodToNode[pod.UID])
assert.Equal(t, []*framework.PodInfo{
framework.NewPodInfo(pod),
}, handler.nominator.nominatedReservePod["test-node-1"])
handler.OnAdd(pod)
rInfo := handler.cache.getReservationInfoByUID(reservationUID)
assert.Empty(t, rInfo.AssignedPods)
// pod not assigned, no need to delete reservation nominated node
assert.Equal(t, "test-node-1", handler.nominator.nominatedReservePodToNode[pod.UID])
assert.Equal(t, []*framework.PodInfo{
framework.NewPodInfo(pod),
}, handler.nominator.nominatedReservePod["test-node-1"])

newPod := pod.DeepCopy()
apiext.SetReservationAllocated(newPod, reservation)
handler.OnUpdate(pod, newPod)
rInfo = handler.cache.getReservationInfoByUID(reservationUID)
assert.Len(t, rInfo.AssignedPods, 0)
// pod not assigned, no need to delete reservation nominated node
assert.Equal(t, "test-node-1", handler.nominator.nominatedReservePodToNode[pod.UID])
assert.Equal(t, []*framework.PodInfo{
framework.NewPodInfo(pod),
}, handler.nominator.nominatedReservePod["test-node-1"])

newPod.Spec.NodeName = reservation.Status.NodeName
handler.OnUpdate(pod, newPod)
rInfo = handler.cache.getReservationInfoByUID(reservationUID)
assert.Len(t, rInfo.AssignedPods, 1)
// pod assigned, delete reservation nominated node
assert.Equal(t, "", handler.nominator.nominatedReservePodToNode[pod.UID])
assert.Equal(t, []*framework.PodInfo(nil), handler.nominator.nominatedReservePod["test-node-1"])

expectPodRequirement := &frameworkext.PodRequirement{
Name: pod.Name,
Expand All @@ -100,9 +119,38 @@ func TestPodEventHandler(t *testing.T) {
}
assert.Equal(t, expectPodRequirement, rInfo.AssignedPods[pod.UID])

handler.nominator.nominatedReservePod["test-node-1"] = []*framework.PodInfo{
framework.NewPodInfo(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-1",
UID: "test-1",
},
}),
}
handler.nominator.AddNominatedReservePod(framework.NewPodInfo(newPod), "test-node-1")
assert.Equal(t, "test-node-1", handler.nominator.nominatedReservePodToNode[newPod.UID])
assert.Equal(t, []*framework.PodInfo{
framework.NewPodInfo(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-1",
UID: "test-1",
},
}),
framework.NewPodInfo(newPod),
}, handler.nominator.nominatedReservePod["test-node-1"])

handler.OnDelete(newPod)
rInfo = handler.cache.getReservationInfoByUID(reservationUID)
assert.Empty(t, rInfo.AssignedPods)
assert.Equal(t, "", handler.nominator.nominatedReservePodToNode[newPod.UID])
assert.Equal(t, []*framework.PodInfo{
framework.NewPodInfo(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test-1",
UID: "test-1",
},
}),
}, handler.nominator.nominatedReservePod["test-node-1"])
}

func TestPodEventHandlerWithOperatingPod(t *testing.T) {
Expand Down
Loading

0 comments on commit 3581c3d

Please sign in to comment.