Skip to content

Commit

Permalink
Merge pull request #960 from vincepri/backport-054
Browse files Browse the repository at this point in the history
Backports for v0.5.4
  • Loading branch information
k8s-ci-robot authored May 22, 2020
2 parents f31eaf7 + 55bed9b commit c45adcf
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 36 deletions.
2 changes: 0 additions & 2 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ func New(name string, mgr manager.Manager, options Options) (Controller, error)
// Create controller with dependencies set
c := &controller.Controller{
Do: options.Reconciler,
Cache: mgr.GetCache(),
Config: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(name),
Expand Down
35 changes: 12 additions & 23 deletions pkg/internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrlmetrics "sigs.k8s.io/controller-runtime/pkg/internal/controller/metrics"
Expand Down Expand Up @@ -61,13 +59,6 @@ type Controller struct {
// Scheme is injected by the controllerManager when controllerManager.Start is called
Scheme *runtime.Scheme

// informers are injected by the controllerManager when controllerManager.Start is called
Cache cache.Cache

// Config is the rest.Config used to talk to the apiserver. Defaults to one of in-cluster, environment variable
// specified, or the ~/.kube/Config.
Config *rest.Config

// MakeQueue constructs the queue for this controller once the controller is ready to start.
// This exists because the standard Kubernetes workqueues start themselves immediately, which
// leads to goroutine leaks if something calls controller.New repeatedly.
Expand All @@ -86,10 +77,6 @@ type Controller struct {
// JitterPeriod allows tests to reduce the JitterPeriod so they complete faster
JitterPeriod time.Duration

// WaitForCacheSync allows tests to mock out the WaitForCacheSync function to return an error
// defaults to Cache.WaitForCacheSync
WaitForCacheSync func(stopCh <-chan struct{}) bool

// Started is true if the Controller has been Started
Started bool

Expand Down Expand Up @@ -170,16 +157,18 @@ func (c *Controller) Start(stop <-chan struct{}) error {
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
log.Info("Starting Controller", "controller", c.Name)

// Wait for the caches to be synced before starting workers
if c.WaitForCacheSync == nil {
c.WaitForCacheSync = c.Cache.WaitForCacheSync
}
if ok := c.WaitForCacheSync(stop); !ok {
// This code is unreachable right now since WaitForCacheSync will never return an error
// Leaving it here because that could happen in the future
err := fmt.Errorf("failed to wait for %s caches to sync", c.Name)
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
return err
for _, watch := range c.watches {
syncingSource, ok := watch.src.(source.SyncingSource)
if !ok {
continue
}
if err := syncingSource.WaitForSync(stop); err != nil {
// This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
// Leaving it here because that could happen in the future
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
log.Error(err, "Could not wait for Cache to sync", "controller", c.Name)
return err
}
}

if c.JitterPeriod == 0 {
Expand Down
21 changes: 12 additions & 9 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ var _ = Describe("controller", func() {
MaxConcurrentReconciles: 1,
Do: fakeReconcile,
MakeQueue: func() workqueue.RateLimitingInterface { return queue },
Cache: informers,
}
Expect(ctrl.InjectFunc(func(interface{}) error { return nil })).To(Succeed())
})
Expand All @@ -88,7 +87,10 @@ var _ = Describe("controller", func() {

Describe("Start", func() {
It("should return an error if there is an error waiting for the informers", func(done Done) {
ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return false }
f := false
ctrl.watches = []watchDescription{{
src: source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f}),
}}
ctrl.Name = "foo"
err := ctrl.Start(stop)
Expect(err).To(HaveOccurred())
Expand All @@ -110,8 +112,9 @@ var _ = Describe("controller", func() {
Expect(err).NotTo(HaveOccurred())
_, err = c.GetInformer(&appsv1.ReplicaSet{})
Expect(err).NotTo(HaveOccurred())
ctrl.Cache = c
ctrl.WaitForCacheSync = func(<-chan struct{}) bool { return true }
ctrl.watches = []watchDescription{{
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}),
}}

Expect(ctrl.Start(stopped)).NotTo(HaveOccurred())

Expand Down Expand Up @@ -161,7 +164,7 @@ var _ = Describe("controller", func() {
Describe("Watch", func() {
It("should inject dependencies into the Source", func() {
src := &source.Kind{Type: &corev1.Pod{}}
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
Expect(src.InjectCache(informers)).To(Succeed())
evthdl := &handler.EnqueueRequestForObject{}
found := false
ctrl.SetFields = func(i interface{}) error {
Expand All @@ -177,7 +180,7 @@ var _ = Describe("controller", func() {

It("should return an error if there is an error injecting into the Source", func() {
src := &source.Kind{Type: &corev1.Pod{}}
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
Expect(src.InjectCache(informers)).To(Succeed())
evthdl := &handler.EnqueueRequestForObject{}
expected := fmt.Errorf("expect fail source")
ctrl.SetFields = func(i interface{}) error {
Expand All @@ -192,7 +195,7 @@ var _ = Describe("controller", func() {

It("should inject dependencies into the EventHandler", func() {
src := &source.Kind{Type: &corev1.Pod{}}
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
Expect(src.InjectCache(informers)).To(Succeed())
evthdl := &handler.EnqueueRequestForObject{}
found := false
ctrl.SetFields = func(i interface{}) error {
Expand Down Expand Up @@ -230,7 +233,7 @@ var _ = Describe("controller", func() {

It("should inject dependencies into all of the Predicates", func() {
src := &source.Kind{Type: &corev1.Pod{}}
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
Expect(src.InjectCache(informers)).To(Succeed())
evthdl := &handler.EnqueueRequestForObject{}
pr1 := &predicate.Funcs{}
pr2 := &predicate.Funcs{}
Expand All @@ -253,7 +256,7 @@ var _ = Describe("controller", func() {

It("should return an error if there is an error injecting into any of the Predicates", func() {
src := &source.Kind{Type: &corev1.Pod{}}
Expect(src.InjectCache(ctrl.Cache)).To(Succeed())
Expect(src.InjectCache(informers)).To(Succeed())
evthdl := &handler.EnqueueRequestForObject{}
pr1 := &predicate.Funcs{}
pr2 := &predicate.Funcs{}
Expand Down
28 changes: 26 additions & 2 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package source

import (
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -55,10 +56,17 @@ type Source interface {
Start(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error
}

// SyncingSource is a source that needs syncing prior to being usable. The controller
// will call its WaitForSync prior to starting workers.
type SyncingSource interface {
Source
WaitForSync(stop <-chan struct{}) error
}

// NewKindWithCache creates a Source without InjectCache, so that it is assured that the given cache is used
// and not overwritten. It can be used to watch objects in a different cluster by passing the cache
// from that other cluster
func NewKindWithCache(object runtime.Object, cache cache.Cache) Source {
func NewKindWithCache(object runtime.Object, cache cache.Cache) SyncingSource {
return &kindWithCache{kind: Kind{Type: object, cache: cache}}
}

Expand All @@ -71,6 +79,10 @@ func (ks *kindWithCache) Start(handler handler.EventHandler, queue workqueue.Rat
return ks.kind.Start(handler, queue, prct...)
}

func (ks *kindWithCache) WaitForSync(stop <-chan struct{}) error {
return ks.kind.WaitForSync(stop)
}

// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create)
type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Expand All @@ -80,7 +92,7 @@ type Kind struct {
cache cache.Cache
}

var _ Source = &Kind{}
var _ SyncingSource = &Kind{}

// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
Expand Down Expand Up @@ -117,6 +129,16 @@ func (ks *Kind) String() string {
return fmt.Sprintf("kind source: unknown GVK")
}

// WaitForSync implements SyncingSource to allow controllers to wait with starting
// workers until the cache is synced.
func (ks *Kind) WaitForSync(stop <-chan struct{}) error {
if !ks.cache.WaitForCacheSync(stop) {
// Would be great to return something more informative here
return errors.New("cache did not sync")
}
return nil
}

var _ inject.Cache = &Kind{}

// InjectCache is internal should be called only by the Controller. InjectCache is used to inject
Expand Down Expand Up @@ -282,6 +304,8 @@ func (is *Informer) String() string {
return fmt.Sprintf("informer source: %p", is.Informer)
}

var _ Source = Func(nil)

// Func is a function that implements Source
type Func func(handler.EventHandler, workqueue.RateLimitingInterface, ...predicate.Predicate) error

Expand Down
32 changes: 32 additions & 0 deletions pkg/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ var _ = Describe("Source", func() {
close(done)
})

It("should return an error if syncing fails", func(done Done) {
instance := source.Kind{}
f := false
Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed())
err := instance.WaitForSync(nil)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("cache did not sync"))

close(done)

})

Context("for a Kind not in the cache", func() {
It("should return an error when Start is called", func(done Done) {
ic.Error = fmt.Errorf("test error")
Expand All @@ -227,6 +239,26 @@ var _ = Describe("Source", func() {
})
})

Describe("KindWithCache", func() {
It("should not allow injecting a cache", func() {
instance := source.NewKindWithCache(nil, nil)
injected, err := inject.CacheInto(&informertest.FakeInformers{}, instance)
Expect(err).To(BeNil())
Expect(injected).To(BeFalse())
})

It("should return an error if syncing fails", func(done Done) {
f := false
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f})
err := instance.WaitForSync(nil)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("cache did not sync"))

close(done)

})
})

Describe("Func", func() {
It("should be called from Start", func(done Done) {
run := false
Expand Down

0 comments on commit c45adcf

Please sign in to comment.