Skip to content

Commit

Permalink
Merge pull request #863 from negz/stop
Browse files Browse the repository at this point in the history
✨Allow controllers to be started and stopped separately from the manager
  • Loading branch information
k8s-ci-robot committed Apr 22, 2020
2 parents 6501aeb + f779bdd commit a457e27
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 4 deletions.
16 changes: 14 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ type Controller interface {
// New returns a new Controller registered with the Manager. The Manager will ensure that shared Caches have
// been synced before the Controller is Started.
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
c, err := NewUnmanaged(name, mgr, options)
if err != nil {
return nil, err
}

// Add the controller as a Manager components
return c, mgr.Add(c)
}

// NewUnmanaged returns a new controller without adding it to the manager. The
// caller is responsible for starting the returned controller.
func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller, error) {
if options.Reconciler == nil {
return nil, fmt.Errorf("must specify Reconciler")
}
Expand Down Expand Up @@ -100,9 +112,9 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
SetFields: mgr.SetFields,
Name: name,
}

// Add the controller as a Manager components
return c, mgr.Add(c)
return c, nil
}
45 changes: 45 additions & 0 deletions pkg/controller/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,48 @@ func ExampleController_unstructured() {
os.Exit(1)
}
}

// This example creates a new controller named "pod-controller" to watch Pods
// and call a no-op reconciler. The controller is not added to the provided
// manager, and must thus be started and stopped by the caller.
func ExampleNewUnmanaged() {
// mgr is a manager.Manager

// Configure creates a new controller but does not add it to the supplied
// manager.
c, err := controller.NewUnmanaged("pod-controller", mgr, controller.Options{
Reconciler: reconcile.Func(func(_ reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, nil
}),
})
if err != nil {
log.Error(err, "unable to create pod-controller")
os.Exit(1)
}

if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}); err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
}

// Create a stop channel for our controller. The controller will stop when
// this channel is closed.
stop := make(chan struct{})

// Start our controller in a goroutine so that we do not block.
go func() {
// Block until our controller manager is elected leader. We presume our
// entire process will terminate if we lose leadership, so we don't need
// to handle that.
<-mgr.Elected()

// Start our controller. This will block until the stop channel is
// closed, or the controller returns an error.
if err := c.Start(stop); err != nil {
log.Error(err, "cannot run experiment controller")
}
}()

// Stop our controller.
close(stop)
}
12 changes: 12 additions & 0 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ type controllerManager struct {
// It and `internalStop` should point to the same channel.
internalStopper chan<- struct{}

// elected is closed when this manager becomes the leader of a group of
// managers, either because it won a leader election or because no leader
// election was configured.
elected chan struct{}

startCache func(stop <-chan struct{}) error

// port is the port that the webhook server serves at.
Expand Down Expand Up @@ -457,6 +462,8 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
return err
}
} else {
// Treat not having leader election enabled the same as being elected.
close(cm.elected)
go cm.startLeaderElectionRunnables()
}

Expand Down Expand Up @@ -545,6 +552,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
RetryPeriod: cm.retryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ context.Context) {
close(cm.elected)
cm.startLeaderElectionRunnables()
},
OnStoppedLeading: func() {
Expand Down Expand Up @@ -572,3 +580,7 @@ func (cm *controllerManager) startLeaderElection() (err error) {
go l.Run(ctx)
return nil
}

func (cm *controllerManager) Elected() <-chan struct{} {
return cm.elected
}
6 changes: 6 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ type Manager interface {
// non-leaderelection mode (always running) or leader election mode (managed by leader election if enabled).
Add(Runnable) error

// Elected is closed when this manager is elected leader of a group of
// managers, either because it won a leader election or because no leader
// election was configured.
Elected() <-chan struct{}

// SetFields will set any dependencies on an object for which the object has implemented the inject
// interface - e.g. inject.Client.
SetFields(interface{}) error
Expand Down Expand Up @@ -325,6 +330,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
metricsExtraHandlers: metricsExtraHandlers,
internalStop: stop,
internalStopper: stop,
elected: make(chan struct{}),
port: options.Port,
host: options.Host,
certDir: options.CertDir,
Expand Down
57 changes: 55 additions & 2 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ var _ = Describe("manger.Manager", func() {
Context("with leader election enabled", func() {
It("should default ID to controller-runtime if ID is not set", func() {
var rl resourcelock.Interface
m, err := New(cfg, Options{
m1, err := New(cfg, Options{
LeaderElection: true,
LeaderElectionNamespace: "default",
LeaderElectionID: "test-leader-election-id",
Expand All @@ -152,10 +152,61 @@ var _ = Describe("manger.Manager", func() {
rl, err = leaderelection.NewResourceLock(config, recorderProvider, options)
return rl, err
},
HealthProbeBindAddress: "0",
MetricsBindAddress: "0",
})
Expect(err).ToNot(HaveOccurred())
Expect(m).ToNot(BeNil())
Expect(m1).ToNot(BeNil())
Expect(rl.Describe()).To(Equal("default/test-leader-election-id"))

m2, err := New(cfg, Options{
LeaderElection: true,
LeaderElectionNamespace: "default",
LeaderElectionID: "test-leader-election-id",
newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) {
var err error
rl, err = leaderelection.NewResourceLock(config, recorderProvider, options)
return rl, err
},
HealthProbeBindAddress: "0",
MetricsBindAddress: "0",
})

Expect(err).ToNot(HaveOccurred())
Expect(m2).ToNot(BeNil())
Expect(rl.Describe()).To(Equal("default/test-leader-election-id"))

c1 := make(chan struct{})
Expect(m1.Add(RunnableFunc(func(s <-chan struct{}) error {
defer GinkgoRecover()
close(c1)
return nil
}))).To(Succeed())

go func() {
defer GinkgoRecover()
Expect(m1.Elected()).ShouldNot(BeClosed())
Expect(m1.Start(stop)).NotTo(HaveOccurred())
Expect(m1.Elected()).Should(BeClosed())
}()
<-c1

c2 := make(chan struct{})
Expect(m2.Add(RunnableFunc(func(s <-chan struct{}) error {
defer GinkgoRecover()
close(c2)
return nil
}))).To(Succeed())

By("Expect second manager to lose leader election")
go func() {
defer GinkgoRecover()
Expect(m2.Start(stop)).NotTo(HaveOccurred())
Consistently(m2.Elected()).ShouldNot(Receive())
}()

By("Expect controller on manager without leader lease never to run")
Consistently(c2).ShouldNot(Receive())
})

It("should return an error if namespace not set and not running in cluster", func() {
Expand Down Expand Up @@ -260,7 +311,9 @@ var _ = Describe("manger.Manager", func() {

go func() {
defer GinkgoRecover()
Expect(m.Elected()).ShouldNot(BeClosed())
Expect(m.Start(stop)).NotTo(HaveOccurred())
Expect(m.Elected()).Should(BeClosed())
}()
<-c1
<-c2
Expand Down

0 comments on commit a457e27

Please sign in to comment.