-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨ Allow configuring cache sync timeouts #1247
✨ Allow configuring cache sync timeouts #1247
Conversation
Hi @varshaprasad96. Thanks for your PR. I'm waiting for a kubernetes-sigs member to verify that this patch is reasonable to test. If it is, they should reply with Once the patch is verified, the new status will be reflected by the I understand the commands that are listed here. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
/assign @alvaroaleman |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a couple questions and a suggestion to add another comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/ok-to-test
pkg/source/source_test.go
Outdated
@@ -247,13 +248,24 @@ var _ = Describe("Source", func() { | |||
It("should return an error if syncing fails", func(done Done) { | |||
f := false | |||
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f}) | |||
err := instance.WaitForSync(nil) | |||
err := instance.WaitForSync(context.Background(), time.Duration(10)*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add an integration test where we give this an impossible low timeout (1 nanosecond or such) and verify it fails, then verify it succeeds with a reasonable timeout?
Also, please add a simple unittest that verifies the defaulting of the timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have added a test case - where the caches do not sync and because of the timeout which is set there is an error (this verifies that we don't go into an infinite loop waiting for caches to sync). However, verifying the case with a reasonable timeout requires us to pass a context to controller.Start(..)
and wait for the channel to close with the caches synced. Currently, the fakeinformer
does not use the context and instead only mocks whether caches are synced or not with a boolean value to ensure that Start()
is not blocked. I suppose that's why in all the other test cases we are passing a cancelled context. Modifying the fake informer to accept the context would break the other test cases. Is there a workaround for this, or is the current test sufficient?
cc: @joelanford
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my other comment, writing an integration test that uses a real cache and not the fakeinformer should be doable here
3886222
to
3382fcd
Compare
Expect(err).NotTo(HaveOccurred()) | ||
sync := false | ||
ctrl.startWatches = []watchDescription{{ | ||
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using the FakeInformer
, please use the cache you construct in line 127 (The testcase you copied this from seems to be bugged, it constructs a cache and then never uses it).
To test the positive case (we correctly sync when there is a reasonable timeout), you could do the following:
- Start off similiar like this test, construct a cache and set it up in
startWatches
- Build a wrapper struct for the
Source
that closes a channel afterWaitForSync
was called (below) - Start the cache in a distinct goroutine (as that is blocking)
- Start the controller in a distinct goroutine (as that is blocking, too)
- Wait for the channel in the source or a timeout via
select
- Close the context used to start the cache and controller
wrapper would be something like this:
type signalingSourceWrapper struct {
cacheSyncDone chan struct{}
source.Source
}
func (s *singalingSourceWrapper) WaitForSync(ctx context.Context) error {
defer func(){close(s.cacheSyncDone} ()
return s.Source.WaitForSync(ctx)
}
Let me know if you have further questions on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ops, sorry, I had missed this. Does the current implementation look fine or shall I modify it ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alvaroaleman, can you please help here:
I tried to do the following:
- Created a wrapper around
SyncingSource
as that in turn wraps Source:
type SingnallingSourceWrapper struct {
cacheSyncDone chan struct{}
source.SyncingSource
}
func NewSignallingSource(cacheChannel chan struct{}, src source.SyncingSource) source.SyncingSource {
return &SingnallingSourceWrapper{cacheSyncDone: cacheChannel, SyncingSource: src}
}
func (s *SingnallingSourceWrapper) WaitForSync(ctx context.Context) error {
defer func() {
close(s.cacheSyncDone)
}()
return s.SyncingSource.WaitForSync(ctx)
}
In the test, I tried the following as you have suggested:
// Configure a new cache:
c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
ctx := context.TODO()
// Set it up with start watches. Create a new `channel` for SignallingSource:
src := source.NewKindWithCache(&appsv1.Deployment{}, c)
ctrl.startWatches = []watchDescription{{
src: NewSignallingSource(channel, src),
}}
// Start cache and controller in a separate go routine:
go c.Start(ctx)
go ctrl.Start(ctx)
// Wait for channel to close:
select {
case <-channel:
// channel has closed, cache has synced.
default:
// there is a timeout. cache did not sync
}
ctx.Done()
When I check for the condition where cache is synced, there is always a timeout. And I am also facing the error:
watch of *v1.Deployment ended with: very short watch: pkg/mod/k8s.io/client-go@v0.19.2/tools/cache/reflector.go:156: Unexpected watch close - watch lasted less than a second and no items received
I'm not able to figure out the reason for timing out always since the context which is being sent for starting caches and controller is closed in the end. Also the channel
seems to still be open. Is there something which Im missing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you check the error return of the cache and controller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the default
case needs to not be default
but something that actually times out, i.E.:
timeoutCtx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer cancel()
select {
case <-channel:
// channel has closed, cache has synced.
case <-timeoutCtx.Done():
// there is a timeout. cache did not sync
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah. Well, that simplifies the whole thing, we can then simply start the controller synchronously and verify we get that error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the same error does occur when controller is started synchronously without any timeout set. But isn't informer not getting synced also mean that caches are not synced? if so, how can we verify the case that everything works fine and caches are synced when no timeout is set.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the reason this happens is that the cache isn't started properly by the time we start the controller. Since cache.Start()
is blocking, it can not be used well to figure out when it finished starting. You could probably use Eventually
with cache.List()
for something that is not a deployment to verify that it finished starting before starting the controller (its important that its a different object than whatever the controller has, as the List
creates and syncs an informer for that object type)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I start the cache in a separate go routine, wait for it to start, then start a controller in a separate go routine - I get the error pkg/mod/k8s.io/client-go@v0.19.2/tools/cache/reflector.go:156: watch of *v1.ReplicaSet ended with: very short watch: pkg/mod/k8s.io/client-go@v0.19.2/tools/cache/reflector.go:156: Unexpected watch close - watch lasted less than a second and no items received
Why is it required that when we set a watch, there should be a change in the state of the resource ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure why you are seeing that error, it is not required to have any change in the state of the resources for this to work. What works for me is this:
It("should error when cache sync timeout occurs", func(done Done) {
c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
ctrl.CacheSyncTimeout = 10 * time.Millisecond
ctrl.startWatches = []watchDescription{{src: source.NewKindWithCache(&appsv1.Deployment{}, c)}}
err = ctrl.Start(context.TODO())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("cache did not sync"))
close(done)
})
It("should not error when cache sync time out is of reasonable value", func(done Done) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ctrl.CacheSyncTimeout = 1 * time.Second
c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
sourceSynced := make(chan struct{})
ctrl.startWatches = []watchDescription{{
src: &syncingSourceWrapper{
SyncingSource: source.NewKindWithCache(&appsv1.Deployment{}, c),
synced: sourceSynced,
},
}}
// Start the cache and controller asynchronously and wait for the source to sync
go func() {
defer GinkgoRecover()
Expect(c.Start(ctx)).To(Succeed())
}()
go func() {
defer GinkgoRecover()
Expect(ctrl.Start(ctx)).To(Succeed())
}()
<-sourceSynced
close(done)
}, 10.0)
Using this wrapper:
type syncingSourceWrapper struct {
source.SyncingSource
synced chan struct{}
}
func (ssw *syncingSourceWrapper) WaitForSync(ctx context.Context) error {
if err := ssw.SyncingSource.WaitForSync(ctx); err != nil {
return err
}
close(ssw.synced)
return nil
}
pkg/source/source_test.go
Outdated
@@ -247,13 +248,24 @@ var _ = Describe("Source", func() { | |||
It("should return an error if syncing fails", func(done Done) { | |||
f := false | |||
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f}) | |||
err := instance.WaitForSync(nil) | |||
err := instance.WaitForSync(context.Background(), time.Duration(10)*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my other comment, writing an integration test that uses a real cache and not the fakeinformer should be doable here
a255c3c
to
d190c63
Compare
@@ -156,7 +164,7 @@ func (c *Controller) Start(ctx context.Context) error { | |||
// caches. | |||
for _, watch := range c.startWatches { | |||
c.Log.Info("Starting EventSource", "source", watch.src) | |||
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { | |||
if err := watch.src.Start(sourceStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This context should be set up for each Start
call. Otherwise there could be an erroneous timeout if you have a bunch of watches.
if err := watch.src.Start(sourceStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil { | |
// use a context with timeout for launching sources and syncing caches. | |
watchStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) | |
defer cancel() | |
if err := watch.src.Start(watchStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil { |
// use a context with timeout for launching sources and syncing caches. | ||
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) | ||
defer cancel() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// use a context with timeout for launching sources and syncing caches. | |
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) | |
defer cancel() |
@@ -169,7 +177,7 @@ func (c *Controller) Start(ctx context.Context) error { | |||
if !ok { | |||
continue | |||
} | |||
if err := syncingSource.WaitForSync(ctx); err != nil { | |||
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { | |
// Use a context with timeout for syncing sources. | |
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) | |
defer cancel() | |
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { |
This PR allows users to configure timeout for cache syncs while starting the controller.
Co-authored-by: Alvaro Aleman <alvaroaleman@users.noreply.github.com>
d190c63
to
bbfc18c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/lgtm
/approve
Thank you for your work on this :)
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: alvaroaleman, varshaprasad96 The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
This PR allows users to configure timeout for cache syncs
while starting the controller.
Closes: #1219