diff --git a/src/cluster/etcd/watchmanager/manager.go b/src/cluster/etcd/watchmanager/manager.go index 0ebba605e4..dc4a4ac6e8 100644 --- a/src/cluster/etcd/watchmanager/manager.go +++ b/src/cluster/etcd/watchmanager/manager.go @@ -96,26 +96,30 @@ func (w *manager) watchChanWithTimeout(key string, rev int64) (clientv3.WatchCha } func (w *manager) Watch(key string) { - ticker := time.NewTicker(w.opts.WatchChanCheckInterval()) - defer ticker.Stop() - var ( + ticker = time.NewTicker(w.opts.WatchChanCheckInterval()) + logger = w.logger.With(zap.String("watch_key", key)) + revOverride int64 watchChan clientv3.WatchChan cancelFn context.CancelFunc err error ) + + defer ticker.Stop() + for { if watchChan == nil { w.m.etcdWatchCreate.Inc(1) + logger.Info("creating etcd watch at revision", zap.Int64("revision", revOverride)) watchChan, cancelFn, err = w.watchChanWithTimeout(key, revOverride) if err != nil { - w.logger.Error("could not create etcd watch", zap.Error(err)) + logger.Error("could not create etcd watch", zap.Error(err)) // NB(cw) when we failed to create a etcd watch channel // we do a get for now and will try to recreate the watch chan later if err = w.updateFn(key, nil); err != nil { - w.logger.Error("failed to get value for key", zap.String("key", key), zap.Error(err)) + logger.Error("failed to get value for key", zap.Error(err)) } // avoid recreating watch channel too frequently time.Sleep(w.opts.WatchChanResetInterval()) @@ -130,8 +134,7 @@ func (w *manager) Watch(key string) { // this is unlikely to happen but just to be defensive cancelFn() watchChan = nil - w.logger.Warn("etcd watch channel closed on key, recreating a watch channel", - zap.String("key", key)) + logger.Warn("etcd watch channel closed on key, recreating a watch channel") // avoid recreating watch channel too frequently time.Sleep(w.opts.WatchChanResetInterval()) @@ -142,7 +145,7 @@ func (w *manager) Watch(key string) { // handle the update if err = r.Err(); err != nil { - w.logger.Error("received error on watch channel", zap.Error(err)) + logger.Error("received error on watch channel", zap.Error(err)) w.m.etcdWatchError.Inc(1) // do not stop here, even though the update contains an error // we still take this chance to attempt a Get() for the latest value @@ -150,7 +153,7 @@ func (w *manager) Watch(key string) { // If the current revision has been compacted, set watchChan to // nil so the watch is recreated with a valid start revision if err == rpctypes.ErrCompacted { - w.logger.Warn("recreating watch at revision", zap.Int64("revision", r.CompactRevision)) + logger.Warn("recreating watch at revision", zap.Int64("revision", r.CompactRevision)) revOverride = r.CompactRevision watchChan = nil } @@ -161,12 +164,11 @@ func (w *manager) Watch(key string) { continue } if err = w.updateFn(key, r.Events); err != nil { - w.logger.Error("received notification for key, but failed to get value", - zap.String("key", key), zap.Error(err)) + logger.Error("received notification for key, but failed to get value", zap.Error(err)) } case <-ticker.C: if w.tickAndStopFn(key) { - w.logger.Info("watch on key ended", zap.String("key", key)) + logger.Info("watch on key ended") return } }