diff --git a/pkg/patterns/declarative/pkg/watch/dynamic.go b/pkg/patterns/declarative/pkg/watch/dynamic.go index c86805dc..12cfeb15 100644 --- a/pkg/patterns/declarative/pkg/watch/dynamic.go +++ b/pkg/patterns/declarative/pkg/watch/dynamic.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "k8s.io/apimachinery/pkg/api/meta" @@ -35,8 +36,19 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) -// WatchDelay is the time between a Watch being dropped and attempting to resume it -const WatchDelay = 30 * time.Second +var ( + // WatchActivityTimeout sets a timeout for a Watch activity under normal operation + WatchActivityTimeout = 300 * time.Second + // WatchActivityFirstTimeout sets a timeout for Watch activity in an Apply path + // We expect the author to set this to a lower value in environments where it makes sense. + // func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { ... watch.WatchActivityFirstTimeout = 10 * time.Second ... } + WatchActivityFirstTimeout = 300 * time.Second +) + +const ( + // WatchDelay is the time between a Watch being dropped and attempting to resume it + WatchDelay = 30 * time.Second +) // NewDynamicWatch constructs a watcher for unstructured objects. // Deprecated: avoid using directly; will move to internal in future. @@ -138,13 +150,46 @@ type clientObject struct { // // [1] https://github.com/kubernetes/kubernetes/issues/54878#issuecomment-357575276 func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget metav1.ObjectMeta, watchStarted *sync.WaitGroup) { + var sawActivity atomic.Bool + log := log.FromContext(ctx) options := w.FilterOptions // Though we don't use the resource version, we allow bookmarks to help keep TCP connections healthy. options.AllowWatchBookmarks = true - events, err := w.resource.Watch(context.TODO(), options) + activityTimeout := WatchActivityTimeout + if watchStarted != nil { + activityTimeout = WatchActivityFirstTimeout + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + // Check for events periodically + ticker := time.NewTicker(activityTimeout) + defer ticker.Stop() + sawActivity.Store(false) + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if !sawActivity.Load() { + log.WithValues("kind", w.GVK.String()).WithValues("namespace", w.FilterNamespace).WithValues("labels", options.LabelSelector).Info("no activity seen for a while, cancelling watch") + cancel() + return + } + sawActivity.Store(false) + } + } + }() + + events, err := w.resource.Watch(ctx, options) + // If the Watch() call doesnt return, this would not be set to true thereby causing the timer to cancle the watch() context + // We have seen cases where a proxy in between causes the first watch to hang if there were no matching objects to return + sawActivity.Store(true) + if watchStarted != nil { watchStarted.Done() } @@ -159,6 +204,7 @@ func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget met defer events.Stop() for clientEvent := range events.ResultChan() { + sawActivity.Store(true) switch clientEvent.Type { case watch.Bookmark: // not an object change, we ignore it