From 4a54ca189d08632226eaf85e124fdf05cae38b79 Mon Sep 17 00:00:00 2001 From: Barni S Date: Thu, 6 Jun 2024 01:32:56 -0400 Subject: [PATCH] Add the ability to set watch timeouts. Allow setting shorter watch timeout for the firt time alone. What this PR does / why we need it: We have seen in some evnironments (where requests are tunnelled) setting up the (first) watch is blocking if there are no resources available to watch. This causes the apply loop to be blocked when setting up watches before apply. --- pkg/patterns/declarative/pkg/watch/dynamic.go | 52 +++++++++++++++++-- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/pkg/patterns/declarative/pkg/watch/dynamic.go b/pkg/patterns/declarative/pkg/watch/dynamic.go index c86805dc..12cfeb15 100644 --- a/pkg/patterns/declarative/pkg/watch/dynamic.go +++ b/pkg/patterns/declarative/pkg/watch/dynamic.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "k8s.io/apimachinery/pkg/api/meta" @@ -35,8 +36,19 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) -// WatchDelay is the time between a Watch being dropped and attempting to resume it -const WatchDelay = 30 * time.Second +var ( + // WatchActivityTimeout sets a timeout for a Watch activity under normal operation + WatchActivityTimeout = 300 * time.Second + // WatchActivityFirstTimeout sets a timeout for Watch activity in an Apply path + // We expect the author to set this to a lower value in environments where it makes sense. + // func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { ... watch.WatchActivityFirstTimeout = 10 * time.Second ... } + WatchActivityFirstTimeout = 300 * time.Second +) + +const ( + // WatchDelay is the time between a Watch being dropped and attempting to resume it + WatchDelay = 30 * time.Second +) // NewDynamicWatch constructs a watcher for unstructured objects. // Deprecated: avoid using directly; will move to internal in future. @@ -138,13 +150,46 @@ type clientObject struct { // // [1] https://github.com/kubernetes/kubernetes/issues/54878#issuecomment-357575276 func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget metav1.ObjectMeta, watchStarted *sync.WaitGroup) { + var sawActivity atomic.Bool + log := log.FromContext(ctx) options := w.FilterOptions // Though we don't use the resource version, we allow bookmarks to help keep TCP connections healthy. options.AllowWatchBookmarks = true - events, err := w.resource.Watch(context.TODO(), options) + activityTimeout := WatchActivityTimeout + if watchStarted != nil { + activityTimeout = WatchActivityFirstTimeout + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + // Check for events periodically + ticker := time.NewTicker(activityTimeout) + defer ticker.Stop() + sawActivity.Store(false) + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if !sawActivity.Load() { + log.WithValues("kind", w.GVK.String()).WithValues("namespace", w.FilterNamespace).WithValues("labels", options.LabelSelector).Info("no activity seen for a while, cancelling watch") + cancel() + return + } + sawActivity.Store(false) + } + } + }() + + events, err := w.resource.Watch(ctx, options) + // If the Watch() call doesnt return, this would not be set to true thereby causing the timer to cancle the watch() context + // We have seen cases where a proxy in between causes the first watch to hang if there were no matching objects to return + sawActivity.Store(true) + if watchStarted != nil { watchStarted.Done() } @@ -159,6 +204,7 @@ func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget met defer events.Stop() for clientEvent := range events.ResultChan() { + sawActivity.Store(true) switch clientEvent.Type { case watch.Bookmark: // not an object change, we ignore it