From 34898541127cda9560309e046acc534d28bdba44 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sun, 14 Mar 2021 16:43:42 -0400 Subject: [PATCH] :bug: Fix wait for cache sync functionality So far, waiting for cache sync in most realistic scenarios didn't work, bceause source.Kind gets an already started cache from the Manager. A cache that is already started will block forever on GetInformer which we called in source.Kinds start and not its WaitForSync. The context passed to Start however defines the entire lifecycle of the Source, not the Start timeout. This change makes source.Kind call GetInformer in Start but in a new go routine and WaitForSync just wait for that to finish or for its context to be cancelled. This preserves the existing semantic of starting in Start but waint for ready in WaitForSync. --- pkg/internal/controller/controller_test.go | 44 ++++++++++----------- pkg/source/source.go | 46 ++++++++++++++++------ pkg/source/source_test.go | 18 ++++++--- 3 files changed, 67 insertions(+), 41 deletions(-) diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 35cd5744ef..5bed7696ac 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -18,6 +18,7 @@ package controller import ( "context" + "errors" "fmt" "sync" "time" @@ -33,6 +34,7 @@ import ( "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -104,40 +106,21 @@ var _ = Describe("controller", func() { close(done) }) - It("should wait for each informer to sync", func(done Done) { - // TODO(directxman12): this test doesn't do what it says it does - - c, err := cache.New(cfg, cache.Options{}) - Expect(err).NotTo(HaveOccurred()) - _, err = c.GetInformer(context.TODO(), &appsv1.Deployment{}) - Expect(err).NotTo(HaveOccurred()) - _, err = c.GetInformer(context.TODO(), &appsv1.ReplicaSet{}) - Expect(err).NotTo(HaveOccurred()) - ctrl.startWatches = []watchDescription{{ - src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}), - }} - - // Use a cancelled context so Start doesn't block - ctx, cancel := context.WithCancel(context.Background()) - cancel() - Expect(ctrl.Start(ctx)).NotTo(HaveOccurred()) - - close(done) - }) - It("should error when cache sync timeout occurs", func(done Done) { ctrl.CacheSyncTimeout = 10 * time.Nanosecond c, err := cache.New(cfg, cache.Options{}) Expect(err).NotTo(HaveOccurred()) + c = &cacheWithIndefinitelyBlockingGetInformer{c} ctrl.startWatches = []watchDescription{{ src: source.NewKindWithCache(&appsv1.Deployment{}, c), }} + ctrl.Name = "testcontroller" err = ctrl.Start(context.TODO()) Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("cache did not sync")) + Expect(err.Error()).To(ContainSubstring("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced")) close(done) }) @@ -944,3 +927,20 @@ func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error { }() return s.SyncingSource.WaitForSync(ctx) } + +var _ cache.Cache = &cacheWithIndefinitelyBlockingGetInformer{} + +// cacheWithIndefinitelyBlockingGetInformer has a GetInformer implementation that blocks indefinitely or until its +// context is cancelled. +// We need it as a workaround for testenvs lack of support for a secure apiserver, because the insecure port always +// implies the allow all authorizer, so we can not simulate rbac issues with it. They are the usual cause of the real +// caches GetInformer blocking showing this behavior. +// TODO: Remove this once envtest supports a secure apiserver. +type cacheWithIndefinitelyBlockingGetInformer struct { + cache.Cache +} + +func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) { + <-ctx.Done() + return nil, errors.New("GetInformer timed out") +} diff --git a/pkg/source/source.go b/pkg/source/source.go index c4a74af02a..f5da856343 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -91,6 +91,11 @@ type Kind struct { // cache used to watch APIs cache cache.Cache + + // started may contain an error if one was encountered during startup. If its closed and does not + // contain an error, startup and syncing finished. + started chan error + startCancel func() } var _ SyncingSource = &Kind{} @@ -110,16 +115,29 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return fmt.Errorf("must call CacheInto on Kind before calling Start") } - // Lookup the Informer from the Cache and add an EventHandler which populates the Queue - i, err := ks.cache.GetInformer(ctx, ks.Type) - if err != nil { - if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok { - log.Error(err, "if kind is a CRD, it should be installed before calling Start", - "kind", kindMatchErr.GroupKind) + // cache.GetInformer will block until its context is cancelled if the cache was already started and it can not + // sync that informer (most commonly due to RBAC issues). + ctx, ks.startCancel = context.WithCancel(ctx) + ks.started = make(chan error) + go func() { + // Lookup the Informer from the Cache and add an EventHandler which populates the Queue + i, err := ks.cache.GetInformer(ctx, ks.Type) + if err != nil { + if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok { + log.Error(err, "if kind is a CRD, it should be installed before calling Start", + "kind", kindMatchErr.GroupKind) + } + ks.started <- err + return } - return err - } - i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + if !ks.cache.WaitForCacheSync(ctx) { + // Would be great to return something more informative here + ks.started <- errors.New("cache did not sync") + } + close(ks.started) + }() + return nil } @@ -133,11 +151,13 @@ func (ks *Kind) String() string { // WaitForSync implements SyncingSource to allow controllers to wait with starting // workers until the cache is synced. func (ks *Kind) WaitForSync(ctx context.Context) error { - if !ks.cache.WaitForCacheSync(ctx) { - // Would be great to return something more informative here - return errors.New("cache did not sync") + select { + case err := <-ks.started: + return err + case <-ctx.Done(): + ks.startCancel() + return errors.New("timed out waiting for cache to be synced") } - return nil } var _ inject.Cache = &Kind{} diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 2726b6cfd0..9b0a1c9744 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -90,6 +90,7 @@ var _ = Describe("Source", func() { }, }, q) Expect(err).NotTo(HaveOccurred()) + Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred()) i, err := ic.FakeInformerFor(&corev1.Pod{}) Expect(err).NotTo(HaveOccurred()) @@ -133,6 +134,7 @@ var _ = Describe("Source", func() { }, }, q) Expect(err).NotTo(HaveOccurred()) + Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred()) i, err := ic.FakeInformerFor(&corev1.Pod{}) Expect(err).NotTo(HaveOccurred()) @@ -178,6 +180,7 @@ var _ = Describe("Source", func() { }, }, q) Expect(err).NotTo(HaveOccurred()) + Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred()) i, err := ic.FakeInformerFor(&corev1.Pod{}) Expect(err).NotTo(HaveOccurred()) @@ -208,10 +211,11 @@ var _ = Describe("Source", func() { }) It("should return an error if syncing fails", func(done Done) { - instance := source.Kind{} + instance := source.Kind{Type: &corev1.Pod{}} f := false Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed()) - err := instance.WaitForSync(nil) + Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred()) + err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("cache did not sync")) @@ -220,7 +224,7 @@ var _ = Describe("Source", func() { }) Context("for a Kind not in the cache", func() { - It("should return an error when Start is called", func(done Done) { + It("should return an error when WaitForSync is called", func(done Done) { ic.Error = fmt.Errorf("test error") q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") @@ -229,7 +233,8 @@ var _ = Describe("Source", func() { } Expect(instance.InjectCache(ic)).To(Succeed()) err := instance.Start(ctx, handler.Funcs{}, q) - Expect(err).To(HaveOccurred()) + Expect(err).NotTo(HaveOccurred()) + Expect(instance.WaitForSync(context.Background())).To(HaveOccurred()) close(done) }) @@ -246,8 +251,9 @@ var _ = Describe("Source", func() { It("should return an error if syncing fails", func(done Done) { f := false - instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f}) - err := instance.WaitForSync(nil) + instance := source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f}) + Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred()) + err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("cache did not sync"))