Skip to content

Commit

Permalink
Stop channel commenting/renaming for clarity
Browse files Browse the repository at this point in the history
This cleans up, adds comments, and renames things in the stop channel
fix commits for clarity.
  • Loading branch information
DirectXMan12 committed Nov 9, 2018
1 parent 732aa29 commit 83939b1
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
30 changes: 18 additions & 12 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type controllerManager struct {
// (and EventHandlers, Sources and Predicates).
recorderProvider recorder.Provider

// resourceLock
// resourceLock forms the basis for leader election
resourceLock resourcelock.Interface

// mapper is used to map resources to kind, and map kind and version.
Expand All @@ -81,10 +81,16 @@ type controllerManager struct {
mu sync.Mutex
started bool
errChan chan error
stop <-chan struct{}

// stopper is the write side of the stop channel. They should have the same value.
stopper chan<- struct{}
// internalStop is the stop channel *actually* used by everything involved
// with the manager as a stop channel, so that we can pass a stop channel
// to things that need it off the bat (like the Channel source). It can
// be closed via `internalStopper` (by being the same underlying channel).
internalStop <-chan struct{}

// internalStopper is the write side of the internal stop channel, allowing us to close it.
// It and `internalStop` should point to the same channel.
internalStopper chan<- struct{}

startCache func(stop <-chan struct{}) error
}
Expand All @@ -104,7 +110,7 @@ func (cm *controllerManager) Add(r Runnable) error {
if cm.started {
// If already started, start the controller
go func() {
cm.errChan <- r.Start(cm.stop)
cm.errChan <- r.Start(cm.internalStop)
}()
}

Expand All @@ -127,7 +133,7 @@ func (cm *controllerManager) SetFields(i interface{}) error {
if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
return err
}
if _, err := inject.StopChannelInto(cm.stop, i); err != nil {
if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil {
return err
}
if _, err := inject.DecoderInto(cm.admissionDecoder, i); err != nil {
Expand Down Expand Up @@ -195,8 +201,8 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
}

func (cm *controllerManager) Start(stop <-chan struct{}) error {
// join the passed-in stop channel as an upstream feeding into cm.stopper
defer close(cm.stopper)
// join the passed-in stop channel as an upstream feeding into cm.internalStopper
defer close(cm.internalStopper)

if cm.resourceLock != nil {
err := cm.startLeaderElection()
Expand Down Expand Up @@ -226,27 +232,27 @@ func (cm *controllerManager) start() {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(cm.stop); err != nil {
if err := cm.startCache(cm.internalStop); err != nil {
cm.errChan <- err
}
}()

// Start the metrics server
if cm.metricsListener != nil {
go cm.serveMetrics(cm.stop)
go cm.serveMetrics(cm.internalStop)
}

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.stop)
cm.cache.WaitForCacheSync(cm.internalStop)

// Start the runnables after the cache has synced
for _, c := range cm.runnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(cm.stop)
cm.errChan <- ctrl.Start(cm.internalStop)
}()
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
resourceLock: resourceLock,
mapper: mapper,
metricsListener: metricsListener,
stop: stop,
stopper: stop,
internalStop: stop,
internalStopper: stop,
}, nil
}

Expand Down

0 comments on commit 83939b1

Please sign in to comment.