Skip to content

Commit

Permalink
⚠️ Support global controller options in component config
Browse files Browse the repository at this point in the history
This change adds support for our v1alpha1 ComponentConfig types to
expose configuration options for controllers. The only two current
exposed options are concurrency (done via a GroupKind map) and the cache
sync timeout option.

This is a breaking change, given that we're adding a new required method
to the Manager's interface.

Signed-off-by: Vince Prignano <vincepri@vmware.com>
  • Loading branch information
vincepri committed Feb 5, 2021
1 parent a763c9a commit 82fc256
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 1 deletion.
16 changes: 16 additions & 0 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ func (blder *Builder) getControllerName(gvk schema.GroupVersionKind) string {
}

func (blder *Builder) doController(r reconcile.Reconciler) error {
globalOpts := blder.mgr.GetControllerOptions()

ctrlOptions := blder.ctrlOptions
if ctrlOptions.Reconciler == nil {
ctrlOptions.Reconciler = r
Expand All @@ -299,6 +301,20 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
return err
}

// Setup concurrency.
if ctrlOptions.MaxConcurrentReconciles == 0 {
groupKind := gvk.GroupKind().String()

if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 {
ctrlOptions.MaxConcurrentReconciles = concurrency
}
}

// Setup cache sync timeout.
if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil {
ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
}

// Setup the logger.
if ctrlOptions.Log == nil {
ctrlOptions.Log = blder.mgr.GetLogger()
Expand Down
29 changes: 29 additions & 0 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -172,6 +173,34 @@ var _ = Describe("application", func() {
Expect(instance).NotTo(BeNil())
})

It("should override max concurrent reconcilers during creation of controller, when using", func() {
const maxConcurrentReconciles = 10
newController = func(name string, mgr manager.Manager, options controller.Options) (
controller.Controller, error) {
if options.MaxConcurrentReconciles == maxConcurrentReconciles {
return controller.New(name, mgr, options)
}
return nil, fmt.Errorf("max concurrent reconcilers expected %d but found %d", maxConcurrentReconciles, options.MaxConcurrentReconciles)
}

By("creating a controller manager")
m, err := manager.New(cfg, manager.Options{
Controller: v1alpha1.ControllerConfigurationSpec{
GroupKindConcurrency: map[string]int{
"ReplicaSet.apps": maxConcurrentReconciles,
},
},
})
Expect(err).NotTo(HaveOccurred())

instance, err := ControllerManagedBy(m).
For(&appsv1.ReplicaSet{}).
Owns(&appsv1.ReplicaSet{}).
Build(noop)
Expect(err).NotTo(HaveOccurred())
Expect(instance).NotTo(BeNil())
})

It("should override rate limiter during creation of controller", func() {
rateLimiter := workqueue.DefaultItemBasedRateLimiter()
newController = func(name string, mgr manager.Manager, options controller.Options) (controller.Controller, error) {
Expand Down
32 changes: 31 additions & 1 deletion pkg/config/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package v1alpha1

import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

configv1alpha1 "k8s.io/component-base/config/v1alpha1"
Expand Down Expand Up @@ -50,9 +52,14 @@ type ControllerManagerConfigurationSpec struct {
// GracefulShutdownTimeout is the duration given to runnable to stop before the manager actually returns on stop.
// To disable graceful shutdown, set to time.Duration(0)
// To use graceful shutdown without timeout, set to a negative duration, e.G. time.Duration(-1)
// The graceful shutdown is skipped for safety reasons in case the leadere election lease is lost.
// The graceful shutdown is skipped for safety reasons in case the leader election lease is lost.
GracefulShutdownTimeout *metav1.Duration `json:"gracefulShutDown,omitempty"`

// Controller contains global configuration options for controllers
// registered within this manager.
// +optional
Controller *ControllerConfigurationSpec `json:"controller,omitempty"`

// Metrics contains thw controller metrics configuration
// +optional
Metrics ControllerMetrics `json:"metrics,omitempty"`
Expand All @@ -66,6 +73,29 @@ type ControllerManagerConfigurationSpec struct {
Webhook ControllerWebhook `json:"webhook,omitempty"`
}

// ControllerConfigurationSpec defines the global configuration for
// controllers registered with the manager.
type ControllerConfigurationSpec struct {
// GroupKindConcurrency is a map from a Kind to the number of concurrent reconciliation
// allowed for that controller.
//
// When a controller is registered within this manager using the builder utilities,
// users have to specify the type the controller reconciles in the For(...) call.
// If the object's kind passed matches one of the keys in this map, the concurrency
// for that controller is set to the number specified.
//
// The key is expected to be consistent in form with GroupKind.String(),
// e.g. ReplicaSet in apps group (regardless of version) would be `ReplicaSet.apps`.
//
// +optional
GroupKindConcurrency map[string]int `json:"groupKindConcurrency,omitempty"`

// CacheSyncTimeout refers to the time limit set to wait for syncing caches.
// Defaults to 2 minutes if not set.
// +optional
CacheSyncTimeout *time.Duration `json:"cacheSyncTimeout,omitempty"`
}

// ControllerMetrics defines the metrics configs
type ControllerMetrics struct {
// BindAddress is the TCP address that the controller should bind to
Expand Down
33 changes: 33 additions & 0 deletions pkg/config/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/healthz"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/metrics"
Expand Down Expand Up @@ -108,6 +109,9 @@ type controllerManager struct {
healthzStarted bool
errChan chan error

// controllerOptions are the global controller options.
controllerOptions v1alpha1.ControllerConfigurationSpec

// Logger is the logger that should be used by this manager.
// If none is set, it defaults to log.Log global logger.
logger logr.Logger
Expand Down Expand Up @@ -355,6 +359,10 @@ func (cm *controllerManager) GetLogger() logr.Logger {
return cm.logger
}

func (cm *controllerManager) GetControllerOptions() v1alpha1.ControllerConfigurationSpec {
return cm.controllerOptions
}

func (cm *controllerManager) serveMetrics() {
handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
ErrorHandling: promhttp.HTTPErrorOnError,
Expand Down
19 changes: 19 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ type Manager interface {

// GetLogger returns this manager's logger.
GetLogger() logr.Logger

// GetControllerOptions returns controller global configuration options.
GetControllerOptions() v1alpha1.ControllerConfigurationSpec
}

// Options are the arguments for creating a new Manager
Expand Down Expand Up @@ -230,6 +233,11 @@ type Options struct {
// The graceful shutdown is skipped for safety reasons in case the leader election lease is lost.
GracefulShutdownTimeout *time.Duration

// Controller contains global configuration options for controllers
// registered within this manager.
// +optional
Controller v1alpha1.ControllerConfigurationSpec

// makeBroadcaster allows deferring the creation of the broadcaster to
// avoid leaking goroutines if we never call Start on this manager. It also
// returns whether or not this is a "owned" broadcaster, and as such should be
Expand Down Expand Up @@ -337,6 +345,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
resourceLock: resourceLock,
metricsListener: metricsListener,
metricsExtraHandlers: metricsExtraHandlers,
controllerOptions: options.Controller,
logger: options.Logger,
elected: make(chan struct{}),
port: options.Port,
Expand Down Expand Up @@ -407,6 +416,16 @@ func (o Options) AndFrom(loader config.ControllerManagerConfiguration) (Options,
o.CertDir = newObj.Webhook.CertDir
}

if newObj.Controller != nil {
if o.Controller.CacheSyncTimeout == nil && newObj.Controller.CacheSyncTimeout != nil {
o.Controller.CacheSyncTimeout = newObj.Controller.CacheSyncTimeout
}

if len(o.Controller.GroupKindConcurrency) == 0 && len(newObj.Controller.GroupKindConcurrency) > 0 {
o.Controller.GroupKindConcurrency = newObj.Controller.GroupKindConcurrency
}
}

return o, nil
}

Expand Down

0 comments on commit 82fc256

Please sign in to comment.