Skip to content

Commit

Permalink
etcdserver: add buffer to the sender queue
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Nov 14, 2014
1 parent ac5a282 commit 7c4b84a
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 17 deletions.
3 changes: 2 additions & 1 deletion etcdserver/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
const (
raftPrefix = "/raft"
connPerSender = 4
senderBufSize = connPerSender * 4
)

type sendHub struct {
Expand Down Expand Up @@ -150,7 +151,7 @@ func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerS
u: u,
cid: cid,
fs: fs,
q: make(chan []byte),
q: make(chan []byte, senderBufSize),
shouldstop: shouldstop,
}
s.wg.Add(connPerSender)
Expand Down
29 changes: 13 additions & 16 deletions etcdserver/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,6 @@ func TestSendHubShouldStop(t *testing.T) {
cl := newTestCluster(membs)
ls := stats.NewLeaderStats("")
h := newSendHub(tr, cl, nil, ls)
// wait for handle goroutines start
// TODO: wait for goroutines ready before return newSender
time.Sleep(10 * time.Millisecond)

shouldstop := h.ShouldStopNotify()
select {
Expand All @@ -123,9 +120,7 @@ func TestSenderSend(t *testing.T) {
tr := &roundTripperRecorder{}
fs := &stats.FollowerStats{}
s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
// wait for handle goroutines start
// TODO: wait for goroutines ready before return newSender
time.Sleep(10 * time.Millisecond)

if err := s.send([]byte("some data")); err != nil {
t.Fatalf("unexpect send error: %v", err)
}
Expand All @@ -145,22 +140,26 @@ func TestSenderExceedMaximalServing(t *testing.T) {
tr := newRoundTripperBlocker()
fs := &stats.FollowerStats{}
s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil)
// wait for handle goroutines start
// TODO: wait for goroutines ready before return newSender
time.Sleep(10 * time.Millisecond)
// It could handle that many requests at the same time.
for i := 0; i < connPerSender; i++ {

// keep the sender busy and make the buffer full
// nothing can go out as we block the sender
for i := 0; i < connPerSender+senderBufSize; i++ {
if err := s.send([]byte("some data")); err != nil {
t.Errorf("send err = %v, want nil", err)
}
// force the sender to grab data
testutil.ForceGosched()
}
// This one exceeds its maximal serving ability

// try to send a data when we are sure the buffer is full
if err := s.send([]byte("some data")); err == nil {
t.Errorf("unexpect send success")
}

// unblock the senders and force them to send out the data
tr.unblock()
// Make handles finish their post
testutil.ForceGosched()

// It could send new data after previous ones succeed
if err := s.send([]byte("some data")); err != nil {
t.Errorf("send err = %v, want nil", err)
Expand All @@ -173,9 +172,7 @@ func TestSenderExceedMaximalServing(t *testing.T) {
func TestSenderSendFailed(t *testing.T) {
fs := &stats.FollowerStats{}
s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil)
// wait for handle goroutines start
// TODO: wait for goroutines ready before return newSender
time.Sleep(10 * time.Millisecond)

if err := s.send([]byte("some data")); err != nil {
t.Fatalf("unexpect send error: %v", err)
}
Expand Down

0 comments on commit 7c4b84a

Please sign in to comment.