-
Notifications
You must be signed in to change notification settings - Fork 9.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
clientv3: process closed watcherStreams in watcherGrpcStream run loop #6487
Conversation
@@ -131,6 +131,8 @@ type watchGrpcStream struct { | |||
donec chan struct{} | |||
// errc transmits errors from grpc Recv to the watch stream reconn logic | |||
errc chan error | |||
// servec gets the watcherStream of a closed watcher | |||
servec chan *watcherStream |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it better to call this closedWatcherc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
closingc
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
LGTM. One nit: can you write about when the race can happen (an example sequence)? |
t.Fatal(err) | ||
} | ||
|
||
// several unique contexts for several unique streams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5 instead of several?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to bake the constant into the comment
idx := rand.Intn(len(ctxs)) | ||
ctx, cancel := context.WithCancel(ctxs[idx]) | ||
ctxc[idx] <- struct{}{} | ||
ch := cli.Watch(ctx, "abc", clientv3.WithRev(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we make key "abc" a const in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when did we start writing tests like that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the bad thing about this "abc" key is that the put req is not close to the watch req. reader might lose context. changing it to putkey or something is easier for the reader.
The CI failure might be related? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @heyitsanthony for putting up the change.
idx := rand.Intn(len(ctxs)) | ||
ctx, cancel := context.WithCancel(ctxs[idx]) | ||
ctxc[idx] <- struct{}{} | ||
ch := cli.Watch(ctx, "abc", clientv3.WithRev(1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change "ch" to "wch". Differentiate from existing var.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -759,3 +759,52 @@ func TestWatchCancelOnServer(t *testing.T) { | |||
t.Fatalf("expected 0 watchers, got %q", watchers) | |||
} | |||
} | |||
|
|||
// TestWatchOverlapContextCancel checks watcher stream per-context accounting. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// ... checks watch context created in the same way, e.g. context.WithCancel(parentCtx),
// should be managed and accounted separately and correctly during cancelling each other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok reworked the comment.
if _, ok := <-ch; !ok { | ||
t.Fatalf("unexpected closed channel") | ||
} | ||
// randomize how cancel overlaps with watch creation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this could happen without overlapping cancel calls?
If so, can we get rid of the "ctxc" thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It could happen, but this synchronization is important to stress the teardown path. Otherwise, watchers will pile up on the streams instead of oscillating between 0, 1, and 2 watchers. I'll add a comment.
@@ -491,6 +497,11 @@ func (w *watchGrpcStream) run() { | |||
cancelSet = make(map[int64]struct{}) | |||
case <-stopc: | |||
return | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty line
0e8159e
to
02dd835
Compare
Was racing with Watch() when closing the grpc stream on no watchers. Fixes etcd-io#6476
02dd835
to
a325180
Compare
Should we cherry-pick this one? |
@timothysc yes, I think so. Tagged for backporting. |
Fixes #6476
/cc @hongchaodeng