Skip to content

Commit

Permalink
Stop channel commenting/renaming for clarity
Browse files Browse the repository at this point in the history
This cleans up, adds comments, and renames things in the stop channel
fix commits for clarity.
  • Loading branch information
DirectXMan12 committed Nov 9, 2018
1 parent 576c309 commit 977e1fa
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 27 deletions.
16 changes: 8 additions & 8 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)

// Create controller with dependencies set
c := &controller.Controller{
Do: options.Reconciler,
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Recorder: mgr.GetRecorder(name),
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
Do: options.Reconciler,
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Recorder: mgr.GetRecorder(name),
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Name: name,
Name: name,
}

// Add the controller as a Manager components
Expand Down
6 changes: 3 additions & 3 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ var _ = Describe("controller", func() {
informers = &informertest.FakeInformers{}
ctrl = &Controller{
MaxConcurrentReconciles: 1,
Do: fakeReconcile,
Queue: queue,
Cache: informers,
Do: fakeReconcile,
Queue: queue,
Cache: informers,
}
ctrl.InjectFunc(func(interface{}) error { return nil })
})
Expand Down
37 changes: 23 additions & 14 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type controllerManager struct {
// (and EventHandlers, Sources and Predicates).
recorderProvider recorder.Provider

// resourceLock
// resourceLock forms the basis for leader election
resourceLock resourcelock.Interface

// mapper is used to map resources to kind, and map kind and version.
Expand All @@ -73,10 +73,16 @@ type controllerManager struct {
mu sync.Mutex
started bool
errChan chan error
stop <-chan struct{}

// stopper is the write side of the stop channel. They should have the same value.
stopper chan<- struct{}
// internalStop is the stop channel *actually* used by everything involved
// with the manager as a stop channel, so that we can pass a stop channel
// to things that need it off the bat (like the Channel source). It can
// be closed via `internalStopper` (by being the same underlying channel).
internalStop <-chan struct{}

// internalStopper is the write side of the internal stop channel, allowing us to close it.
// It and `internalStop` should point to the same channel.
internalStopper chan<- struct{}

startCache func(stop <-chan struct{}) error
}
Expand All @@ -96,7 +102,7 @@ func (cm *controllerManager) Add(r Runnable) error {
if cm.started {
// If already started, start the controller
go func() {
cm.errChan <- r.Start(cm.stop)
cm.errChan <- r.Start(cm.internalStop)
}()
}

Expand All @@ -119,7 +125,7 @@ func (cm *controllerManager) SetFields(i interface{}) error {
if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
return err
}
if _, err := inject.StopChannelInto(cm.stop, i); err != nil {
if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil {
return err
}
if _, err := inject.DecoderInto(cm.admissionDecoder, i); err != nil {
Expand Down Expand Up @@ -161,13 +167,15 @@ func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {
}

func (cm *controllerManager) Start(stop <-chan struct{}) error {
defer close(cm.stopper)
// stop everything we start when we exit this method for any reason
defer close(cm.internalStopper)

// if we're not using resource election, start directly
if cm.resourceLock == nil {
go cm.start()
cm.start()
select {
// Only this function should receive from stop, and everything else
// should receive from cm.stop.
// should receive from cm.internalStop.
case <-stop:
// we are done
return nil
Expand All @@ -177,7 +185,8 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
}
}

l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
// otherwise, start when we acquire a resource election lock
leaderElector, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: cm.resourceLock,
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
// TODO(joelspeed): These timings should be configurable
Expand All @@ -204,7 +213,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
return err
}

go l.Run()
go leaderElector.Run()

select {
case <-stop:
Expand All @@ -225,22 +234,22 @@ func (cm *controllerManager) start() {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(cm.stop); err != nil {
if err := cm.startCache(cm.internalStop); err != nil {
cm.errChan <- err
}
}()

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.stop)
cm.cache.WaitForCacheSync(cm.internalStop)

// Start the runnables after the cache has synced
for _, c := range cm.runnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(cm.stop)
cm.errChan <- ctrl.Start(cm.internalStop)
}()
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
recorderProvider: recorderProvider,
resourceLock: resourceLock,
mapper: mapper,
stop: stop,
stopper: stop,
internalStop: stop,
internalStopper: stop,
}, nil
}

Expand Down

0 comments on commit 977e1fa

Please sign in to comment.