From b7148d6ab533d0c34b1b46d63eafa4a0dcc8c3b3 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Wed, 27 Mar 2024 10:57:00 -0500 Subject: [PATCH 1/5] allow setting watchTimeoutPeriod when creating informers --- pkg/cache/cache.go | 7 +++++++ pkg/cache/internal/informers.go | 9 +++++++++ 2 files changed, 16 insertions(+) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 33804d4e7f..5522f27dfc 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -170,6 +170,12 @@ type Options struct { // instead of `reconcile.Result{}`. SyncPeriod *time.Duration + // WatchTimeoutPeriod is the timeout for the watch request. If the watch request + // times out, the cache will close the watch and reconnect. + // + // Defaults to a random duration between 5 and 10 minutes if unset. + WatchTimeoutPeriod *time.Duration + // ReaderFailOnMissingInformer configures the cache to return a ErrResourceNotCached error when a user // requests, using Get() and List(), a resource the cache does not already have an informer for. // @@ -383,6 +389,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { WatchErrorHandler: opts.DefaultWatchErrorHandler, UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false), NewInformer: opts.newInformer, + WatchTimeoutPeriod: opts.WatchTimeoutPeriod, }), readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, } diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index c270e809ca..c24755079a 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/metadata" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/internal/syncs" ) @@ -51,6 +52,7 @@ type InformersOpts struct { Transform cache.TransformFunc UnsafeDisableDeepCopy bool WatchErrorHandler cache.WatchErrorHandler + WatchTimeoutPeriod *time.Duration } // NewInformers creates a new InformersMap that can create informers under the hood. @@ -79,6 +81,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers { unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy, newInformer: newInformer, watchErrorHandler: options.WatchErrorHandler, + watchTimeoutPeriod: options.WatchTimeoutPeriod, } } @@ -147,6 +150,9 @@ type Informers struct { // so that all informers will not send list requests simultaneously. resync time.Duration + // watchTimeoutPeriod is the timeout period for watch requests + watchTimeoutPeriod *time.Duration + // mu guards access to the map mu sync.RWMutex @@ -354,6 +360,9 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { ip.selector.ApplyToList(&opts) opts.Watch = true // Watch needs to be set to true separately + if ip.watchTimeoutPeriod != nil { + opts.TimeoutSeconds = ptr.To(int64(ip.watchTimeoutPeriod.Seconds())) + } return listWatcher.WatchFunc(opts) }, }, obj, calculateResyncPeriod(ip.resync), cache.Indexers{ From f7269ead7edc9811834c06746819de16cd3ea7ad Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 28 Mar 2024 11:38:10 -0500 Subject: [PATCH 2/5] RewatchPeriod --- pkg/cache/cache.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 5522f27dfc..57743581af 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -170,11 +170,11 @@ type Options struct { // instead of `reconcile.Result{}`. SyncPeriod *time.Duration - // WatchTimeoutPeriod is the timeout for the watch request. If the watch request + // RewatchPeriod is the timeout for the watch request. If the watch request // times out, the cache will close the watch and reconnect. // // Defaults to a random duration between 5 and 10 minutes if unset. - WatchTimeoutPeriod *time.Duration + RewatchPeriod *time.Duration // ReaderFailOnMissingInformer configures the cache to return a ErrResourceNotCached error when a user // requests, using Get() and List(), a resource the cache does not already have an informer for. @@ -389,7 +389,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { WatchErrorHandler: opts.DefaultWatchErrorHandler, UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false), NewInformer: opts.newInformer, - WatchTimeoutPeriod: opts.WatchTimeoutPeriod, + WatchTimeoutPeriod: opts.RewatchPeriod, }), readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, } From b64a62600bf12bae707f1b84f8f641fa2565457b Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 28 Mar 2024 21:51:22 -0500 Subject: [PATCH 3/5] randomize --- pkg/cache/cache.go | 11 +++++++---- pkg/cache/internal/informers.go | 14 +++++++------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 57743581af..c4b620a900 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -170,11 +170,14 @@ type Options struct { // instead of `reconcile.Result{}`. SyncPeriod *time.Duration - // RewatchPeriod is the timeout for the watch request. If the watch request + // MinRewatchPeriod is the mininum rewatch period. If the watch request // times out, the cache will close the watch and reconnect. // - // Defaults to a random duration between 5 and 10 minutes if unset. - RewatchPeriod *time.Duration + // We try to spread the load on apiserver by setting timeouts for + // watch requests - it is random in [MinRewatchPeriod, 2*MinRewatchPeriod]. + // + // Defaults to 5 minutes if unset. + MinRewatchPeriod *time.Duration // ReaderFailOnMissingInformer configures the cache to return a ErrResourceNotCached error when a user // requests, using Get() and List(), a resource the cache does not already have an informer for. @@ -389,7 +392,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { WatchErrorHandler: opts.DefaultWatchErrorHandler, UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false), NewInformer: opts.newInformer, - WatchTimeoutPeriod: opts.RewatchPeriod, + MinWatchTimeoutPeriod: opts.MinRewatchPeriod, }), readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, } diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index c24755079a..6e0dcc04f4 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -35,7 +35,6 @@ import ( "k8s.io/client-go/metadata" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" - "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/internal/syncs" ) @@ -52,7 +51,7 @@ type InformersOpts struct { Transform cache.TransformFunc UnsafeDisableDeepCopy bool WatchErrorHandler cache.WatchErrorHandler - WatchTimeoutPeriod *time.Duration + MinWatchTimeoutPeriod *time.Duration } // NewInformers creates a new InformersMap that can create informers under the hood. @@ -81,7 +80,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers { unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy, newInformer: newInformer, watchErrorHandler: options.WatchErrorHandler, - watchTimeoutPeriod: options.WatchTimeoutPeriod, + minWatchTimeoutPeriod: options.MinWatchTimeoutPeriod, } } @@ -150,8 +149,8 @@ type Informers struct { // so that all informers will not send list requests simultaneously. resync time.Duration - // watchTimeoutPeriod is the timeout period for watch requests - watchTimeoutPeriod *time.Duration + // minWatchTimeoutPeriod is the timeout period for watch requests + minWatchTimeoutPeriod *time.Duration // mu guards access to the map mu sync.RWMutex @@ -360,8 +359,9 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { ip.selector.ApplyToList(&opts) opts.Watch = true // Watch needs to be set to true separately - if ip.watchTimeoutPeriod != nil { - opts.TimeoutSeconds = ptr.To(int64(ip.watchTimeoutPeriod.Seconds())) + if ip.minWatchTimeoutPeriod != nil { + watchTimeoutSeconds := int64(ip.minWatchTimeoutPeriod.Seconds() * (rand.Float64() + 1.0)) + opts.TimeoutSeconds = &watchTimeoutSeconds } return listWatcher.WatchFunc(opts) }, From 176c5b2dde7d07fa2eaf5a53f4b11bac8a67399f Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 28 Mar 2024 21:53:12 -0500 Subject: [PATCH 4/5] rename and fix comment --- pkg/cache/cache.go | 2 +- pkg/cache/internal/informers.go | 14 ++++++++------ 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index c4b620a900..6347cb7394 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -392,7 +392,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { WatchErrorHandler: opts.DefaultWatchErrorHandler, UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false), NewInformer: opts.newInformer, - MinWatchTimeoutPeriod: opts.MinRewatchPeriod, + MinWatchTimeout: opts.MinRewatchPeriod, }), readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, } diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index 6e0dcc04f4..0ff4af3095 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -51,7 +51,7 @@ type InformersOpts struct { Transform cache.TransformFunc UnsafeDisableDeepCopy bool WatchErrorHandler cache.WatchErrorHandler - MinWatchTimeoutPeriod *time.Duration + MinWatchTimeout *time.Duration } // NewInformers creates a new InformersMap that can create informers under the hood. @@ -80,7 +80,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers { unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy, newInformer: newInformer, watchErrorHandler: options.WatchErrorHandler, - minWatchTimeoutPeriod: options.MinWatchTimeoutPeriod, + minWatchTimeout: options.MinWatchTimeout, } } @@ -149,8 +149,10 @@ type Informers struct { // so that all informers will not send list requests simultaneously. resync time.Duration - // minWatchTimeoutPeriod is the timeout period for watch requests - minWatchTimeoutPeriod *time.Duration + // minWatchTimeout is the minimum timeout period for watch requests + // We try to spread the load on apiserver by setting timeouts for + // watch requests - it is random in [minWatchTimeout, 2*minWatchTimeout]. + minWatchTimeout *time.Duration // mu guards access to the map mu sync.RWMutex @@ -359,8 +361,8 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { ip.selector.ApplyToList(&opts) opts.Watch = true // Watch needs to be set to true separately - if ip.minWatchTimeoutPeriod != nil { - watchTimeoutSeconds := int64(ip.minWatchTimeoutPeriod.Seconds() * (rand.Float64() + 1.0)) + if ip.minWatchTimeout != nil { + watchTimeoutSeconds := int64(ip.minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) opts.TimeoutSeconds = &watchTimeoutSeconds } return listWatcher.WatchFunc(opts) From d74c43fccd0b1d3f94dec4b2f67a6185f3ad5641 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 28 Mar 2024 22:16:53 -0500 Subject: [PATCH 5/5] ignore lint --- pkg/cache/internal/informers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index 0ff4af3095..1501723777 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -362,7 +362,7 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O ip.selector.ApplyToList(&opts) opts.Watch = true // Watch needs to be set to true separately if ip.minWatchTimeout != nil { - watchTimeoutSeconds := int64(ip.minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) + watchTimeoutSeconds := int64(ip.minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) //nolint:gosec opts.TimeoutSeconds = &watchTimeoutSeconds } return listWatcher.WatchFunc(opts)