From 67e464cfdc506d1057622044fcbba9300c5a80b5 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Mon, 11 May 2020 14:40:26 +0100 Subject: [PATCH] mvcc: avoid negative watcher count metrics The watch count metrics are not robust to duplicate cancellations. These cause the count to be decremented twice, leading eventually to negative counts. We are seeing this in production. The duplicate cancellations themselves are not themselves a big problem (except performance), but they are caused by the new proactive cancellation logic (#11850). As it turns out, w.closingc seems to receive two messages for a cancellation. I have added a fix which ensures that we won't send duplicate cancel requests. --- clientv3/watch.go | 6 +++++- mvcc/watchable_store.go | 5 ++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index 36bf1a71820..6a50b8631ed 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -650,7 +650,11 @@ func (w *watchGrpcStream) run() { return case ws := <-w.closingc: - if ws.id != -1 { + if ws.id == -1 { + // this stream hasn't actually started, don't attempt to cancel + } else if _, ok := w.substreams[ws.id]; !ok { + // this is a duplicate cancellation, the substream no longer exists + } else { // client is closing an established watch; close it on the server proactively instead of waiting // to close when the next message arrives cancelSet[ws.id] = struct{}{} diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 15e2c55f5c2..b74fa375f95 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -153,10 +153,13 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { s.mu.Lock() if s.unsynced.delete(wa) { slowWatcherGauge.Dec() + watcherGauge.Dec() break } else if s.synced.delete(wa) { + watcherGauge.Dec() break } else if wa.compacted { + watcherGauge.Dec() break } else if wa.ch == nil { // already canceled (e.g., cancel/close race) @@ -177,6 +180,7 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { } if victimBatch != nil { slowWatcherGauge.Dec() + watcherGauge.Dec() delete(victimBatch, wa) break } @@ -186,7 +190,6 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { time.Sleep(time.Millisecond) } - watcherGauge.Dec() wa.ch = nil s.mu.Unlock() }