Skip to content

Commit

Permalink
koord-manager: refactor gpu device resource plugin
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Jan 16, 2024
1 parent 10c2d6b commit ca8c2d8
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 17 deletions.
6 changes: 2 additions & 4 deletions pkg/slo-controller/noderesource/noderesource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/source"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/slo-controller/config"
"github.com/koordinator-sh/koordinator/pkg/slo-controller/metrics"
Expand Down Expand Up @@ -123,7 +122,7 @@ func (r *NodeResourceReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{Requeue: true}, err
}

// do other node updates. e.g. update device resources
// do other node updates.
if err := r.updateNodeExtensions(node, nodeMetric, podList); err != nil {
klog.ErrorS(err, "failed to update node extensions for node", "node", node.Name)
return ctrl.Result{Requeue: true}, err
Expand Down Expand Up @@ -187,8 +186,7 @@ func (r *NodeResourceReconciler) SetupWithManager(mgr ctrl.Manager) error {
Named(Name). // avoid conflict with others reconciling `Node`
For(&corev1.Node{}).
Watches(&source.Kind{Type: &slov1alpha1.NodeMetric{}}, &EnqueueRequestForNodeMetric{syncContext: r.NodeSyncContext}).
Watches(&source.Kind{Type: &corev1.ConfigMap{}}, cfgHandler).
Watches(&source.Kind{Type: &schedulingv1alpha1.Device{}}, &EnqueueRequestForDevice{syncContext: r.GPUSyncContext})
Watches(&source.Kind{Type: &corev1.ConfigMap{}}, cfgHandler)

// setup plugins
// allow plugins to mutate controller via the builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/source"

Expand All @@ -50,7 +50,7 @@ var (
)

var (
Clock clock.Clock = clock.RealClock{} // for testing
Clock clock.WithTickerAndDelayedExecution = clock.RealClock{} // for testing
client ctrlclient.Client
nrtHandler *NRTHandler
nrtSyncContext *framework.SyncContext
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gpudeviceresource

import (
"reflect"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
)

var _ handler.EventHandler = &DeviceHandler{}

type DeviceHandler struct{}

func (d *DeviceHandler) Create(e event.CreateEvent, q workqueue.RateLimitingInterface) {
device := e.Object.(*schedulingv1alpha1.Device)
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: device.Name,
},
})
}

func (d *DeviceHandler) Update(e event.UpdateEvent, q workqueue.RateLimitingInterface) {
newDevice := e.ObjectNew.(*schedulingv1alpha1.Device)
oldDevice := e.ObjectOld.(*schedulingv1alpha1.Device)
if reflect.DeepEqual(newDevice.Spec, oldDevice.Spec) {
return
}
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: newDevice.Name,
},
})
}

func (d *DeviceHandler) Delete(e event.DeleteEvent, q workqueue.RateLimitingInterface) {
device, ok := e.Object.(*schedulingv1alpha1.Device)
if !ok {
return
}
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: device.Name,
},
})
}

func (d *DeviceHandler) Generic(e event.GenericEvent, q workqueue.RateLimitingInterface) {
}
226 changes: 226 additions & 0 deletions pkg/slo-controller/noderesource/plugins/gpudeviceresource/plugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gpudeviceresource

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/koordinator-sh/koordinator/apis/configuration"
"github.com/koordinator-sh/koordinator/apis/extension"
schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/slo-controller/noderesource/framework"
"github.com/koordinator-sh/koordinator/pkg/util"
)

const PluginName = "GPUDeviceResource"

const (
ResetResourcesMsg = "reset node gpu resources"
UpdateResourcesMsg = "node gpu resources from device"
UpdateLabelsMsg = "node gpu labels from device"
)

var (
ResourceNames = []corev1.ResourceName{
extension.ResourceGPU,
extension.ResourceGPUCore,
extension.ResourceGPUMemory,
extension.ResourceGPUMemoryRatio,
}

client ctrlclient.Client
)

type Plugin struct{}

func (p *Plugin) Name() string {
return PluginName
}

