diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 33804d4e7f..6347cb7394 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -170,6 +170,15 @@ type Options struct { // instead of `reconcile.Result{}`. SyncPeriod *time.Duration + // MinRewatchPeriod is the mininum rewatch period. If the watch request + // times out, the cache will close the watch and reconnect. + // + // We try to spread the load on apiserver by setting timeouts for + // watch requests - it is random in [MinRewatchPeriod, 2*MinRewatchPeriod]. + // + // Defaults to 5 minutes if unset. + MinRewatchPeriod *time.Duration + // ReaderFailOnMissingInformer configures the cache to return a ErrResourceNotCached error when a user // requests, using Get() and List(), a resource the cache does not already have an informer for. // @@ -383,6 +392,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { WatchErrorHandler: opts.DefaultWatchErrorHandler, UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false), NewInformer: opts.newInformer, + MinWatchTimeout: opts.MinRewatchPeriod, }), readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, } diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index c270e809ca..1501723777 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -51,6 +51,7 @@ type InformersOpts struct { Transform cache.TransformFunc UnsafeDisableDeepCopy bool WatchErrorHandler cache.WatchErrorHandler + MinWatchTimeout *time.Duration } // NewInformers creates a new InformersMap that can create informers under the hood. @@ -79,6 +80,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers { unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy, newInformer: newInformer, watchErrorHandler: options.WatchErrorHandler, + minWatchTimeout: options.MinWatchTimeout, } } @@ -147,6 +149,11 @@ type Informers struct { // so that all informers will not send list requests simultaneously. resync time.Duration + // minWatchTimeout is the minimum timeout period for watch requests + // We try to spread the load on apiserver by setting timeouts for + // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout]. + minWatchTimeout *time.Duration + // mu guards access to the map mu sync.RWMutex @@ -354,6 +361,10 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { ip.selector.ApplyToList(&opts) opts.Watch = true // Watch needs to be set to true separately + if ip.minWatchTimeout != nil { + watchTimeoutSeconds := int64(ip.minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) //nolint:gosec + opts.TimeoutSeconds = &watchTimeoutSeconds + } return listWatcher.WatchFunc(opts) }, }, obj, calculateResyncPeriod(ip.resync), cache.Indexers{