Skip to content

Commit

Permalink
Merge pull request #8052 from heyitsanthony/watch-victim-test
Browse files Browse the repository at this point in the history
mvcc: test watch victim/delay path
  • Loading branch information
Anthony Romano committed Jun 8, 2017
2 parents d335821 + 83b2ea2 commit 300feea
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 1 deletion.
3 changes: 2 additions & 1 deletion mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import (
"github.com/coreos/etcd/mvcc/mvccpb"
)

const (
// non-const so modifiable by tests
var (
// chanBufLen is the length of the buffered chan
// for sending out watched events.
// TODO: find a good buf value. 1024 is just a random one that
Expand Down
82 changes: 82 additions & 0 deletions mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ package mvcc

import (
"bytes"
"fmt"
"os"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -424,3 +426,83 @@ func TestNewMapwatcherToEventMap(t *testing.T) {
}
}
}

// TestWatchVictims tests that watchable store delivers watch events
// when the watch channel is temporarily clogged with too many events.
func TestWatchVictims(t *testing.T) {
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync

b, tmpPath := backend.NewDefaultTmpBackend()
s := newWatchableStore(b, &lease.FakeLessor{}, nil)

defer func() {
s.store.Close()
os.Remove(tmpPath)
chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
}()

chanBufLen, maxWatchersPerSync = 1, 2
numPuts := chanBufLen * 64
testKey, testValue := []byte("foo"), []byte("bar")

var wg sync.WaitGroup
numWatches := maxWatchersPerSync * 128
errc := make(chan error, numWatches)
wg.Add(numWatches)
for i := 0; i < numWatches; i++ {
go func() {
w := s.NewWatchStream()
w.Watch(testKey, nil, 1)
defer func() {
w.Close()
wg.Done()
}()
tc := time.After(10 * time.Second)
evs, nextRev := 0, int64(2)
for evs < numPuts {
select {
case <-tc:
errc <- fmt.Errorf("time out")
return
case wr := <-w.Chan():
evs += len(wr.Events)
for _, ev := range wr.Events {
if ev.Kv.ModRevision != nextRev {
errc <- fmt.Errorf("expected rev=%d, got %d", nextRev, ev.Kv.ModRevision)
return
}
nextRev++
}
time.Sleep(time.Millisecond)
}
}
if evs != numPuts {
errc <- fmt.Errorf("expected %d events, got %d", numPuts, evs)
return
}
select {
case <-w.Chan():
errc <- fmt.Errorf("unexpected response")
default:
}
}()
time.Sleep(time.Millisecond)
}

var wgPut sync.WaitGroup
wgPut.Add(numPuts)
for i := 0; i < numPuts; i++ {
go func() {
defer wgPut.Done()
s.Put(testKey, testValue, lease.NoLease)
}()
}
wgPut.Wait()

wg.Wait()
select {
case err := <-errc:
t.Fatal(err)
default:
}
}

0 comments on commit 300feea

Please sign in to comment.