Skip to content

Commit

Permalink
* Set jobSet's ManagedBy field by default if it's managed by MultiKueue
Browse files Browse the repository at this point in the history
  • Loading branch information
vladikkuzn committed Apr 24, 2024
1 parent 92baacd commit 6ccf00e
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 0 deletions.
2 changes: 2 additions & 0 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
jobframework.WithEnabledFrameworks(cfg.Integrations),
jobframework.WithManagerName(constants.KueueName),
jobframework.WithLabelKeysToCopy(cfg.Integrations.LabelKeysToCopy),
jobframework.WithCache(cCache),
jobframework.WithQueues(queues),
}
if err := jobframework.SetupControllers(mgr, setupLog, opts...); err != nil {
setupLog.Error(err, "Unable to create controller or webhook", "kubernetesVersion", serverVersionFetcher.GetServerVersion())
Expand Down
14 changes: 14 additions & 0 deletions pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ func (c *Cache) DeleteAdmissionCheck(ac *kueue.AdmissionCheck) sets.Set[string]
return c.updateClusterQueues()
}

func (c *Cache) GetAdmissionCheck(name string) (AdmissionCheck, bool) {
c.RLock()
defer c.RUnlock()
ac, ok := c.admissionChecks[name]
return ac, ok
}

func (c *Cache) ClusterQueueActive(name string) bool {
return c.clusterQueueInStatus(name, active)
}
Expand Down Expand Up @@ -371,6 +378,13 @@ func (c *Cache) DeleteClusterQueue(cq *kueue.ClusterQueue) {
metrics.ClearCacheMetrics(cq.Name)
}

func (c *Cache) GetClusterQueue(name string) (*ClusterQueue, bool) {
c.RLock()
defer c.RUnlock()
cq, ok := c.clusterQueues[name]
return cq, ok
}

func (c *Cache) AddLocalQueue(q *kueue.LocalQueue) error {
c.Lock()
defer c.Unlock()
Expand Down
18 changes: 18 additions & 0 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ import (

configapi "sigs.k8s.io/kueue/apis/config/v1beta1"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
controllerconsts "sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
"sigs.k8s.io/kueue/pkg/util/equality"
"sigs.k8s.io/kueue/pkg/util/kubeversion"
Expand Down Expand Up @@ -78,6 +80,8 @@ type Options struct {
EnabledFrameworks sets.Set[string]
ManagerName string
LabelKeysToCopy []string
Queues *queue.Manager
Cache *cache.Cache
}

// Option configures the reconciler.
Expand Down Expand Up @@ -149,6 +153,20 @@ func WithLabelKeysToCopy(n []string) Option {
}
}

// WithQueues adds the queue manager.
func WithQueues(q *queue.Manager) Option {
return func(o *Options) {
o.Queues = q
}
}

// WithCache adds the cache manager.
func WithCache(c *cache.Cache) Option {
return func(o *Options) {
o.Cache = c
}
}

var defaultOptions = Options{}

func NewReconciler(
Expand Down
33 changes: 33 additions & 0 deletions pkg/controller/jobs/jobset/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,24 @@ import (

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
jobsetapi "sigs.k8s.io/jobset/api/jobset/v1alpha2"

"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/controller/admissionchecks/multikueue"
"sigs.k8s.io/kueue/pkg/controller/constants"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/queue"
)

type JobSetWebhook struct {
manageJobsWithoutQueueName bool
queues *queue.Manager
cache *cache.Cache
}

// SetupJobSetWebhook configures the webhook for kubeflow JobSet.
Expand All @@ -57,6 +65,31 @@ func (w *JobSetWebhook) Default(ctx context.Context, obj runtime.Object) error {
log.V(5).Info("Applying defaults", "jobset", klog.KObj(jobSet))

jobframework.ApplyDefaultForSuspend(jobSet, w.manageJobsWithoutQueueName)
if features.Enabled(features.MultiKueue) {
localQueueName, found := jobSet.Labels[constants.QueueLabel]
if !found {
return nil
}
clusterQueueName, err := w.queues.ClusterQueueFromLocalQueue(localQueueName)
if err != nil {
return err
}
clusterQueue, found := w.cache.GetClusterQueue(clusterQueueName)
if !found {
return nil
}
for admissionCheckName := range clusterQueue.AdmissionChecks {
admissionCheck, ok := w.cache.GetAdmissionCheck(admissionCheckName)
if !ok {
continue
}
if admissionCheck.Controller == multikueue.ControllerName {
jobSet.Spec.ManagedBy = ptr.To(multikueue.ControllerName)
break
}
}
}

return nil
}

Expand Down

0 comments on commit 6ccf00e

Please sign in to comment.