Skip to content

Commit

Permalink
* Set jobSet's ManagedBy field by default if it's managed by MultiKueue
Browse files Browse the repository at this point in the history
  • Loading branch information
vladikkuzn committed Apr 25, 2024
1 parent 92baacd commit 90fb7b4
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 1 deletion.
2 changes: 2 additions & 0 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
jobframework.WithEnabledFrameworks(cfg.Integrations),
jobframework.WithManagerName(constants.KueueName),
jobframework.WithLabelKeysToCopy(cfg.Integrations.LabelKeysToCopy),
jobframework.WithCache(cCache),
jobframework.WithQueues(queues),
}
if err := jobframework.SetupControllers(mgr, setupLog, opts...); err != nil {
setupLog.Error(err, "Unable to create controller or webhook", "kubernetesVersion", serverVersionFetcher.GetServerVersion())
Expand Down
14 changes: 14 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ func (c *Cache) DeleteAdmissionCheck(ac *kueue.AdmissionCheck) sets.Set[string]
return c.updateClusterQueues()
}

func (c *Cache) GetAdmissionCheck(name string) (AdmissionCheck, bool) {
c.RLock()
defer c.RUnlock()
ac, ok := c.admissionChecks[name]
return ac, ok
}

func (c *Cache) ClusterQueueActive(name string) bool {
return c.clusterQueueInStatus(name, active)
}
Expand Down Expand Up @@ -371,6 +378,13 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) {
metrics.ClearCacheMetrics(cq.Name)
}

func (c *Cache) GetClusterQueue(name string) (*ClusterQueue, bool) {
c.RLock()
defer c.RUnlock()
cq, ok := c.clusterQueues[name]
return cq, ok
}

func (c *Cache) AddLocalQueue(q *kueue.LocalQueue) error {
c.Lock()
defer c.Unlock()
Expand Down
18 changes: 18 additions & 0 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ import (

configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/util/equality"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
Expand Down Expand Up @@ -78,6 +80,8 @@ type Options struct {
EnabledFrameworks sets.Set[string]
ManagerName string
LabelKeysToCopy []string
Queues *queue.Manager
Cache *cache.Cache
}

// Option configures the reconciler.
Expand Down Expand Up @@ -149,6 +153,20 @@ func WithLabelKeysToCopy(n []string) Option {
}
}

// WithQueues adds the queue manager.
func WithQueues(q *queue.Manager) Option {
return func(o *Options) {
o.Queues = q
}
}

// WithCache adds the cache manager.
func WithCache(c *cache.Cache) Option {
return func(o *Options) {
o.Cache = c
}
}

var defaultOptions = Options{}

func NewReconciler(
Expand Down
35 changes: 34 additions & 1 deletion pkg/controller/jobs/jobset/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,27 @@ package jobset

import (
"context"

"fmt"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
jobsetapi "sigs.k8s.io/jobset/api/jobset/v1alpha2"

"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/admissionchecks/multikueue"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
)

type JobSetWebhook struct {
manageJobsWithoutQueueName bool
queues *queue.Manager
cache *cache.Cache
}

// SetupJobSetWebhook configures the webhook for kubeflow JobSet.
Expand All @@ -57,6 +65,31 @@ func (w *JobSetWebhook) Default(ctx context.Context, obj runtime.Object) error {
log.V(5).Info("Applying defaults", "jobset", klog.KObj(jobSet))

jobframework.ApplyDefaultForSuspend(jobSet, w.manageJobsWithoutQueueName)
if features.Enabled(features.MultiKueue) {
localQueueName, found := jobSet.Labels[constants.QueueLabel]
if !found {
return nil
}
clusterQueueName, err := w.queues.ClusterQueueFromLocalQueue(fmt.Sprintf("%s/%s", jobSet.ObjectMeta.Namespace, localQueueName))
if err != nil {
return err
}
clusterQueue, found := w.cache.GetClusterQueue(clusterQueueName)
if !found {
return nil
}
for admissionCheckName := range clusterQueue.AdmissionChecks {
admissionCheck, ok := w.cache.GetAdmissionCheck(admissionCheckName)
if !ok {
continue
}
if admissionCheck.Controller == multikueue.ControllerName {
jobSet.Spec.ManagedBy = ptr.To(multikueue.ControllerName)
break
}
}
}

return nil
}

Expand Down
186 changes: 186 additions & 0 deletions pkg/controller/jobs/jobset/jobset_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,23 @@ package jobset

import (
"context"
"errors"
"testing"

"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/validation/field"
ctrl "sigs.k8s.io/controller-runtime"
jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/admissionchecks/multikueue"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
testingutil "sigs.k8s.io/kueue/pkg/util/testingjobs/jobset"
)

Expand Down Expand Up @@ -71,3 +81,179 @@ func TestValidateCreate(t *testing.T) {
})
}
}

