From c6e3e53f7ff3abb46fd69f700a457647b072196b Mon Sep 17 00:00:00 2001 From: Joseph Date: Fri, 26 Jan 2024 11:25:08 +0800 Subject: [PATCH] scheduler: fix the method of calculating desiredCount in DeviceShare ApplyForAll strategy (#1870) Signed-off-by: Joseph --- .../plugins/deviceshare/device_allocator.go | 5 +- .../deviceshare/devicehandler_default.go | 44 +++-- .../deviceshare/devicehandler_default_test.go | 184 ++++++++++++++++++ .../plugins/deviceshare/devicehandler_gpu.go | 7 +- 4 files changed, 221 insertions(+), 19 deletions(-) create mode 100644 pkg/scheduler/plugins/deviceshare/devicehandler_default_test.go diff --git a/pkg/scheduler/plugins/deviceshare/device_allocator.go b/pkg/scheduler/plugins/deviceshare/device_allocator.go index 0cdc31cb1..15c58c382 100644 --- a/pkg/scheduler/plugins/deviceshare/device_allocator.go +++ b/pkg/scheduler/plugins/deviceshare/device_allocator.go @@ -37,7 +37,7 @@ var deviceHandlers = map[schedulingv1alpha1.DeviceType]DeviceHandler{} var deviceAllocators = map[schedulingv1alpha1.DeviceType]DeviceAllocator{} type DeviceHandler interface { - CalcDesiredRequestsAndCount(node *corev1.Node, pod *corev1.Pod, podRequests corev1.ResourceList, totalDevices deviceResources, hint *apiext.DeviceHint) (corev1.ResourceList, int, *framework.Status) + CalcDesiredRequestsAndCount(node *corev1.Node, pod *corev1.Pod, podRequests corev1.ResourceList, nodeDevice *nodeDevice, hint *apiext.DeviceHint) (corev1.ResourceList, int, *framework.Status) } type DeviceAllocator interface { @@ -169,8 +169,7 @@ func (a *AutopilotAllocator) calcRequestsAndCountByDeviceType( if handler == nil { continue } - totalDevices := nodeDevice.deviceTotal[deviceType] - requests, desiredCount, status := handler.CalcDesiredRequestsAndCount(a.node, a.pod, requests, totalDevices, hints[deviceType]) + requests, desiredCount, status := handler.CalcDesiredRequestsAndCount(a.node, a.pod, requests, nodeDevice, hints[deviceType]) if !status.IsSuccess() { if status.Code() == framework.Skip { continue diff --git a/pkg/scheduler/plugins/deviceshare/devicehandler_default.go b/pkg/scheduler/plugins/deviceshare/devicehandler_default.go index d8b651484..3a80d3788 100644 --- a/pkg/scheduler/plugins/deviceshare/devicehandler_default.go +++ b/pkg/scheduler/plugins/deviceshare/devicehandler_default.go @@ -21,10 +21,12 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/scheduler/framework" apiext "github.com/koordinator-sh/koordinator/apis/extension" schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/util" ) func init() { @@ -39,7 +41,8 @@ type DefaultDeviceHandler struct { resourceName corev1.ResourceName } -func (h *DefaultDeviceHandler) CalcDesiredRequestsAndCount(node *corev1.Node, pod *corev1.Pod, podRequests corev1.ResourceList, totalDevices deviceResources, hint *apiext.DeviceHint) (corev1.ResourceList, int, *framework.Status) { +func (h *DefaultDeviceHandler) CalcDesiredRequestsAndCount(_ *corev1.Node, _ *corev1.Pod, podRequests corev1.ResourceList, nodeDevice *nodeDevice, hint *apiext.DeviceHint) (corev1.ResourceList, int, *framework.Status) { + totalDevices := nodeDevice.deviceTotal[h.deviceType] if len(totalDevices) == 0 { return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("Insufficient %s devices", h.deviceType)) } @@ -54,20 +57,35 @@ func (h *DefaultDeviceHandler) CalcDesiredRequestsAndCount(node *corev1.Node, po requests = corev1.ResourceList{ h.resourceName: *resource.NewQuantity(quantity.Value()/desiredCount, resource.DecimalSI), } - } else { - if hint != nil { - switch hint.AllocateStrategy { - case apiext.ApplyForAllDeviceAllocateStrategy: - desiredCount = int64(len(totalDevices)) - case apiext.RequestsAsCountAllocateStrategy: - desiredCount = quantity.Value() - desiredQuantity := 1 - if hint.ExclusivePolicy == apiext.DeviceLevelDeviceExclusivePolicy { - desiredQuantity = 100 + } else if hint != nil { + switch hint.AllocateStrategy { + case apiext.ApplyForAllDeviceAllocateStrategy: + desiredCount = int64(len(totalDevices)) + if hint.Selector != nil { + selector, err := util.GetFastLabelSelector(hint.Selector) + if err != nil { + return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("invalid Selector of DeviceHint, deviceType: %s, err: %s", h.deviceType, err.Error())) } - requests = corev1.ResourceList{ - h.resourceName: *resource.NewQuantity(int64(desiredQuantity), resource.DecimalSI), + + matched := 0 + for _, v := range nodeDevice.deviceInfos[h.deviceType] { + if selector.Matches(labels.Set(v.Labels)) { + matched++ + } } + desiredCount = int64(matched) + } + if desiredCount == 0 { + return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("Insufficient %s devices", h.deviceType)) + } + case apiext.RequestsAsCountAllocateStrategy: + desiredCount = quantity.Value() + desiredQuantity := 1 + if hint.ExclusivePolicy == apiext.DeviceLevelDeviceExclusivePolicy { + desiredQuantity = 100 + } + requests = corev1.ResourceList{ + h.resourceName: *resource.NewQuantity(int64(desiredQuantity), resource.DecimalSI), } } } diff --git a/pkg/scheduler/plugins/deviceshare/devicehandler_default_test.go b/pkg/scheduler/plugins/deviceshare/devicehandler_default_test.go new file mode 100644 index 000000000..22f24d35e --- /dev/null +++ b/pkg/scheduler/plugins/deviceshare/devicehandler_default_test.go @@ -0,0 +1,184 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package deviceshare + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + + apiext "github.com/koordinator-sh/koordinator/apis/extension" + schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" +) + +func TestDefaultDeviceHandler_CalcDesiredRequestsAndCount(t *testing.T) { + fakeDeviceCRCopy := fakeDeviceCR.DeepCopy() + fakeDeviceCRCopy.Spec.Devices = append(fakeDeviceCRCopy.Spec.Devices, schedulingv1alpha1.DeviceInfo{ + Type: schedulingv1alpha1.RDMA, + Labels: map[string]string{ + "type": "fakeS", + }, + UUID: "123456789", + Minor: pointer.Int32(5), + Health: true, + Resources: corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("100"), + }, + Topology: &schedulingv1alpha1.DeviceTopology{ + SocketID: 1, + NodeID: 1, + PCIEID: "4", + }, + }) + + cache := newNodeDeviceCache() + cache.updateNodeDevice(fakeDeviceCRCopy.Name, fakeDeviceCRCopy) + + resources := corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("100"), + } + tests := []struct { + name string + podRequests corev1.ResourceList + hint *apiext.DeviceHint + wantRequests corev1.ResourceList + wantDesiredCount int + wantStatusSuccess bool + }{ + { + name: "general request one NIC device", + podRequests: resources, + wantRequests: resources, + wantDesiredCount: 1, + wantStatusSuccess: true, + }, + { + name: "apply for all NIC devices that type is fakeW", + podRequests: corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("1"), + }, + hint: &apiext.DeviceHint{ + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"type": "fakeW"}}, + AllocateStrategy: apiext.ApplyForAllDeviceAllocateStrategy, + }, + wantRequests: corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("1"), + }, + wantDesiredCount: 4, + wantStatusSuccess: true, + }, + { + name: "apply for all NIC devices that type is fakeS", + podRequests: corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("1"), + }, + hint: &apiext.DeviceHint{ + Selector: &metav1.LabelSelector{MatchLabels: map[string]string{"type": "fakeS"}}, + AllocateStrategy: apiext.ApplyForAllDeviceAllocateStrategy, + }, + wantRequests: corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("1"), + }, + wantDesiredCount: 1, + wantStatusSuccess: true, + }, + { + name: "apply for all NIC devices", + podRequests: corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("1"), + }, + hint: &apiext.DeviceHint{ + Selector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "type", + Operator: metav1.LabelSelectorOpExists, + }, + }}, + AllocateStrategy: apiext.ApplyForAllDeviceAllocateStrategy, + }, + wantRequests: corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("1"), + }, + wantDesiredCount: 5, + wantStatusSuccess: true, + }, + { + name: "apply for all unmatched NIC devices", + podRequests: corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("1"), + }, + hint: &apiext.DeviceHint{ + Selector: &metav1.LabelSelector{MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "non-exists-label", + Operator: metav1.LabelSelectorOpExists, + }, + }}, + AllocateStrategy: apiext.ApplyForAllDeviceAllocateStrategy, + }, + wantRequests: nil, + wantDesiredCount: 0, + wantStatusSuccess: false, + }, + { + name: "request as count", + podRequests: corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("4"), + }, + hint: &apiext.DeviceHint{ + AllocateStrategy: apiext.RequestsAsCountAllocateStrategy, + }, + wantRequests: corev1.ResourceList{ + apiext.ResourceRDMA: *resource.NewQuantity(1, resource.DecimalSI), + }, + wantDesiredCount: 4, + wantStatusSuccess: true, + }, + { + name: "request as count and exclusive", + podRequests: corev1.ResourceList{ + apiext.ResourceRDMA: resource.MustParse("4"), + }, + hint: &apiext.DeviceHint{ + AllocateStrategy: apiext.RequestsAsCountAllocateStrategy, + ExclusivePolicy: apiext.DeviceLevelDeviceExclusivePolicy, + }, + wantRequests: corev1.ResourceList{ + apiext.ResourceRDMA: *resource.NewQuantity(100, resource.DecimalSI), + }, + wantDesiredCount: 4, + wantStatusSuccess: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler := &DefaultDeviceHandler{ + deviceType: schedulingv1alpha1.RDMA, + resourceName: apiext.ResourceRDMA, + } + nodeDevice := cache.getNodeDevice(fakeDeviceCRCopy.Name, false) + requests, desiredCount, status := handler.CalcDesiredRequestsAndCount(nil, nil, tt.podRequests, nodeDevice, tt.hint) + assert.Equal(t, tt.wantRequests, requests) + assert.Equal(t, tt.wantDesiredCount, desiredCount) + assert.Equal(t, tt.wantStatusSuccess, status.IsSuccess()) + }) + } +} diff --git a/pkg/scheduler/plugins/deviceshare/devicehandler_gpu.go b/pkg/scheduler/plugins/deviceshare/devicehandler_gpu.go index 6a10b4c3d..eb50b4fd2 100644 --- a/pkg/scheduler/plugins/deviceshare/devicehandler_gpu.go +++ b/pkg/scheduler/plugins/deviceshare/devicehandler_gpu.go @@ -37,13 +37,14 @@ var _ DeviceHandler = &GPUHandler{} type GPUHandler struct { } -func (h *GPUHandler) CalcDesiredRequestsAndCount(node *corev1.Node, pod *corev1.Pod, podRequests corev1.ResourceList, totalDevices deviceResources, hint *apiext.DeviceHint) (corev1.ResourceList, int, *framework.Status) { - if len(totalDevices) == 0 { +func (h *GPUHandler) CalcDesiredRequestsAndCount(node *corev1.Node, pod *corev1.Pod, podRequests corev1.ResourceList, nodeDevice *nodeDevice, hint *apiext.DeviceHint) (corev1.ResourceList, int, *framework.Status) { + totalDevice := nodeDevice.deviceTotal[schedulingv1alpha1.GPU] + if len(totalDevice) == 0 { return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("Insufficient %s devices", schedulingv1alpha1.GPU)) } podRequests = podRequests.DeepCopy() - if err := fillGPUTotalMem(totalDevices, podRequests); err != nil { + if err := fillGPUTotalMem(totalDevice, podRequests); err != nil { return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) }