Skip to content

Commit

Permalink
wait for specific registration to Sync instead of whole cache Sync
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Ramlot <42113979+inteon@users.noreply.github.com>
  • Loading branch information
inteon committed Dec 27, 2023
1 parent 360fedb commit 7a34661
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 23 deletions.
8 changes: 3 additions & 5 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -89,10 +90,7 @@ func (c *FakeInformers) RemoveInformer(ctx context.Context, obj client.Object) e

// WaitForCacheSync implements Informers.
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
if c.Synced == nil {
return true
}
return *c.Synced
return ptr.Deref(c.Synced, true)
}

// FakeInformerFor implements Informers.
Expand All @@ -116,7 +114,7 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec
return informer, nil
}

c.InformersByGVK[gvk] = &controllertest.FakeInformer{}
c.InformersByGVK[gvk] = &controllertest.FakeInformer{Synced: ptr.Deref(c.Synced, true)}
return c.InformersByGVK[gvk], nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/controllertest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (f *FakeInformer) HasSynced() bool {
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
f.mu.Lock()
defer f.mu.Unlock()
eh := &eventHandlerWrapper{handler, true}
eh := &eventHandlerWrapper{handler, f.Synced}
f.handlers = append(f.handlers, eh)
return eh, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/internal/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ var _ = Describe("controller", func() {

err = ctrl.Start(context.TODO())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("timed out trying to get an informer from cache and waiting for cache to be synced for Kind *v1.Deployment"))
Expect(err.Error()).To(ContainSubstring("timed out trying to get an informer from cache for Kind *v1.Deployment"))
})

It("should not error when context cancelled", func() {
Expand Down
21 changes: 10 additions & 11 deletions pkg/internal/source/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// syncedPollPeriod controls how often you look at the status of your sync funcs
const syncedPollPeriod = 100 * time.Millisecond

// Kind is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
type Kind struct {
// Type is the type of object to watch. e.g. &v1.Pod{}
Expand Down Expand Up @@ -187,15 +190,6 @@ func (ks *Kind) registerEventHandler(ctx context.Context, eventHandler toolscach
return err
}

// Wait for the cache to sync.
if !ks.Cache.WaitForCacheSync(ctx) {
if ctx.Err() != nil {
return fmt.Errorf("cache did not sync: %w", ctx.Err())
}

return fmt.Errorf("cache did not sync")
}

return nil
}

Expand Down Expand Up @@ -224,6 +218,12 @@ func (ks *Kind) WaitForSync(ctx context.Context) error {
// Check if we have any startup errors. We ignore the errors if the context was canceled, because it means that the
// source was stopped by the user.
if startErr, _ := ks.startupErr.Load().(startupErr); !startErr.isCanceled && startErr.err == nil {
if err := wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(_ context.Context) (bool, error) {
return ks.registration.HasSynced(), nil
}); err != nil {
return fmt.Errorf("informer did not sync in time: %w", err)
}

// The cache was synced and the startup goroutine returned without an error.
return nil
}
Expand All @@ -236,7 +236,7 @@ func (ks *Kind) WaitForSync(ctx context.Context) error {

// Return an additional timeout error if the context was not cancelled.
if !errors.Is(ctx.Err(), context.Canceled) {
returnErrors = append(returnErrors, fmt.Errorf("timed out trying to get an informer from cache and waiting for cache to be synced for Kind %T", ks.Type))
returnErrors = append(returnErrors, fmt.Errorf("timed out trying to get an informer from cache for Kind %T", ks.Type))
}
}

Expand Down Expand Up @@ -292,7 +292,6 @@ func (ks *Kind) Shutdown() error {
if err := ks.informer.RemoveEventHandler(ks.registration); err != nil {
errs = append(errs, fmt.Errorf("failed to stop source: %w", err))
}
ks.registration = nil
}

return kerrors.NewAggregate(errs)
Expand Down
12 changes: 11 additions & 1 deletion pkg/source/source_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ import (
"errors"
"fmt"
"sync"
"time"

kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/cache"
Expand All @@ -31,6 +33,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// syncedPollPeriod controls how often you look at the status of your sync funcs
const syncedPollPeriod = 100 * time.Millisecond

// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create).
type Informer struct {
// Informer is the controller-runtime Informer
Expand Down Expand Up @@ -116,6 +121,12 @@ func (is *Informer) WaitForSync(ctx context.Context) error {
// Check if we have any startup errors. We ignore the errors if the context was canceled, because it means that the
// source was stopped by the user.
if !is.startupErr.isCanceled && is.startupErr.err == nil {
if err := wait.PollUntilContextCancel(ctx, syncedPollPeriod, true, func(_ context.Context) (bool, error) {
return is.registration.HasSynced(), nil
}); err != nil {
return fmt.Errorf("informer did not sync in time: %w", err)
}

// The cache was synced and the startup goroutine returned without an error.
return nil
}
Expand Down Expand Up @@ -167,7 +178,6 @@ func (is *Informer) Shutdown() error {
if err := is.Informer.RemoveEventHandler(is.registration); err != nil {
errs = append(errs, fmt.Errorf("failed to stop source: %w", err))
}
is.registration = nil
}

return kerrors.NewAggregate(errs)
Expand Down
12 changes: 8 additions & 4 deletions pkg/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,11 @@ var _ = Describe("Source", func() {
f := false
instance := source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{})
Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred())
err := instance.WaitForSync(context.Background())
timeoutCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := instance.WaitForSync(timeoutCtx)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("failed to start source: cache did not sync"))
Expect(err.Error()).To(Equal("informer did not sync in time: context deadline exceeded"))

})

Expand All @@ -264,9 +266,11 @@ var _ = Describe("Source", func() {
f := false
instance := source.Kind(&informertest.FakeInformers{Synced: &f}, &corev1.Pod{})
Expect(instance.Start(context.Background(), nil, nil)).NotTo(HaveOccurred())
err := instance.WaitForSync(context.Background())
timeoutCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
err := instance.WaitForSync(timeoutCtx)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("failed to start source: cache did not sync"))
Expect(err.Error()).To(Equal("informer did not sync in time: context deadline exceeded"))

})

Expand Down

0 comments on commit 7a34661

Please sign in to comment.