From e374ba6363a1ba80a79125f58258020488341715 Mon Sep 17 00:00:00 2001 From: "xingbao.zy" Date: Mon, 18 Mar 2024 17:37:31 +0800 Subject: [PATCH] scheduler : fix deviceshare plugin of add\remove pod (#1959) Signed-off-by: xingbao.zy --- .../plugins/deviceshare/device_resources.go | 15 ++ .../deviceshare/device_resources_test.go | 16 ++ .../plugins/deviceshare/nominator.go | 69 +++++ .../plugins/deviceshare/nominator_test.go | 52 ++++ pkg/scheduler/plugins/deviceshare/plugin.go | 130 +++++++--- .../plugins/deviceshare/plugin_test.go | 239 +++++++++++++++++- 6 files changed, 487 insertions(+), 34 deletions(-) create mode 100644 pkg/scheduler/plugins/deviceshare/nominator.go create mode 100644 pkg/scheduler/plugins/deviceshare/nominator_test.go diff --git a/pkg/scheduler/plugins/deviceshare/device_resources.go b/pkg/scheduler/plugins/deviceshare/device_resources.go index 8eae69465..ee107be76 100644 --- a/pkg/scheduler/plugins/deviceshare/device_resources.go +++ b/pkg/scheduler/plugins/deviceshare/device_resources.go @@ -19,6 +19,8 @@ package deviceshare import ( "sort" + apiext "github.com/koordinator-sh/koordinator/apis/extension" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" quotav1 "k8s.io/apiserver/pkg/quota/v1" @@ -33,6 +35,19 @@ import ( // "1": {koordinator.sh/gpu-core:100, koordinator.sh/gpu-memory-ratio:100, koordinator.sh/gpu-memory: 16GB} type deviceResources map[int]corev1.ResourceList +func TransDeviceAllocationsToDeviceResources(allocation apiext.DeviceAllocations) map[schedulingv1alpha1.DeviceType]deviceResources { + result := make(map[schedulingv1alpha1.DeviceType]deviceResources) + + for deviceType, deviceDetails := range allocation { + result[deviceType] = make(deviceResources) + for _, deviceDetail := range deviceDetails { + result[deviceType][int(deviceDetail.Minor)] = deviceDetail.Resources.DeepCopy() + } + } + + return result +} + func (r deviceResources) DeepCopy() deviceResources { if r == nil { return nil diff --git a/pkg/scheduler/plugins/deviceshare/device_resources_test.go b/pkg/scheduler/plugins/deviceshare/device_resources_test.go index 8d0f74553..9e9c30dea 100644 --- a/pkg/scheduler/plugins/deviceshare/device_resources_test.go +++ b/pkg/scheduler/plugins/deviceshare/device_resources_test.go @@ -19,6 +19,8 @@ package deviceshare import ( "testing" + corev1 "k8s.io/api/core/v1" + "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/sets" @@ -27,6 +29,20 @@ import ( schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" ) +func TestTransDeviceAllocationsToDeviceResources(t *testing.T) { + allocation := make(apiext.DeviceAllocations) + allocation[schedulingv1alpha1.GPU] = append(allocation[schedulingv1alpha1.GPU], &apiext.DeviceAllocation{ + Minor: 0, + Resources: corev1.ResourceList{ + apiext.ResourceGPUCore: resource.MustParse("100"), + }, + }) + + result := TransDeviceAllocationsToDeviceResources(allocation) + res := result[schedulingv1alpha1.GPU][0][apiext.ResourceGPUCore] + assert.Equal(t, "100", res.String()) +} + func Test_sortDeviceResourcesByMinor(t *testing.T) { tests := []struct { name string diff --git a/pkg/scheduler/plugins/deviceshare/nominator.go b/pkg/scheduler/plugins/deviceshare/nominator.go new file mode 100644 index 000000000..fe71318f1 --- /dev/null +++ b/pkg/scheduler/plugins/deviceshare/nominator.go @@ -0,0 +1,69 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceshare + +import ( + "sync" + + corev1 "k8s.io/api/core/v1" + + schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" +) + +type Nominator struct { + sync.Mutex + nominateMap map[string]map[schedulingv1alpha1.DeviceType]deviceResources +} + +func NewNominator() *Nominator { + return &Nominator{ + nominateMap: make(map[string]map[schedulingv1alpha1.DeviceType]deviceResources), + } +} + +func (nominator *Nominator) AddPod(pod *corev1.Pod, used map[schedulingv1alpha1.DeviceType]deviceResources) { + nominator.Lock() + defer nominator.Unlock() + + podNamespacedName := pod.Namespace + "/" + pod.Name + nominator.nominateMap[podNamespacedName] = used +} + +func (nominator *Nominator) RemovePod(pod *corev1.Pod) { + nominator.Lock() + defer nominator.Unlock() + + podNamespacedName := pod.Namespace + "/" + pod.Name + delete(nominator.nominateMap, podNamespacedName) +} + +func (nominator *Nominator) GetPodAllocated(pod *corev1.Pod) map[schedulingv1alpha1.DeviceType]deviceResources { + nominator.Lock() + defer nominator.Unlock() + + podNamespacedName := pod.Namespace + "/" + pod.Name + return nominator.nominateMap[podNamespacedName] +} + +func (nominator *Nominator) IsPodExist(pod *corev1.Pod) bool { + nominator.Lock() + defer nominator.Unlock() + + podNamespacedName := pod.Namespace + "/" + pod.Name + _, exist := nominator.nominateMap[podNamespacedName] + return exist +} diff --git a/pkg/scheduler/plugins/deviceshare/nominator_test.go b/pkg/scheduler/plugins/deviceshare/nominator_test.go new file mode 100644 index 000000000..05a3546ab --- /dev/null +++ b/pkg/scheduler/plugins/deviceshare/nominator_test.go @@ -0,0 +1,52 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceshare + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + + "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" +) + +func TestNominator(t *testing.T) { + nominator := NewNominator() + assert.Equal(t, 0, len(nominator.nominateMap)) + + pod := &corev1.Pod{} + pod.Namespace = "test" + pod.Name = "job1" + + used := make(map[v1alpha1.DeviceType]deviceResources) + used[v1alpha1.GPU] = make(deviceResources) + used[v1alpha1.GPU][0] = corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("10"), + } + + nominator.AddPod(pod, used) + assert.Equal(t, 1, len(nominator.nominateMap)) + used = nominator.GetPodAllocated(pod) + usedCPU := used[v1alpha1.GPU][0][corev1.ResourceCPU] + assert.Equal(t, usedCPU.String(), "10") + assert.Equal(t, true, nominator.IsPodExist(pod)) + + nominator.RemovePod(pod) + assert.Equal(t, 0, len(nominator.nominateMap)) +} diff --git a/pkg/scheduler/plugins/deviceshare/plugin.go b/pkg/scheduler/plugins/deviceshare/plugin.go index c388820eb..361a8c2fa 100644 --- a/pkg/scheduler/plugins/deviceshare/plugin.go +++ b/pkg/scheduler/plugins/deviceshare/plugin.go @@ -70,6 +70,7 @@ type Plugin struct { handle frameworkext.ExtendedHandle nodeDeviceCache *nodeDeviceCache scorer *resourceAllocationScorer + nominator *Nominator } type preFilterState struct { @@ -178,12 +179,40 @@ func (p *Plugin) AddPod(ctx context.Context, cycleState *framework.CycleState, p nd.lock.RLock() defer nd.lock.RUnlock() - podAllocated := nd.getUsed(podInfoToAdd.Pod.Namespace, podInfoToAdd.Pod.Name) + var podAllocated map[schedulingv1alpha1.DeviceType]deviceResources + podAllocated = nd.getUsed(podInfoToAdd.Pod.Namespace, podInfoToAdd.Pod.Name) + + //in generally, when we execute AddPod logic here, the pod may be in scheduling status, + //it won't exist in nodeDeviceCache's used map, so there is a bug that when the framework execute + //RunFilterPluginsWithNominatedPods with AddPod for high priority pods, the plugin can't reserve resource for + //these high priority pods, In RDMA\VF scenario, it can cause high priority pods assign fail + //due to some resources is assigned by low priority pods. So we reused the "Reserve" logic to generate an assign + //placement and save it in nominator. We will clear the nominator cache In "Reserve" and "UnReserve", which means + //we will do clean job no matter assign success or not, this is the same process of the origin k8s framework + //nominate process. + if len(podAllocated) == 0 && p.nominator != nil { + if p.nominator.IsPodExist(podInfoToAdd.Pod) { + podAllocated = p.nominator.GetPodAllocated(podInfoToAdd.Pod) + } else { + assignFunc := func(state *preFilterState, nodeDeviceInfo *nodeDevice, pod *corev1.Pod, assignResult apiext.DeviceAllocations) { + //do nothing + } + result, _ := p.reserveInternal(ctx, cycleState, state, podInfoToAdd.Pod, nodeInfo, nd, assignFunc) + podAllocated = TransDeviceAllocationsToDeviceResources(result) + + p.nominator.AddPod(podInfoToAdd.Pod, podAllocated) + } + } + if len(podAllocated) == 0 { return nil } - rInfo := reservation.GetReservationCache().GetReservationInfoByPod(podInfoToAdd.Pod, node.Name) + var rInfo *frameworkext.ReservationInfo + if reservation.GetReservationCache() != nil { + rInfo = reservation.GetReservationCache().GetReservationInfoByPod(podInfoToAdd.Pod, node.Name) + } + if rInfo == nil { nominator := p.handle.GetReservationNominator() if nominator != nil { @@ -233,12 +262,25 @@ func (p *Plugin) RemovePod(ctx context.Context, cycleState *framework.CycleState nd.lock.RLock() defer nd.lock.RUnlock() - podAllocated := nd.getUsed(podInfoToRemove.Pod.Namespace, podInfoToRemove.Pod.Name) + var podAllocated map[schedulingv1alpha1.DeviceType]deviceResources + podAllocated = nd.getUsed(podInfoToRemove.Pod.Namespace, podInfoToRemove.Pod.Name) + + if len(podAllocated) == 0 && p.nominator != nil { + if p.nominator.IsPodExist(podInfoToRemove.Pod) { + podAllocated = p.nominator.GetPodAllocated(podInfoToRemove.Pod) + p.nominator.RemovePod(podInfoToRemove.Pod) + } + } + if len(podAllocated) == 0 { return nil } - rInfo := reservation.GetReservationCache().GetReservationInfoByPod(podInfoToRemove.Pod, node.Name) + var rInfo *frameworkext.ReservationInfo + if reservation.GetReservationCache() != nil { + rInfo = reservation.GetReservationCache().GetReservationInfoByPod(podInfoToRemove.Pod, node.Name) + } + if rInfo == nil { nominator := p.handle.GetReservationNominator() if nominator != nil { @@ -374,25 +416,8 @@ func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.Cy return status } -func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { - state, status := getPreFilterState(cycleState) - if !status.IsSuccess() { - return status - } - if state.skip { - return nil - } - - nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) - if err != nil { - return framework.AsStatus(err) - } - - nodeDeviceInfo := p.nodeDeviceCache.getNodeDevice(nodeName, false) - if nodeDeviceInfo == nil { - return nil - } - +func (p *Plugin) reserveInternal(ctx context.Context, cycleState *framework.CycleState, state *preFilterState, pod *corev1.Pod, + nodeInfo *framework.NodeInfo, nodeDeviceInfo *nodeDevice, assignFunc func(state *preFilterState, nodeDeviceInfo *nodeDevice, pod *corev1.Pod, assignResult apiext.DeviceAllocations)) (apiext.DeviceAllocations, *framework.Status) { store := topologymanager.GetStore(cycleState) affinity := store.GetAffinity(nodeInfo.Node().Name) @@ -406,30 +431,68 @@ func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, } reservationRestoreState := getReservationRestoreState(cycleState) - restoreState := reservationRestoreState.getNodeState(nodeName) - preemptible := appendAllocated(nil, restoreState.mergedUnmatchedUsed, state.preemptibleDevices[nodeName]) - - nodeDeviceInfo.lock.Lock() - defer nodeDeviceInfo.lock.Unlock() + restoreState := reservationRestoreState.getNodeState(nodeInfo.Node().Name) + preemptible := appendAllocated(nil, restoreState.mergedUnmatchedUsed, state.preemptibleDevices[nodeInfo.Node().Name]) result, status := p.allocateWithNominatedReservation( allocator, cycleState, state, restoreState, nodeInfo.Node(), pod, preemptible) if !status.IsSuccess() { - return status + return nil, status } if len(result) == 0 { preemptible = appendAllocated(preemptible, restoreState.mergedMatchedAllocatable) result, status = allocator.Allocate(nil, nil, nil, preemptible) if !status.IsSuccess() { - return status + return nil, status } } - nodeDeviceInfo.updateCacheUsed(result, pod, true) - state.allocationResult = result - return nil + + assignFunc(state, nodeDeviceInfo, pod, result) + + return result, nil +} + +func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { + if p.nominator != nil && p.nominator.IsPodExist(pod) { + p.nominator.RemovePod(pod) + } + + state, status := getPreFilterState(cycleState) + if !status.IsSuccess() { + return status + } + if state.skip { + return nil + } + + nodeInfo, err := p.handle.SnapshotSharedLister().NodeInfos().Get(nodeName) + if err != nil { + return framework.AsStatus(err) + } + + nodeDeviceInfo := p.nodeDeviceCache.getNodeDevice(nodeName, false) + if nodeDeviceInfo == nil { + return nil + } + + nodeDeviceInfo.lock.Lock() + defer nodeDeviceInfo.lock.Unlock() + + assignFunc := func(state *preFilterState, nodeDeviceInfo *nodeDevice, pod *corev1.Pod, assignResult apiext.DeviceAllocations) { + nodeDeviceInfo.updateCacheUsed(assignResult, pod, true) + state.allocationResult = assignResult + } + + _, status = p.reserveInternal(ctx, cycleState, state, pod, nodeInfo, nodeDeviceInfo, assignFunc) + + return status } func (p *Plugin) Unreserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) { + if p.nominator != nil && p.nominator.IsPodExist(pod) { + p.nominator.RemovePod(pod) + } + state, status := getPreFilterState(cycleState) if !status.IsSuccess() { return @@ -562,5 +625,6 @@ func New(obj runtime.Object, handle framework.Handle) (framework.Plugin, error) handle: extendedHandle, nodeDeviceCache: deviceCache, scorer: scorePlugin(args), + nominator: NewNominator(), }, nil } diff --git a/pkg/scheduler/plugins/deviceshare/plugin_test.go b/pkg/scheduler/plugins/deviceshare/plugin_test.go index f27160d42..15a7d7622 100644 --- a/pkg/scheduler/plugins/deviceshare/plugin_test.go +++ b/pkg/scheduler/plugins/deviceshare/plugin_test.go @@ -2046,6 +2046,233 @@ func Test_Plugin_FilterReservation(t *testing.T) { assert.Equal(t, framework.NewStatus(framework.Unschedulable, "node(s) no reservation(s) to meet the device requirements"), status) } +func Test_Plugin_AddPod_RemovePod_With_Nominator(t *testing.T) { + type args struct { + nodeDeviceCache *nodeDeviceCache + state *preFilterState + reserved apiext.DeviceAllocations + pod *corev1.Pod + nodeName string + } + type wants struct { + allocationResult apiext.DeviceAllocations + status *framework.Status + } + tests := []struct { + name string + args args + wants wants + }{ + { + name: "sufficient device resource 1", + args: args{ + nodeDeviceCache: &nodeDeviceCache{ + nodeDeviceInfos: map[string]*nodeDevice{ + "test-node": { + deviceFree: map[schedulingv1alpha1.DeviceType]deviceResources{ + schedulingv1alpha1.RDMA: { + 0: corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("100"), + }, + }, + schedulingv1alpha1.FPGA: { + 0: corev1.ResourceList{ + apiext.ResourceFPGA: resource.MustParse("100"), + }, + }, + schedulingv1alpha1.GPU: { + 0: corev1.ResourceList{ + apiext.ResourceGPUCore: resource.MustParse("100"), + apiext.ResourceGPUMemoryRatio: resource.MustParse("100"), + apiext.ResourceGPUMemory: resource.MustParse("16Gi"), + }, + }, + }, + deviceTotal: map[schedulingv1alpha1.DeviceType]deviceResources{ + schedulingv1alpha1.RDMA: { + 0: corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("100"), + }, + }, + schedulingv1alpha1.FPGA: { + 0: corev1.ResourceList{ + apiext.ResourceFPGA: resource.MustParse("100"), + }, + }, + schedulingv1alpha1.GPU: { + 0: corev1.ResourceList{ + apiext.ResourceGPUCore: resource.MustParse("100"), + apiext.ResourceGPUMemoryRatio: resource.MustParse("100"), + apiext.ResourceGPUMemory: resource.MustParse("16Gi"), + }, + }, + }, + deviceUsed: map[schedulingv1alpha1.DeviceType]deviceResources{}, + allocateSet: make(map[schedulingv1alpha1.DeviceType]map[types.NamespacedName]deviceResources), + vfAllocations: map[schedulingv1alpha1.DeviceType]*VFAllocation{}, + numaTopology: &NUMATopology{}, + deviceInfos: map[schedulingv1alpha1.DeviceType][]*schedulingv1alpha1.DeviceInfo{ + schedulingv1alpha1.GPU: { + { + Type: schedulingv1alpha1.GPU, + Health: true, + UUID: "123456-1", + Minor: pointer.Int32(0), + }, + }, + schedulingv1alpha1.RDMA: { + { + Type: schedulingv1alpha1.RDMA, + Health: true, + UUID: "123456-1", + Minor: pointer.Int32(0), + }, + }, + schedulingv1alpha1.FPGA: { + { + Type: schedulingv1alpha1.FPGA, + Health: true, + UUID: "123456-1", + Minor: pointer.Int32(0), + }, + }, + }, + }, + }, + }, + pod: &corev1.Pod{}, + state: &preFilterState{ + skip: false, + podRequests: map[schedulingv1alpha1.DeviceType]corev1.ResourceList{ + schedulingv1alpha1.GPU: { + apiext.ResourceGPUCore: resource.MustParse("100"), + apiext.ResourceGPUMemoryRatio: resource.MustParse("100"), + }, + schedulingv1alpha1.RDMA: { + apiext.ResourceRDMA: resource.MustParse("100"), + }, + schedulingv1alpha1.FPGA: { + apiext.ResourceFPGA: resource.MustParse("100"), + }, + }, + }, + nodeName: "test-node", + }, + wants: wants{ + allocationResult: apiext.DeviceAllocations{ + schedulingv1alpha1.GPU: { + { + Minor: 0, + Resources: corev1.ResourceList{ + apiext.ResourceGPUCore: resource.MustParse("100"), + apiext.ResourceGPUMemoryRatio: resource.MustParse("100"), + apiext.ResourceGPUMemory: resource.MustParse("16Gi"), + }, + }, + }, + schedulingv1alpha1.FPGA: { + { + Minor: 0, + Resources: corev1.ResourceList{ + apiext.ResourceFPGA: resource.MustParse("100"), + }, + }, + }, + schedulingv1alpha1.RDMA: { + { + Minor: 0, + Resources: corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("100"), + }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node", + }, + } + suit := newPluginTestSuit(t, []*corev1.Node{node}) + p, err := suit.proxyNew(getDefaultArgs(), suit.Framework) + assert.NoError(t, err) + pl := p.(*Plugin) + pl.nominator = NewNominator() + pl.nodeDeviceCache = tt.args.nodeDeviceCache + cycleState := framework.NewCycleState() + if tt.args.state != nil { + cycleState.Write(stateKey, tt.args.state) + } + + if len(tt.args.reserved) > 0 { + reservation := &schedulingv1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + Name: "reservation-1", + }, + Spec: schedulingv1alpha1.ReservationSpec{ + Template: &corev1.PodTemplateSpec{}, + }, + Status: schedulingv1alpha1.ReservationStatus{ + NodeName: "test-node", + }, + } + err := apiext.SetDeviceAllocations(reservation, tt.args.reserved) + assert.NoError(t, err) + + tt.args.nodeDeviceCache.updatePod(nil, reservationutil.NewReservePod(reservation)) + + namespacedName := reservationutil.GetReservePodNamespacedName(reservation) + allocatable := tt.args.nodeDeviceCache.getNodeDevice("test-node", false).getUsed(namespacedName.Namespace, namespacedName.Name) + + restoreState := &reservationRestoreStateData{ + skip: false, + nodeToState: frameworkext.NodeReservationRestoreStates{ + "test-node": &nodeReservationRestoreStateData{ + mergedMatchedAllocatable: allocatable, + matched: []reservationAlloc{ + { + rInfo: frameworkext.NewReservationInfo(reservation), + allocatable: allocatable, + remained: allocatable, + }, + }, + }, + }, + } + cycleState.Write(reservationRestoreStateKey, restoreState) + rInfo := frameworkext.NewReservationInfo(reservation) + pl.handle.GetReservationNominator().AddNominatedReservation(tt.args.pod, "test-node", rInfo) + } + + reservation.SetReservationCache(&fakeReservationCache{}) + + nodeInfo := framework.NewNodeInfo() + nodeInfo.SetNode(node) + + tt.args.state.preemptibleDevices = make(map[string]map[schedulingv1alpha1.DeviceType]deviceResources) + cycleState.Write(stateKey, tt.args.state) + pl.AddPod(context.TODO(), cycleState, nil, framework.NewPodInfo(tt.args.pod), nodeInfo) + + assert.Equal(t, 1, len(pl.nominator.nominateMap)) + used := pl.nominator.GetPodAllocated(tt.args.pod) + gpuUsed := used[schedulingv1alpha1.GPU][0][apiext.ResourceGPUCore] + assert.Equal(t, "100", gpuUsed.String()) + gpuUsed = tt.args.state.preemptibleDevices[node.Name][schedulingv1alpha1.GPU][0][apiext.ResourceGPUCore] + assert.Equal(t, "-100", gpuUsed.String()) + + pl.RemovePod(context.TODO(), cycleState, nil, framework.NewPodInfo(tt.args.pod), nodeInfo) + assert.Equal(t, 0, len(pl.nominator.nominateMap)) + gpuUsed = tt.args.state.preemptibleDevices[node.Name][schedulingv1alpha1.GPU][0][apiext.ResourceGPUCore] + assert.Equal(t, "0", gpuUsed.String()) + }) + } +} + func Test_Plugin_Reserve(t *testing.T) { type args struct { nodeDeviceCache *nodeDeviceCache @@ -3019,6 +3246,7 @@ func Test_Plugin_Reserve(t *testing.T) { p, err := suit.proxyNew(getDefaultArgs(), suit.Framework) assert.NoError(t, err) pl := p.(*Plugin) + pl.nominator = NewNominator() pl.nodeDeviceCache = tt.args.nodeDeviceCache cycleState := framework.NewCycleState() if tt.args.state != nil { @@ -3066,7 +3294,11 @@ func Test_Plugin_Reserve(t *testing.T) { pl.handle.GetReservationNominator().AddNominatedReservation(tt.args.pod, "test-node", rInfo) } + pl.nominator.AddPod(tt.args.pod, nil) + assert.Equal(t, 1, len(pl.nominator.nominateMap)) + status := pl.Reserve(context.TODO(), cycleState, tt.args.pod, tt.args.nodeName) + assert.Equal(t, 0, len(pl.nominator.nominateMap)) assert.Equal(t, tt.wants.status.Code(), status.Code()) assert.True(t, strings.Contains(status.Message(), tt.wants.status.Message())) if tt.wants.allocationResult != nil { @@ -3400,12 +3632,17 @@ func Test_Plugin_Unreserve(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p := &Plugin{nodeDeviceCache: tt.args.nodeDeviceCache} + p := &Plugin{nodeDeviceCache: tt.args.nodeDeviceCache, nominator: NewNominator()} cycleState := framework.NewCycleState() if tt.args.state != nil { cycleState.Write(stateKey, tt.args.state) } + + p.nominator.AddPod(tt.args.pod, nil) + assert.Equal(t, 1, len(p.nominator.nominateMap)) + p.Unreserve(context.TODO(), cycleState, tt.args.pod, "test-node") + assert.Equal(t, 0, len(p.nominator.nominateMap)) if tt.changed { assert.Empty(t, tt.args.state.allocationResult) stateCmpOpts := []cmp.Option{