Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3: process closed watcherStreams in watcherGrpcStream run loop #6487

Merged
merged 2 commits into from
Sep 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,3 +759,63 @@ func TestWatchCancelOnServer(t *testing.T) {
t.Fatalf("expected 0 watchers, got %q", watchers)
}
}

// TestWatchOverlapContextCancel stresses the watcher stream teardown path by
// creating/canceling watchers to ensure that new watchers are not taken down
// by a torn down watch stream. The sort of race that's being detected:
// 1. create w1 using a cancelable ctx with %v as "ctx"
// 2. cancel ctx
// 3. watcher client begins tearing down watcher grpc stream since no more watchers
// 3. start creating watcher w2 using a new "ctx" (not canceled), attaches to old grpc stream
// 4. watcher client finishes tearing down stream on "ctx"
// 5. w2 comes back canceled
func TestWatchOverlapContextCancel(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

cli := clus.RandClient()
if _, err := cli.Put(context.TODO(), "abc", "def"); err != nil {
t.Fatal(err)
}

// each unique context "%v" has a unique grpc stream
n := 100
ctxs, ctxc := make([]context.Context, 5), make([]chan struct{}, 5)
for i := range ctxs {
// make "%v" unique
ctxs[i] = context.WithValue(context.TODO(), "key", i)
// limits the maximum number of outstanding watchers per stream
ctxc[i] = make(chan struct{}, 2)
}
ch := make(chan struct{}, n)
// issue concurrent watches with cancel
for i := 0; i < n; i++ {
go func() {
defer func() { ch <- struct{}{} }()
idx := rand.Intn(len(ctxs))
ctx, cancel := context.WithCancel(ctxs[idx])
ctxc[idx] <- struct{}{}
ch := cli.Watch(ctx, "abc", clientv3.WithRev(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make key "abc" a const in this test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when did we start writing tests like that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the bad thing about this "abc" key is that the put req is not close to the watch req. reader might lose context. changing it to putkey or something is easier for the reader.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change "ch" to "wch". Differentiate from existing var.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

if _, ok := <-ch; !ok {
t.Fatalf("unexpected closed channel")
}
// randomize how cancel overlaps with watch creation
Copy link
Contributor

@hongchaodeng hongchaodeng Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this could happen without overlapping cancel calls?
If so, can we get rid of the "ctxc" thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could happen, but this synchronization is important to stress the teardown path. Otherwise, watchers will pile up on the streams instead of oscillating between 0, 1, and 2 watchers. I'll add a comment.

if rand.Intn(2) == 0 {
<-ctxc[idx]
cancel()
} else {
cancel()
<-ctxc[idx]
}
}()
}
// join on watches
for i := 0; i < n; i++ {
select {
case <-ch:
case <-time.After(time.Second):
t.Fatalf("timed out waiting for completed watch")
}
}
}
46 changes: 27 additions & 19 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ type watchGrpcStream struct {
donec chan struct{}
// errc transmits errors from grpc Recv to the watch stream reconn logic
errc chan error
// closingc gets the watcherStream of closing watchers
closingc chan *watcherStream

// the error that closed the watch stream
closeErr error
Expand Down Expand Up @@ -203,11 +205,12 @@ func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
cancel: cancel,
streams: make(map[int64]*watcherStream),

respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
respc: make(chan *pb.WatchResponse),
reqc: make(chan *watchRequest),
stopc: make(chan struct{}),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
}
go wgs.run()
return wgs
Expand Down Expand Up @@ -268,7 +271,6 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
wgs.stopIfEmpty()
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{closeErr: wgs.closeErr}
Expand Down Expand Up @@ -378,15 +380,19 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq
go w.serveStream(ws)
}

// closeStream closes the watcher resources and removes it
func (w *watchGrpcStream) closeStream(ws *watcherStream) {
func (w *watchGrpcStream) closeStream(ws *watcherStream) bool {
w.mu.Lock()
// cancels request stream; subscriber receives nil channel
close(ws.initReq.retc)
// close subscriber's channel
close(ws.outc)
delete(w.streams, ws.id)
empty := len(w.streams) == 0
if empty && w.stopc != nil {
w.stopc = nil
}
w.mu.Unlock()
return empty
}

// run is the root of the goroutines for managing a watcher client
Expand Down Expand Up @@ -491,6 +497,10 @@ func (w *watchGrpcStream) run() {
cancelSet = make(map[int64]struct{})
case <-stopc:
return
case ws := <-w.closingc:
if w.closeStream(ws) {
return
}
}

// send failed; queue for retry
Expand Down Expand Up @@ -553,6 +563,15 @@ func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {

// serveStream forwards watch responses from run() to the subscriber
func (w *watchGrpcStream) serveStream(ws *watcherStream) {
defer func() {
// signal that this watcherStream is finished
select {
case w.closingc <- ws:
case <-w.donec:
w.closeStream(ws)
}
}()

var closeErr error
emptyWr := &WatchResponse{}
wrs := []*WatchResponse{}
Expand Down Expand Up @@ -641,20 +660,9 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) {
}
}

w.closeStream(ws)
w.stopIfEmpty()
// lazily send cancel message if events on missing id
}

func (wgs *watchGrpcStream) stopIfEmpty() {
wgs.mu.Lock()
if len(wgs.streams) == 0 && wgs.stopc != nil {
close(wgs.stopc)
wgs.stopc = nil
}
wgs.mu.Unlock()
}

func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
ws, rerr := w.resume()
if rerr != nil {
Expand Down