Skip to content

Commit

Permalink
🐛 Start the Cache if the Manager has already started (#1681)
Browse files Browse the repository at this point in the history
* Start the Cache if the Manager has already started

* verify adding cluster after mgr started results in working cache

Add a test that adds a cluster to the manager after the manager has
already started. Verify that the cluster is started and its cache
is sycned.

Added the startClusterAfterManager struct which is a basically just a
hook to verify that the cluster is started.

* remove unnecessary methods

Only GetCache and Start methods are neeed for new test that adds a
cluster to the manage after the manager has already started.
  • Loading branch information
jsanda authored Oct 8, 2021
1 parent e1880f5 commit 3e870eb
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 0 deletions.
6 changes: 6 additions & 0 deletions pkg/manager/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ func (cm *controllerManager) Add(r Runnable) error {
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
} else if hasCache, ok := r.(hasCache); ok {
cm.caches = append(cm.caches, hasCache)
if cm.started {
cm.startRunnable(hasCache)
if !hasCache.GetCache().WaitForCacheSync(cm.internalCtx) {
return fmt.Errorf("could not sync cache")
}
}
} else {
shouldStart = cm.startedLeader
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
Expand Down
52 changes: 52 additions & 0 deletions pkg/manager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,46 @@ var _ = Describe("manger.Manager", func() {
Expect(err.Error()).To(Equal("expected error"))
})

It("should start caches added after Manager has started", func() {
fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
return fakeCache, nil
}
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}

runnableWasStarted := make(chan struct{})
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
defer GinkgoRecover()
if !fakeCache.wasSynced {
return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started")
}
close(runnableWasStarted)
return nil
}))).To(Succeed())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).ToNot(HaveOccurred())
}()

<-runnableWasStarted

additionalClusterCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
fakeCluster := &startClusterAfterManager{informer: additionalClusterCache}

Expect(err).NotTo(HaveOccurred())
Expect(m.Add(fakeCluster)).NotTo(HaveOccurred())

Expect(fakeCluster.informer.wasStarted).To(BeTrue())
Expect(fakeCluster.informer.wasSynced).To(BeTrue())
})

It("should wait for runnables to stop", func() {
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -1758,3 +1798,15 @@ func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool {
}()
return c.Cache.WaitForCacheSync(ctx)
}

type startClusterAfterManager struct {
informer *startSignalingInformer
}

func (c *startClusterAfterManager) Start(ctx context.Context) error {
return c.informer.Start(ctx)
}

func (c *startClusterAfterManager) GetCache() cache.Cache {
return c.informer
}

0 comments on commit 3e870eb

Please sign in to comment.