Skip to content

Commit

Permalink
Merge pull request #204 from DirectXMan12/bug/stop-channel-borked
Browse files Browse the repository at this point in the history
fix nil stop value for source.Channel
  • Loading branch information
k8s-ci-robot committed Nov 13, 2018
2 parents 902ff11 + 071c3a2 commit 010d65c
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 30 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ os:
- osx

go:
- "1.10"
- "1.11"

git:
depth: 3
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/internal/cache_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out runtime.O
}

// List lists items out of the indexer and writes them to out
func (c *CacheReader) List(ctx context.Context, opts *client.ListOptions, out runtime.Object) error {
func (c *CacheReader) List(_ context.Context, opts *client.ListOptions, out runtime.Object) error {
var objs []interface{}
var err error

Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)

// Create controller with dependencies set
c := &controller.Controller{
Do: options.Reconciler,
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Recorder: mgr.GetRecorder(name),
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
Do: options.Reconciler,
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Recorder: mgr.GetRecorder(name),
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Name: name,
Name: name,
}

// Add the controller as a Manager components
Expand Down
6 changes: 3 additions & 3 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ var _ = Describe("controller", func() {
informers = &informertest.FakeInformers{}
ctrl = &Controller{
MaxConcurrentReconciles: 1,
Do: fakeReconcile,
Queue: queue,
Cache: informers,
Do: fakeReconcile,
Queue: queue,
Cache: informers,
}
ctrl.InjectFunc(func(interface{}) error { return nil })
})
Expand Down
40 changes: 25 additions & 15 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type controllerManager struct {
// (and EventHandlers, Sources and Predicates).
recorderProvider recorder.Provider

// resourceLock
// resourceLock forms the basis for leader election
resourceLock resourcelock.Interface

// mapper is used to map resources to kind, and map kind and version.
Expand All @@ -81,7 +81,16 @@ type controllerManager struct {
mu sync.Mutex
started bool
errChan chan error
stop <-chan struct{}

// internalStop is the stop channel *actually* used by everything involved
// with the manager as a stop channel, so that we can pass a stop channel
// to things that need it off the bat (like the Channel source). It can
// be closed via `internalStopper` (by being the same underlying channel).
internalStop <-chan struct{}

// internalStopper is the write side of the internal stop channel, allowing us to close it.
// It and `internalStop` should point to the same channel.
internalStopper chan<- struct{}

startCache func(stop <-chan struct{}) error
}
Expand All @@ -101,7 +110,7 @@ func (cm *controllerManager) Add(r Runnable) error {
if cm.started {
// If already started, start the controller
go func() {
cm.errChan <- r.Start(cm.stop)
cm.errChan <- r.Start(cm.internalStop)
}()
}

Expand All @@ -124,7 +133,7 @@ func (cm *controllerManager) SetFields(i interface{}) error {
if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
return err
}
if _, err := inject.StopChannelInto(cm.stop, i); err != nil {
if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil {
return err
}
if _, err := inject.DecoderInto(cm.admissionDecoder, i); err != nil {
Expand Down Expand Up @@ -192,13 +201,16 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
}

func (cm *controllerManager) Start(stop <-chan struct{}) error {
// join the passed-in stop channel as an upstream feeding into cm.internalStopper
defer close(cm.internalStopper)

if cm.resourceLock != nil {
err := cm.startLeaderElection(stop)
err := cm.startLeaderElection()
if err != nil {
return err
}
} else {
go cm.start(stop)
go cm.start()
}

select {
Expand All @@ -211,45 +223,43 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
}
}

func (cm *controllerManager) start(stop <-chan struct{}) {
func (cm *controllerManager) start() {
cm.mu.Lock()
defer cm.mu.Unlock()

cm.stop = stop

// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(stop); err != nil {
if err := cm.startCache(cm.internalStop); err != nil {
cm.errChan <- err
}
}()

// Start the metrics server
if cm.metricsListener != nil {
go cm.serveMetrics(stop)
go cm.serveMetrics(cm.internalStop)
}

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(stop)
cm.cache.WaitForCacheSync(cm.internalStop)

// Start the runnables after the cache has synced
for _, c := range cm.runnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(stop)
cm.errChan <- ctrl.Start(cm.internalStop)
}()
}

cm.started = true
}

func (cm *controllerManager) startLeaderElection(stop <-chan struct{}) (err error) {
func (cm *controllerManager) startLeaderElection() (err error) {
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: cm.resourceLock,
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
Expand All @@ -259,7 +269,7 @@ func (cm *controllerManager) startLeaderElection(stop <-chan struct{}) (err erro
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ <-chan struct{}) {
cm.start(stop)
cm.start()
},
OnStoppedLeading: func() {
// Most implementations of leader election log.Fatal() here.
Expand Down
4 changes: 4 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
return nil, err
}

stop := make(chan struct{})

return &controllerManager{
config: config,
scheme: options.Scheme,
Expand All @@ -219,6 +221,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
resourceLock: resourceLock,
mapper: mapper,
metricsListener: metricsListener,
internalStop: stop,
internalStopper: stop,
}, nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,7 @@ var _ = Describe("manger.Manager", func() {
},
stop: func(stop <-chan struct{}) error {
defer GinkgoRecover()
// Manager stop chan has not been initialized.
Expect(stop).To(BeNil())
Expect(stop).NotTo(BeNil())
return nil
},
f: func(f inject.Func) error {
Expand Down

0 comments on commit 010d65c

Please sign in to comment.