Skip to content

Commit

Permalink
Merge pull request #1046 from tenzen-y/use-mapper-provider
Browse files Browse the repository at this point in the history
Ensure that the framework is available using RESTMapper instead of getting CRDs
  • Loading branch information
k8s-ci-robot committed Aug 11, 2023
2 parents b83c25f + 76c693d commit acec9e1
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 47 deletions.
7 changes: 0 additions & 7 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ rules:
- list
- update
- watch
- apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
verbs:
- list
- watch
- apiGroups:
- batch
resources:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ require (
github.com/ray-project/kuberay/ray-operator v0.0.0-20230613204710-aeed3cdcbdcc
go.uber.org/zap v1.25.0
k8s.io/api v0.27.4
k8s.io/apiextensions-apiserver v0.27.4
k8s.io/apimachinery v0.27.4
k8s.io/apiserver v0.27.4
k8s.io/client-go v0.27.4
Expand Down Expand Up @@ -78,6 +77,7 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.27.4 // indirect
k8s.io/gengo v0.0.0-20220902162205-c0856e24416d // indirect
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
77 changes: 38 additions & 39 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"os"
"strings"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand All @@ -30,16 +30,16 @@ import (
zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
schedulingv1 "k8s.io/api/scheduling/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

Expand All @@ -51,7 +51,6 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobs/job"
"sigs.k8s.io/kueue/pkg/controller/jobs/noop"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
Expand All @@ -78,7 +77,6 @@ func init() {

utilruntime.Must(kueue.AddToScheme(scheme))
utilruntime.Must(configapi.AddToScheme(scheme))
utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
// Add any additional framework integration types.
utilruntime.Must(
jobframework.ForEachIntegration(func(_ string, cb jobframework.IntegrationCallbacks) error {
Expand Down Expand Up @@ -159,7 +157,7 @@ func main() {
// Cert won't be ready until manager starts, so start a goroutine here which
// will block until the cert is ready before setting up the controllers.
// Controllers who register after manager starts will start directly.
go setupControllers(ctx, mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher)
go setupControllers(mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher)

go func() {
queues.CleanUpOnContext(ctx)
Expand Down Expand Up @@ -196,7 +194,7 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
}
}

func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
// The controllers won't work until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
setupLog.Info("Waiting for certificate generation to complete")
Expand All @@ -214,34 +212,50 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache
os.Exit(1)
}

crds := findCustomResources(ctx, mgr)

opts := []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(manageJobsWithoutQueueName),
jobframework.WithWaitForPodsReady(waitForPodsReady(cfg)),
jobframework.WithKubeServerVersion(serverVersionFetcher),
}
err := jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error {
log := setupLog.WithValues("jobFrameworkName", name)
if isFrameworkEnabled(cfg, name) && crds.Has(name) {
if err := cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)),
opts...,
).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller")
return err
}
if err := cb.SetupWebhook(mgr, opts...); err != nil {
log.Error(err, "Unable to create webhook")

if isFrameworkEnabled(cfg, name) {
gvk, err := apiutil.GVKForObject(cb.JobType, mgr.GetScheme())
if err != nil {
return err
}
} else {
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
log.Error(err, "Unable to create noop webhook")
return err
if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
// TODO: If the below PR is released, we need to change a way to check if the GVK is registered.
// REF: https://github.com/kubernetes-sigs/controller-runtime/pull/2425
// if !meta.IsNoMatchError(err) {
// return err
// }
var NoMatchingErr *discovery.ErrGroupDiscoveryFailed
if !meta.IsNoMatchError(err) && !errors.As(err, &NoMatchingErr) {
return err
}
log.Info("No matching API server for job framework, skip to create controller and webhook")
} else {
if err = cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)),
opts...,
).SetupWithManager(mgr); err != nil {
log.Error(err, "Unable to create controller")
return err
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
log.Error(err, "Unable to create webhook")
return err
}
return nil
}
}
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
log.Error(err, "Unable to create noop webhook")
return err
}
return nil
})
if err != nil {
Expand Down Expand Up @@ -340,18 +354,3 @@ func isFrameworkEnabled(cfg *configapi.Configuration, name string) bool {
}
return false
}

// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=list;watch

func findCustomResources(ctx context.Context, mgr ctrl.Manager) sets.Set[string] {
var crds = apiextensionsv1.CustomResourceDefinitionList{}
if err := mgr.GetClient().List(ctx, &crds); err != nil {
setupLog.Error(err, "Unable to get crd list")
os.Exit(1)
}
customResources := sets.New[string](job.FrameworkName)
for _, crd := range crds.Items {
customResources.Insert(strings.Join([]string{crd.Spec.Group, crd.Spec.Names.Singular}, "/"))
}
return customResources
}

0 comments on commit acec9e1

Please sign in to comment.