Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix nil stop value for source.Channel #204

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ os:
- osx

go:
- "1.10"
- "1.11"

git:
depth: 3
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/internal/cache_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out runtime.O
}

// List lists items out of the indexer and writes them to out
func (c *CacheReader) List(ctx context.Context, opts *client.ListOptions, out runtime.Object) error {
func (c *CacheReader) List(_ context.Context, opts *client.ListOptions, out runtime.Object) error {
var objs []interface{}
var err error

Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,15 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)

// Create controller with dependencies set
c := &controller.Controller{
Do: options.Reconciler,
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Recorder: mgr.GetRecorder(name),
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
Do: options.Reconciler,
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Recorder: mgr.GetRecorder(name),
Queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
Name: name,
Name: name,
}

// Add the controller as a Manager components
Expand Down
6 changes: 3 additions & 3 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ var _ = Describe("controller", func() {
informers = &informertest.FakeInformers{}
ctrl = &Controller{
MaxConcurrentReconciles: 1,
Do: fakeReconcile,
Queue: queue,
Cache: informers,
Do: fakeReconcile,
Queue: queue,
Cache: informers,
}
ctrl.InjectFunc(func(interface{}) error { return nil })
})
Expand Down
40 changes: 25 additions & 15 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type controllerManager struct {
// (and EventHandlers, Sources and Predicates).
recorderProvider recorder.Provider

// resourceLock
// resourceLock forms the basis for leader election
resourceLock resourcelock.Interface

// mapper is used to map resources to kind, and map kind and version.
Expand All @@ -81,7 +81,16 @@ type controllerManager struct {
mu sync.Mutex
started bool
errChan chan error
stop <-chan struct{}

// internalStop is the stop channel *actually* used by everything involved
// with the manager as a stop channel, so that we can pass a stop channel
// to things that need it off the bat (like the Channel source). It can
// be closed via `internalStopper` (by being the same underlying channel).
internalStop <-chan struct{}

// internalStopper is the write side of the internal stop channel, allowing us to close it.
// It and `internalStop` should point to the same channel.
internalStopper chan<- struct{}

startCache func(stop <-chan struct{}) error
}
Expand All @@ -101,7 +110,7 @@ func (cm *controllerManager) Add(r Runnable) error {
if cm.started {
// If already started, start the controller
go func() {
cm.errChan <- r.Start(cm.stop)
cm.errChan <- r.Start(cm.internalStop)
}()
}

Expand All @@ -124,7 +133,7 @@ func (cm *controllerManager) SetFields(i interface{}) error {
if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
return err
}
if _, err := inject.StopChannelInto(cm.stop, i); err != nil {
if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil {
return err
}
if _, err := inject.DecoderInto(cm.admissionDecoder, i); err != nil {
Expand Down Expand Up @@ -192,13 +201,16 @@ func (cm *controllerManager) serveMetrics(stop <-chan struct{}) {
}

func (cm *controllerManager) Start(stop <-chan struct{}) error {
// join the passed-in stop channel as an upstream feeding into cm.internalStopper
defer close(cm.internalStopper)

if cm.resourceLock != nil {
err := cm.startLeaderElection(stop)
err := cm.startLeaderElection()
if err != nil {
return err
}
} else {
go cm.start(stop)
go cm.start()
}

select {
Expand All @@ -211,45 +223,43 @@ func (cm *controllerManager) Start(stop <-chan struct{}) error {
}
}

func (cm *controllerManager) start(stop <-chan struct{}) {
func (cm *controllerManager) start() {
cm.mu.Lock()
defer cm.mu.Unlock()

cm.stop = stop

// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cache.Start
}
go func() {
if err := cm.startCache(stop); err != nil {
if err := cm.startCache(cm.internalStop); err != nil {
cm.errChan <- err
}
}()

// Start the metrics server
if cm.metricsListener != nil {
go cm.serveMetrics(stop)
go cm.serveMetrics(cm.internalStop)
}

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cache.WaitForCacheSync(stop)
cm.cache.WaitForCacheSync(cm.internalStop)

// Start the runnables after the cache has synced
for _, c := range cm.runnables {
// Controllers block, but we want to return an error if any have an error starting.
// Write any Start errors to a channel so we can return them
ctrl := c
go func() {
cm.errChan <- ctrl.Start(stop)
cm.errChan <- ctrl.Start(cm.internalStop)
}()
}

cm.started = true
}

func (cm *controllerManager) startLeaderElection(stop <-chan struct{}) (err error) {
func (cm *controllerManager) startLeaderElection() (err error) {
l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{
Lock: cm.resourceLock,
// Values taken from: https://github.com/kubernetes/apiserver/blob/master/pkg/apis/config/v1alpha1/defaults.go
Expand All @@ -259,7 +269,7 @@ func (cm *controllerManager) startLeaderElection(stop <-chan struct{}) (err erro
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(_ <-chan struct{}) {
cm.start(stop)
cm.start()
},
OnStoppedLeading: func() {
// Most implementations of leader election log.Fatal() here.
Expand Down
4 changes: 4 additions & 0 deletions pkg/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
return nil, err
}

stop := make(chan struct{})

return &controllerManager{
config: config,
scheme: options.Scheme,
Expand All @@ -219,6 +221,8 @@ func New(config *rest.Config, options Options) (Manager, error) {
resourceLock: resourceLock,
mapper: mapper,
metricsListener: metricsListener,
internalStop: stop,
internalStopper: stop,
}, nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,7 @@ var _ = Describe("manger.Manager", func() {
},
stop: func(stop <-chan struct{}) error {
defer GinkgoRecover()
// Manager stop chan has not been initialized.
Expect(stop).To(BeNil())
Expect(stop).NotTo(BeNil())
return nil
},
f: func(f inject.Func) error {
Expand Down