Skip to content

Commit

Permalink
First pass at watch bookmarks.
Browse files Browse the repository at this point in the history
  • Loading branch information
fasaxc committed Dec 12, 2024
1 parent 78451e9 commit b6e6e29
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 23 deletions.
4 changes: 3 additions & 1 deletion libcalico-go/lib/backend/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ type Client interface {
}

type WatchOptions struct {
Revision string
Revision string
AllowWatchBookmarks bool
}

type Syncer interface {
Expand Down Expand Up @@ -200,6 +201,7 @@ const (
WatchModified WatchEventType = "MODIFIED"
WatchDeleted WatchEventType = "DELETED"
WatchError WatchEventType = "ERROR"
WatchBookmark WatchEventType = "BOOKMARK"
)

// Event represents a single event to a watched resource.
Expand Down
7 changes: 7 additions & 0 deletions libcalico-go/lib/backend/k8s/resources/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,13 @@ func (pw *profileWatcher) processProfileEvents() {
switch e.Type {
case api.WatchModified, api.WatchAdded:
value = e.New.Value
case api.WatchBookmark:
if isNsEvent {
pw.k8sNSRev = e.New.Revision
} else {
pw.k8sSARev = e.New.Revision
}
e.New.Revision = pw.JoinProfileRevisions(pw.k8sNSRev, pw.k8sSARev)
case api.WatchDeleted:
value = e.Old.Value
}
Expand Down
5 changes: 3 additions & 2 deletions libcalico-go/lib/backend/k8s/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,8 @@ func ConvertK8sResourceToCalicoResource(res Resource) error {

func watchOptionsToK8sListOptions(wo api.WatchOptions) metav1.ListOptions {
return metav1.ListOptions{
ResourceVersion: wo.Revision,
Watch: true,
ResourceVersion: wo.Revision,
Watch: true,
AllowWatchBookmarks: wo.AllowWatchBookmarks,
}
}
12 changes: 11 additions & 1 deletion libcalico-go/lib/backend/k8s/resources/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,17 @@ func (crw *k8sWatcherConverter) convertEvent(kevent kwatch.Event) []*api.WatchEv
}

return crw.buildEventsFromKVPs(kvps, kevent.Type)

case kwatch.Bookmark:
// For bookmarks we send an empty KVPair with the current resource
// version only.
k8sRes := kevent.Object.(Resource)
revision := k8sRes.GetObjectMeta().GetResourceVersion()
return []*api.WatchEvent{{
Type: api.WatchBookmark,
New: &model.KVPair{
Revision: revision,
},
}}
default:
return []*api.WatchEvent{{
Type: api.WatchError,
Expand Down
72 changes: 53 additions & 19 deletions libcalico-go/lib/backend/watchersyncer/watchercache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,23 @@ import (
// - An api.Update
// - A api.SyncStatus (only for the very first InSync notification)
type watcherCache struct {
logger *logrus.Entry
client api.Client
watch api.WatchInterface
resources map[string]cacheEntry
oldResources map[string]cacheEntry
results chan<- interface{}
hasSynced bool
resourceType ResourceType
currentWatchRevision string
resyncBlockedUntil time.Time
logger *logrus.Entry
client api.Client
watch api.WatchInterface
resources map[string]cacheEntry
oldResources map[string]cacheEntry
results chan<- interface{}
hasSynced bool
resourceType ResourceType
currentWatchRevision string
errorCountAtCurrentRev int
resyncBlockedUntil time.Time
}

const (
maxErrorsPerRevision = 5
)

var (
MinResyncInterval = 500 * time.Millisecond
ListRetryInterval = 1000 * time.Millisecond
Expand Down Expand Up @@ -117,13 +122,22 @@ mainLoop:
}
kvp.Value = nil
wc.handleWatchListEvent(kvp)
case api.WatchBookmark:
wc.logger.WithField("newRevision", event.New.Revision).Debug("Watch bookmark received")
wc.currentWatchRevision = event.New.Revision
wc.errorCountAtCurrentRev = 0
case api.WatchError:
// Handle a WatchError. This error triggered from upstream, all type
// of WatchError are treated equally,log the Error and trigger a full resync. We only log at info
// because errors may occur due to compaction causing revisions to no longer be valid - in this case
// we simply need to do a full resync.
wc.logger.WithError(event.Error).Infof("Watch error received from Upstream")
wc.currentWatchRevision = "0"
wc.logger.WithError(event.Error).Info("Watch error event received, restarting the watch.")
wc.errorCountAtCurrentRev++
if wc.errorCountAtCurrentRev > maxErrorsPerRevision {
// Too many errors at the current revision, trigger a full resync.
wc.logger.Warn("Too many errors at current revision, triggering full resync")
wc.resetWatchRevisionForFullResync()
}
wc.resyncAndCreateWatcher(ctx)
default:
// Unknown event type - not much we can do other than log.
Expand Down Expand Up @@ -159,7 +173,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
wc.cleanExistingWatcher()

// If we don't have a currentWatchRevision then we need to perform a full resync.
performFullResync := wc.currentWatchRevision == "0"
var performFullResync bool
for {
select {
case <-ctx.Done():
Expand All @@ -177,6 +191,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
// watch immediately ends.
wc.resyncBlockedUntil = time.Now().Add(MinResyncInterval)

performFullResync = performFullResync || wc.currentWatchRevision == "0"
if performFullResync {
wc.logger.Info("Full resync is required")

Expand All @@ -195,7 +210,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
if errors.IsResourceExpired(err) {
// Our current watch revision is too old. Start again without a revision.
wc.logger.Info("Clearing cached watch revision for next List call")
wc.currentWatchRevision = "0"
wc.resetWatchRevisionForFullResync()
}
wc.resyncBlockedUntil = time.Now().Add(ListRetryInterval)
continue
Expand All @@ -219,6 +234,7 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {

// Store the current watch revision. This gets updated on any new add/modified event.
wc.currentWatchRevision = l.Revision
wc.errorCountAtCurrentRev = 0

// Mark the resync as complete.
performFullResync = false
Expand All @@ -227,17 +243,25 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
// And now start watching from the revision returned by the List, or from a previous watch event
// (depending on whether we were performing a full resync).
w, err := wc.client.Watch(ctx, wc.resourceType.ListInterface, api.WatchOptions{
Revision: wc.currentWatchRevision,
Revision: wc.currentWatchRevision,
AllowWatchBookmarks: true,
})
if err != nil {
if errors.IsResourceExpired(err) {
// Our current watch revision is too old. Start again without a revision.
wc.logger.Info("Watch has expired, queueing full resync.")
wc.resetWatchRevisionForFullResync()
continue
}

// Failed to create the watcher - we'll need to retry.
switch err.(type) {
case cerrors.ErrorOperationNotSupported, cerrors.ErrorResourceDoesNotExist:
// Watch is not supported on this resource type, either because the type fundamentally
// doesn't support it, or because there are no resources to watch yet (and Kubernetes won't
// let us watch if there are no resources yet). Pause for the watch poll interval.
// This loop effectively becomes a poll loop for this resource type.
wc.logger.Debug("Watch operation not supported")
wc.logger.Debug("Watch operation not supported; reverting to poll.")
wc.resyncBlockedUntil = time.Now().Add(WatchPollInterval)

// Make sure we force a re-list of the resource even if the watch previously succeeded
Expand All @@ -246,9 +270,13 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
continue
}

// We hit an error creating the Watch. Trigger a full resync.
wc.logger.WithError(err).WithField("performFullResync", performFullResync).Info("Failed to create watcher")
performFullResync = true
wc.logger.WithError(err).WithField("performFullResync", performFullResync).Warn(
"Failed to create watcher; will retry.")
wc.errorCountAtCurrentRev++
if wc.errorCountAtCurrentRev > maxErrorsPerRevision {
// Hitting repeated errors, try a full resync next time.
performFullResync = true
}
continue
}

Expand All @@ -259,6 +287,11 @@ func (wc *watcherCache) resyncAndCreateWatcher(ctx context.Context) {
}
}

func (wc *watcherCache) resetWatchRevisionForFullResync() {
wc.currentWatchRevision = "0"
wc.errorCountAtCurrentRev = 0
}

var closedTimeC = make(chan time.Time)

func init() {
Expand Down Expand Up @@ -324,6 +357,7 @@ func (wc *watcherCache) finishResync() {
func (wc *watcherCache) handleWatchListEvent(kvp *model.KVPair) {
// Track the resource version from this watch/list event.
wc.currentWatchRevision = kvp.Revision
wc.errorCountAtCurrentRev = 0

if wc.resourceType.UpdateProcessor == nil {
// No update processor - handle immediately.
Expand Down

0 comments on commit b6e6e29

Please sign in to comment.