diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 4c9f14757c5..e36f9a9a801 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -514,6 +514,9 @@ func TestWatchCompactRevision(t *testing.T) { if wresp.Err() != rpctypes.ErrCompacted { t.Fatalf("wresp.Err() expected %v, but got %v", rpctypes.ErrCompacted, wresp.Err()) } + if !wresp.Canceled { + t.Fatalf("wresp.Canceled expected true, got %+v", wresp) + } // ensure the channel is closed if wresp, ok = <-wch; ok { diff --git a/clientv3/watch.go b/clientv3/watch.go index ee43b2afeba..c4ec08fc892 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -461,7 +461,7 @@ func (w *watchGrpcStream) run() { if ws := w.nextResume(); ws != nil { wc.Send(ws.initReq.toPB()) } - case pbresp.Canceled: + case pbresp.Canceled && pbresp.CompactRevision == 0: delete(cancelSet, pbresp.WatchId) if ws, ok := w.substreams[pbresp.WatchId]; ok { // signal to stream goroutine to update closingc diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 84c0a5eac8c..a456307fea5 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -321,11 +321,13 @@ func (sws *serverWatchStream) sendLoop() { } } + canceled := wresp.CompactRevision != 0 wr := &pb.WatchResponse{ Header: sws.newResponseHeader(wresp.Revision), WatchId: int64(wresp.WatchID), Events: events, CompactRevision: wresp.CompactRevision, + Canceled: canceled, } if _, hasId := ids[wresp.WatchID]; !hasId { diff --git a/proxy/grpcproxy/watcher.go b/proxy/grpcproxy/watcher.go index 7387caf4dbd..1a497462f99 100644 --- a/proxy/grpcproxy/watcher.go +++ b/proxy/grpcproxy/watcher.go @@ -111,6 +111,7 @@ func (w *watcher) send(wr clientv3.WatchResponse) { Header: &wr.Header, Created: wr.Created, CompactRevision: wr.CompactRevision, + Canceled: wr.Canceled, WatchId: w.id, Events: events, })