From b95d0fd979ae210e1dd6834523f3bb407db86bf8 Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Tue, 8 Aug 2023 03:12:45 +0900 Subject: [PATCH] Ensure that the framework is available using RESTMapper instead of getting theCRDs Signed-off-by: Yuki Iwai --- config/components/rbac/role.yaml | 7 -- go.mod | 2 +- main.go | 76 +++++++++---------- .../jobframework/integrationmanager.go | 3 + pkg/controller/jobs/job/job_controller.go | 1 + .../jobs/jobset/jobset_controller.go | 1 + .../jobs/mpijob/mpijob_controller.go | 1 + .../jobs/rayjob/rayjob_controller.go | 1 + 8 files changed, 43 insertions(+), 49 deletions(-) diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index 7d297749a3..18d6f9f307 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -56,13 +56,6 @@ rules: - list - update - watch -- apiGroups: - - apiextensions.k8s.io - resources: - - customresourcedefinitions - verbs: - - list - - watch - apiGroups: - batch resources: diff --git a/go.mod b/go.mod index 3bdc3bec05..668f4ccea7 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,6 @@ require ( github.com/ray-project/kuberay/ray-operator v0.0.0-20230613204710-aeed3cdcbdcc go.uber.org/zap v1.24.0 k8s.io/api v0.27.4 - k8s.io/apiextensions-apiserver v0.27.4 k8s.io/apimachinery v0.27.4 k8s.io/apiserver v0.27.4 k8s.io/client-go v0.27.4 @@ -78,6 +77,7 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + k8s.io/apiextensions-apiserver v0.27.4 // indirect k8s.io/gengo v0.0.0-20220902162205-c0856e24416d // indirect k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect diff --git a/main.go b/main.go index b8a651a7df..3e7e0179d0 100644 --- a/main.go +++ b/main.go @@ -18,10 +18,10 @@ package main import ( "context" + "errors" "flag" "fmt" "os" - "strings" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -30,10 +30,9 @@ import ( zaplog "go.uber.org/zap" "go.uber.org/zap/zapcore" schedulingv1 "k8s.io/api/scheduling/v1" - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/discovery" @@ -51,7 +50,6 @@ import ( "sigs.k8s.io/kueue/pkg/controller/core" "sigs.k8s.io/kueue/pkg/controller/core/indexer" "sigs.k8s.io/kueue/pkg/controller/jobframework" - "sigs.k8s.io/kueue/pkg/controller/jobs/job" "sigs.k8s.io/kueue/pkg/controller/jobs/noop" "sigs.k8s.io/kueue/pkg/metrics" "sigs.k8s.io/kueue/pkg/queue" @@ -78,7 +76,6 @@ func init() { utilruntime.Must(kueue.AddToScheme(scheme)) utilruntime.Must(configapi.AddToScheme(scheme)) - utilruntime.Must(apiextensionsv1.AddToScheme(scheme)) // Add any additional framework integration types. utilruntime.Must( jobframework.ForEachIntegration(func(_ string, cb jobframework.IntegrationCallbacks) error { @@ -159,7 +156,7 @@ func main() { // Cert won't be ready until manager starts, so start a goroutine here which // will block until the cert is ready before setting up the controllers. // Controllers who register after manager starts will start directly. - go setupControllers(ctx, mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher) + go setupControllers(mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher) go func() { queues.CleanUpOnContext(ctx) @@ -196,7 +193,7 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur } } -func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) { +func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) { // The controllers won't work until the webhooks are operating, and the webhook won't work until the // certs are all in place. setupLog.Info("Waiting for certificate generation to complete") @@ -214,8 +211,6 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache os.Exit(1) } - crds := findCustomResources(ctx, mgr) - opts := []jobframework.Option{ jobframework.WithManageJobsWithoutQueueName(manageJobsWithoutQueueName), jobframework.WithWaitForPodsReady(waitForPodsReady(cfg)), @@ -223,25 +218,39 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache } err := jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error { log := setupLog.WithValues("jobFrameworkName", name) - if isFrameworkEnabled(cfg, name) && crds.Has(name) { - if err := cb.NewReconciler( - mgr.GetClient(), - mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)), - opts..., - ).SetupWithManager(mgr); err != nil { - log.Error(err, "unable to create controller") - return err - } - if err := cb.SetupWebhook(mgr, opts...); err != nil { - log.Error(err, "Unable to create webhook") - return err - } - } else { - if err := noop.SetupWebhook(mgr, cb.JobType); err != nil { - log.Error(err, "Unable to create noop webhook") - return err + + if isFrameworkEnabled(cfg, name) { + if _, err := mgr.GetRESTMapper().RESTMapping(cb.GVK.GroupKind(), cb.GVK.Version); err != nil { + // TODO: If the below PR is released, we need to change a way to check if the GVK is registered. + // REF: https://github.com/kubernetes-sigs/controller-runtime/pull/2425 + // if !meta.IsNoMatchError(err) { + // return err + // } + var NoMatchingErr *discovery.ErrGroupDiscoveryFailed + if !meta.IsNoMatchError(err) && !errors.As(err, &NoMatchingErr) { + return err + } + log.Info("No matching API server for job framework, skip to create controller and webhook") + } else { + if err = cb.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)), + opts..., + ).SetupWithManager(mgr); err != nil { + log.Error(err, "Unable to create controller") + return err + } + if err = cb.SetupWebhook(mgr, opts...); err != nil { + log.Error(err, "Unable to create webhook") + return err + } + return nil } } + if err := noop.SetupWebhook(mgr, cb.JobType); err != nil { + log.Error(err, "Unable to create noop webhook") + return err + } return nil }) if err != nil { @@ -340,18 +349,3 @@ func isFrameworkEnabled(cfg *configapi.Configuration, name string) bool { } return false } - -// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=list;watch - -func findCustomResources(ctx context.Context, mgr ctrl.Manager) sets.Set[string] { - var crds = apiextensionsv1.CustomResourceDefinitionList{} - if err := mgr.GetClient().List(ctx, &crds); err != nil { - setupLog.Error(err, "Unable to get crd list") - os.Exit(1) - } - customResources := sets.New[string](job.FrameworkName) - for _, crd := range crds.Items { - customResources.Insert(strings.Join([]string{crd.Spec.Group, crd.Spec.Names.Singular}, "/")) - } - return customResources -} diff --git a/pkg/controller/jobframework/integrationmanager.go b/pkg/controller/jobframework/integrationmanager.go index 190561d281..b8a95cb3ca 100644 --- a/pkg/controller/jobframework/integrationmanager.go +++ b/pkg/controller/jobframework/integrationmanager.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -50,6 +51,8 @@ type IntegrationCallbacks struct { SetupWebhook func(mgr ctrl.Manager, opts ...Option) error // JobType holds an object of the type managed by the integration's webhook JobType runtime.Object + // GVK is the GroupVersionKind for the job. + GVK schema.GroupVersionKind // SetupIndexes registers any additional indexes with the controllers manager // (this callback is optional) SetupIndexes func(ctx context.Context, indexer client.FieldIndexer) error diff --git a/pkg/controller/jobs/job/job_controller.go b/pkg/controller/jobs/job/job_controller.go index c6b2030d1b..c022f545f6 100644 --- a/pkg/controller/jobs/job/job_controller.go +++ b/pkg/controller/jobs/job/job_controller.go @@ -61,6 +61,7 @@ func init() { NewReconciler: NewReconciler, SetupWebhook: SetupWebhook, JobType: &batchv1.Job{}, + GVK: gvk, })) } diff --git a/pkg/controller/jobs/jobset/jobset_controller.go b/pkg/controller/jobs/jobset/jobset_controller.go index a4394e2b4c..c95d54dcd8 100644 --- a/pkg/controller/jobs/jobset/jobset_controller.go +++ b/pkg/controller/jobs/jobset/jobset_controller.go @@ -47,6 +47,7 @@ func init() { NewReconciler: NewReconciler, SetupWebhook: SetupJobSetWebhook, JobType: &jobsetapi.JobSet{}, + GVK: gvk, AddToScheme: jobsetapi.AddToScheme, IsManagingObjectsOwner: isJobSet, })) diff --git a/pkg/controller/jobs/mpijob/mpijob_controller.go b/pkg/controller/jobs/mpijob/mpijob_controller.go index 4637b61e25..15e02dba46 100644 --- a/pkg/controller/jobs/mpijob/mpijob_controller.go +++ b/pkg/controller/jobs/mpijob/mpijob_controller.go @@ -47,6 +47,7 @@ func init() { NewReconciler: NewReconciler, SetupWebhook: SetupMPIJobWebhook, JobType: &kubeflow.MPIJob{}, + GVK: gvk, AddToScheme: kubeflow.AddToScheme, IsManagingObjectsOwner: isMPIJob, })) diff --git a/pkg/controller/jobs/rayjob/rayjob_controller.go b/pkg/controller/jobs/rayjob/rayjob_controller.go index d6c2df01d7..a6465f79b9 100644 --- a/pkg/controller/jobs/rayjob/rayjob_controller.go +++ b/pkg/controller/jobs/rayjob/rayjob_controller.go @@ -47,6 +47,7 @@ func init() { NewReconciler: NewReconciler, SetupWebhook: SetupRayJobWebhook, JobType: &rayjobapi.RayJob{}, + GVK: gvk, AddToScheme: rayjobapi.AddToScheme, })) }