diff --git a/clientv3/watch.go b/clientv3/watch.go index 36bf1a71820..6a50b8631ed 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -650,7 +650,11 @@ func (w *watchGrpcStream) run() { return case ws := <-w.closingc: - if ws.id != -1 { + if ws.id == -1 { + // this stream hasn't actually started, don't attempt to cancel + } else if _, ok := w.substreams[ws.id]; !ok { + // this is a duplicate cancellation, the substream no longer exists + } else { // client is closing an established watch; close it on the server proactively instead of waiting // to close when the next message arrives cancelSet[ws.id] = struct{}{} diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 15e2c55f5c2..b74fa375f95 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -153,10 +153,13 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { s.mu.Lock() if s.unsynced.delete(wa) { slowWatcherGauge.Dec() + watcherGauge.Dec() break } else if s.synced.delete(wa) { + watcherGauge.Dec() break } else if wa.compacted { + watcherGauge.Dec() break } else if wa.ch == nil { // already canceled (e.g., cancel/close race) @@ -177,6 +180,7 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { } if victimBatch != nil { slowWatcherGauge.Dec() + watcherGauge.Dec() delete(victimBatch, wa) break } @@ -186,7 +190,6 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { time.Sleep(time.Millisecond) } - watcherGauge.Dec() wa.ch = nil s.mu.Unlock() }