// +kubebuilder:rbac:groups=core,resources=configmaps,verbs=get;list;watch
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;patch
// +kubebuilder:rbac:groups=core,resources=nodes/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
// +kubebuilder:rbac:groups=scheduling.koordinator.sh,resources=devices,verbs=get;list;watch
// +kubebuilder:rbac:groups=topology.node.k8s.io,resources=noderesourcetopologies,verbs=get;list;watch;create;update

func (p *Plugin) Setup(opt *framework.Option) error {
client = opt.Client

// schedulingv1alpha1.AddToScheme(opt.Scheme)
opt.Builder = opt.Builder.Watches(&source.Kind{Type: &schedulingv1alpha1.Device{}}, &DeviceHandler{})

return nil
}

func (p *Plugin) NeedSync(strategy *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) (bool, string) {
for _, resourceName := range ResourceNames {
if util.IsResourceDiff(oldNode.Status.Allocatable, newNode.Status.Allocatable, resourceName,
*strategy.ResourceDiffThreshold) {
klog.V(4).Infof("node %s resource %s diff bigger than %v, need sync",
newNode.Name, resourceName, *strategy.ResourceDiffThreshold)
return true, "gpu resource diff is big than threshold"
}
}

return false, ""
}

func (p *Plugin) NeedSyncMeta(_ *configuration.ColocationStrategy, oldNode, newNode *corev1.Node) (bool, string) {
if oldNode.Labels[extension.LabelGPUModel] != newNode.Labels[extension.LabelGPUModel] {
klog.V(4).InfoS("need sync node metadata since gpu model change", "node", newNode.Name,
"old", oldNode.Labels[extension.LabelGPUModel], "new", newNode.Labels[extension.LabelGPUModel])
return true, "gpu model changed"
}
if oldNode.Labels[extension.LabelGPUDriverVersion] != newNode.Labels[extension.LabelGPUDriverVersion] {
klog.V(4).InfoS("need sync node metadata since gpu driver version change", "node", newNode.Name,
"old", oldNode.Labels[extension.LabelGPUDriverVersion], "new", newNode.Labels[extension.LabelGPUDriverVersion])
return true, "gpu driver version changed"
}

return false, ""
}

func (p *Plugin) Prepare(_ *configuration.ColocationStrategy, node *corev1.Node, nr *framework.NodeResource) error {
// prepare node resources
for _, resourceName := range ResourceNames {
q := nr.Resources[resourceName]
// FIXME: shall we remove the resource when some of resources is missing
if q == nil || nr.Resets[resourceName] {
delete(node.Status.Allocatable, resourceName)
delete(node.Status.Capacity, resourceName)
continue
}

node.Status.Allocatable[resourceName] = *q
node.Status.Capacity[resourceName] = *q
}

// prepare node labels
// FIXME: do we need reset labels
if nr.Labels == nil {
delete(node.Labels, extension.LabelGPUModel)
delete(node.Labels, extension.LabelGPUDriverVersion)
} else {
if _, ok := nr.Labels[extension.LabelGPUModel]; ok {
node.Labels[extension.LabelGPUModel] = nr.Labels[extension.LabelGPUModel]
} else {
delete(node.Labels, extension.LabelGPUModel)
}
if _, ok := nr.Labels[extension.LabelGPUDriverVersion]; ok {
node.Labels[extension.LabelGPUDriverVersion] = nr.Labels[extension.LabelGPUDriverVersion]
} else {
delete(node.Labels, extension.LabelGPUDriverVersion)
}
}

return nil
}

func (p *Plugin) Reset(node *corev1.Node, message string) []framework.ResourceItem {
// FIXME: shall we reset koord-gpu resources when the colocation disabled?
return nil
}

