diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 67777ca260f0..588f2fcf149a 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -27,20 +27,15 @@ import ( ) func main() { - options, manager := operator.NewOptionsWithManagerOrDie() - cloudProvider := cloudprovider.CloudProvider(awscloudprovider.NewCloudProvider(options.Ctx, cloudprovider.Options{ - ClientSet: options.Clientset, - KubeClient: options.KubeClient, - StartAsync: options.StartAsync, - })) - if hp, ok := cloudProvider.(operator.HealthCheck); ok { - utilruntime.Must(manager.AddHealthzCheck("cloud-provider", hp.LivenessProbe)) - } - cloudProvider = cloudprovidermetrics.Decorate(cloudProvider) - if err := operator.RegisterControllers(options.Ctx, - manager, - controllers.GetControllers(options, cloudProvider)..., - ).Start(options.Ctx); err != nil { + ctx := operator.NewContextOrDie() + cloudProvider := awscloudprovider.NewCloudProvider(ctx, cloudprovider.Options{ + ClientSet: ctx.Clientset, + KubeClient: ctx.KubeClient, + StartAsync: ctx.StartAsync, + }) + utilruntime.Must(ctx.Manager.AddHealthzCheck("cloud-provider", cloudProvider.LivenessProbe)) + operator.RegisterControllers(ctx, controllers.GetControllers(ctx, cloudprovidermetrics.Decorate(cloudProvider))...) + if err := ctx.Manager.Start(ctx); err != nil { panic(fmt.Sprintf("Unable to start manager, %s", err)) } } diff --git a/pkg/controllers/controllers.go b/pkg/controllers/controllers.go index 389b3ece7f2d..5c927d806745 100644 --- a/pkg/controllers/controllers.go +++ b/pkg/controllers/controllers.go @@ -33,22 +33,22 @@ func init() { metrics.MustRegister() // Registers cross-controller metrics } -func GetControllers(opts operator.Options, cloudProvider cloudprovider.CloudProvider) []operator.Controller { - cluster := state.NewCluster(opts.Clock, opts.Config, opts.KubeClient, cloudProvider) - provisioner := provisioning.NewProvisioner(opts.Ctx, opts.Config, opts.KubeClient, opts.Clientset.CoreV1(), opts.Recorder, cloudProvider, cluster) +func GetControllers(ctx operator.Context, cloudProvider cloudprovider.CloudProvider) []operator.Controller { + cluster := state.NewCluster(ctx.Clock, ctx.Config, ctx.KubeClient, cloudProvider) + provisioner := provisioning.NewProvisioner(ctx, ctx.Config, ctx.KubeClient, ctx.Clientset.CoreV1(), ctx.Recorder, cloudProvider, cluster) - metricsstate.StartMetricScraper(opts.Ctx, cluster) + metricsstate.StartMetricScraper(ctx, cluster) return []operator.Controller{ - provisioning.NewController(opts.KubeClient, provisioner, opts.Recorder), - state.NewNodeController(opts.KubeClient, cluster), - state.NewPodController(opts.KubeClient, cluster), - state.NewProvisionerController(opts.KubeClient, cluster), - node.NewController(opts.Clock, opts.KubeClient, cloudProvider, cluster), - termination.NewController(opts.Ctx, opts.Clock, opts.KubeClient, opts.Clientset.CoreV1(), opts.Recorder, cloudProvider), - metricspod.NewController(opts.KubeClient), - metricsprovisioner.NewController(opts.KubeClient), - counter.NewController(opts.KubeClient, cluster), - consolidation.NewController(opts.Clock, opts.KubeClient, provisioner, cloudProvider, opts.Recorder, cluster), + provisioning.NewController(ctx.KubeClient, provisioner, ctx.Recorder), + state.NewNodeController(ctx.KubeClient, cluster), + state.NewPodController(ctx.KubeClient, cluster), + state.NewProvisionerController(ctx.KubeClient, cluster), + node.NewController(ctx.Clock, ctx.KubeClient, cloudProvider, cluster), + termination.NewController(ctx, ctx.Clock, ctx.KubeClient, ctx.Clientset.CoreV1(), ctx.Recorder, cloudProvider), + metricspod.NewController(ctx.KubeClient), + metricsprovisioner.NewController(ctx.KubeClient), + counter.NewController(ctx.KubeClient, cluster), + consolidation.NewController(ctx.Clock, ctx.KubeClient, provisioner, cloudProvider, ctx.Recorder, cluster), } } diff --git a/pkg/operator/options.go b/pkg/operator/context.go similarity index 79% rename from pkg/operator/options.go rename to pkg/operator/context.go index fd3f05e6e0e9..00c9a2b17041 100644 --- a/pkg/operator/options.go +++ b/pkg/operator/context.go @@ -22,10 +22,14 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/kubernetes" clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" "k8s.io/client-go/util/flowcontrol" "k8s.io/utils/clock" "knative.dev/pkg/configmap/informer" + knativeinjection "knative.dev/pkg/injection" + "knative.dev/pkg/injection/sharedmain" "knative.dev/pkg/logging" + "knative.dev/pkg/signals" "knative.dev/pkg/system" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -53,9 +57,10 @@ func init() { utilruntime.Must(apis.AddToScheme(scheme)) } -// Options exposes shared components that are initialized by the startup.Initialize() call -type Options struct { - Ctx context.Context +// Context is the root context for the operator and exposes shared components used by the entire process +type Context struct { + context.Context + Manager manager.Manager Recorder events.Recorder Config config.Config KubeClient client.Client @@ -65,7 +70,7 @@ type Options struct { StartAsync <-chan struct{} } -func NewOptionsWithManagerOrDie() (Options, manager.Manager) { +func NewContextOrDie() Context { opts := options.New().MustParse() // Setup Client @@ -74,9 +79,15 @@ func NewOptionsWithManagerOrDie() (Options, manager.Manager) { controllerRuntimeConfig.UserAgent = appName clientSet := kubernetes.NewForConfigOrDie(controllerRuntimeConfig) - // Set up logger and watch for changes to log level + // Set up logger and watch for changes to log level defined in the ConfigMap `config-logging` cmw := informer.NewInformedWatcher(clientSet, system.Namespace()) - ctx := injection.LoggingContextOrDie(component, controllerRuntimeConfig, cmw) + ctx, startinformers := knativeinjection.EnableInjectionOrDie(signals.NewContext(), controllerRuntimeConfig) + logger, atomicLevel := sharedmain.SetupLoggerOrDie(ctx, component) + ctx = logging.WithLogger(ctx, logger) + rest.SetDefaultWarningHandler(&logging.WarningHandler{Logger: logger}) + sharedmain.WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component) + startinformers() + ctx = injection.WithConfig(ctx, controllerRuntimeConfig) ctx = injection.WithOptions(ctx, *opts) @@ -101,8 +112,9 @@ func NewOptionsWithManagerOrDie() (Options, manager.Manager) { recorder = events.NewLoadSheddingRecorder(recorder) recorder = events.NewDedupeRecorder(recorder) - return Options{ - Ctx: ctx, + return Context{ + Context: ctx, + Manager: manager, Recorder: recorder, Config: cfg, Clientset: clientSet, @@ -110,5 +122,5 @@ func NewOptionsWithManagerOrDie() (Options, manager.Manager) { Clock: clock.RealClock{}, Options: opts, StartAsync: manager.Elected(), - }, manager + } } diff --git a/pkg/operator/manager.go b/pkg/operator/manager.go index d3dc62241ffa..33e9b741cbd8 100644 --- a/pkg/operator/manager.go +++ b/pkg/operator/manager.go @@ -80,23 +80,22 @@ func NewManagerOrDie(ctx context.Context, config *rest.Config, opts *options.Opt } // RegisterControllers registers a set of controllers to the controller manager -func RegisterControllers(ctx context.Context, m manager.Manager, controllers ...Controller) manager.Manager { +func RegisterControllers(ctx Context, controllers ...Controller) { for _, c := range controllers { - if err := c.Register(ctx, m); err != nil { + if err := c.Register(ctx, ctx.Manager); err != nil { panic(err) } // if the controller implements a liveness check, connect it if lp, ok := c.(HealthCheck); ok { - utilruntime.Must(m.AddHealthzCheck(fmt.Sprintf("%T", c), lp.LivenessProbe)) + utilruntime.Must(ctx.Manager.AddHealthzCheck(fmt.Sprintf("%T", c), lp.LivenessProbe)) } } - if err := m.AddHealthzCheck("healthz", healthz.Ping); err != nil { + if err := ctx.Manager.AddHealthzCheck("healthz", healthz.Ping); err != nil { panic(fmt.Sprintf("Failed to add health probe, %s", err)) } - if err := m.AddReadyzCheck("readyz", healthz.Ping); err != nil { + if err := ctx.Manager.AddReadyzCheck("readyz", healthz.Ping); err != nil { panic(fmt.Sprintf("Failed to add ready probe, %s", err)) } - return m } func newRunnableContext(config *rest.Config, options *options.Options, logger *zap.SugaredLogger) func() context.Context { diff --git a/pkg/utils/injection/injection.go b/pkg/utils/injection/injection.go index 2b2f12b3077e..eb2ee0e745e8 100644 --- a/pkg/utils/injection/injection.go +++ b/pkg/utils/injection/injection.go @@ -19,29 +19,12 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" - "knative.dev/pkg/configmap/informer" - knativeinjection "knative.dev/pkg/injection" - "knative.dev/pkg/injection/sharedmain" - "knative.dev/pkg/logging" - "knative.dev/pkg/signals" "github.com/aws/karpenter/pkg/utils/options" ) type resourceKey struct{} -// LoggingContextOrDie injects a logger into the returned context. The logger is -// configured by the ConfigMap `config-logging` and live updates the level. -func LoggingContextOrDie(componentName string, config *rest.Config, cmw *informer.InformedWatcher) context.Context { - ctx, startinformers := knativeinjection.EnableInjectionOrDie(signals.NewContext(), config) - logger, atomicLevel := sharedmain.SetupLoggerOrDie(ctx, componentName) - ctx = logging.WithLogger(ctx, logger) - rest.SetDefaultWarningHandler(&logging.WarningHandler{Logger: logger}) - sharedmain.WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, componentName) - startinformers() - return ctx -} - func WithNamespacedName(ctx context.Context, namespacedname types.NamespacedName) context.Context { return context.WithValue(ctx, resourceKey{}, namespacedname) }