Skip to content

Commit

Permalink
[etcd] Clean up logging in WatchManager (#2973)
Browse files Browse the repository at this point in the history
  • Loading branch information
vdarulis authored Dec 3, 2020
1 parent 9f0944a commit c595495
Showing 1 changed file with 14 additions and 12 deletions.
26 changes: 14 additions & 12 deletions src/cluster/etcd/watchmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,26 +96,30 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha
}

func (w *manager) Watch(key string) {
ticker := time.NewTicker(w.opts.WatchChanCheckInterval())
defer ticker.Stop()

var (
ticker = time.NewTicker(w.opts.WatchChanCheckInterval())
logger = w.logger.With(zap.String("watch_key", key))

revOverride int64
watchChan clientv3.WatchChan
cancelFn context.CancelFunc
err error
)

defer ticker.Stop()

for {
if watchChan == nil {
w.m.etcdWatchCreate.Inc(1)
logger.Info("creating etcd watch at revision", zap.Int64("revision", revOverride))
watchChan, cancelFn, err = w.watchChanWithTimeout(key, revOverride)
if err != nil {
w.logger.Error("could not create etcd watch", zap.Error(err))
logger.Error("could not create etcd watch", zap.Error(err))

// NB(cw) when we failed to create a etcd watch channel
// we do a get for now and will try to recreate the watch chan later
if err = w.updateFn(key, nil); err != nil {
w.logger.Error("failed to get value for key", zap.String("key", key), zap.Error(err))
logger.Error("failed to get value for key", zap.Error(err))
}
// avoid recreating watch channel too frequently
time.Sleep(w.opts.WatchChanResetInterval())
Expand All @@ -130,8 +134,7 @@ func (w *manager) Watch(key string) {
// this is unlikely to happen but just to be defensive
cancelFn()
watchChan = nil
w.logger.Warn("etcd watch channel closed on key, recreating a watch channel",
zap.String("key", key))
logger.Warn("etcd watch channel closed on key, recreating a watch channel")

// avoid recreating watch channel too frequently
time.Sleep(w.opts.WatchChanResetInterval())
Expand All @@ -142,15 +145,15 @@ func (w *manager) Watch(key string) {

// handle the update
if err = r.Err(); err != nil {
w.logger.Error("received error on watch channel", zap.Error(err))
logger.Error("received error on watch channel", zap.Error(err))
w.m.etcdWatchError.Inc(1)
// do not stop here, even though the update contains an error
// we still take this chance to attempt a Get() for the latest value

// If the current revision has been compacted, set watchChan to
// nil so the watch is recreated with a valid start revision
if err == rpctypes.ErrCompacted {
w.logger.Warn("recreating watch at revision", zap.Int64("revision", r.CompactRevision))
logger.Warn("recreating watch at revision", zap.Int64("revision", r.CompactRevision))
revOverride = r.CompactRevision
watchChan = nil
}
Expand All @@ -161,12 +164,11 @@ func (w *manager) Watch(key string) {
continue
}
if err = w.updateFn(key, r.Events); err != nil {
w.logger.Error("received notification for key, but failed to get value",
zap.String("key", key), zap.Error(err))
logger.Error("received notification for key, but failed to get value", zap.Error(err))
}
case <-ticker.C:
if w.tickAndStopFn(key) {
w.logger.Info("watch on key ended", zap.String("key", key))
logger.Info("watch on key ended")
return
}
}
Expand Down

0 comments on commit c595495

Please sign in to comment.