Skip to content

Commit

Permalink
Merge pull request #7795 from heyitsanthony/dont-force-initrev
Browse files Browse the repository at this point in the history
clientv3: only update initReq.rev == 0 with watch revision
  • Loading branch information
Anthony Romano committed Apr 22, 2017
2 parents 91039be + 4ab818a commit 7da4516
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 10 deletions.
52 changes: 51 additions & 1 deletion clientv3/integration/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,57 @@ func putAndWatch(t *testing.T, wctx *watchctx, key, val string) {
}
}

// TestWatchResumeComapcted checks that the watcher gracefully closes in case
func TestWatchResumeInitRev(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

cli := clus.Client(0)
if _, err := cli.Put(context.TODO(), "b", "2"); err != nil {
t.Fatal(err)
}
if _, err := cli.Put(context.TODO(), "a", "3"); err != nil {
t.Fatal(err)
}
// if resume is broken, it'll pick up this key first instead of a=3
if _, err := cli.Put(context.TODO(), "a", "4"); err != nil {
t.Fatal(err)
}

wch := clus.Client(0).Watch(context.Background(), "a", clientv3.WithRev(1), clientv3.WithCreatedNotify())
if resp, ok := <-wch; !ok || resp.Header.Revision != 4 {
t.Fatalf("got (%v, %v), expected create notification rev=4", resp, ok)
}
// pause wch
clus.Members[0].DropConnections()
clus.Members[0].PauseConnections()

select {
case resp, ok := <-wch:
t.Skipf("wch should block, got (%+v, %v); drop not fast enough", resp, ok)
case <-time.After(100 * time.Millisecond):
}

// resume wch
clus.Members[0].UnpauseConnections()

select {
case resp, ok := <-wch:
if !ok {
t.Fatal("unexpected watch close")
}
if len(resp.Events) == 0 {
t.Fatal("expected event on watch")
}
if string(resp.Events[0].Kv.Value) != "3" {
t.Fatalf("expected value=3, got event %+v", resp.Events[0])
}
case <-time.After(5 * time.Second):
t.Fatal("watch timed out")
}
}

// TestWatchResumeCompacted checks that the watcher gracefully closes in case
// that it tries to resume to a revision that's been compacted out of the store.
// Since the watcher's server restarts with stale data, the watcher will receive
// either a compaction error or all keys by staying in sync before the compaction
Expand Down
11 changes: 10 additions & 1 deletion clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,11 +615,20 @@ func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{
// send first creation event only if requested
if ws.initReq.createdNotify {
ws.outc <- *wr
if ws.initReq.rev == 0 {
// current revision of store; returning the
// create response binds the current revision to
// this revision, so restart with it if there's a
// disconnect before receiving any events.
nextRev = wr.Header.Revision
}
}
}
} else {
// current progress of watch; <= store revision
nextRev = wr.Header.Revision
}

nextRev = wr.Header.Revision
if len(wr.Events) > 0 {
nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
}
Expand Down
49 changes: 42 additions & 7 deletions integration/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ type bridge struct {
l net.Listener
conns map[*bridgeConn]struct{}

stopc chan struct{}
wg sync.WaitGroup
stopc chan struct{}
pausec chan struct{}
wg sync.WaitGroup

mu sync.Mutex
}
Expand All @@ -43,8 +44,11 @@ func newBridge(addr string) (*bridge, error) {
inaddr: addr + "0",
outaddr: addr,
conns: make(map[*bridgeConn]struct{}),
stopc: make(chan struct{}, 1),
stopc: make(chan struct{}),
pausec: make(chan struct{}),
}
close(b.pausec)

l, err := transport.NewUnixListener(b.inaddr)
if err != nil {
return nil, fmt.Errorf("listen failed on socket %s (%v)", addr, err)
Expand All @@ -59,10 +63,13 @@ func (b *bridge) URL() string { return "unix://" + b.inaddr }

func (b *bridge) Close() {
b.l.Close()
b.mu.Lock()
select {
case b.stopc <- struct{}{}:
case <-b.stopc:
default:
close(b.stopc)
}
b.mu.Unlock()
b.wg.Wait()
}

Expand All @@ -75,6 +82,22 @@ func (b *bridge) Reset() {
b.conns = make(map[*bridgeConn]struct{})
}

func (b *bridge) Pause() {
b.mu.Lock()
b.pausec = make(chan struct{})
b.mu.Unlock()
}

func (b *bridge) Unpause() {
b.mu.Lock()
select {
case <-b.pausec:
default:
close(b.pausec)
}
b.mu.Unlock()
}

func (b *bridge) serveListen() {
defer func() {
b.l.Close()
Expand All @@ -91,13 +114,22 @@ func (b *bridge) serveListen() {
if ierr != nil {
return
}
b.mu.Lock()
pausec := b.pausec
b.mu.Unlock()
select {
case <-b.stopc:
return
case <-pausec:
}

outc, oerr := net.Dial("unix", b.outaddr)
if oerr != nil {
inc.Close()
return
}

bc := &bridgeConn{inc, outc}
bc := &bridgeConn{inc, outc, make(chan struct{})}
b.wg.Add(1)
b.mu.Lock()
b.conns[bc] = struct{}{}
Expand All @@ -108,6 +140,7 @@ func (b *bridge) serveListen() {

func (b *bridge) serveConn(bc *bridgeConn) {
defer func() {
close(bc.donec)
bc.Close()
b.mu.Lock()
delete(b.conns, bc)
Expand All @@ -129,11 +162,13 @@ func (b *bridge) serveConn(bc *bridgeConn) {
}

type bridgeConn struct {
in net.Conn
out net.Conn
in net.Conn
out net.Conn
donec chan struct{}
}

func (bc *bridgeConn) Close() {
bc.in.Close()
bc.out.Close()
<-bc.donec
}
4 changes: 3 additions & 1 deletion integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,9 @@ func (m *member) electionTimeout() time.Duration {
return time.Duration(m.s.Cfg.ElectionTicks) * time.Millisecond
}

func (m *member) DropConnections() { m.grpcBridge.Reset() }
func (m *member) DropConnections() { m.grpcBridge.Reset() }
func (m *member) PauseConnections() { m.grpcBridge.Pause() }
func (m *member) UnpauseConnections() { m.grpcBridge.Unpause() }

// NewClientV3 creates a new grpc client connection to the member
func NewClientV3(m *member) (*clientv3.Client, error) {
Expand Down

0 comments on commit 7da4516

Please sign in to comment.