func TestDefault(t *testing.T) {
testCases := []struct {
name string
jobSet *jobset.JobSet
queues []kueue.LocalQueue
clusterQueues []kueue.ClusterQueue
admissionCheck *kueue.AdmissionCheck
multiKueueEnabled bool
expectedManagedBy string
expectedError error
}{
{
name: "TestDefault_WithQueueLabel",
jobSet: &jobset.JobSet{
ObjectMeta: ctrl.ObjectMeta{
Labels: map[string]string{
constants.QueueLabel: "local-queue",
},
Namespace: "default",
},
},
queues: []kueue.LocalQueue{
*utiltesting.MakeLocalQueue("local-queue", "default").
ClusterQueue("cluster-queue").
Obj(),
},
clusterQueues: []kueue.ClusterQueue{
*utiltesting.MakeClusterQueue("cluster-queue").
AdmissionChecks("admission-check").
Obj(),
},
admissionCheck: utiltesting.MakeAdmissionCheck("admission-check").
ControllerName(multikueue.ControllerName).
Active(metav1.ConditionTrue).
Obj(),
multiKueueEnabled: true,
expectedManagedBy: multikueue.ControllerName,
},
{
name: "TestDefault_WithoutQueueLabel",
jobSet: &jobset.JobSet{
ObjectMeta: ctrl.ObjectMeta{Namespace: "default"},
},
multiKueueEnabled: true,
expectedManagedBy: "",
},
{
name: "TestDefault_InvalidQueueName",
jobSet: &jobset.JobSet{
ObjectMeta: ctrl.ObjectMeta{
Labels: map[string]string{constants.QueueLabel: "invalid-queue"},
Namespace: "default",
},
},
multiKueueEnabled: true,
expectedError: errors.New("queue doesn't exist"),
},
{
name: "TestDefault_QueueNotFound",
jobSet: &jobset.JobSet{
ObjectMeta: ctrl.ObjectMeta{
Labels: map[string]string{
constants.QueueLabel: "non-existent-queue",
},
Namespace: "default",
},
},
multiKueueEnabled: true,
expectedError: errors.New("queue doesn't exist"),
},
{
name: "TestDefault_AdmissionCheckNotFound",
jobSet: &jobset.JobSet{
ObjectMeta: ctrl.ObjectMeta{
Labels: map[string]string{
constants.QueueLabel: "local-queue",
},
Namespace: "default",
},
},
queues: []kueue.LocalQueue{
*utiltesting.MakeLocalQueue("local-queue", "default").
ClusterQueue("cluster-queue").
Obj(),
},
clusterQueues: []kueue.ClusterQueue{
*utiltesting.MakeClusterQueue("cluster-queue").
AdmissionChecks("non-existent-admission-check").
Obj(),
},
multiKueueEnabled: true,
expectedManagedBy: "",
},
{
name: "TestDefault_MultiKueueFeatureDisabled",
jobSet: &jobset.JobSet{
ObjectMeta: ctrl.ObjectMeta{
Labels: map[string]string{
constants.QueueLabel: "local-queue",
},
Namespace: "default",
},
},
queues: []kueue.LocalQueue{
*utiltesting.MakeLocalQueue("local-queue", "default").
ClusterQueue("cluster-queue").
Obj(),
},
clusterQueues: []kueue.ClusterQueue{
*utiltesting.MakeClusterQueue("cluster-queue").
AdmissionChecks("admission-check").
Obj(),
},
admissionCheck: utiltesting.MakeAdmissionCheck("admission-check").
ControllerName(multikueue.ControllerName).
Active(metav1.ConditionTrue).
Obj(),
multiKueueEnabled: false,
expectedManagedBy: "",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
defer features.SetFeatureGateDuringTest(t, features.MultiKueue, tc.multiKueueEnabled)()

ctx, _ := utiltesting.ContextWithLog(t)

clientBuilder := utiltesting.NewClientBuilder().
WithObjects(
&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}},
)
cl := clientBuilder.Build()
cqCache := cache.New(cl)
queueManager := queue.NewManager(cl, cqCache)

for _, q := range tc.queues {
if err := queueManager.AddLocalQueue(ctx, &q); err != nil {
t.Fatalf("Inserting queue %s/%s in manager: %v", q.Namespace, q.Name, err)
}
}
for _, cq := range tc.clusterQueues {
if err := cqCache.AddClusterQueue(ctx, &cq); err != nil {
t.Fatalf("Inserting clusterQueue %s in cache: %v", cq.Name, err)
}
if tc.admissionCheck != nil {
cqCache.AddOrUpdateAdmissionCheck(tc.admissionCheck)
if err := queueManager.AddClusterQueue(ctx, &cq); err != nil {
t.Fatalf("Inserting clusterQueue %s in manager: %v", cq.Name, err)
}
}
}
webhook := &JobSetWebhook{
manageJobsWithoutQueueName: false,
queues: queueManager,
cache: cqCache,
}

err := webhook.Default(ctx, tc.jobSet)
if err != nil && tc.expectedError == nil {
t.Errorf("Unexpected error: %v", err)
} else if err == nil && tc.expectedError != nil {
t.Errorf("Expected error %v, but got nil", tc.expectedError)
} else if err != nil && err.Error() != tc.expectedError.Error() {
t.Errorf("Expected error %v, but got %v", tc.expectedError, err)
}

if tc.jobSet.Spec.ManagedBy == nil && tc.expectedManagedBy != "" {
t.Errorf("Expected jobSet.Spec.ManagedBy to be %s, but got nil", tc.expectedManagedBy)
} else if tc.jobSet.Spec.ManagedBy != nil && *tc.jobSet.Spec.ManagedBy != tc.expectedManagedBy {
t.Errorf("Expected jobSet.Spec.ManagedBy to be %s, but got %v", tc.expectedManagedBy, *tc.jobSet.Spec.ManagedBy)
}
})
}
}

0 comments on commit 90fb7b4

Please sign in to comment.