From ebcfdd1a3d1f368abc46d94aed208ed8d93ccf4f Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 19 Jul 2017 12:53:39 -0700 Subject: [PATCH 1/4] integration: check Canceled is true in compacted watch response --- clientv3/integration/watch_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 4c9f14757c5..e36f9a9a801 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -514,6 +514,9 @@ func TestWatchCompactRevision(t *testing.T) { if wresp.Err() != rpctypes.ErrCompacted { t.Fatalf("wresp.Err() expected %v, but got %v", rpctypes.ErrCompacted, wresp.Err()) } + if !wresp.Canceled { + t.Fatalf("wresp.Canceled expected true, got %+v", wresp) + } // ensure the channel is closed if wresp, ok = <-wch; ok { From 6fb08672d82565a9b471c67cb1f95c2c433b07de Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 19 Jul 2017 12:50:17 -0700 Subject: [PATCH 2/4] v3rpc: set canceled=true when stream is compacted Fixes #8231 --- etcdserver/api/v3rpc/watch.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 84c0a5eac8c..a456307fea5 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -321,11 +321,13 @@ func (sws *serverWatchStream) sendLoop() { } } + canceled := wresp.CompactRevision != 0 wr := &pb.WatchResponse{ Header: sws.newResponseHeader(wresp.Revision), WatchId: int64(wresp.WatchID), Events: events, CompactRevision: wresp.CompactRevision, + Canceled: canceled, } if _, hasId := ids[wresp.WatchID]; !hasId { From 318caeee7e00fd795983961fe3d4fc41d81e3c04 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Wed, 19 Jul 2017 13:04:19 -0700 Subject: [PATCH 3/4] clientv3: return CompactRevision wresp when set with Canceled --- clientv3/watch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index ee43b2afeba..c4ec08fc892 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -461,7 +461,7 @@ func (w *watchGrpcStream) run() { if ws := w.nextResume(); ws != nil { wc.Send(ws.initReq.toPB()) } - case pbresp.Canceled: + case pbresp.Canceled && pbresp.CompactRevision == 0: delete(cancelSet, pbresp.WatchId) if ws, ok := w.substreams[pbresp.WatchId]; ok { // signal to stream goroutine to update closingc From 9581f7676c9d32dd3ed78398abc9966aba4cbfa8 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 25 Jul 2017 12:27:53 -0700 Subject: [PATCH 4/4] grpcproxy: forward Canceled field when broadcasting watch responses --- proxy/grpcproxy/watcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/proxy/grpcproxy/watcher.go b/proxy/grpcproxy/watcher.go index 7387caf4dbd..1a497462f99 100644 --- a/proxy/grpcproxy/watcher.go +++ b/proxy/grpcproxy/watcher.go @@ -111,6 +111,7 @@ func (w *watcher) send(wr clientv3.WatchResponse) { Header: &wr.Header, Created: wr.Created, CompactRevision: wr.CompactRevision, + Canceled: wr.Canceled, WatchId: w.id, Events: events, })