Skip to content

Commit

Permalink
clientv3: process closed watcherStreams in watcherGrpcStream run loop
Browse files Browse the repository at this point in the history
Was racing with Watch() when closing the grpc stream on no watchers.

Fixes #6476
  • Loading branch information
Anthony Romano committed Sep 20, 2016
1 parent 6295666 commit 0e8159e
Showing 1 changed file with 27 additions and 19 deletions.
46 changes: 27 additions & 19 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ type watchGrpcStream struct {
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconn logic
errc chan error
// servec gets the watcherStream of a closed watcher
servec chan *watcherStream

// the error that closed the watch stream
closeErr error
Expand Down Expand Up @@ -203,11 +205,12 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
cancel: cancel,
streams: make(map[int64]*watcherStream),

respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
servec: make(chan *watcherStream),
}
go wgs.run()
return wgs
Expand Down Expand Up @@ -268,7 +271,6 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
wgs.stopIfEmpty()
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
Expand Down Expand Up @@ -378,15 +380,19 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq
go w.serveStream(ws)
}

// closeStream closes the watcher resources and removes it
func (w *watchGrpcStream) closeStream(ws *watcherStream) {
func (w *watchGrpcStream) closeStream(ws *watcherStream) bool {
w.mu.Lock()
// cancels request stream; subscriber receives nil channel
close(ws.initReq.retc)
// close subscriber's channel
close(ws.outc)
delete(w.streams, ws.id)
empty := len(w.streams) == 0
if empty && w.stopc != nil {
w.stopc = nil
}
w.mu.Unlock()
return empty
}

// run is the root of the goroutines for managing a watcher client
Expand Down Expand Up @@ -491,6 +497,11 @@ func (w *watchGrpcStream) run() {
cancelSet = make(map[int64]struct{})
case <-stopc:
return

case ws := <-w.servec:
if w.closeStream(ws) {
return
}
}

// send failed; queue for retry
Expand Down Expand Up @@ -553,6 +564,14 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {

// serveStream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveStream(ws *watcherStream) {
defer func() {
// signal that this watcherStream is finished
select {
case w.servec <- ws:
case <-w.donec:
}
}()

var closeErr error
emptyWr := &WatchResponse{}
wrs := []*WatchResponse{}
Expand Down Expand Up @@ -641,20 +660,9 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
}
}

w.closeStream(ws)
w.stopIfEmpty()
// lazily send cancel message if events on missing id
}

func (wgs *watchGrpcStream) stopIfEmpty() {
wgs.mu.Lock()
if len(wgs.streams) == 0 && wgs.stopc != nil {
close(wgs.stopc)
wgs.stopc = nil
}
wgs.mu.Unlock()
}

func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
ws, rerr := w.resume()
if rerr != nil {
Expand Down

0 comments on commit 0e8159e

Please sign in to comment.