Skip to content

Commit

Permalink
Merge pull request #1428 from alvaroaleman/fix-cachesync
Browse files Browse the repository at this point in the history
🐛 Fix cache sync timeout functionality
  • Loading branch information
k8s-ci-robot committed Mar 16, 2021
2 parents 82c68b9 + 4d059e8 commit 2a448a7
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 41 deletions.
44 changes: 22 additions & 22 deletions pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -33,6 +34,7 @@ import (
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -104,40 +106,21 @@ var _ = Describe("controller", func() {
close(done)
})

It("should wait for each informer to sync", func(done Done) {
// TODO(directxman12): this test doesn't do what it says it does

c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
_, err = c.GetInformer(context.TODO(), &appsv1.Deployment{})
Expect(err).NotTo(HaveOccurred())
_, err = c.GetInformer(context.TODO(), &appsv1.ReplicaSet{})
Expect(err).NotTo(HaveOccurred())
ctrl.startWatches = []watchDescription{{
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}),
}}

// Use a cancelled context so Start doesn't block
ctx, cancel := context.WithCancel(context.Background())
cancel()
Expect(ctrl.Start(ctx)).NotTo(HaveOccurred())

close(done)
})

It("should error when cache sync timeout occurs", func(done Done) {
ctrl.CacheSyncTimeout = 10 * time.Nanosecond

c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
c = &cacheWithIndefinitelyBlockingGetInformer{c}

ctrl.startWatches = []watchDescription{{
src: source.NewKindWithCache(&appsv1.Deployment{}, c),
}}
ctrl.Name = "testcontroller"

err = ctrl.Start(context.TODO())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("cache did not sync"))
Expect(err.Error()).To(ContainSubstring("failed to wait for testcontroller caches to sync: timed out waiting for cache to be synced"))

close(done)
})
Expand Down Expand Up @@ -944,3 +927,20 @@ func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error {
}()
return s.SyncingSource.WaitForSync(ctx)
}

var _ cache.Cache = &cacheWithIndefinitelyBlockingGetInformer{}

// cacheWithIndefinitelyBlockingGetInformer has a GetInformer implementation that blocks indefinitely or until its
// context is cancelled.
// We need it as a workaround for testenvs lack of support for a secure apiserver, because the insecure port always
// implies the allow all authorizer, so we can not simulate rbac issues with it. They are the usual cause of the real
// caches GetInformer blocking showing this behavior.
// TODO: Remove this once envtest supports a secure apiserver.
type cacheWithIndefinitelyBlockingGetInformer struct {
cache.Cache
}

func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) {
<-ctx.Done()
return nil, errors.New("GetInformer timed out")
}
47 changes: 34 additions & 13 deletions pkg/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ type Kind struct {

// cache used to watch APIs
cache cache.Cache

// started may contain an error if one was encountered during startup. If its closed and does not
// contain an error, startup and syncing finished.
started chan error
startCancel func()
}

var _ SyncingSource = &Kind{}
Expand All @@ -110,16 +115,30 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return fmt.Errorf("must call CacheInto on Kind before calling Start")
}

// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(ctx, ks.Type)
if err != nil {
if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
// sync that informer (most commonly due to RBAC issues).
ctx, ks.startCancel = context.WithCancel(ctx)
ks.started = make(chan error)
go func() {
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
i, err := ks.cache.GetInformer(ctx, ks.Type)
if err != nil {
kindMatchErr := &meta.NoKindMatchError{}
if errors.As(err, &kindMatchErr) {
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
"kind", kindMatchErr.GroupKind)
}
ks.started <- err
return
}
return err
}
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")
}
close(ks.started)
}()

return nil
}

Expand All @@ -133,11 +152,13 @@ func (ks *Kind) String() string {
// WaitForSync implements SyncingSource to allow controllers to wait with starting
// workers until the cache is synced.
func (ks *Kind) WaitForSync(ctx context.Context) error {
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
return errors.New("cache did not sync")
select {
case err := <-ks.started:
return err
case <-ctx.Done():
ks.startCancel()
return errors.New("timed out waiting for cache to be synced")
}
return nil
}

var _ inject.Cache = &Kind{}
Expand Down
18 changes: 12 additions & 6 deletions pkg/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ var _ = Describe("Source", func() {
},
}, q)
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())

i, err := ic.FakeInformerFor(&corev1.Pod{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -133,6 +134,7 @@ var _ = Describe("Source", func() {
},
}, q)
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())

i, err := ic.FakeInformerFor(&corev1.Pod{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -178,6 +180,7 @@ var _ = Describe("Source", func() {
},
}, q)
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).NotTo(HaveOccurred())

i, err := ic.FakeInformerFor(&corev1.Pod{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -208,10 +211,11 @@ var _ = Describe("Source", func() {
})

It("should return an error if syncing fails", func(done Done) {
instance := source.Kind{}
instance := source.Kind{Type: &corev1.Pod{}}
f := false
Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed())
err := instance.WaitForSync(nil)
Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred())
err := instance.WaitForSync(context.Background())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("cache did not sync"))

Expand All @@ -220,7 +224,7 @@ var _ = Describe("Source", func() {
})

Context("for a Kind not in the cache", func() {
It("should return an error when Start is called", func(done Done) {
It("should return an error when WaitForSync is called", func(done Done) {
ic.Error = fmt.Errorf("test error")
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")

Expand All @@ -229,7 +233,8 @@ var _ = Describe("Source", func() {
}
Expect(instance.InjectCache(ic)).To(Succeed())
err := instance.Start(ctx, handler.Funcs{}, q)
Expect(err).To(HaveOccurred())
Expect(err).NotTo(HaveOccurred())
Expect(instance.WaitForSync(context.Background())).To(HaveOccurred())

close(done)
})
Expand All @@ -246,8 +251,9 @@ var _ = Describe("Source", func() {

It("should return an error if syncing fails", func(done Done) {
f := false
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f})
err := instance.WaitForSync(nil)
instance := source.NewKindWithCache(&corev1.Pod{}, &informertest.FakeInformers{Synced: &f})
Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred())
err := instance.WaitForSync(context.Background())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("cache did not sync"))

Expand Down

0 comments on commit 2a448a7

Please sign in to comment.