diff --git a/libcalico-go/lib/backend/api/api.go b/libcalico-go/lib/backend/api/api.go index d6b45441bb9..fa3fb4bbbfb 100644 --- a/libcalico-go/lib/backend/api/api.go +++ b/libcalico-go/lib/backend/api/api.go @@ -126,7 +126,8 @@ type Client interface { } type WatchOptions struct { - Revision string + Revision string + AllowWatchBookmarks bool } type Syncer interface { @@ -200,6 +201,7 @@ const ( WatchModified WatchEventType = "MODIFIED" WatchDeleted WatchEventType = "DELETED" WatchError WatchEventType = "ERROR" + WatchBookmark WatchEventType = "BOOKMARK" ) // Event represents a single event to a watched resource. diff --git a/libcalico-go/lib/backend/k8s/resources/profile.go b/libcalico-go/lib/backend/k8s/resources/profile.go index 2df16916812..0d197f1ae9f 100644 --- a/libcalico-go/lib/backend/k8s/resources/profile.go +++ b/libcalico-go/lib/backend/k8s/resources/profile.go @@ -424,6 +424,13 @@ func (pw *profileWatcher) processProfileEvents() { switch e.Type { case api.WatchModified, api.WatchAdded: value = e.New.Value + case api.WatchBookmark: + if isNsEvent { + pw.k8sNSRev = e.New.Revision + } else { + pw.k8sSARev = e.New.Revision + } + e.New.Revision = pw.JoinProfileRevisions(pw.k8sNSRev, pw.k8sSARev) case api.WatchDeleted: value = e.Old.Value } @@ -444,7 +451,7 @@ func (pw *profileWatcher) processProfileEvents() { } oma.GetObjectMeta().SetResourceVersion(pw.JoinProfileRevisions(pw.k8sNSRev, pw.k8sSARev)) } - } else if e.Error == nil { + } else if e.Error == nil && e.Type != api.WatchBookmark { log.WithField("event", e).Warning("Event without error or value") } diff --git a/libcalico-go/lib/backend/k8s/resources/resources.go b/libcalico-go/lib/backend/k8s/resources/resources.go index 3231bad203f..5d569f1afac 100644 --- a/libcalico-go/lib/backend/k8s/resources/resources.go +++ b/libcalico-go/lib/backend/k8s/resources/resources.go @@ -325,7 +325,8 @@ func ConvertK8sResourceToCalicoResource(res Resource) error { func watchOptionsToK8sListOptions(wo api.WatchOptions) metav1.ListOptions { return metav1.ListOptions{ - ResourceVersion: wo.Revision, - Watch: true, + ResourceVersion: wo.Revision, + Watch: true, + AllowWatchBookmarks: wo.AllowWatchBookmarks, } } diff --git a/libcalico-go/lib/backend/k8s/resources/watcher.go b/libcalico-go/lib/backend/k8s/resources/watcher.go index 63fb38086be..33219627bab 100644 --- a/libcalico-go/lib/backend/k8s/resources/watcher.go +++ b/libcalico-go/lib/backend/k8s/resources/watcher.go @@ -166,7 +166,17 @@ func (crw *k8sWatcherConverter) convertEvent(kevent kwatch.Event) []*api.WatchEv } return crw.buildEventsFromKVPs(kvps, kevent.Type) - + case kwatch.Bookmark: + // For bookmarks we send an empty KVPair with the current resource + // version only. + k8sRes := kevent.Object.(Resource) + revision := k8sRes.GetObjectMeta().GetResourceVersion() + return []*api.WatchEvent{{ + Type: api.WatchBookmark, + New: &model.KVPair{ + Revision: revision, + }, + }} default: return []*api.WatchEvent{{ Type: api.WatchError, diff --git a/libcalico-go/lib/backend/k8s/resources/watcher_test.go b/libcalico-go/lib/backend/k8s/resources/watcher_test.go index 2552d91f142..ed148e3564c 100644 --- a/libcalico-go/lib/backend/k8s/resources/watcher_test.go +++ b/libcalico-go/lib/backend/k8s/resources/watcher_test.go @@ -48,7 +48,7 @@ var _ = Describe("Resources watcher ", func() { It("should return error WatchEvent with unexpected kwatch event type", func() { events := kwc.convertEvent(kwatch.Event{ - Type: kwatch.Bookmark, + Type: "GARBAGE", }) Expect(events).To(HaveLen(1)) Expect(events[0].Type).To(Equal(api.WatchError)) diff --git a/libcalico-go/lib/backend/watchersyncer/reflector_ports.go b/libcalico-go/lib/backend/watchersyncer/reflector_ports.go new file mode 100644 index 00000000000..85f690920e7 --- /dev/null +++ b/libcalico-go/lib/backend/watchersyncer/reflector_ports.go @@ -0,0 +1,54 @@ +// Extracted from on Kubernetes reflector.go, Copyright 2014 The Kubernetes Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package watchersyncer + +import ( + "strings" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Copied from client-go's reflector.go +func isTooLargeResourceVersionError(err error) bool { + if apierrors.HasStatusCause(err, metav1.CauseTypeResourceVersionTooLarge) { + return true + } + // In Kubernetes 1.17.0-1.18.5, the api server doesn't set the error status cause to + // metav1.CauseTypeResourceVersionTooLarge to indicate that the requested minimum resource + // version is larger than the largest currently available resource version. To ensure backward + // compatibility with these server versions we also need to detect the error based on the content + // of the error message field. + if !apierrors.IsTimeout(err) { + return false + } + apierr, ok := err.(apierrors.APIStatus) + if !ok || apierr == nil || apierr.Status().Details == nil { + return false + } + for _, cause := range apierr.Status().Details.Causes { + // Matches the message returned by api server 1.17.0-1.18.5 for this error condition + if cause.Message == "Too large resource version" { + return true + } + } + + // Matches the message returned by api server before 1.17.0 + if strings.Contains(apierr.Status().Message, "Too large resource version") { + return true + } + + return false +} diff --git a/libcalico-go/lib/backend/watchersyncer/watchercache.go b/libcalico-go/lib/backend/watchersyncer/watchercache.go index f24bf275156..61ef9c8dbce 100644 --- a/libcalico-go/lib/backend/watchersyncer/watchercache.go +++ b/libcalico-go/lib/backend/watchersyncer/watchercache.go @@ -16,10 +16,12 @@ package watchersyncer import ( "context" + "errors" "time" "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/api/errors" + kerrors "k8s.io/apimachinery/pkg/api/errors" + utilnet "k8s.io/apimachinery/pkg/util/net" "github.com/projectcalico/calico/libcalico-go/lib/backend/api" "github.com/projectcalico/calico/libcalico-go/lib/backend/model" @@ -36,18 +38,23 @@ import ( // - An api.Update // - A api.SyncStatus (only for the very first InSync notification) type watcherCache struct { - logger *logrus.Entry - client api.Client - watch api.WatchInterface - resources map[string]cacheEntry - oldResources map[string]cacheEntry - results chan<- interface{} - hasSynced bool - resourceType ResourceType - currentWatchRevision string - resyncBlockedUntil time.Time + logger *logrus.Entry + client api.Client + watch api.WatchInterface + resources map[string]cacheEntry + oldResources map[string]cacheEntry + results chan<- interface{} + hasSynced bool + resourceType ResourceType + currentWatchRevision string + errorCountAtCurrentRev int + resyncBlockedUntil time.Time } +const ( + maxErrorsPerRevision = 5 +) + var ( MinResyncInterval = 500 * time.Millisecond ListRetryInterval = 1000 * time.Millisecond @@ -117,13 +124,29 @@ mainLoop: } kvp.Value = nil wc.handleWatchListEvent(kvp) + case api.WatchBookmark: + wc.logger.WithField("newRevision", event.New.Revision).Debug("Watch bookmark received") + wc.currentWatchRevision = event.New.Revision + wc.errorCountAtCurrentRev = 0 case api.WatchError: // Handle a WatchError. This error triggered from upstream, all type // of WatchError are treated equally,log the Error and trigger a full resync. We only log at info // because errors may occur due to compaction causing revisions to no longer be valid - in this case // we simply need to do a full resync. - wc.logger.WithError(event.Error).Infof("Watch error received from Upstream") - wc.currentWatchRevision = "0" + if kerrors.IsResourceExpired(event.Error) { + // Our current watch revision is too old. We hit this path after the API server restarts + // (and presumably does an immediate compaction). + wc.logger.WithError(event.Error).Info("Watch has expired, triggering full resync.") + wc.resetWatchRevisionForFullResync() + } else { + wc.logger.WithError(event.Error).Warn("Unknown watch error event received, restarting the watch.") + wc.errorCountAtCurrentRev++ + if wc.errorCountAtCurrentRev > maxErrorsPerRevision { + // Too many errors at the current revision, trigger a full resync. + wc.logger.Warn("Too many errors at current revision, triggering full resync") + wc.resetWatchRevisionForFullResync() + } + } wc.resyncAndCreateWatcher(ctx) default: // Unknown event type - not much we can do other than log. @@ -159,7 +182,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { wc.cleanExistingWatcher() // If we don't have a currentWatchRevision then we need to perform a full resync. - performFullResync := wc.currentWatchRevision == "0" + var performFullResync bool for { select { case <-ctx.Done(): @@ -177,6 +200,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // watch immediately ends. wc.resyncBlockedUntil = time.Now().Add(MinResyncInterval) + performFullResync = performFullResync || wc.currentWatchRevision == "0" if performFullResync { wc.logger.Info("Full resync is required") @@ -192,10 +216,10 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { if err != nil { // Failed to perform the list. Pause briefly (so we don't tight loop) and retry. wc.logger.WithError(err).Info("Failed to perform list of current data during resync") - if errors.IsResourceExpired(err) { + if kerrors.IsResourceExpired(err) { // Our current watch revision is too old. Start again without a revision. wc.logger.Info("Clearing cached watch revision for next List call") - wc.currentWatchRevision = "0" + wc.resetWatchRevisionForFullResync() } wc.resyncBlockedUntil = time.Now().Add(ListRetryInterval) continue @@ -219,6 +243,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // Store the current watch revision. This gets updated on any new add/modified event. wc.currentWatchRevision = l.Revision + wc.errorCountAtCurrentRev = 0 // Mark the resync as complete. performFullResync = false @@ -227,17 +252,34 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { // And now start watching from the revision returned by the List, or from a previous watch event // (depending on whether we were performing a full resync). w, err := wc.client.Watch(ctx, wc.resourceType.ListInterface, api.WatchOptions{ - Revision: wc.currentWatchRevision, + Revision: wc.currentWatchRevision, + AllowWatchBookmarks: true, }) if err != nil { - // Failed to create the watcher - we'll need to retry. - switch err.(type) { - case cerrors.ErrorOperationNotSupported, cerrors.ErrorResourceDoesNotExist: + if kerrors.IsResourceExpired(err) || kerrors.IsGone(err) || isTooLargeResourceVersionError(err) { + // Our current watch revision is too old (or too new!). Start again + // without a revision. Condition cribbed from client-go's reflector. + wc.logger.Info("Watch has expired, queueing full resync.") + wc.resetWatchRevisionForFullResync() + continue + } + + if utilnet.IsConnectionRefused(err) || kerrors.IsTooManyRequests(err) { + // Connection-related error, we can just retry without resetting + // the watch. Condition cribbed from client-go's reflector. + wc.logger.WithError(err).Warn("API server refused connection, will retry.") + continue + } + + var errNotSupp cerrors.ErrorOperationNotSupported + var errNotExist cerrors.ErrorResourceDoesNotExist + if errors.As(err, &errNotSupp) || + errors.As(err, &errNotExist) { // Watch is not supported on this resource type, either because the type fundamentally // doesn't support it, or because there are no resources to watch yet (and Kubernetes won't // let us watch if there are no resources yet). Pause for the watch poll interval. // This loop effectively becomes a poll loop for this resource type. - wc.logger.Debug("Watch operation not supported") + wc.logger.Debug("Watch operation not supported; reverting to poll.") wc.resyncBlockedUntil = time.Now().Add(WatchPollInterval) // Make sure we force a re-list of the resource even if the watch previously succeeded @@ -246,9 +288,13 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { continue } - // We hit an error creating the Watch. Trigger a full resync. - wc.logger.WithError(err).WithField("performFullResync", performFullResync).Info("Failed to create watcher") - performFullResync = true + wc.logger.WithError(err).WithField("performFullResync", performFullResync).Warn( + "Failed to create watcher; will retry.") + wc.errorCountAtCurrentRev++ + if wc.errorCountAtCurrentRev > maxErrorsPerRevision { + // Hitting repeated errors, try a full resync next time. + performFullResync = true + } continue } @@ -259,6 +305,11 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) { } } +func (wc *watcherCache) resetWatchRevisionForFullResync() { + wc.currentWatchRevision = "0" + wc.errorCountAtCurrentRev = 0 +} + var closedTimeC = make(chan time.Time) func init() { @@ -324,6 +375,7 @@ func (wc *watcherCache) finishResync() { func (wc *watcherCache) handleWatchListEvent(kvp *model.KVPair) { // Track the resource version from this watch/list event. wc.currentWatchRevision = kvp.Revision + wc.errorCountAtCurrentRev = 0 if wc.resourceType.UpdateProcessor == nil { // No update processor - handle immediately. diff --git a/libcalico-go/lib/errors/errors.go b/libcalico-go/lib/errors/errors.go index 39072438485..e51669ec327 100644 --- a/libcalico-go/lib/errors/errors.go +++ b/libcalico-go/lib/errors/errors.go @@ -29,6 +29,10 @@ type ErrorDatastoreError struct { Identifier interface{} } +func (e ErrorDatastoreError) Unwrap() error { + return e.Err +} + func (e ErrorDatastoreError) Error() string { return e.Err.Error() } @@ -61,6 +65,10 @@ func (e ErrorResourceDoesNotExist) Error() string { return fmt.Sprintf("resource does not exist: %v with error: %v", e.Identifier, e.Err) } +func (e ErrorResourceDoesNotExist) Unwrap() error { + return e.Err +} + // Error indicating an operation is not supported. type ErrorOperationNotSupported struct { Operation string @@ -83,6 +91,10 @@ type ErrorResourceAlreadyExists struct { Identifier interface{} } +func (e ErrorResourceAlreadyExists) Unwrap() error { + return e.Err +} + func (e ErrorResourceAlreadyExists) Error() string { return fmt.Sprintf("resource already exists: %v", e.Identifier) } @@ -92,6 +104,10 @@ type ErrorConnectionUnauthorized struct { Err error } +func (e ErrorConnectionUnauthorized) Unwrap() error { + return e.Err +} + func (e ErrorConnectionUnauthorized) Error() string { return fmt.Sprintf("connection is unauthorized: %v", e.Err) } @@ -151,6 +167,10 @@ type ErrorResourceUpdateConflict struct { Identifier interface{} } +func (e ErrorResourceUpdateConflict) Unwrap() error { + return e.Err +} + func (e ErrorResourceUpdateConflict) Error() string { return fmt.Sprintf("update conflict: %v", e.Identifier) }