diff --git a/pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go b/pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go index 2869fd487..5ca5ced7a 100644 --- a/pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go +++ b/pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go @@ -71,6 +71,12 @@ func MakeReservationErrorHandler( return false } + if _, ok := schedulingErr.(*framework.FitError); !ok || !fwk.HasPostFilterPlugins() { + if extendedHandle, ok := fwk.(frameworkext.ExtendedHandle); ok { + extendedHandle.GetReservationNominator().DeleteNominatedReservationIfExists(pod) + } + } + reservationErrorFn(podInfo, schedulingErr) rName := reservationutil.GetReservationNameFromReservePod(pod) diff --git a/pkg/scheduler/frameworkext/framework_extender.go b/pkg/scheduler/frameworkext/framework_extender.go index 7ee2df40a..d3269559e 100644 --- a/pkg/scheduler/frameworkext/framework_extender.go +++ b/pkg/scheduler/frameworkext/framework_extender.go @@ -229,6 +229,7 @@ func (ext *frameworkExtenderImpl) RunPostFilterPlugins(ctx context.Context, stat result, status := ext.Framework.RunPostFilterPlugins(ctx, state, pod, filteredNodeStatusMap) if result == nil || result.NominatingInfo.NominatedNodeName == "" { ext.GetReservationNominator().RemoveNominatedReservations(pod) + ext.GetReservationNominator().DeleteNominatedReservationIfExists(pod) } return result, status } @@ -461,6 +462,7 @@ func (ext *frameworkExtenderImpl) RunReservePluginsReserve(ctx context.Context, } status := ext.Framework.RunReservePluginsReserve(ctx, cycleState, pod, nodeName) ext.GetReservationNominator().RemoveNominatedReservations(pod) + ext.GetReservationNominator().DeleteNominatedReservationIfExists(pod) return status } diff --git a/pkg/scheduler/frameworkext/interface.go b/pkg/scheduler/frameworkext/interface.go index 89aea0a53..3704afbf9 100644 --- a/pkg/scheduler/frameworkext/interface.go +++ b/pkg/scheduler/frameworkext/interface.go @@ -132,6 +132,8 @@ type ReservationNominator interface { AddNominatedReservation(pod *corev1.Pod, nodeName string, rInfo *ReservationInfo) RemoveNominatedReservations(pod *corev1.Pod) GetNominatedReservation(pod *corev1.Pod, nodeName string) *ReservationInfo + AddNominatedReservationToNode(reservePod *corev1.Pod, nodeName string) + DeleteNominatedReservationIfExists(reservePod *corev1.Pod) } const ( diff --git a/pkg/scheduler/frameworkext/testing/fake_reservation_nominator.go b/pkg/scheduler/frameworkext/testing/fake_reservation_nominator.go index ce81fb49c..aa9e7c31d 100644 --- a/pkg/scheduler/frameworkext/testing/fake_reservation_nominator.go +++ b/pkg/scheduler/frameworkext/testing/fake_reservation_nominator.go @@ -22,6 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" @@ -34,7 +35,10 @@ type FakeNominator struct { // nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated. nominatedPodToNode map[types.UID]map[string]types.UID reservations map[types.UID]*frameworkext.ReservationInfo - lock sync.RWMutex + // nominatedReservation is map keyed by nodeName, value is the nominated reservations + nominatedReservation map[string][]*framework.PodInfo + nominatedReservationToNode map[types.UID]string + lock sync.RWMutex } func NewFakeReservationNominator() *FakeNominator { @@ -87,3 +91,45 @@ func (nm *FakeNominator) NominateReservation(ctx context.Context, cycleState *fr rInfo := nm.GetNominatedReservation(pod, nodeName) return rInfo, nil } + +func (nm *FakeNominator) AddNominatedReservationToNode(rInfo *corev1.Pod, nodeName string) { + nm.lock.Lock() + defer nm.lock.Unlock() + + // Always delete the reservation if it already exists, to ensure we never store more than + // one instance of the reservation. + nm.deleteReservation(rInfo) + + nm.nominatedReservationToNode[rInfo.UID] = nodeName + for _, npi := range nm.nominatedReservation[nodeName] { + if npi.Pod.UID == rInfo.UID { + klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod)) + return + } + } + nm.nominatedReservation[nodeName] = append(nm.nominatedReservation[nodeName], framework.NewPodInfo(rInfo)) +} + +func (nm *FakeNominator) DeleteNominatedReservationIfExists(rInfo *corev1.Pod) { + nm.lock.Lock() + defer nm.lock.Unlock() + + nm.deleteReservation(rInfo) +} + +func (nm *FakeNominator) deleteReservation(rInfo *corev1.Pod) { + nnn, ok := nm.nominatedReservationToNode[rInfo.UID] + if !ok { + return + } + for i, np := range nm.nominatedReservation[nnn] { + if np.Pod.UID == rInfo.UID { + nm.nominatedReservation[nnn] = append(nm.nominatedReservation[nnn][:i], nm.nominatedReservation[nnn][i+1:]...) + if len(nm.nominatedReservation[nnn]) == 0 { + delete(nm.nominatedReservation, nnn) + } + break + } + } + delete(nm.nominatedReservationToNode, rInfo.UID) +} diff --git a/pkg/scheduler/plugins/reservation/nominator.go b/pkg/scheduler/plugins/reservation/nominator.go index f2e779bbb..ff8549a9b 100644 --- a/pkg/scheduler/plugins/reservation/nominator.go +++ b/pkg/scheduler/plugins/reservation/nominator.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + schedulingcorev1 "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -34,12 +35,17 @@ import ( type nominator struct { // nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated. nominatedPodToNode map[types.UID]map[string]types.UID - lock sync.RWMutex + // nominatedReservation is map keyed by nodeName, value is the nominated reservation's PodInfo + nominatedReservation map[string][]*framework.PodInfo + nominatedReservationToNode map[types.UID]string + lock sync.RWMutex } func newNominator() *nominator { return &nominator{ - nominatedPodToNode: map[types.UID]map[string]types.UID{}, + nominatedPodToNode: map[types.UID]map[string]types.UID{}, + nominatedReservation: map[string][]*framework.PodInfo{}, + nominatedReservationToNode: map[types.UID]string{}, } } @@ -58,6 +64,59 @@ func (nm *nominator) AddNominatedReservation(pod *corev1.Pod, nodeName string, r nodeToReservation[nodeName] = rInfo.UID() } +func (nm *nominator) AddNominatedReservationToNode(pi *framework.PodInfo, nodeName string) { + nm.lock.Lock() + defer nm.lock.Unlock() + + // Always delete the reservation if it already exists, to ensure we never store more than + // one instance of the reservation. + nm.deleteReservation(pi) + + nm.nominatedReservationToNode[pi.Pod.UID] = nodeName + for _, npi := range nm.nominatedReservation[nodeName] { + if npi.Pod.UID == pi.Pod.UID { + klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod)) + return + } + } + nm.nominatedReservation[nodeName] = append(nm.nominatedReservation[nodeName], pi) +} + +func (nm *nominator) NominatedReservationForNode(nodeName string) []*framework.PodInfo { + nm.lock.RLock() + defer nm.lock.RUnlock() + // Make a copy of the nominated Pods so the caller can mutate safely. + reservationInfos := make([]*framework.PodInfo, len(nm.nominatedReservation[nodeName])) + for i := 0; i < len(reservationInfos); i++ { + reservationInfos[i] = nm.nominatedReservation[nodeName][i].DeepCopy() + } + return reservationInfos +} + +func (nm *nominator) DeleteReservation(pi *framework.PodInfo) { + nm.lock.Lock() + defer nm.lock.Unlock() + + nm.deleteReservation(pi) +} + +func (nm *nominator) deleteReservation(pi *framework.PodInfo) { + nnn, ok := nm.nominatedReservationToNode[pi.Pod.UID] + if !ok { + return + } + for i, np := range nm.nominatedReservation[nnn] { + if np.Pod.UID == pi.Pod.UID { + nm.nominatedReservation[nnn] = append(nm.nominatedReservation[nnn][:i], nm.nominatedReservation[nnn][i+1:]...) + if len(nm.nominatedReservation[nnn]) == 0 { + delete(nm.nominatedReservation, nnn) + } + break + } + } + delete(nm.nominatedReservationToNode, pi.Pod.UID) +} + func (nm *nominator) RemoveNominatedReservation(pod *corev1.Pod) { nm.lock.Lock() defer nm.lock.Unlock() @@ -141,6 +200,14 @@ func (pl *Plugin) RemoveNominatedReservations(pod *corev1.Pod) { pl.nominator.RemoveNominatedReservation(pod) } +func (pl *Plugin) AddNominatedReservationToNode(pod *corev1.Pod, nodeName string) { + pl.nominator.AddNominatedReservationToNode(framework.NewPodInfo(pod), nodeName) +} + +func (pl *Plugin) DeleteNominatedReservationIfExists(pod *corev1.Pod) { + pl.nominator.DeleteReservation(framework.NewPodInfo(pod)) +} + func (pl *Plugin) GetNominatedReservation(pod *corev1.Pod, nodeName string) *frameworkext.ReservationInfo { reservationID := pl.nominator.GetNominatedReservation(pod, nodeName) if reservationID == "" { @@ -149,6 +216,48 @@ func (pl *Plugin) GetNominatedReservation(pod *corev1.Pod, nodeName string) *fra return pl.reservationCache.getReservationInfoByUID(reservationID) } +func (pl *Plugin) BeforeFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) (*corev1.Pod, *framework.NodeInfo, bool, *framework.Status) { + if !reservationutil.IsReservePod(pod) { + return pod, nodeInfo, false, nil + } + + nominatedReservationInfos := pl.nominator.NominatedReservationForNode(nodeInfo.Node().Name) + if len(nominatedReservationInfos) == 0 { + return pod, nodeInfo, false, nil + } + + if nodeInfo.Node() == nil { + // This may happen only in tests. + return pod, nodeInfo, false, nil + } + + nodeInfoOut := nodeInfo.Clone() + stateOut := cycleState.Clone() + + rName := reservationutil.GetReservationNameFromReservePod(pod) + _, err := pl.rLister.Get(rName) + if err != nil { + return pod, nodeInfo, false, framework.NewStatus(framework.Error, "reservation not found") + } + + for _, rInfo := range nominatedReservationInfos { + if schedulingcorev1.PodPriority(rInfo.Pod) >= schedulingcorev1.PodPriority(pod) && rInfo.Pod.UID != pod.UID { + pInfo := framework.NewPodInfo(rInfo.Pod) + nodeInfoOut.AddPodInfo(pInfo) + status := pl.handle.RunPreFilterExtensionAddPod(ctx, stateOut, pod, pInfo, nodeInfoOut) + if !status.IsSuccess() { + return pod, nodeInfo, false, status + } + klog.V(4).Infof("toschedule reservation: %s, addReservation: %s", + reservationutil.GetReservationNameFromReservePod(pod), + reservationutil.GetReservationNameFromReservePod(rInfo.Pod)) + } + } + + cycleState = stateOut + return pod, nodeInfoOut, true, nil +} + func prioritizeReservations( ctx context.Context, fwk frameworkext.FrameworkExtender, diff --git a/pkg/scheduler/plugins/reservation/nominator_test.go b/pkg/scheduler/plugins/reservation/nominator_test.go index aec86f6dd..089053856 100644 --- a/pkg/scheduler/plugins/reservation/nominator_test.go +++ b/pkg/scheduler/plugins/reservation/nominator_test.go @@ -434,3 +434,60 @@ func TestMultiReservationsOnSameNode(t *testing.T) { assert.Equal(t, 1, v) } } + +func TestReservationsNominator(t *testing.T) { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("1886495404Ki"), + }, + }, + } + + resourceList := corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + } + labels := map[string]string{ + "foo": "bar", + } + suit := newPluginTestSuitWith(t, nil, []*corev1.Node{node}) + var reservations []*schedulingv1alpha1.Reservation + var pods []*corev1.Pod + for i := 0; i < 3; i++ { + r := newTestReservation(t, fmt.Sprintf("test-r-%d", i), labels, labels, node.Name, resourceList) + reservations = append(reservations, r) + pods = append(pods, reservationutil.NewReservePod(r)) + _, err := suit.extenderFactory.KoordinatorClientSet().SchedulingV1alpha1().Reservations().Create(context.TODO(), r, metav1.CreateOptions{}) + assert.NoError(t, err) + } + nodeInfo, err := suit.fw.SnapshotSharedLister().NodeInfos().Get(node.Name) + assert.NoError(t, err) + assert.Equal(t, 0, len(nodeInfo.Pods)) + + p, err := suit.pluginFactory() + assert.NoError(t, err) + pl := p.(*Plugin) + + nominatorImpl := pl.handle.(frameworkext.FrameworkExtender).GetReservationNominator() + + nominatorImpl.AddNominatedReservationToNode(pods[0], "node-1") + ctx := context.TODO() + state := framework.NewCycleState() + pod, nodeInfoOut, update, status := pl.BeforeFilter(ctx, state, pods[2], nodeInfo) + assert.Equal(t, pod, pods[2]) + assert.True(t, update) + assert.True(t, status.IsSuccess()) + assert.Equal(t, 1, len(nodeInfoOut.Pods)) + + nominatorImpl.AddNominatedReservationToNode(pods[1], "node-1") + pod, nodeInfoOut, update, status = pl.BeforeFilter(ctx, state, pods[2], nodeInfo) + assert.Equal(t, pod, pods[2]) + assert.True(t, update) + assert.True(t, status.IsSuccess()) + assert.Equal(t, 2, len(nodeInfoOut.Pods)) +} diff --git a/pkg/scheduler/plugins/reservation/pod_eventhandler.go b/pkg/scheduler/plugins/reservation/pod_eventhandler.go index 089ad8420..a3e882af0 100644 --- a/pkg/scheduler/plugins/reservation/pod_eventhandler.go +++ b/pkg/scheduler/plugins/reservation/pod_eventhandler.go @@ -24,6 +24,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" apiext "github.com/koordinator-sh/koordinator/apis/extension" frameworkexthelper "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper" @@ -95,6 +96,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) { } h.nominator.RemoveNominatedReservation(newPod) + h.nominator.DeleteReservation(framework.NewPodInfo(newPod)) var reservationUID types.UID if oldPod != nil { @@ -128,6 +130,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) { func (h *podEventHandler) deletePod(pod *corev1.Pod) { h.nominator.RemoveNominatedReservation(pod) + h.nominator.DeleteReservation(framework.NewPodInfo(pod)) reservationAllocated, err := apiext.GetReservationAllocated(pod) if err == nil && reservationAllocated != nil && reservationAllocated.UID != "" { diff --git a/pkg/scheduler/plugins/reservation/pod_eventhandler_test.go b/pkg/scheduler/plugins/reservation/pod_eventhandler_test.go index 54f4efded..c24040ef1 100644 --- a/pkg/scheduler/plugins/reservation/pod_eventhandler_test.go +++ b/pkg/scheduler/plugins/reservation/pod_eventhandler_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/kubernetes/pkg/scheduler/framework" apiext "github.com/koordinator-sh/koordinator/apis/extension" schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" @@ -75,20 +76,38 @@ func TestPodEventHandler(t *testing.T) { }, } + handler.nominator.AddNominatedReservationToNode(framework.NewPodInfo(pod), "test-node-1") + assert.Equal(t, "test-node-1", handler.nominator.nominatedReservationToNode[pod.UID]) + assert.Equal(t, []*framework.PodInfo{ + framework.NewPodInfo(pod), + }, handler.nominator.nominatedReservation["test-node-1"]) handler.OnAdd(pod) rInfo := handler.cache.getReservationInfoByUID(reservationUID) assert.Empty(t, rInfo.AssignedPods) + // pod not assigned, no need to delete reservation nominated node + assert.Equal(t, "test-node-1", handler.nominator.nominatedReservationToNode[pod.UID]) + assert.Equal(t, []*framework.PodInfo{ + framework.NewPodInfo(pod), + }, handler.nominator.nominatedReservation["test-node-1"]) newPod := pod.DeepCopy() apiext.SetReservationAllocated(newPod, reservation) handler.OnUpdate(pod, newPod) rInfo = handler.cache.getReservationInfoByUID(reservationUID) assert.Len(t, rInfo.AssignedPods, 0) + // pod not assigned, no need to delete reservation nominated node + assert.Equal(t, "test-node-1", handler.nominator.nominatedReservationToNode[pod.UID]) + assert.Equal(t, []*framework.PodInfo{ + framework.NewPodInfo(pod), + }, handler.nominator.nominatedReservation["test-node-1"]) newPod.Spec.NodeName = reservation.Status.NodeName handler.OnUpdate(pod, newPod) rInfo = handler.cache.getReservationInfoByUID(reservationUID) assert.Len(t, rInfo.AssignedPods, 1) + // pod assigned, delete reservation nominated node + assert.Equal(t, "", handler.nominator.nominatedReservationToNode[pod.UID]) + assert.Equal(t, []*framework.PodInfo(nil), handler.nominator.nominatedReservation["test-node-1"]) expectPodRequirement := &frameworkext.PodRequirement{ Name: pod.Name, @@ -100,9 +119,38 @@ func TestPodEventHandler(t *testing.T) { } assert.Equal(t, expectPodRequirement, rInfo.AssignedPods[pod.UID]) + handler.nominator.nominatedReservation["test-node-1"] = []*framework.PodInfo{ + framework.NewPodInfo(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-1", + UID: "test-1", + }, + }), + } + handler.nominator.AddNominatedReservationToNode(framework.NewPodInfo(newPod), "test-node-1") + assert.Equal(t, "test-node-1", handler.nominator.nominatedReservationToNode[newPod.UID]) + assert.Equal(t, []*framework.PodInfo{ + framework.NewPodInfo(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-1", + UID: "test-1", + }, + }), + framework.NewPodInfo(newPod), + }, handler.nominator.nominatedReservation["test-node-1"]) + handler.OnDelete(newPod) rInfo = handler.cache.getReservationInfoByUID(reservationUID) assert.Empty(t, rInfo.AssignedPods) + assert.Equal(t, "", handler.nominator.nominatedReservationToNode[newPod.UID]) + assert.Equal(t, []*framework.PodInfo{ + framework.NewPodInfo(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-1", + UID: "test-1", + }, + }), + }, handler.nominator.nominatedReservation["test-node-1"]) } func TestPodEventHandlerWithOperatingPod(t *testing.T) {