diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index c2a19232c3..d2cba56c3c 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -39,6 +39,16 @@ var ( _ Cache = &informerCache{} ) +type InformerGetOption func(*internal.GetOptions) + +// BlockUntilSynced determines whether a get request for an informer should block +// until the informer's cache has synced. +func BlockUntilSynced(shouldBlock bool) InformerGetOption { + return func(opts *internal.GetOptions) { + opts.DoNotBlockUntilSynced = !shouldBlock + } +} + // ErrCacheNotStarted is returned when trying to read from the cache that wasn't started. type ErrCacheNotStarted struct{} @@ -60,7 +70,7 @@ func (ic *informerCache) Get(ctx context.Context, key client.ObjectKey, out clie return err } - started, cache, err := ic.Informers.Get(ctx, gvk, out) + started, cache, err := ic.Informers.Get(ctx, gvk, out, &internal.GetOptions{}) if err != nil { return err } @@ -78,7 +88,7 @@ func (ic *informerCache) List(ctx context.Context, out client.ObjectList, opts . return err } - started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj) + started, cache, err := ic.Informers.Get(ctx, *gvk, cacheTypeObj, &internal.GetOptions{}) if err != nil { return err } @@ -125,14 +135,19 @@ func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schem } // GetInformerForKind returns the informer for the GroupVersionKind. -func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { +func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) { // Map the gvk to an object obj, err := ic.scheme.New(gvk) if err != nil { return nil, err } - _, i, err := ic.Informers.Get(ctx, gvk, obj) + cfg := &internal.GetOptions{} + for _, opt := range opts { + opt(cfg) + } + + _, i, err := ic.Informers.Get(ctx, gvk, obj, cfg) if err != nil { return nil, err } @@ -140,35 +155,22 @@ func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou } // GetInformer returns the informer for the obj. -func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) { +func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) { gvk, err := apiutil.GVKForObject(obj, ic.scheme) if err != nil { return nil, err } - _, i, err := ic.Informers.Get(ctx, gvk, obj) - if err != nil { - return nil, err + cfg := &internal.GetOptions{} + for _, opt := range opts { + opt(cfg) } - return i.Informer, err -} -// GetInformerNonBlocking returns the informer for the obj without waiting for its cache to sync. -func (ic *informerCache) GetInformerNonBlocking(ctx context.Context, obj client.Object) (Informer, error) { - gvk, err := apiutil.GVKForObject(obj, ic.scheme) + _, i, err := ic.Informers.Get(ctx, gvk, obj, cfg) if err != nil { return nil, err } - - // Use a canceled context to signal non-blocking - canceledCtx, cancel := context.WithCancel(context.Background()) - cancel() - - _, i, err := ic.Informers.Get(canceledCtx, gvk, obj) - if err != nil && !apierrors.IsTimeout(err) { - return nil, err - } - return i.Informer, nil + return i.Informer, err } // NeedLeaderElection implements the LeaderElectionRunnable interface diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index 7dcb1119ff..5e42f93848 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -36,6 +36,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/internal/syncs" ) // InformersOpts configures an InformerMap. @@ -95,7 +96,7 @@ type Cache struct { // via the provided stop argument. func (c *Cache) Start(stop <-chan struct{}) { // Stop on either the whole map stopping or just this informer being removed. - internalStop, cancel := eitherChan(stop, c.stop) + internalStop, cancel := syncs.MergeChans(stop, c.stop) defer cancel() c.Informer.Run(internalStop) } @@ -106,6 +107,14 @@ type tracker struct { Metadata map[schema.GroupVersionKind]*Cache } +// GetOptions provides configuration to customize the behavior when +// getting an informer. +type GetOptions struct { + // DoNotBlockUntilSynced tells Get() to return the informer immediately, + // without waiting for its cache to sync. + DoNotBlockUntilSynced bool +} + // Informers create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. // It uses a standard parameter codec constructed based on the given generated Scheme. type Informers struct { @@ -294,7 +303,7 @@ func (ip *Informers) get(gvk schema.GroupVersionKind, obj runtime.Object) (res * // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns // the Informer from the map. -func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *Cache, error) { +func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, opts *GetOptions) (bool, *Cache, error) { // Return the informer if it is found i, started, ok := ip.get(gvk, obj) if !ok { @@ -304,14 +313,10 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r } } - if started && !i.Informer.HasSynced() { - // Cancel for context, informer stopping, or entire map stopping. - syncStop, cancel := mergeChan(ctx.Done(), i.stop, ip.ctx.Done()) - defer cancel() - + if started && !i.Informer.HasSynced() && !opts.DoNotBlockUntilSynced { // Wait for it to sync before returning the Informer so that folks don't read from a stale cache. - if !cache.WaitForCacheSync(syncStop, i.Informer.HasSynced) { - return started, i, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0) + if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) { + return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0) } } @@ -592,50 +597,3 @@ func restrictNamespaceBySelector(namespaceOpt string, s Selector) string { } return "" } - -// eitherChan returns a channel that is closed when either of the input channels are signaled. -// The caller must call the returned CancelFunc to ensure no resources are leaked. -func eitherChan(a, b <-chan struct{}) (<-chan struct{}, context.CancelFunc) { - var once sync.Once - out := make(chan struct{}) - cancel := make(chan struct{}) - cancelFunc := func() { - once.Do(func() { - close(cancel) - }) - } - go func() { - defer close(out) - select { - case <-a: - case <-b: - case <-cancel: - } - }() - - return out, cancelFunc -} - -// mergeChan returns a channel that is closed when any of the input channels are signaled. -// The caller must call the returned CancelFunc to ensure no resources are leaked. -func mergeChan(a, b, c <-chan struct{}) (<-chan struct{}, context.CancelFunc) { - var once sync.Once - out := make(chan struct{}) - cancel := make(chan struct{}) - cancelFunc := func() { - once.Do(func() { - close(cancel) - }) - } - go func() { - defer close(out) - select { - case <-a: - case <-b: - case <-c: - case <-cancel: - } - }() - - return out, cancelFunc -} diff --git a/pkg/internal/syncs/syncs.go b/pkg/internal/syncs/syncs.go new file mode 100644 index 0000000000..84beb1b265 --- /dev/null +++ b/pkg/internal/syncs/syncs.go @@ -0,0 +1,37 @@ +package syncs + +import ( + "context" + "reflect" + "sync" +) + +// MergeChans returns a channel that is closed when any of the input channels are signaled. +// The caller must call the returned CancelFunc to ensure no resources are leaked. +func MergeChans[T any](chans ...<-chan T) (<-chan T, context.CancelFunc) { + var once sync.Once + out := make(chan T) + cancel := make(chan T) + cancelFunc := func() { + once.Do(func() { + close(cancel) + }) + } + cases := make([]reflect.SelectCase, len(chans)+1) + for i := range chans { + cases[i] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(chans[i]), + } + } + cases[len(cases)-1] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(cancel), + } + go func() { + defer close(out) + _, _, _ = reflect.Select(cases) + }() + + return out, cancelFunc +} diff --git a/pkg/internal/syncs/syncs_test.go b/pkg/internal/syncs/syncs_test.go new file mode 100644 index 0000000000..091549514f --- /dev/null +++ b/pkg/internal/syncs/syncs_test.go @@ -0,0 +1,103 @@ +package syncs + +import ( + "testing" + "time" +) + +func TestMergeChans(t *testing.T) { + tests := []struct { + name string + count int + signal int + }{ + { + name: "single channel", + count: 1, + signal: 0, + }, + { + name: "double channel", + count: 2, + signal: 0, + }, + { + name: "five channel, close 0", + count: 5, + signal: 0, + }, + { + name: "five channel, close 1", + count: 5, + signal: 1, + }, + { + name: "five channel, close 2", + count: 5, + signal: 2, + }, + { + name: "five channel, close 3", + count: 5, + signal: 3, + }, + { + name: "five channel, close 4", + count: 5, + signal: 4, + }, + { + name: "single channel, cancel", + count: 1, + signal: -1, + }, + { + name: "double channel, cancel", + count: 2, + signal: -1, + }, + { + name: "five channel, cancel", + count: 5, + signal: -1, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if callAndClose(test.count, test.signal, 1) { + t.Error("timeout before merged channel closed") + } + }) + } +} + +func callAndClose(numChans, signalChan, timeoutSeconds int) bool { + chans := make([]chan struct{}, numChans) + readOnlyChans := make([]<-chan struct{}, numChans) + for i := range chans { + chans[i] = make(chan struct{}) + readOnlyChans[i] = chans[i] + } + defer func() { + for i := range chans { + close(chans[i]) + } + }() + + merged, cancel := MergeChans(readOnlyChans...) + defer cancel() + + timer := time.NewTimer(time.Duration(timeoutSeconds) * time.Second) + + if signalChan >= 0 { + chans[signalChan] <- struct{}{} + } else { + cancel() + } + select { + case <-merged: + return false + case <-timer.C: + return true + } +}