func (p *Plugin) Calculate(_ *configuration.ColocationStrategy, node *corev1.Node, _ *corev1.PodList, _ *framework.ResourceMetrics) ([]framework.ResourceItem, error) {
if node == nil || node.Status.Allocatable == nil {
return nil, fmt.Errorf("missing essential arguments")
}

// calculate device resources
device := &schedulingv1alpha1.Device{}
if err := client.Get(context.TODO(), types.NamespacedName{Name: node.Name, Namespace: node.Namespace}, device); err != nil {
if !errors.IsNotFound(err) {
klog.V(4).InfoS("failed to get device for node", "node", node.Name, "err", err)
return nil, fmt.Errorf("failed to get device resources: %w", err)
}

// device not found, reset gpu resources on node
return p.resetGPUNodeResource()
}

// TODO: also handle NUMA-level resources NRT
return p.calculate(node, device)
}

func (p *Plugin) calculate(node *corev1.Node, device *schedulingv1alpha1.Device) ([]framework.ResourceItem, error) {
if device == nil {
return nil, fmt.Errorf("invalid device")
}

// calculate gpu resources
gpuResources := make(corev1.ResourceList)
totalKoordGPU := resource.NewQuantity(0, resource.DecimalSI)
for _, d := range device.Spec.Devices {
if d.Type != schedulingv1alpha1.GPU || !d.Health {
continue
}
util.AddResourceList(gpuResources, d.Resources)
totalKoordGPU.Add(d.Resources[extension.ResourceGPUCore])
}
gpuResources[extension.ResourceGPU] = *totalKoordGPU
// FIXME: shall we update the node gpu resources when the sum of koord-gpu resources is zero?
var items []framework.ResourceItem
for _, resourceName := range ResourceNames {
q, ok := gpuResources[resourceName]
if !ok {
continue
}
items = append(items, framework.ResourceItem{
Name: resourceName,
Quantity: &q,
Message: UpdateResourcesMsg,
})
}
klog.V(5).InfoS("calculate gpu resources", "node", node.Name, "resources", gpuResources)

// calculate labels about gpu driver and model
if device.Labels != nil {
items = append(items, framework.ResourceItem{
Name: PluginName,
Labels: map[string]string{
extension.LabelGPUModel: device.Labels[extension.LabelGPUModel],
extension.LabelGPUDriverVersion: device.Labels[extension.LabelGPUDriverVersion],
},
Message: UpdateLabelsMsg,
})
}
klog.V(5).InfoS("calculate gpu labels", "node", node.Name, "labels", device.Labels)

return items, nil
}

func (p *Plugin) resetGPUNodeResource() ([]framework.ResourceItem, error) {
items := make([]framework.ResourceItem, 0, len(ResourceNames))
for i := range ResourceNames {
items[i] = framework.ResourceItem{
Name: ResourceNames[i],
Reset: true,
Message: ResetResourcesMsg,
}
}
return items, nil
}
4 changes: 2 additions & 2 deletions pkg/slo-controller/noderesource/plugins/midresource/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/klog/v2"
"k8s.io/utils/clock"

"github.com/koordinator-sh/koordinator/apis/configuration"
"github.com/koordinator-sh/koordinator/apis/extension"
Expand All @@ -38,7 +38,7 @@ const PluginName = "MidResource"
// ResourceNames defines the Mid-tier extended resource names to update.
var ResourceNames = []corev1.ResourceName{extension.MidCPU, extension.MidMemory}

var clk clock.Clock = clock.RealClock{} // for testing
var clk clock.WithTickerAndDelayedExecution = clock.RealClock{} // for testing

type Plugin struct{}

Expand Down
9 changes: 0 additions & 9 deletions pkg/slo-controller/noderesource/resource_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,6 @@ func (r *NodeResourceReconciler) updateNodeMeta(node *corev1.Node, strategy *con

// updateNodeExtensions is an extension point for updating node other than node metric resources.
func (r *NodeResourceReconciler) updateNodeExtensions(node *corev1.Node, nodeMetric *slov1alpha1.NodeMetric, podList *corev1.PodList) error {
// update device resources
if err := r.updateDeviceResources(node); err != nil {
metrics.RecordNodeResourceReconcileCount(false, "updateDeviceResources")
klog.V(4).InfoS("failed to update device resources for node", "node", node.Name,
"err", err)
return err
}
metrics.RecordNodeResourceReconcileCount(true, "updateDeviceResources")

return nil
}

Expand Down

0 comments on commit ca8c2d8

Please sign in to comment.