Skip to content

Commit

Permalink
koord-manager: refactor gpu device resource plugin
Browse files Browse the repository at this point in the history
Signed-off-by: saintube <saintube@foxmail.com>
  • Loading branch information
saintube committed Jan 17, 2024
1 parent 9ecac4b commit 9b26e92
Show file tree
Hide file tree
Showing 9 changed files with 1,164 additions and 18 deletions.
6 changes: 2 additions & 4 deletions pkg/slo-controller/noderesource/noderesource_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/source"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/slo-controller/config"
"github.com/koordinator-sh/koordinator/pkg/slo-controller/metrics"
Expand Down Expand Up @@ -123,7 +122,7 @@ func (r *NodeResourceReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{Requeue: true}, err
}

// do other node updates. e.g. update device resources
// do other node updates.
if err := r.updateNodeExtensions(node, nodeMetric, podList); err != nil {
klog.ErrorS(err, "failed to update node extensions for node", "node", node.Name)
return ctrl.Result{Requeue: true}, err
Expand Down Expand Up @@ -187,8 +186,7 @@ func (r *NodeResourceReconciler) SetupWithManager(mgr ctrl.Manager) error {
Named(Name). // avoid conflict with others reconciling `Node`
For(&corev1.Node{}).
Watches(&source.Kind{Type: &slov1alpha1.NodeMetric{}}, &EnqueueRequestForNodeMetric{syncContext: r.NodeSyncContext}).
Watches(&source.Kind{Type: &corev1.ConfigMap{}}, cfgHandler).
Watches(&source.Kind{Type: &schedulingv1alpha1.Device{}}, &EnqueueRequestForDevice{syncContext: r.GPUSyncContext})
Watches(&source.Kind{Type: &corev1.ConfigMap{}}, cfgHandler)

// setup plugins
// allow plugins to mutate controller via the builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
quotav1 "k8s.io/apiserver/pkg/quota/v1"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/source"

Expand All @@ -50,7 +50,7 @@ var (
)

var (
Clock clock.Clock = clock.RealClock{} // for testing
Clock clock.WithTickerAndDelayedExecution = clock.RealClock{} // for testing
client ctrlclient.Client
nrtHandler *NRTHandler
nrtSyncContext *framework.SyncContext
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gpudeviceresource

import (
"reflect"

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
)

var _ handler.EventHandler = &DeviceHandler{}

type DeviceHandler struct{}

func (d *DeviceHandler) Create(e event.CreateEvent, q workqueue.RateLimitingInterface) {
device := e.Object.(*schedulingv1alpha1.Device)
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: device.Name,
},
})
}

func (d *DeviceHandler) Update(e event.UpdateEvent, q workqueue.RateLimitingInterface) {
newDevice := e.ObjectNew.(*schedulingv1alpha1.Device)
oldDevice := e.ObjectOld.(*schedulingv1alpha1.Device)
if reflect.DeepEqual(newDevice.Spec, oldDevice.Spec) {
return
}
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: newDevice.Name,
},
})
}

func (d *DeviceHandler) Delete(e event.DeleteEvent, q workqueue.RateLimitingInterface) {
device, ok := e.Object.(*schedulingv1alpha1.Device)
if !ok {
return
}
q.Add(reconcile.Request{
NamespacedName: types.NamespacedName{
Name: device.Name,
},
})
}

func (d *DeviceHandler) Generic(e event.GenericEvent, q workqueue.RateLimitingInterface) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gpudeviceresource

import (
"testing"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
)

func Test_EnqueueRequestForNodeMetricMetric(t *testing.T) {
tests := []struct {
name string
fn func(handler handler.EventHandler, q workqueue.RateLimitingInterface)
hasEvent bool
eventName string
}{
{
name: "create device event",
fn: func(handler handler.EventHandler, q workqueue.RateLimitingInterface) {
handler.Create(event.CreateEvent{
Object: &schedulingv1alpha1.Device{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
},
}, q)
},
hasEvent: true,
eventName: "node1",
},
{
name: "delete device event",
fn: func(handler handler.EventHandler, q workqueue.RateLimitingInterface) {
handler.Delete(event.DeleteEvent{
Object: &schedulingv1alpha1.Device{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
},
}, q)
},
hasEvent: true,
eventName: "node1",
},
{
name: "delete event not device",
fn: func(handler handler.EventHandler, q workqueue.RateLimitingInterface) {
handler.Delete(event.DeleteEvent{
Object: &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
},
},
}, q)
},
hasEvent: false,
},
{
name: "generic event ignore",
fn: func(handler handler.EventHandler, q workqueue.RateLimitingInterface) {
handler.Generic(event.GenericEvent{}, q)
},
hasEvent: false,
},
{
name: "update device event",
fn: func(handler handler.EventHandler, q workqueue.RateLimitingInterface) {
handler.Update(event.UpdateEvent{
ObjectOld: &schedulingv1alpha1.Device{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
ResourceVersion: "100",
},
Spec: schedulingv1alpha1.DeviceSpec{
Devices: []schedulingv1alpha1.DeviceInfo{
{},
},
},
},
ObjectNew: &schedulingv1alpha1.Device{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
ResourceVersion: "101",
},
Spec: schedulingv1alpha1.DeviceSpec{
Devices: []schedulingv1alpha1.DeviceInfo{
{},
{},
},
},
},
}, q)
},
hasEvent: true,
eventName: "node1",
},
{
name: "update device event ignore",
fn: func(handler handler.EventHandler, q workqueue.RateLimitingInterface) {
handler.Update(event.UpdateEvent{
ObjectOld: &schedulingv1alpha1.Device{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
ResourceVersion: "100",
},
Spec: schedulingv1alpha1.DeviceSpec{
Devices: []schedulingv1alpha1.DeviceInfo{
{},
},
},
},
ObjectNew: &schedulingv1alpha1.Device{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
ResourceVersion: "100",
},
Spec: schedulingv1alpha1.DeviceSpec{
Devices: []schedulingv1alpha1.DeviceInfo{
{},
},
},
},
}, q)
},
hasEvent: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
h := &DeviceHandler{}
tt.fn(h, queue)
assert.Equal(t, tt.hasEvent, queue.Len() > 0, "unexpected event")
if tt.hasEvent {
assert.True(t, queue.Len() >= 0, "expected event")
e, _ := queue.Get()
assert.Equal(t, tt.eventName, e.(reconcile.Request).Name)
}
})
}

}
Loading

0 comments on commit 9b26e92

Please sign in to comment.