Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the ability to set watch timeouts. #392

Merged
merged 1 commit into from
Jun 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 49 additions & 3 deletions pkg/patterns/declarative/pkg/watch/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -35,8 +36,19 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
)

// WatchDelay is the time between a Watch being dropped and attempting to resume it
const WatchDelay = 30 * time.Second
var (
// WatchActivityTimeout sets a timeout for a Watch activity under normal operation
WatchActivityTimeout = 300 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have a method to enable the watch activity timeout, but I actually think it's a really good defensive feature so I think we should enable it by default anyway. (If anyone disagrees, please comment!)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

// WatchActivityFirstTimeout sets a timeout for Watch activity in an Apply path
// We expect the author to set this to a lower value in environments where it makes sense.
// func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error { ... watch.WatchActivityFirstTimeout = 10 * time.Second ... }
WatchActivityFirstTimeout = 300 * time.Second
)

const (
// WatchDelay is the time between a Watch being dropped and attempting to resume it
WatchDelay = 30 * time.Second
)

// NewDynamicWatch constructs a watcher for unstructured objects.
// Deprecated: avoid using directly; will move to internal in future.
Expand Down Expand Up @@ -138,13 +150,46 @@ type clientObject struct {
//
// [1] https://github.com/kubernetes/kubernetes/issues/54878#issuecomment-357575276
func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget metav1.ObjectMeta, watchStarted *sync.WaitGroup) {
var sawActivity atomic.Bool

log := log.FromContext(ctx)

options := w.FilterOptions
// Though we don't use the resource version, we allow bookmarks to help keep TCP connections healthy.
options.AllowWatchBookmarks = true

events, err := w.resource.Watch(context.TODO(), options)
activityTimeout := WatchActivityTimeout
if watchStarted != nil {
activityTimeout = WatchActivityFirstTimeout
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Check for events periodically
ticker := time.NewTicker(activityTimeout)
defer ticker.Stop()
sawActivity.Store(false)

go func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if !sawActivity.Load() {
log.WithValues("kind", w.GVK.String()).WithValues("namespace", w.FilterNamespace).WithValues("labels", options.LabelSelector).Info("no activity seen for a while, cancelling watch")
cancel()
return
}
sawActivity.Store(false)
}
}
}()

events, err := w.resource.Watch(ctx, options)
// If the Watch() call doesnt return, this would not be set to true thereby causing the timer to cancle the watch() context
// We have seen cases where a proxy in between causes the first watch to hang if there were no matching objects to return
sawActivity.Store(true)
barney-s marked this conversation as resolved.
Show resolved Hide resolved

if watchStarted != nil {
watchStarted.Done()
}
Expand All @@ -159,6 +204,7 @@ func (w *dynamicKindWatch) watchUntilClosed(ctx context.Context, eventTarget met
defer events.Stop()

for clientEvent := range events.ResultChan() {
sawActivity.Store(true)
switch clientEvent.Type {
case watch.Bookmark:
// not an object change, we ignore it
Expand Down
Loading