diff --git a/libcalico-go/lib/backend/api/api.go b/libcalico-go/lib/backend/api/api.go index d6b45441bb9..fa3fb4bbbfb 100644 --- a/libcalico-go/lib/backend/api/api.go +++ b/libcalico-go/lib/backend/api/api.go @@ -126,7 +126,8 @@ type Client interface { } type WatchOptions struct { - Revision string + Revision string + AllowWatchBookmarks bool } type Syncer interface { @@ -200,6 +201,7 @@ const ( WatchModified WatchEventType = "MODIFIED" WatchDeleted WatchEventType = "DELETED" WatchError WatchEventType = "ERROR" + WatchBookmark WatchEventType = "BOOKMARK" ) // Event represents a single event to a watched resource. diff --git a/libcalico-go/lib/backend/k8s/resources/profile.go b/libcalico-go/lib/backend/k8s/resources/profile.go index 2df16916812..4e4b75adb13 100644 --- a/libcalico-go/lib/backend/k8s/resources/profile.go +++ b/libcalico-go/lib/backend/k8s/resources/profile.go @@ -424,6 +424,13 @@ func (pw *profileWatcher) processProfileEvents() { switch e.Type { case api.WatchModified, api.WatchAdded: value = e.New.Value + case api.WatchBookmark: + if isNsEvent { + pw.k8sNSRev = e.New.Revision + } else { + pw.k8sSARev = e.New.Revision + } + e.New.Revision = pw.JoinProfileRevisions(pw.k8sNSRev, pw.k8sSARev) case api.WatchDeleted: value = e.Old.Value } diff --git a/libcalico-go/lib/backend/k8s/resources/resources.go b/libcalico-go/lib/backend/k8s/resources/resources.go index 3231bad203f..5d569f1afac 100644 --- a/libcalico-go/lib/backend/k8s/resources/resources.go +++ b/libcalico-go/lib/backend/k8s/resources/resources.go @@ -325,7 +325,8 @@ func ConvertK8sResourceToCalicoResource(res Resource) error { func watchOptionsToK8sListOptions(wo api.WatchOptions) metav1.ListOptions { return metav1.ListOptions{ - ResourceVersion: wo.Revision, - Watch: true, + ResourceVersion: wo.Revision, + Watch: true, + AllowWatchBookmarks: wo.AllowWatchBookmarks, } } diff --git a/libcalico-go/lib/backend/k8s/resources/watcher.go b/libcalico-go/lib/backend/k8s/resources/watcher.go index 63fb38086be..33219627bab 100644 --- a/libcalico-go/lib/backend/k8s/resources/watcher.go +++ b/libcalico-go/lib/backend/k8s/resources/watcher.go @@ -166,7 +166,17 @@ func (crw *k8sWatcherConverter) convertEvent(kevent kwatch.Event) []*api.WatchEv } return crw.buildEventsFromKVPs(kvps, kevent.Type) - + case kwatch.Bookmark: + // For bookmarks we send an empty KVPair with the current resource + // version only. + k8sRes := kevent.Object.(Resource) + revision := k8sRes.GetObjectMeta().GetResourceVersion() + return []*api.WatchEvent{{ + Type: api.WatchBookmark, + New: &model.KVPair{ + Revision: revision, + }, + }} default: return []*api.WatchEvent{{ Type: api.WatchError, diff --git a/libcalico-go/lib/backend/k8s/resources/watcher_test.go b/libcalico-go/lib/backend/k8s/resources/watcher_test.go index 2552d91f142..ed148e3564c 100644 --- a/libcalico-go/lib/backend/k8s/resources/watcher_test.go +++ b/libcalico-go/lib/backend/k8s/resources/watcher_test.go @@ -48,7 +48,7 @@ var _ = Describe("Resources watcher ", func() { It("should return error WatchEvent with unexpected kwatch event type", func() { events := kwc.convertEvent(kwatch.Event{ - Type: kwatch.Bookmark, + Type: "GARBAGE", }) Expect(events).To(HaveLen(1)) Expect(events[0].Type).To(Equal(api.WatchError)) diff --git a/libcalico-go/lib/backend/watchersyncer/watchercache.go b/libcalico-go/lib/backend/watchersyncer/watchercache.go index f24bf275156..78df06734ca 100644 --- a/libcalico-go/lib/backend/watchersyncer/watchercache.go +++ b/libcalico-go/lib/backend/watchersyncer/watchercache.go @@ -36,18 +36,23 @@ import ( // - An api.Update // - A api.SyncStatus (only for the very first InSync notification) type watcherCache struct { - logger *logrus.Entry - client api.Client - watch api.WatchInterface - resources map[string]cacheEntry - oldResources map[string]cacheEntry - results chan<- interface{} - hasSynced bool - resourceType ResourceType - currentWatchRevision string - resyncBlockedUntil time.Time + logger *logrus.Entry + client api.Client + watch api.WatchInterface + resources map[string]cacheEntry + oldResources map[string]cacheEntry + results chan<- interface{} + hasSynced bool + resourceType ResourceType + currentWatchRevision string + errorCountAtCurrentRev int + resyncBlockedUntil time.Time } +const ( + maxErrorsPerRevision = 5 +) + var ( MinResyncInterval = 500 * time.Millisecond ListRetryInterval = 1000 * time.Millisecond @@ -117,13 +122,22 @@ mainLoop: } kvp.Value = nil wc.handleWatchListEvent(kvp) + case api.WatchBookmark: + wc.logger.WithField("newRevision", event.New.Revision).Debug("Watch bookmark received") + wc.currentWatchRevision = event.New.Revision + wc.errorCountAtCurrentRev = 0 case api.WatchError: // Handle a WatchError. This error triggered from upstream, all type // of WatchError are treated equally,log the Error and trigger a full resync. We only log at info // because errors may occur due to compaction causing revisions to no longer be valid - in this case // we simply need to do a full resync. - wc.logger.WithError(event.Error).Infof("Watch error received from Upstream") - wc.currentWatchRevision = "0" + wc.logger.WithError(event.Error).Info("Watch error event received, restarting the watch.") + wc.errorCountAtCurrentRev++ + if wc.errorCountAtCurrentRev > maxErrorsPerRevision { + // Too many errors at the current revision, trigger a full resync. + wc.logger.Warn("Too many errors at current revision, triggering full resync") + wc.resetWatchRevisionForFullResync() + } wc.resyncAndCreateWatcher(ctx) default: // Unknown event type - not much we can do other than log. @@ -159,7 +173,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { wc.cleanExistingWatcher() // If we don't have a currentWatchRevision then we need to perform a full resync. - performFullResync := wc.currentWatchRevision == "0" + var performFullResync bool for { select { case <-ctx.Done(): @@ -177,6 +191,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // watch immediately ends. wc.resyncBlockedUntil = time.Now().Add(MinResyncInterval) + performFullResync = performFullResync || wc.currentWatchRevision == "0" if performFullResync { wc.logger.Info("Full resync is required") @@ -195,7 +210,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { if errors.IsResourceExpired(err) { // Our current watch revision is too old. Start again without a revision. wc.logger.Info("Clearing cached watch revision for next List call") - wc.currentWatchRevision = "0" + wc.resetWatchRevisionForFullResync() } wc.resyncBlockedUntil = time.Now().Add(ListRetryInterval) continue @@ -219,6 +234,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // Store the current watch revision. This gets updated on any new add/modified event. wc.currentWatchRevision = l.Revision + wc.errorCountAtCurrentRev = 0 // Mark the resync as complete. performFullResync = false @@ -227,9 +243,17 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // And now start watching from the revision returned by the List, or from a previous watch event // (depending on whether we were performing a full resync). w, err := wc.client.Watch(ctx, wc.resourceType.ListInterface, api.WatchOptions{ - Revision: wc.currentWatchRevision, + Revision: wc.currentWatchRevision, + AllowWatchBookmarks: true, }) if err != nil { + if errors.IsResourceExpired(err) { + // Our current watch revision is too old. Start again without a revision. + wc.logger.Info("Watch has expired, queueing full resync.") + wc.resetWatchRevisionForFullResync() + continue + } + // Failed to create the watcher - we'll need to retry. switch err.(type) { case cerrors.ErrorOperationNotSupported, cerrors.ErrorResourceDoesNotExist: @@ -237,7 +261,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // doesn't support it, or because there are no resources to watch yet (and Kubernetes won't // let us watch if there are no resources yet). Pause for the watch poll interval. // This loop effectively becomes a poll loop for this resource type. - wc.logger.Debug("Watch operation not supported") + wc.logger.Debug("Watch operation not supported; reverting to poll.") wc.resyncBlockedUntil = time.Now().Add(WatchPollInterval) // Make sure we force a re-list of the resource even if the watch previously succeeded @@ -246,9 +270,13 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { continue } - // We hit an error creating the Watch. Trigger a full resync. - wc.logger.WithError(err).WithField("performFullResync", performFullResync).Info("Failed to create watcher") - performFullResync = true + wc.logger.WithError(err).WithField("performFullResync", performFullResync).Warn( + "Failed to create watcher; will retry.") + wc.errorCountAtCurrentRev++ + if wc.errorCountAtCurrentRev > maxErrorsPerRevision { + // Hitting repeated errors, try a full resync next time. + performFullResync = true + } continue } @@ -259,6 +287,11 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { } } +func (wc *watcherCache) resetWatchRevisionForFullResync() { + wc.currentWatchRevision = "0" + wc.errorCountAtCurrentRev = 0 +} + var closedTimeC = make(chan time.Time) func init() { @@ -324,6 +357,7 @@ func (wc *watcherCache) finishResync() { func (wc *watcherCache) handleWatchListEvent(kvp *model.KVPair) { // Track the resource version from this watch/list event. wc.currentWatchRevision = kvp.Revision + wc.errorCountAtCurrentRev = 0 if wc.resourceType.UpdateProcessor == nil { // No update processor - handle immediately.