diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 35cd5744ef..5bed7696ac 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -18,6 +18,7 @@ package controller import ( "context" + "errors" "fmt" "sync" "time" @@ -33,6 +34,7 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -104,40 +106,21 @@ var _ = Describe("controller", func() { close(done) }) - It("should wait for each informer to sync", func(done Done) { - // TODO(directxman12): this test doesn't do what it says it does - - c, err := cache.New(cfg, cache.Options{}) - Expect(err).NotTo(HaveOccurred()) - _, err = c.GetInformer(context.TODO(), &appsv1.Deployment{}) - Expect(err).NotTo(HaveOccurred()) - _, err = c.GetInformer(context.TODO(), &appsv1.ReplicaSet{}) - Expect(err).NotTo(HaveOccurred()) - ctrl.startWatches = []watchDescription{{ - src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}), - }} - - // Use a cancelled context so Start doesn't block - ctx, cancel := context.WithCancel(context.Background()) - cancel() - Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) - - close(done) - }) - It("should error when cache sync timeout occurs", func(done Done) { ctrl.CacheSyncTimeout = 10 * time.Nanosecond c, err := cache.New(cfg, cache.Options{}) Expect(err).NotTo(HaveOccurred()) + c = &cacheWithIndefinitelyBlockingGetInformer{c} ctrl.startWatches = []watchDescription{{ src: source.NewKindWithCache(&appsv1.Deployment{}, c), }} + ctrl.Name = "testcontroller" err = ctrl.Start(context.TODO()) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("cache did not sync")) + Expect(err.Error()).To(ContainSubstring("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced")) close(done) }) @@ -944,3 +927,20 @@ func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error { }() return s.SyncingSource.WaitForSync(ctx) } + +var _ cache.Cache = &cacheWithIndefinitelyBlockingGetInformer{} + +// cacheWithIndefinitelyBlockingGetInformer has a GetInformer implementation that blocks indefinitely or until its +// context is cancelled. +// We need it as a workaround for testenvs lack of support for a secure apiserver, because the insecure port always +// implies the allow all authorizer, so we can not simulate rbac issues with it. They are the usual cause of the real +// caches GetInformer blocking showing this behavior. +// TODO: Remove this once envtest supports a secure apiserver. +type cacheWithIndefinitelyBlockingGetInformer struct { + cache.Cache +} + +func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) { + <-ctx.Done() + return nil, errors.New("GetInformer timed out") +} diff --git a/pkg/source/source.go b/pkg/source/source.go index c4a74af02a..adabbaf917 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -91,6 +91,11 @@ type Kind struct { // cache used to watch APIs cache cache.Cache + + // started may contain an error if one was encountered during startup. If its closed and does not + // contain an error, startup and syncing finished. + started chan error + startCancel func() } var _ SyncingSource = &Kind{} @@ -110,16 +115,30 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return fmt.Errorf("must call CacheInto on Kind before calling Start") } - // Lookup the Informer from the Cache and add an EventHandler which populates the Queue - i, err := ks.cache.GetInformer(ctx, ks.Type) - if err != nil { - if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok { - log.Error(err, "if kind is a CRD, it should be installed before calling Start", - "kind", kindMatchErr.GroupKind) + // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not + // sync that informer (most commonly due to RBAC issues). + ctx, ks.startCancel = context.WithCancel(ctx) + ks.started = make(chan error) + go func() { + // Lookup the Informer from the Cache and add an EventHandler which populates the Queue + i, err := ks.cache.GetInformer(ctx, ks.Type) + if err != nil { + kindMatchErr := &meta.NoKindMatchError{} + if errors.As(err, &kindMatchErr) { + log.Error(err, "if kind is a CRD, it should be installed before calling Start", + "kind", kindMatchErr.GroupKind) + } + ks.started <- err + return } - return err - } - i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + if !ks.cache.WaitForCacheSync(ctx) { + // Would be great to return something more informative here + ks.started <- errors.New("cache did not sync") + } + close(ks.started) + }() + return nil } @@ -133,11 +152,13 @@ func (ks *Kind) String() string { // WaitForSync implements SyncingSource to allow controllers to wait with starting // workers until the cache is synced. func (ks *Kind) WaitForSync(ctx context.Context) error { - if !ks.cache.WaitForCacheSync(ctx) { - // Would be great to return something more informative here - return errors.New("cache did not sync") + select { + case err := <-ks.started: + return err + case <-ctx.Done(): + ks.startCancel() + return errors.New("timed out waiting for cache to be synced") } - return nil } var _ inject.Cache = &Kind{} diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 2726b6cfd0..9b0a1c9744 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -90,6 +90,7 @@ var _ = Describe("Source", func() { }, }, q) Expect(err).NotTo(HaveOccurred()) + Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred()) i, err := ic.FakeInformerFor(&corev1.Pod{}) Expect(err).NotTo(HaveOccurred()) @@ -133,6 +134,7 @@ var _ = Describe("Source", func() { }, }, q) Expect(err).NotTo(HaveOccurred()) + Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred()) i, err := ic.FakeInformerFor(&corev1.Pod{}) Expect(err).NotTo(HaveOccurred()) @@ -178,6 +180,7 @@ var _ = Describe("Source", func() { }, }, q) Expect(err).NotTo(HaveOccurred()) + Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred()) i, err := ic.FakeInformerFor(&corev1.Pod{}) Expect(err).NotTo(HaveOccurred()) @@ -208,10 +211,11 @@ var _ = Describe("Source", func() { }) It("should return an error if syncing fails", func(done Done) { - instance := source.Kind{} + instance := source.Kind{Type: &corev1.Pod{}} f := false Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed()) - err := instance.WaitForSync(nil) + Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred()) + err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("cache did not sync")) @@ -220,7 +224,7 @@ var _ = Describe("Source", func() { }) Context("for a Kind not in the cache", func() { - It("should return an error when Start is called", func(done Done) { + It("should return an error when WaitForSync is called", func(done Done) { ic.Error = fmt.Errorf("test error") q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") @@ -229,7 +233,8 @@ var _ = Describe("Source", func() { } Expect(instance.InjectCache(ic)).To(Succeed()) err := instance.Start(ctx, handler.Funcs{}, q) - Expect(err).To(HaveOccurred()) + Expect(err).NotTo(HaveOccurred()) + Expect(instance.WaitForSync(context.Background())).To(HaveOccurred()) close(done) }) @@ -246,8 +251,9 @@ var _ = Describe("Source", func() { It("should return an error if syncing fails", func(done Done) { f := false - instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f}) - err := instance.WaitForSync(nil) + instance := source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f}) + Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred()) + err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("cache did not sync"))