diff --git a/pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go b/pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go index 6ae949e16..564e3933b 100644 --- a/pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go +++ b/pkg/scheduler/frameworkext/eventhandlers/reservation_handler.go @@ -60,12 +60,18 @@ func MakeReservationErrorHandler( } // if the pod is not a reserve pod, use the default error handler - if !reservationutil.IsReservePod(pod) { - // If the Pod failed to schedule or no post-filter plugins, should remove exist NominatedReservation of the Pod. - if _, ok := schedulingErr.(*framework.FitError); !ok || !fwk.HasPostFilterPlugins() { - extendedHandle := fwk.(frameworkext.ExtendedHandle) - extendedHandle.GetReservationNominator().RemoveNominatedReservations(pod) + // If the Pod failed to schedule or no post-filter plugins, should remove exist NominatedReservation of the Pod. + if _, ok := schedulingErr.(*framework.FitError); !ok || !fwk.HasPostFilterPlugins() { + if extendedHandle, ok := fwk.(frameworkext.ExtendedHandle); ok { + if !reservationutil.IsReservePod(pod) { + extendedHandle.GetReservationNominator().RemoveNominatedReservations(pod) + } else { + extendedHandle.GetReservationNominator().DeleteNominatedReservePod(pod) + } } + } + + if !reservationutil.IsReservePod(pod) { return false } diff --git a/pkg/scheduler/frameworkext/framework_extender.go b/pkg/scheduler/frameworkext/framework_extender.go index 7ee2df40a..ff61e2432 100644 --- a/pkg/scheduler/frameworkext/framework_extender.go +++ b/pkg/scheduler/frameworkext/framework_extender.go @@ -229,6 +229,7 @@ func (ext *frameworkExtenderImpl) RunPostFilterPlugins(ctx context.Context, stat result, status := ext.Framework.RunPostFilterPlugins(ctx, state, pod, filteredNodeStatusMap) if result == nil || result.NominatingInfo.NominatedNodeName == "" { ext.GetReservationNominator().RemoveNominatedReservations(pod) + ext.GetReservationNominator().DeleteNominatedReservePod(pod) } return result, status } @@ -461,6 +462,7 @@ func (ext *frameworkExtenderImpl) RunReservePluginsReserve(ctx context.Context, } status := ext.Framework.RunReservePluginsReserve(ctx, cycleState, pod, nodeName) ext.GetReservationNominator().RemoveNominatedReservations(pod) + ext.GetReservationNominator().DeleteNominatedReservePod(pod) return status } diff --git a/pkg/scheduler/frameworkext/interface.go b/pkg/scheduler/frameworkext/interface.go index 89aea0a53..ec9f35c1a 100644 --- a/pkg/scheduler/frameworkext/interface.go +++ b/pkg/scheduler/frameworkext/interface.go @@ -132,6 +132,8 @@ type ReservationNominator interface { AddNominatedReservation(pod *corev1.Pod, nodeName string, rInfo *ReservationInfo) RemoveNominatedReservations(pod *corev1.Pod) GetNominatedReservation(pod *corev1.Pod, nodeName string) *ReservationInfo + AddNominatedReservePod(reservePod *corev1.Pod, nodeName string) + DeleteNominatedReservePod(reservePod *corev1.Pod) } const ( diff --git a/pkg/scheduler/frameworkext/testing/fake_reservation_nominator.go b/pkg/scheduler/frameworkext/testing/fake_reservation_nominator.go index ce81fb49c..5b13c2e3a 100644 --- a/pkg/scheduler/frameworkext/testing/fake_reservation_nominator.go +++ b/pkg/scheduler/frameworkext/testing/fake_reservation_nominator.go @@ -22,6 +22,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/scheduler/framework" "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" @@ -34,7 +35,10 @@ type FakeNominator struct { // nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated. nominatedPodToNode map[types.UID]map[string]types.UID reservations map[types.UID]*frameworkext.ReservationInfo - lock sync.RWMutex + // nominatedReservePod is map keyed by nodeName, value is the nominated reservations + nominatedReservePod map[string][]*framework.PodInfo + nominatedReservePodToNode map[types.UID]string + lock sync.RWMutex } func NewFakeReservationNominator() *FakeNominator { @@ -87,3 +91,45 @@ func (nm *FakeNominator) NominateReservation(ctx context.Context, cycleState *fr rInfo := nm.GetNominatedReservation(pod, nodeName) return rInfo, nil } + +func (nm *FakeNominator) AddNominatedReservePod(rInfo *corev1.Pod, nodeName string) { + nm.lock.Lock() + defer nm.lock.Unlock() + + // Always delete the reservation if it already exists, to ensure we never store more than + // one instance of the reservation. + nm.deleteReservePod(rInfo) + + nm.nominatedReservePodToNode[rInfo.UID] = nodeName + for _, npi := range nm.nominatedReservePod[nodeName] { + if npi.Pod.UID == rInfo.UID { + klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod)) + return + } + } + nm.nominatedReservePod[nodeName] = append(nm.nominatedReservePod[nodeName], framework.NewPodInfo(rInfo)) +} + +func (nm *FakeNominator) DeleteNominatedReservePod(rInfo *corev1.Pod) { + nm.lock.Lock() + defer nm.lock.Unlock() + + nm.deleteReservePod(rInfo) +} + +func (nm *FakeNominator) deleteReservePod(rInfo *corev1.Pod) { + nnn, ok := nm.nominatedReservePodToNode[rInfo.UID] + if !ok { + return + } + for i, np := range nm.nominatedReservePod[nnn] { + if np.Pod.UID == rInfo.UID { + nm.nominatedReservePod[nnn] = append(nm.nominatedReservePod[nnn][:i], nm.nominatedReservePod[nnn][i+1:]...) + if len(nm.nominatedReservePod[nnn]) == 0 { + delete(nm.nominatedReservePod, nnn) + } + break + } + } + delete(nm.nominatedReservePodToNode, rInfo.UID) +} diff --git a/pkg/scheduler/plugins/reservation/nominator.go b/pkg/scheduler/plugins/reservation/nominator.go index f2e779bbb..6feb6289c 100644 --- a/pkg/scheduler/plugins/reservation/nominator.go +++ b/pkg/scheduler/plugins/reservation/nominator.go @@ -34,12 +34,17 @@ import ( type nominator struct { // nominatedPodToNode is map keyed by a Pod UID to the node name where it is nominated. nominatedPodToNode map[types.UID]map[string]types.UID - lock sync.RWMutex + // nominatedReservePod is map keyed by nodeName, value is the nominated reservation's PodInfo + nominatedReservePod map[string][]*framework.PodInfo + nominatedReservePodToNode map[types.UID]string + lock sync.RWMutex } func newNominator() *nominator { return &nominator{ - nominatedPodToNode: map[types.UID]map[string]types.UID{}, + nominatedPodToNode: map[types.UID]map[string]types.UID{}, + nominatedReservePod: map[string][]*framework.PodInfo{}, + nominatedReservePodToNode: map[types.UID]string{}, } } @@ -58,6 +63,59 @@ func (nm *nominator) AddNominatedReservation(pod *corev1.Pod, nodeName string, r nodeToReservation[nodeName] = rInfo.UID() } +func (nm *nominator) AddNominatedReservePod(pi *framework.PodInfo, nodeName string) { + nm.lock.Lock() + defer nm.lock.Unlock() + + // Always delete the reservation if it already exists, to ensure we never store more than + // one instance of the reservation. + nm.deleteReservePod(pi) + + nm.nominatedReservePodToNode[pi.Pod.UID] = nodeName + for _, npi := range nm.nominatedReservePod[nodeName] { + if npi.Pod.UID == pi.Pod.UID { + klog.V(4).InfoS("reservation already exists in the nominator", "pod", klog.KObj(npi.Pod)) + return + } + } + nm.nominatedReservePod[nodeName] = append(nm.nominatedReservePod[nodeName], pi) +} + +func (nm *nominator) NominatedReservePodForNode(nodeName string) []*framework.PodInfo { + nm.lock.RLock() + defer nm.lock.RUnlock() + // Make a copy of the nominated Pods so the caller can mutate safely. + reservePods := make([]*framework.PodInfo, len(nm.nominatedReservePod[nodeName])) + for i := 0; i < len(reservePods); i++ { + reservePods[i] = nm.nominatedReservePod[nodeName][i].DeepCopy() + } + return reservePods +} + +func (nm *nominator) DeleteReservePod(pi *framework.PodInfo) { + nm.lock.Lock() + defer nm.lock.Unlock() + + nm.deleteReservePod(pi) +} + +func (nm *nominator) deleteReservePod(pi *framework.PodInfo) { + nnn, ok := nm.nominatedReservePodToNode[pi.Pod.UID] + if !ok { + return + } + for i, np := range nm.nominatedReservePod[nnn] { + if np.Pod.UID == pi.Pod.UID { + nm.nominatedReservePod[nnn] = append(nm.nominatedReservePod[nnn][:i], nm.nominatedReservePod[nnn][i+1:]...) + if len(nm.nominatedReservePod[nnn]) == 0 { + delete(nm.nominatedReservePod, nnn) + } + break + } + } + delete(nm.nominatedReservePodToNode, pi.Pod.UID) +} + func (nm *nominator) RemoveNominatedReservation(pod *corev1.Pod) { nm.lock.Lock() defer nm.lock.Unlock() @@ -141,6 +199,14 @@ func (pl *Plugin) RemoveNominatedReservations(pod *corev1.Pod) { pl.nominator.RemoveNominatedReservation(pod) } +func (pl *Plugin) AddNominatedReservePod(pod *corev1.Pod, nodeName string) { + pl.nominator.AddNominatedReservePod(framework.NewPodInfo(pod), nodeName) +} + +func (pl *Plugin) DeleteNominatedReservePod(pod *corev1.Pod) { + pl.nominator.DeleteReservePod(framework.NewPodInfo(pod)) +} + func (pl *Plugin) GetNominatedReservation(pod *corev1.Pod, nodeName string) *frameworkext.ReservationInfo { reservationID := pl.nominator.GetNominatedReservation(pod, nodeName) if reservationID == "" { diff --git a/pkg/scheduler/plugins/reservation/nominator_test.go b/pkg/scheduler/plugins/reservation/nominator_test.go index aec86f6dd..ecf3d1f6f 100644 --- a/pkg/scheduler/plugins/reservation/nominator_test.go +++ b/pkg/scheduler/plugins/reservation/nominator_test.go @@ -434,3 +434,58 @@ func TestMultiReservationsOnSameNode(t *testing.T) { assert.Equal(t, 1, v) } } + +func TestReservationsNominator(t *testing.T) { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-1", + }, + Status: corev1.NodeStatus{ + Allocatable: map[corev1.ResourceName]resource.Quantity{ + corev1.ResourceCPU: resource.MustParse("96"), + corev1.ResourceMemory: resource.MustParse("1886495404Ki"), + }, + }, + } + + resourceList := corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("16"), + corev1.ResourceMemory: resource.MustParse("32Gi"), + } + labels := map[string]string{ + "foo": "bar", + } + suit := newPluginTestSuitWith(t, nil, []*corev1.Node{node}) + var pods []*corev1.Pod + for i := 0; i < 3; i++ { + r := newTestReservation(t, fmt.Sprintf("test-r-%d", i), labels, labels, node.Name, resourceList) + pods = append(pods, reservationutil.NewReservePod(r)) + _, err := suit.extenderFactory.KoordinatorClientSet().SchedulingV1alpha1().Reservations().Create(context.TODO(), r, metav1.CreateOptions{}) + assert.NoError(t, err) + } + nodeInfo, err := suit.fw.SnapshotSharedLister().NodeInfos().Get(node.Name) + assert.NoError(t, err) + assert.Equal(t, 0, len(nodeInfo.Pods)) + + p, err := suit.pluginFactory() + assert.NoError(t, err) + pl := p.(*Plugin) + + nominatorImpl := pl.handle.(frameworkext.FrameworkExtender).GetReservationNominator() + + nominatorImpl.AddNominatedReservePod(pods[0], "node-1") + ctx := context.TODO() + state := framework.NewCycleState() + pod, nodeInfoOut, update, status := pl.BeforeFilter(ctx, state, pods[2], nodeInfo) + assert.Equal(t, pod, pods[2]) + assert.True(t, update) + assert.True(t, status.IsSuccess()) + assert.Equal(t, 1, len(nodeInfoOut.Pods)) + + nominatorImpl.AddNominatedReservePod(pods[1], "node-1") + pod, nodeInfoOut, update, status = pl.BeforeFilter(ctx, state, pods[2], nodeInfo) + assert.Equal(t, pod, pods[2]) + assert.True(t, update) + assert.True(t, status.IsSuccess()) + assert.Equal(t, 2, len(nodeInfoOut.Pods)) +} diff --git a/pkg/scheduler/plugins/reservation/pod_eventhandler.go b/pkg/scheduler/plugins/reservation/pod_eventhandler.go index 089ad8420..f42846215 100644 --- a/pkg/scheduler/plugins/reservation/pod_eventhandler.go +++ b/pkg/scheduler/plugins/reservation/pod_eventhandler.go @@ -24,6 +24,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/scheduler/framework" apiext "github.com/koordinator-sh/koordinator/apis/extension" frameworkexthelper "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper" @@ -95,6 +96,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) { } h.nominator.RemoveNominatedReservation(newPod) + h.nominator.DeleteReservePod(framework.NewPodInfo(newPod)) var reservationUID types.UID if oldPod != nil { @@ -128,6 +130,7 @@ func (h *podEventHandler) updatePod(oldPod, newPod *corev1.Pod) { func (h *podEventHandler) deletePod(pod *corev1.Pod) { h.nominator.RemoveNominatedReservation(pod) + h.nominator.DeleteReservePod(framework.NewPodInfo(pod)) reservationAllocated, err := apiext.GetReservationAllocated(pod) if err == nil && reservationAllocated != nil && reservationAllocated.UID != "" { diff --git a/pkg/scheduler/plugins/reservation/pod_eventhandler_test.go b/pkg/scheduler/plugins/reservation/pod_eventhandler_test.go index 54f4efded..b8cb9b01e 100644 --- a/pkg/scheduler/plugins/reservation/pod_eventhandler_test.go +++ b/pkg/scheduler/plugins/reservation/pod_eventhandler_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/kubernetes/pkg/scheduler/framework" apiext "github.com/koordinator-sh/koordinator/apis/extension" schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" @@ -75,20 +76,38 @@ func TestPodEventHandler(t *testing.T) { }, } + handler.nominator.AddNominatedReservePod(framework.NewPodInfo(pod), "test-node-1") + assert.Equal(t, "test-node-1", handler.nominator.nominatedReservePodToNode[pod.UID]) + assert.Equal(t, []*framework.PodInfo{ + framework.NewPodInfo(pod), + }, handler.nominator.nominatedReservePod["test-node-1"]) handler.OnAdd(pod) rInfo := handler.cache.getReservationInfoByUID(reservationUID) assert.Empty(t, rInfo.AssignedPods) + // pod not assigned, no need to delete reservation nominated node + assert.Equal(t, "test-node-1", handler.nominator.nominatedReservePodToNode[pod.UID]) + assert.Equal(t, []*framework.PodInfo{ + framework.NewPodInfo(pod), + }, handler.nominator.nominatedReservePod["test-node-1"]) newPod := pod.DeepCopy() apiext.SetReservationAllocated(newPod, reservation) handler.OnUpdate(pod, newPod) rInfo = handler.cache.getReservationInfoByUID(reservationUID) assert.Len(t, rInfo.AssignedPods, 0) + // pod not assigned, no need to delete reservation nominated node + assert.Equal(t, "test-node-1", handler.nominator.nominatedReservePodToNode[pod.UID]) + assert.Equal(t, []*framework.PodInfo{ + framework.NewPodInfo(pod), + }, handler.nominator.nominatedReservePod["test-node-1"]) newPod.Spec.NodeName = reservation.Status.NodeName handler.OnUpdate(pod, newPod) rInfo = handler.cache.getReservationInfoByUID(reservationUID) assert.Len(t, rInfo.AssignedPods, 1) + // pod assigned, delete reservation nominated node + assert.Equal(t, "", handler.nominator.nominatedReservePodToNode[pod.UID]) + assert.Equal(t, []*framework.PodInfo(nil), handler.nominator.nominatedReservePod["test-node-1"]) expectPodRequirement := &frameworkext.PodRequirement{ Name: pod.Name, @@ -100,9 +119,38 @@ func TestPodEventHandler(t *testing.T) { } assert.Equal(t, expectPodRequirement, rInfo.AssignedPods[pod.UID]) + handler.nominator.nominatedReservePod["test-node-1"] = []*framework.PodInfo{ + framework.NewPodInfo(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-1", + UID: "test-1", + }, + }), + } + handler.nominator.AddNominatedReservePod(framework.NewPodInfo(newPod), "test-node-1") + assert.Equal(t, "test-node-1", handler.nominator.nominatedReservePodToNode[newPod.UID]) + assert.Equal(t, []*framework.PodInfo{ + framework.NewPodInfo(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-1", + UID: "test-1", + }, + }), + framework.NewPodInfo(newPod), + }, handler.nominator.nominatedReservePod["test-node-1"]) + handler.OnDelete(newPod) rInfo = handler.cache.getReservationInfoByUID(reservationUID) assert.Empty(t, rInfo.AssignedPods) + assert.Equal(t, "", handler.nominator.nominatedReservePodToNode[newPod.UID]) + assert.Equal(t, []*framework.PodInfo{ + framework.NewPodInfo(&corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-1", + UID: "test-1", + }, + }), + }, handler.nominator.nominatedReservePod["test-node-1"]) } func TestPodEventHandlerWithOperatingPod(t *testing.T) { diff --git a/pkg/scheduler/plugins/reservation/transformer.go b/pkg/scheduler/plugins/reservation/transformer.go index e59eabd8d..033600e52 100644 --- a/pkg/scheduler/plugins/reservation/transformer.go +++ b/pkg/scheduler/plugins/reservation/transformer.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" quotav1 "k8s.io/apiserver/pkg/quota/v1" + schedulingcorev1 "k8s.io/component-helpers/scheduling/corev1" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" "k8s.io/klog/v2" resourceapi "k8s.io/kubernetes/pkg/api/v1/resource" @@ -411,3 +412,43 @@ func parseSpecificNodesFromAffinity(pod *corev1.Pod) (sets.String, *framework.St } return nodeNames, nil } + +func (pl *Plugin) BeforeFilter(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeInfo *framework.NodeInfo) (*corev1.Pod, *framework.NodeInfo, bool, *framework.Status) { + if !reservationutil.IsReservePod(pod) { + return pod, nodeInfo, false, nil + } + + nominatedReservationInfos := pl.nominator.NominatedReservePodForNode(nodeInfo.Node().Name) + if len(nominatedReservationInfos) == 0 { + return pod, nodeInfo, false, nil + } + + if nodeInfo.Node() == nil { + // This may happen only in tests. + return pod, nodeInfo, false, nil + } + + nodeInfoOut := nodeInfo.Clone() + + rName := reservationutil.GetReservationNameFromReservePod(pod) + _, err := pl.rLister.Get(rName) + if err != nil { + return pod, nodeInfo, false, framework.NewStatus(framework.Error, "reservation not found") + } + + for _, rInfo := range nominatedReservationInfos { + if schedulingcorev1.PodPriority(rInfo.Pod) >= schedulingcorev1.PodPriority(pod) && rInfo.Pod.UID != pod.UID { + pInfo := framework.NewPodInfo(rInfo.Pod) + nodeInfoOut.AddPodInfo(pInfo) + status := pl.handle.RunPreFilterExtensionAddPod(ctx, cycleState, pod, pInfo, nodeInfoOut) + if !status.IsSuccess() { + return pod, nodeInfo, false, status + } + klog.V(4).Infof("toschedule reservation: %s, added reservation: %s", + reservationutil.GetReservationNameFromReservePod(pod), + reservationutil.GetReservationNameFromReservePod(rInfo.Pod)) + } + } + + return pod, nodeInfoOut, true, nil +}