Skip to content

Commit

Permalink
fixes nil stop value for source.Channel
Browse files Browse the repository at this point in the history
  • Loading branch information
mhrivnak committed Sep 19, 2018
1 parent 0f4719b commit be88210
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 40 deletions.
87 changes: 49 additions & 38 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type controllerManager struct {
errChan chan error
stop <-chan struct{}

// stopper is the write side of the stop channel. They should have the same value.
stopper chan<- struct{}

startCache func(stop <-chan struct{}) error
}

Expand Down Expand Up @@ -159,9 +162,15 @@ func (cm *controllerManager) GetRESTMapper() meta.RESTMapper {

func (cm *controllerManager) Start(stop <-chan struct{}) error {
if cm.resourceLock == nil {
go cm.start(stop)
// join the passed-in stop channel as an upstream feeding into cm.stopper
go func() {
<-stop
close(cm.stopper)
}()

go cm.start()
select {
case <-stop:
case <-cm.stop:
// we are done
return nil
case err := <-cm.errChan:
Expand All @@ -178,7 +187,19 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: cm.start,
// This type changes in k8s 1.12 to func(context.Context)
OnStartedLeading: func(stopleading <-chan struct{}) {
// join both stop and stopleading so they feed into cm.stopper
go func() {
select {
case <-stop:
close(cm.stopper)
case <-stopleading:
close(cm.stopper)
}
}()
cm.start()
},
OnStoppedLeading: func() {
// Most implementations of leader election log.Fatal() here.
// Since Start is wrapped in log.Fatal when called, we can just return
Expand All @@ -194,7 +215,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
go l.Run()

select {
case <-stop:
case <-cm.stop:
// We are done
return nil
case err := <-cm.errChan:
Expand All @@ -203,43 +224,33 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
}
}

func (cm *controllerManager) start(stop <-chan struct{}) {
func() {
cm.mu.Lock()
defer cm.mu.Unlock()

cm.stop = stop

// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(stop); err != nil {
cm.errChan <- err
}
}()
func (cm *controllerManager) start() {
cm.mu.Lock()
defer cm.mu.Unlock()

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(stop)

// Start the runnables after the cache has synced
for _, c := range cm.runnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(stop)
}()
// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(cm.stop); err != nil {
cm.errChan <- err
}

cm.started = true
}()

select {
case <-stop:
// We are done
return
// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(cm.stop)

// Start the runnables after the cache has synced
for _, c := range cm.runnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(cm.stop)
}()
}

cm.started = true
}
4 changes: 4 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
return nil, err
}

stop := make(chan struct{})

return &controllerManager{
config: config,
scheme: options.Scheme,
Expand All @@ -191,6 +193,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
recorderProvider: recorderProvider,
resourceLock: resourceLock,
mapper: mapper,
stop: stop,
stopper: stop,
}, nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ var _ = Describe("manger.Manager", func() {
},
stop: func(stop <-chan struct{}) error {
defer GinkgoRecover()
// Manager stop chan has not been initialized.
Expect(stop).To(BeNil())
Expect(stop).NotTo(BeNil())
return nil
},
f: func(f inject.Func) error {
Expand Down

0 comments on commit be88210

Please sign in to comment.