diff --git a/pkg/slo-controller/noderesource/noderesource_controller.go b/pkg/slo-controller/noderesource/noderesource_controller.go index da978c2d6..b0d9c002d 100644 --- a/pkg/slo-controller/noderesource/noderesource_controller.go +++ b/pkg/slo-controller/noderesource/noderesource_controller.go @@ -34,7 +34,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/source" - schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1" "github.com/koordinator-sh/koordinator/pkg/slo-controller/config" "github.com/koordinator-sh/koordinator/pkg/slo-controller/metrics" @@ -123,7 +122,7 @@ func (r *NodeResourceReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{Requeue: true}, err } - // do other node updates. e.g. update device resources + // do other node updates. if err := r.updateNodeExtensions(node, nodeMetric, podList); err != nil { klog.ErrorS(err, "failed to update node extensions for node", "node", node.Name) return ctrl.Result{Requeue: true}, err @@ -187,8 +186,7 @@ func (r *NodeResourceReconciler) SetupWithManager(mgr ctrl.Manager) error { Named(Name). // avoid conflict with others reconciling `Node` For(&corev1.Node{}). Watches(&source.Kind{Type: &slov1alpha1.NodeMetric{}}, &EnqueueRequestForNodeMetric{syncContext: r.NodeSyncContext}). - Watches(&source.Kind{Type: &corev1.ConfigMap{}}, cfgHandler). - Watches(&source.Kind{Type: &schedulingv1alpha1.Device{}}, &EnqueueRequestForDevice{syncContext: r.GPUSyncContext}) + Watches(&source.Kind{Type: &corev1.ConfigMap{}}, cfgHandler) // setup plugins // allow plugins to mutate controller via the builder diff --git a/pkg/slo-controller/noderesource/plugins/batchresource/plugin.go b/pkg/slo-controller/noderesource/plugins/batchresource/plugin.go index f3ae5882a..68db3167a 100644 --- a/pkg/slo-controller/noderesource/plugins/batchresource/plugin.go +++ b/pkg/slo-controller/noderesource/plugins/batchresource/plugin.go @@ -26,11 +26,11 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/sets" quotav1 "k8s.io/apiserver/pkg/quota/v1" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" + "k8s.io/utils/clock" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/source" @@ -50,7 +50,7 @@ var ( ) var ( - Clock clock.Clock = clock.RealClock{} // for testing + Clock clock.WithTickerAndDelayedExecution = clock.RealClock{} // for testing client ctrlclient.Client nrtHandler *NRTHandler nrtSyncContext *framework.SyncContext diff --git a/pkg/slo-controller/noderesource/plugins/gpudeviceresource/device_event_handler.go b/pkg/slo-controller/noderesource/plugins/gpudeviceresource/device_event_handler.go new file mode 100644 index 000000000..e48075185 --- /dev/null +++ b/pkg/slo-controller/noderesource/plugins/gpudeviceresource/device_event_handler.go @@ -0,0 +1,70 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gpudeviceresource + +import ( + "reflect" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" +) + +var _ handler.EventHandler = &DeviceHandler{} + +type DeviceHandler struct{} + +func (d *DeviceHandler) Create(e event.CreateEvent, q workqueue.RateLimitingInterface) { + device := e.Object.(*schedulingv1alpha1.Device) + q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: device.Name, + }, + }) +} + +func (d *DeviceHandler) Update(e event.UpdateEvent, q workqueue.RateLimitingInterface) { + newDevice := e.ObjectNew.(*schedulingv1alpha1.Device) + oldDevice := e.ObjectOld.(*schedulingv1alpha1.Device) + if reflect.DeepEqual(newDevice.Spec, oldDevice.Spec) { + return + } + q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: newDevice.Name, + }, + }) +} + +func (d *DeviceHandler) Delete(e event.DeleteEvent, q workqueue.RateLimitingInterface) { + device, ok := e.Object.(*schedulingv1alpha1.Device) + if !ok { + return + } + q.Add(reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: device.Name, + }, + }) +} + +func (d *DeviceHandler) Generic(e event.GenericEvent, q workqueue.RateLimitingInterface) { +} diff --git a/pkg/slo-controller/noderesource/plugins/gpudeviceresource/plugin.go b/pkg/slo-controller/noderesource/plugins/gpudeviceresource/plugin.go new file mode 100644 index 000000000..0f826ac0d --- /dev/null +++ b/pkg/slo-controller/noderesource/plugins/gpudeviceresource/plugin.go @@ -0,0 +1,226 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gpudeviceresource + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/koordinator-sh/koordinator/apis/configuration" + "github.com/koordinator-sh/koordinator/apis/extension" + schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource/framework" + "github.com/koordinator-sh/koordinator/pkg/util" +) + +const PluginName = "GPUDeviceResource" + +const ( + ResetResourcesMsg = "reset node gpu resources" + UpdateResourcesMsg = "node gpu resources from device" + UpdateLabelsMsg = "node gpu labels from device" +) + +var ( + ResourceNames = []corev1.ResourceName{ + extension.ResourceGPU, + extension.ResourceGPUCore, + extension.ResourceGPUMemory, + extension.ResourceGPUMemoryRatio, + } + + client ctrlclient.Client +) + +type Plugin struct{} + +func (p *Plugin) Name() string { + return PluginName +} + +// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch +// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;patch +// +kubebuilder:rbac:groups=core,resources=nodes/status,verbs=get;update;patch +// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch +// +kubebuilder:rbac:groups=scheduling.koordinator.sh,resources=devices,verbs=get;list;watch +// +kubebuilder:rbac:groups=topology.node.k8s.io,resources=noderesourcetopologies,verbs=get;list;watch;create;update + +func (p *Plugin) Setup(opt *framework.Option) error { + client = opt.Client + + // schedulingv1alpha1.AddToScheme(opt.Scheme) + opt.Builder = opt.Builder.Watches(&source.Kind{Type: &schedulingv1alpha1.Device{}}, &DeviceHandler{}) + + return nil +} + +func (p *Plugin) NeedSync(strategy *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) (bool, string) { + for _, resourceName := range ResourceNames { + if util.IsResourceDiff(oldNode.Status.Allocatable, newNode.Status.Allocatable, resourceName, + *strategy.ResourceDiffThreshold) { + klog.V(4).Infof("node %s resource %s diff bigger than %v, need sync", + newNode.Name, resourceName, *strategy.ResourceDiffThreshold) + return true, "gpu resource diff is big than threshold" + } + } + + return false, "" +} + +func (p *Plugin) NeedSyncMeta(_ *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) (bool, string) { + if oldNode.Labels[extension.LabelGPUModel] != newNode.Labels[extension.LabelGPUModel] { + klog.V(4).InfoS("need sync node metadata since gpu model change", "node", newNode.Name, + "old", oldNode.Labels[extension.LabelGPUModel], "new", newNode.Labels[extension.LabelGPUModel]) + return true, "gpu model changed" + } + if oldNode.Labels[extension.LabelGPUDriverVersion] != newNode.Labels[extension.LabelGPUDriverVersion] { + klog.V(4).InfoS("need sync node metadata since gpu driver version change", "node", newNode.Name, + "old", oldNode.Labels[extension.LabelGPUDriverVersion], "new", newNode.Labels[extension.LabelGPUDriverVersion]) + return true, "gpu driver version changed" + } + + return false, "" +} + +func (p *Plugin) Prepare(_ *configuration.ColocationStrategy, node *corev1.Node, nr *framework.NodeResource) error { + // prepare node resources + for _, resourceName := range ResourceNames { + q := nr.Resources[resourceName] + // FIXME: shall we remove the resource when some of resources is missing + if q == nil || nr.Resets[resourceName] { + delete(node.Status.Allocatable, resourceName) + delete(node.Status.Capacity, resourceName) + continue + } + + node.Status.Allocatable[resourceName] = *q + node.Status.Capacity[resourceName] = *q + } + + // prepare node labels + // FIXME: do we need reset labels + if nr.Labels == nil { + delete(node.Labels, extension.LabelGPUModel) + delete(node.Labels, extension.LabelGPUDriverVersion) + } else { + if _, ok := nr.Labels[extension.LabelGPUModel]; ok { + node.Labels[extension.LabelGPUModel] = nr.Labels[extension.LabelGPUModel] + } else { + delete(node.Labels, extension.LabelGPUModel) + } + if _, ok := nr.Labels[extension.LabelGPUDriverVersion]; ok { + node.Labels[extension.LabelGPUDriverVersion] = nr.Labels[extension.LabelGPUDriverVersion] + } else { + delete(node.Labels, extension.LabelGPUDriverVersion) + } + } + + return nil +} + +func (p *Plugin) Reset(node *corev1.Node, message string) []framework.ResourceItem { + // FIXME: shall we reset koord-gpu resources when the colocation disabled? + return nil +} + +func (p *Plugin) Calculate(_ *configuration.ColocationStrategy, node *corev1.Node, _ *corev1.PodList, _ *framework.ResourceMetrics) ([]framework.ResourceItem, error) { + if node == nil || node.Status.Allocatable == nil { + return nil, fmt.Errorf("missing essential arguments") + } + + // calculate device resources + device := &schedulingv1alpha1.Device{} + if err := client.Get(context.TODO(), types.NamespacedName{Name: node.Name, Namespace: node.Namespace}, device); err != nil { + if !errors.IsNotFound(err) { + klog.V(4).InfoS("failed to get device for node", "node", node.Name, "err", err) + return nil, fmt.Errorf("failed to get device resources: %w", err) + } + + // device not found, reset gpu resources on node + return p.resetGPUNodeResource() + } + + // TODO: also handle NUMA-level resources NRT + return p.calculate(node, device) +} + +func (p *Plugin) calculate(node *corev1.Node, device *schedulingv1alpha1.Device) ([]framework.ResourceItem, error) { + if device == nil { + return nil, fmt.Errorf("invalid device") + } + + // calculate gpu resources + gpuResources := make(corev1.ResourceList) + totalKoordGPU := resource.NewQuantity(0, resource.DecimalSI) + for _, d := range device.Spec.Devices { + if d.Type != schedulingv1alpha1.GPU || !d.Health { + continue + } + util.AddResourceList(gpuResources, d.Resources) + totalKoordGPU.Add(d.Resources[extension.ResourceGPUCore]) + } + gpuResources[extension.ResourceGPU] = *totalKoordGPU + // FIXME: shall we update the node gpu resources when the sum of koord-gpu resources is zero? + var items []framework.ResourceItem + for _, resourceName := range ResourceNames { + q, ok := gpuResources[resourceName] + if !ok { + continue + } + items = append(items, framework.ResourceItem{ + Name: resourceName, + Quantity: &q, + Message: UpdateResourcesMsg, + }) + } + klog.V(5).InfoS("calculate gpu resources", "node", node.Name, "resources", gpuResources) + + // calculate labels about gpu driver and model + if device.Labels != nil { + items = append(items, framework.ResourceItem{ + Name: PluginName, + Labels: map[string]string{ + extension.LabelGPUModel: device.Labels[extension.LabelGPUModel], + extension.LabelGPUDriverVersion: device.Labels[extension.LabelGPUDriverVersion], + }, + Message: UpdateLabelsMsg, + }) + } + klog.V(5).InfoS("calculate gpu labels", "node", node.Name, "labels", device.Labels) + + return items, nil +} + +func (p *Plugin) resetGPUNodeResource() ([]framework.ResourceItem, error) { + items := make([]framework.ResourceItem, 0, len(ResourceNames)) + for i := range ResourceNames { + items[i] = framework.ResourceItem{ + Name: ResourceNames[i], + Reset: true, + Message: ResetResourcesMsg, + } + } + return items, nil +} diff --git a/pkg/slo-controller/noderesource/plugins/midresource/plugin.go b/pkg/slo-controller/noderesource/plugins/midresource/plugin.go index 50bbf7490..01c8e2268 100644 --- a/pkg/slo-controller/noderesource/plugins/midresource/plugin.go +++ b/pkg/slo-controller/noderesource/plugins/midresource/plugin.go @@ -22,8 +22,8 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/util/clock" "k8s.io/klog/v2" + "k8s.io/utils/clock" "github.com/koordinator-sh/koordinator/apis/configuration" "github.com/koordinator-sh/koordinator/apis/extension" @@ -38,7 +38,7 @@ const PluginName = "MidResource" // ResourceNames defines the Mid-tier extended resource names to update. var ResourceNames = []corev1.ResourceName{extension.MidCPU, extension.MidMemory} -var clk clock.Clock = clock.RealClock{} // for testing +var clk clock.WithTickerAndDelayedExecution = clock.RealClock{} // for testing type Plugin struct{} diff --git a/pkg/slo-controller/noderesource/resource_calculator.go b/pkg/slo-controller/noderesource/resource_calculator.go index bc29d6ec0..bb50742ff 100644 --- a/pkg/slo-controller/noderesource/resource_calculator.go +++ b/pkg/slo-controller/noderesource/resource_calculator.go @@ -168,15 +168,6 @@ func (r *NodeResourceReconciler) updateNodeMeta(node *corev1.Node, strategy *con // updateNodeExtensions is an extension point for updating node other than node metric resources. func (r *NodeResourceReconciler) updateNodeExtensions(node *corev1.Node, nodeMetric *slov1alpha1.NodeMetric, podList *corev1.PodList) error { - // update device resources - if err := r.updateDeviceResources(node); err != nil { - metrics.RecordNodeResourceReconcileCount(false, "updateDeviceResources") - klog.V(4).InfoS("failed to update device resources for node", "node", node.Name, - "err", err) - return err - } - metrics.RecordNodeResourceReconcileCount(true, "updateDeviceResources") - return nil }