Skip to content

Commit

Permalink
chore: initialization touchups
Browse files Browse the repository at this point in the history
  • Loading branch information
ellistarn committed Oct 17, 2022
1 parent b22d51f commit 5eb6415
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 60 deletions.
23 changes: 9 additions & 14 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,15 @@ import (
)

func main() {
options, manager := operator.NewOptionsWithManagerOrDie()
cloudProvider := cloudprovider.CloudProvider(awscloudprovider.NewCloudProvider(options.Ctx, cloudprovider.Options{
ClientSet: options.Clientset,
KubeClient: options.KubeClient,
StartAsync: options.StartAsync,
}))
if hp, ok := cloudProvider.(operator.HealthCheck); ok {
utilruntime.Must(manager.AddHealthzCheck("cloud-provider", hp.LivenessProbe))
}
cloudProvider = cloudprovidermetrics.Decorate(cloudProvider)
if err := operator.RegisterControllers(options.Ctx,
manager,
controllers.GetControllers(options, cloudProvider)...,
).Start(options.Ctx); err != nil {
ctx := operator.NewContextOrDie()
cloudProvider := awscloudprovider.NewCloudProvider(ctx, cloudprovider.Options{
ClientSet: ctx.Clientset,
KubeClient: ctx.KubeClient,
StartAsync: ctx.StartAsync,
})
utilruntime.Must(ctx.Manager.AddHealthzCheck("cloud-provider", cloudProvider.LivenessProbe))
operator.RegisterControllers(ctx, controllers.GetControllers(ctx, cloudprovidermetrics.Decorate(cloudProvider))...)
if err := ctx.Manager.Start(ctx); err != nil {
panic(fmt.Sprintf("Unable to start manager, %s", err))
}
}
28 changes: 14 additions & 14 deletions pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@ func init() {
metrics.MustRegister() // Registers cross-controller metrics
}

func GetControllers(opts operator.Options, cloudProvider cloudprovider.CloudProvider) []operator.Controller {
cluster := state.NewCluster(opts.Clock, opts.Config, opts.KubeClient, cloudProvider)
provisioner := provisioning.NewProvisioner(opts.Ctx, opts.Config, opts.KubeClient, opts.Clientset.CoreV1(), opts.Recorder, cloudProvider, cluster)
func GetControllers(ctx operator.Context, cloudProvider cloudprovider.CloudProvider) []operator.Controller {
cluster := state.NewCluster(ctx.Clock, ctx.Config, ctx.KubeClient, cloudProvider)
provisioner := provisioning.NewProvisioner(ctx, ctx.Config, ctx.KubeClient, ctx.Clientset.CoreV1(), ctx.Recorder, cloudProvider, cluster)

metricsstate.StartMetricScraper(opts.Ctx, cluster)
metricsstate.StartMetricScraper(ctx, cluster)

return []operator.Controller{
provisioning.NewController(opts.KubeClient, provisioner, opts.Recorder),
state.NewNodeController(opts.KubeClient, cluster),
state.NewPodController(opts.KubeClient, cluster),
state.NewProvisionerController(opts.KubeClient, cluster),
node.NewController(opts.Clock, opts.KubeClient, cloudProvider, cluster),
termination.NewController(opts.Ctx, opts.Clock, opts.KubeClient, opts.Clientset.CoreV1(), opts.Recorder, cloudProvider),
metricspod.NewController(opts.KubeClient),
metricsprovisioner.NewController(opts.KubeClient),
counter.NewController(opts.KubeClient, cluster),
consolidation.NewController(opts.Clock, opts.KubeClient, provisioner, cloudProvider, opts.Recorder, cluster),
provisioning.NewController(ctx.KubeClient, provisioner, ctx.Recorder),
state.NewNodeController(ctx.KubeClient, cluster),
state.NewPodController(ctx.KubeClient, cluster),
state.NewProvisionerController(ctx.KubeClient, cluster),
node.NewController(ctx.Clock, ctx.KubeClient, cloudProvider, cluster),
termination.NewController(ctx, ctx.Clock, ctx.KubeClient, ctx.Clientset.CoreV1(), ctx.Recorder, cloudProvider),
metricspod.NewController(ctx.KubeClient),
metricsprovisioner.NewController(ctx.KubeClient),
counter.NewController(ctx.KubeClient, cluster),
consolidation.NewController(ctx.Clock, ctx.KubeClient, provisioner, cloudProvider, ctx.Recorder, cluster),
}
}
30 changes: 21 additions & 9 deletions pkg/operator/options.go → pkg/operator/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/flowcontrol"
"k8s.io/utils/clock"
"knative.dev/pkg/configmap/informer"
knativeinjection "knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -53,9 +57,10 @@ func init() {
utilruntime.Must(apis.AddToScheme(scheme))
}

// Options exposes shared components that are initialized by the startup.Initialize() call
type Options struct {
Ctx context.Context
// Context is the root context for the operator and exposes shared components used by the entire process
type Context struct {
context.Context
Manager manager.Manager
Recorder events.Recorder
Config config.Config
KubeClient client.Client
Expand All @@ -65,7 +70,7 @@ type Options struct {
StartAsync <-chan struct{}
}

func NewOptionsWithManagerOrDie() (Options, manager.Manager) {
func NewContextOrDie() Context {
opts := options.New().MustParse()

// Setup Client
Expand All @@ -74,9 +79,15 @@ func NewOptionsWithManagerOrDie() (Options, manager.Manager) {
controllerRuntimeConfig.UserAgent = appName
clientSet := kubernetes.NewForConfigOrDie(controllerRuntimeConfig)

// Set up logger and watch for changes to log level
// Set up logger and watch for changes to log level defined in the ConfigMap `config-logging`
cmw := informer.NewInformedWatcher(clientSet, system.Namespace())
ctx := injection.LoggingContextOrDie(component, controllerRuntimeConfig, cmw)
ctx, startinformers := knativeinjection.EnableInjectionOrDie(signals.NewContext(), controllerRuntimeConfig)
logger, atomicLevel := sharedmain.SetupLoggerOrDie(ctx, component)
ctx = logging.WithLogger(ctx, logger)
rest.SetDefaultWarningHandler(&logging.WarningHandler{Logger: logger})
sharedmain.WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component)
startinformers()

ctx = injection.WithConfig(ctx, controllerRuntimeConfig)
ctx = injection.WithOptions(ctx, *opts)

Expand All @@ -101,14 +112,15 @@ func NewOptionsWithManagerOrDie() (Options, manager.Manager) {
recorder = events.NewLoadSheddingRecorder(recorder)
recorder = events.NewDedupeRecorder(recorder)

return Options{
Ctx: ctx,
return Context{
Context: ctx,
Manager: manager,
Recorder: recorder,
Config: cfg,
Clientset: clientSet,
KubeClient: manager.GetClient(),
Clock: clock.RealClock{},
Options: opts,
StartAsync: manager.Elected(),
}, manager
}
}
11 changes: 5 additions & 6 deletions pkg/operator/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,22 @@ func NewManagerOrDie(ctx context.Context, config *rest.Config, opts *options.Opt
}

// RegisterControllers registers a set of controllers to the controller manager
func RegisterControllers(ctx context.Context, m manager.Manager, controllers ...Controller) manager.Manager {
func RegisterControllers(ctx Context, controllers ...Controller) {
for _, c := range controllers {
if err := c.Register(ctx, m); err != nil {
if err := c.Register(ctx, ctx.Manager); err != nil {
panic(err)
}
// if the controller implements a liveness check, connect it
if lp, ok := c.(HealthCheck); ok {
utilruntime.Must(m.AddHealthzCheck(fmt.Sprintf("%T", c), lp.LivenessProbe))
utilruntime.Must(ctx.Manager.AddHealthzCheck(fmt.Sprintf("%T", c), lp.LivenessProbe))
}
}
if err := m.AddHealthzCheck("healthz", healthz.Ping); err != nil {
if err := ctx.Manager.AddHealthzCheck("healthz", healthz.Ping); err != nil {
panic(fmt.Sprintf("Failed to add health probe, %s", err))
}
if err := m.AddReadyzCheck("readyz", healthz.Ping); err != nil {
if err := ctx.Manager.AddReadyzCheck("readyz", healthz.Ping); err != nil {
panic(fmt.Sprintf("Failed to add ready probe, %s", err))
}
return m
}

func newRunnableContext(config *rest.Config, options *options.Options, logger *zap.SugaredLogger) func() context.Context {
Expand Down
17 changes: 0 additions & 17 deletions pkg/utils/injection/injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,12 @@ import (

"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"knative.dev/pkg/configmap/informer"
knativeinjection "knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
"knative.dev/pkg/signals"

"github.com/aws/karpenter/pkg/utils/options"
)

type resourceKey struct{}

// LoggingContextOrDie injects a logger into the returned context. The logger is
// configured by the ConfigMap `config-logging` and live updates the level.
func LoggingContextOrDie(componentName string, config *rest.Config, cmw *informer.InformedWatcher) context.Context {
ctx, startinformers := knativeinjection.EnableInjectionOrDie(signals.NewContext(), config)
logger, atomicLevel := sharedmain.SetupLoggerOrDie(ctx, componentName)
ctx = logging.WithLogger(ctx, logger)
rest.SetDefaultWarningHandler(&logging.WarningHandler{Logger: logger})
sharedmain.WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, componentName)
startinformers()
return ctx
}

func WithNamespacedName(ctx context.Context, namespacedname types.NamespacedName) context.Context {
return context.WithValue(ctx, resourceKey{}, namespacedname)
}
Expand Down

0 comments on commit 5eb6415

Please sign in to comment.