Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure that the framework is available using RESTMapper instead of getting CRDs #1046

Merged
merged 1 commit into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ rules:
- list
- update
- watch
- apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
verbs:
- list
- watch
- apiGroups:
- batch
resources:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ require (
github.com/ray-project/kuberay/ray-operator v0.0.0-20230613204710-aeed3cdcbdcc
go.uber.org/zap v1.24.0
k8s.io/api v0.27.4
k8s.io/apiextensions-apiserver v0.27.4
k8s.io/apimachinery v0.27.4
k8s.io/apiserver v0.27.4
k8s.io/client-go v0.27.4
Expand Down Expand Up @@ -78,6 +77,7 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.27.4 // indirect
k8s.io/gengo v0.0.0-20220902162205-c0856e24416d // indirect
k8s.io/kube-openapi v0.0.0-20230501164219-8b0f38b5fd1f // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
Expand Down
77 changes: 38 additions & 39 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package main

import (
"context"
"errors"
"flag"
"fmt"
"os"
"strings"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
Expand All @@ -30,16 +30,16 @@ import (
zaplog "go.uber.org/zap"
"go.uber.org/zap/zapcore"
schedulingv1 "k8s.io/api/scheduling/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/discovery"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

Expand All @@ -51,7 +51,6 @@ import (
"sigs.k8s.io/kueue/pkg/controller/core"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/controller/jobs/job"
"sigs.k8s.io/kueue/pkg/controller/jobs/noop"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
Expand All @@ -78,7 +77,6 @@ func init() {

utilruntime.Must(kueue.AddToScheme(scheme))
utilruntime.Must(configapi.AddToScheme(scheme))
utilruntime.Must(apiextensionsv1.AddToScheme(scheme))
// Add any additional framework integration types.
utilruntime.Must(
jobframework.ForEachIntegration(func(_ string, cb jobframework.IntegrationCallbacks) error {
Expand Down Expand Up @@ -159,7 +157,7 @@ func main() {
// Cert won't be ready until manager starts, so start a goroutine here which
// will block until the cert is ready before setting up the controllers.
// Controllers who register after manager starts will start directly.
go setupControllers(ctx, mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher)
go setupControllers(mgr, cCache, queues, certsReady, &cfg, serverVersionFetcher)

go func() {
queues.CleanUpOnContext(ctx)
Expand Down Expand Up @@ -196,7 +194,7 @@ func setupIndexes(ctx context.Context, mgr ctrl.Manager, cfg *configapi.Configur
}
}

func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manager, certsReady chan struct{}, cfg *configapi.Configuration, serverVersionFetcher *kubeversion.ServerVersionFetcher) {
// The controllers won't work until the webhooks are operating, and the webhook won't work until the
// certs are all in place.
setupLog.Info("Waiting for certificate generation to complete")
Expand All @@ -214,34 +212,50 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *cache.Cache
os.Exit(1)
}

crds := findCustomResources(ctx, mgr)

opts := []jobframework.Option{
jobframework.WithManageJobsWithoutQueueName(manageJobsWithoutQueueName),
jobframework.WithWaitForPodsReady(waitForPodsReady(cfg)),
jobframework.WithKubeServerVersion(serverVersionFetcher),
}
err := jobframework.ForEachIntegration(func(name string, cb jobframework.IntegrationCallbacks) error {
log := setupLog.WithValues("jobFrameworkName", name)
if isFrameworkEnabled(cfg, name) && crds.Has(name) {
if err := cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)),
opts...,
).SetupWithManager(mgr); err != nil {
log.Error(err, "unable to create controller")
return err
}
if err := cb.SetupWebhook(mgr, opts...); err != nil {
log.Error(err, "Unable to create webhook")

if isFrameworkEnabled(cfg, name) {
gvk, err := apiutil.GVKForObject(cb.JobType, mgr.GetScheme())
if err != nil {
return err
}
} else {
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
log.Error(err, "Unable to create noop webhook")
return err
if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
// TODO: If the below PR is released, we need to change a way to check if the GVK is registered.
Copy link
Contributor

@mimowo mimowo Aug 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, one more thought. There is a risk that once the PR is released, say in controller-runtime 0.16, we may not remember to check this when bumping the version and break this. I think it is unlikely a person doing a bump of controller-runtime would look at this comment without prior knowledge.
Some ideas to mitigate this risk:

  1. prepare the check in advance, if !meta.IsNoMatchError(err) && !errors.As(err, &NoMatchingErr) { return err }. Then leave the TODO comment to cleanup up once released
  2. coordinate with the release of the PR and bump controller-runtime (but might be not necessarily involving)
  3. create an issue in advance in kueue to increase visibility, giving a title like "Bump controller-runtime and adjust RESTMapper usage"

I'm leaning towards (1.) or (2.), but (2.) only if it is to happen with Kueue 0.5 release, because I don't like keeping PRs in a freezer for long :). (1.) seems safe to do.

EDIT: 4. integration tests for the scenario.
(4.) would be great, but might be overkill

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 sounds good, but also leave a TODO and issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mimowo It's a great suggestion. I think 1 would be nice.

1 sounds good, but also leave a TODO and issue.

I agree. We should create an issue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed logic and created an issue: #1054

// REF: https://github.com/kubernetes-sigs/controller-runtime/pull/2425
// if !meta.IsNoMatchError(err) {
// return err
// }
var NoMatchingErr *discovery.ErrGroupDiscoveryFailed
if !meta.IsNoMatchError(err) && !errors.As(err, &NoMatchingErr) {
return err
}
log.Info("No matching API server for job framework, skip to create controller and webhook")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include which framework, by adding a key and a value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The framework name is already set in

kueue/main.go

Line 219 in ed880c7

log := setupLog.WithValues("jobFrameworkName", name)
.

} else {
if err = cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, constants.KueueName)),
opts...,
).SetupWithManager(mgr); err != nil {
log.Error(err, "Unable to create controller")
return err
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
log.Error(err, "Unable to create webhook")
return err
}
return nil
}
}
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should think not be under else?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the current behavior, the noop.SetupWebhook is run even when isFrameworkEnabled==true && crds.Has==false.
So I think we need to run here even when isFrameworkEnabled==true && errors.As(err, &NoMatchingErr)==true.

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I missed that in the happy path you now do return nil. I was thinking the noop.SetupWebhook is called now in that case.

log.Error(err, "Unable to create noop webhook")
return err
}
return nil
})
if err != nil {
Expand Down Expand Up @@ -340,18 +354,3 @@ func isFrameworkEnabled(cfg *configapi.Configuration, name string) bool {
}
return false
}

// +kubebuilder:rbac:groups=apiextensions.k8s.io,resources=customresourcedefinitions,verbs=list;watch

func findCustomResources(ctx context.Context, mgr ctrl.Manager) sets.Set[string] {
var crds = apiextensionsv1.CustomResourceDefinitionList{}
if err := mgr.GetClient().List(ctx, &crds); err != nil {
setupLog.Error(err, "Unable to get crd list")
os.Exit(1)
}
customResources := sets.New[string](job.FrameworkName)
for _, crd := range crds.Items {
customResources.Insert(strings.Join([]string{crd.Spec.Group, crd.Spec.Names.Singular}, "/"))
}
return customResources
}