-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
🐛 Start the Cache if the Manager has already started #1681
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -211,6 +211,12 @@ func (cm *controllerManager) Add(r Runnable) error { | |
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) | ||
} else if hasCache, ok := r.(hasCache); ok { | ||
cm.caches = append(cm.caches, hasCache) | ||
if cm.started { | ||
cm.startRunnable(hasCache) | ||
if !hasCache.GetCache().WaitForCacheSync(cm.internalCtx) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will never timeout, right? Does that match the standard path, where we start the caches before the /ok-to-test There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It could definitely time out. The remote cluster might not be accessible. Should the cache be started and synced with a different context? If the sync times out, I would then cancel the context. I suppose it also makes more sense to append the cache after the sync completes successfully. I believe this matches the standard path. I can't simply call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After looking through the code some more, and the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I meant was if the normal route of starting caches has a context that times out, but doesn't seem to be the case so this is fine |
||
return fmt.Errorf("could not sync cache") | ||
} | ||
} | ||
} else { | ||
shouldStart = cm.startedLeader | ||
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -28,6 +28,8 @@ import ( | |
"sync/atomic" | ||
"time" | ||
|
||
"k8s.io/client-go/tools/record" | ||
|
||
"github.com/go-logr/logr" | ||
. "github.com/onsi/ginkgo" | ||
. "github.com/onsi/gomega" | ||
|
@@ -765,6 +767,46 @@ var _ = Describe("manger.Manager", func() { | |
Expect(err.Error()).To(Equal("expected error")) | ||
}) | ||
|
||
It("should start caches added after Manager has started", func() { | ||
fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}} | ||
options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) { | ||
return fakeCache, nil | ||
} | ||
m, err := New(cfg, options) | ||
Expect(err).NotTo(HaveOccurred()) | ||
for _, cb := range callbacks { | ||
cb(m) | ||
} | ||
|
||
runnableWasStarted := make(chan struct{}) | ||
Expect(m.Add(RunnableFunc(func(ctx context.Context) error { | ||
defer GinkgoRecover() | ||
if !fakeCache.wasSynced { | ||
return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started") | ||
} | ||
close(runnableWasStarted) | ||
return nil | ||
}))).To(Succeed()) | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
go func() { | ||
defer GinkgoRecover() | ||
Expect(m.Start(ctx)).ToNot(HaveOccurred()) | ||
}() | ||
|
||
<-runnableWasStarted | ||
|
||
additionalClusterCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}} | ||
fakeCluster := &startClusterAfterManager{informer: additionalClusterCache} | ||
|
||
Expect(err).NotTo(HaveOccurred()) | ||
Expect(m.Add(fakeCluster)).NotTo(HaveOccurred()) | ||
|
||
Expect(fakeCluster.informer.wasStarted).To(BeTrue()) | ||
Expect(fakeCluster.informer.wasSynced).To(BeTrue()) | ||
}) | ||
|
||
It("should wait for runnables to stop", func() { | ||
m, err := New(cfg, options) | ||
Expect(err).NotTo(HaveOccurred()) | ||
|
@@ -1758,3 +1800,47 @@ func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool { | |
}() | ||
return c.Cache.WaitForCacheSync(ctx) | ||
} | ||
|
||
type startClusterAfterManager struct { | ||
informer *startSignalingInformer | ||
} | ||
|
||
func (c *startClusterAfterManager) Start(ctx context.Context) error { | ||
return c.informer.Start(ctx) | ||
} | ||
|
||
func (c *startClusterAfterManager) GetCache() cache.Cache { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You don't need to implement anything other than |
||
return c.informer | ||
} | ||
|
||
func (c *startClusterAfterManager) SetFields(interface{}) error { | ||
return nil | ||
} | ||
|
||
func (c *startClusterAfterManager) GetConfig() *rest.Config { | ||
return nil | ||
} | ||
|
||
func (c *startClusterAfterManager) GetScheme() *runtime.Scheme { | ||
return nil | ||
} | ||
|
||
func (c *startClusterAfterManager) GetClient() client.Client { | ||
return nil | ||
} | ||
|
||
func (c *startClusterAfterManager) GetFieldIndexer() client.FieldIndexer { | ||
return nil | ||
} | ||
|
||
func (c *startClusterAfterManager) GetEventRecorderFor(name string) record.EventRecorder { | ||
return nil | ||
} | ||
|
||
func (c *startClusterAfterManager) GetRESTMapper() meta.RESTMapper { | ||
return nil | ||
} | ||
|
||
func (c *startClusterAfterManager) GetAPIReader() client.Reader { | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable may only be accessed after acquiringNevermind, we acquire that lock in the beginning ofcm.mu
. Also warning, #1689 changes some of the lock handling so that might end up conflictingAdd
so this is fine