-
Notifications
You must be signed in to change notification settings - Fork 228
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure that the framework is available using RESTMapper instead of getting CRDs #1046
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -18,10 +18,10 @@ package main | |||
|
||||
import ( | ||||
"context" | ||||
"errors" | ||||
"flag" | ||||
"fmt" | ||||
"os" | ||||
"strings" | ||||
|
||||
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) | ||||
// to ensure that exec-entrypoint and run can make use of them. | ||||
|
@@ -30,16 +30,16 @@ import ( | |||
zaplog "go.uber.org/zap" | ||||
"go.uber.org/zap/zapcore" | ||||
schedulingv1 "k8s.io/api/scheduling/v1" | ||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" | ||||
"k8s.io/apimachinery/pkg/api/meta" | ||||
"k8s.io/apimachinery/pkg/runtime" | ||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||||
"k8s.io/apimachinery/pkg/util/sets" | ||||
"k8s.io/apimachinery/pkg/util/validation/field" | ||||
utilfeature "k8s.io/apiserver/pkg/util/feature" | ||||
"k8s.io/client-go/discovery" | ||||
clientgoscheme "k8s.io/client-go/kubernetes/scheme" | ||||
"k8s.io/client-go/rest" | ||||
ctrl "sigs.k8s.io/controller-runtime" | ||||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil" | ||||
"sigs.k8s.io/controller-runtime/pkg/healthz" | ||||
"sigs.k8s.io/controller-runtime/pkg/log/zap" | ||||
|
||||
|
@@ -51,7 +51,6 @@ import ( | |||
"sigs.k8s.io/kueue/pkg/controller/core" | ||||
"sigs.k8s.io/kueue/pkg/controller/core/indexer" | ||||
"sigs.k8s.io/kueue/pkg/controller/jobframework" | ||||
"sigs.k8s.io/kueue/pkg/controller/jobs/job" | ||||
"sigs.k8s.io/kueue/pkg/controller/jobs/noop" | ||||
"sigs.k8s.io/kueue/pkg/metrics" | ||||
"sigs.k8s.io/kueue/pkg/queue" | ||||
|
@@ -78,7 +77,6 @@ func init() { | |||
|
||||
utilruntime.Must(kueue.AddToScheme(scheme)) | ||||
utilruntime.Must(configapi.AddToScheme(scheme)) | ||||
utilruntime.Must(apiextensionsv1.AddToScheme(scheme)) | ||||
// Add any additional framework integration types. | ||||
utilruntime.Must( | ||||
jobframework.ForEachIntegration(func(_ string, cb jobframework.IntegrationCallbacks) error { | ||||
|
@@ -159,7 +157,7 @@ func main() { | |||
// Cert won't be ready until manager starts, so start a goroutine here which | ||||
// will block until the cert is ready before setting up the controllers. | ||||
// Controllers who register after manager starts will start directly. | ||||
go setupControllers(ctx, mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher) | ||||
go setupControllers(mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher) | ||||
|
||||
go func() { | ||||
queues.CleanUpOnContext(ctx) | ||||
|
@@ -196,7 +194,7 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur | |||
} | ||||
} | ||||
|
||||
func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) { | ||||
func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) { | ||||
// The controllers won't work until the webhooks are operating, and the webhook won't work until the | ||||
// certs are all in place. | ||||
setupLog.Info("Waiting for certificate generation to complete") | ||||
|
@@ -214,34 +212,50 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache | |||
os.Exit(1) | ||||
} | ||||
|
||||
crds := findCustomResources(ctx, mgr) | ||||
|
||||
opts := []jobframework.Option{ | ||||
jobframework.WithManageJobsWithoutQueueName(manageJobsWithoutQueueName), | ||||
jobframework.WithWaitForPodsReady(waitForPodsReady(cfg)), | ||||
jobframework.WithKubeServerVersion(serverVersionFetcher), | ||||
} | ||||
err := jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error { | ||||
log := setupLog.WithValues("jobFrameworkName", name) | ||||
if isFrameworkEnabled(cfg, name) && crds.Has(name) { | ||||
if err := cb.NewReconciler( | ||||
mgr.GetClient(), | ||||
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)), | ||||
opts..., | ||||
).SetupWithManager(mgr); err != nil { | ||||
log.Error(err, "unable to create controller") | ||||
return err | ||||
} | ||||
if err := cb.SetupWebhook(mgr, opts...); err != nil { | ||||
log.Error(err, "Unable to create webhook") | ||||
|
||||
if isFrameworkEnabled(cfg, name) { | ||||
gvk, err := apiutil.GVKForObject(cb.JobType, mgr.GetScheme()) | ||||
if err != nil { | ||||
return err | ||||
} | ||||
} else { | ||||
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil { | ||||
log.Error(err, "Unable to create noop webhook") | ||||
return err | ||||
if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { | ||||
// TODO: If the below PR is released, we need to change a way to check if the GVK is registered. | ||||
// REF: https://github.com/kubernetes-sigs/controller-runtime/pull/2425 | ||||
// if !meta.IsNoMatchError(err) { | ||||
// return err | ||||
// } | ||||
var NoMatchingErr *discovery.ErrGroupDiscoveryFailed | ||||
if !meta.IsNoMatchError(err) && !errors.As(err, &NoMatchingErr) { | ||||
return err | ||||
} | ||||
log.Info("No matching API server for job framework, skip to create controller and webhook") | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. include which framework, by adding a key and a value There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The framework name is already set in Line 219 in ed880c7
|
||||
} else { | ||||
if err = cb.NewReconciler( | ||||
mgr.GetClient(), | ||||
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)), | ||||
opts..., | ||||
).SetupWithManager(mgr); err != nil { | ||||
log.Error(err, "Unable to create controller") | ||||
return err | ||||
} | ||||
if err = cb.SetupWebhook(mgr, opts...); err != nil { | ||||
log.Error(err, "Unable to create webhook") | ||||
return err | ||||
} | ||||
return nil | ||||
} | ||||
} | ||||
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should think not be under else? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the current behavior, the WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I missed that in the happy path you now do |
||||
log.Error(err, "Unable to create noop webhook") | ||||
return err | ||||
} | ||||
return nil | ||||
}) | ||||
if err != nil { | ||||
|
@@ -340,18 +354,3 @@ func isFrameworkEnabled(cfg *configapi.Configuration, name string) bool { | |||
} | ||||
return false | ||||
} | ||||
|
||||
// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=list;watch | ||||
|
||||
func findCustomResources(ctx context.Context, mgr ctrl.Manager) sets.Set[string] { | ||||
var crds = apiextensionsv1.CustomResourceDefinitionList{} | ||||
if err := mgr.GetClient().List(ctx, &crds); err != nil { | ||||
setupLog.Error(err, "Unable to get crd list") | ||||
os.Exit(1) | ||||
} | ||||
customResources := sets.New[string](job.FrameworkName) | ||||
for _, crd := range crds.Items { | ||||
customResources.Insert(strings.Join([]string{crd.Spec.Group, crd.Spec.Names.Singular}, "/")) | ||||
} | ||||
return customResources | ||||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, one more thought. There is a risk that once the PR is released, say in controller-runtime 0.16, we may not remember to check this when bumping the version and break this. I think it is unlikely a person doing a bump of controller-runtime would look at this comment without prior knowledge.
Some ideas to mitigate this risk:
if !meta.IsNoMatchError(err) && !errors.As(err, &NoMatchingErr) { return err }
. Then leave the TODO comment to cleanup up once releasedI'm leaning towards (1.) or (2.), but (2.) only if it is to happen with Kueue 0.5 release, because I don't like keeping PRs in a freezer for long :). (1.) seems safe to do.
EDIT: 4. integration tests for the scenario.
(4.) would be great, but might be overkill
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 sounds good, but also leave a TODO and issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mimowo It's a great suggestion. I think
1
would be nice.I agree. We should create an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed logic and created an issue: #1054