From 7c4b84a6cd4e3301fd85e2f3f69886a5be03c6f5 Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Fri, 14 Nov 2014 15:18:16 -0800 Subject: [PATCH] etcdserver: add buffer to the sender queue --- etcdserver/sender.go | 3 ++- etcdserver/sender_test.go | 29 +++++++++++++---------------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/etcdserver/sender.go b/etcdserver/sender.go index 527e8f9c3d8..875c48bdea3 100644 --- a/etcdserver/sender.go +++ b/etcdserver/sender.go @@ -34,6 +34,7 @@ import ( const ( raftPrefix = "/raft" connPerSender = 4 + senderBufSize = connPerSender * 4 ) type sendHub struct { @@ -150,7 +151,7 @@ func newSender(tr http.RoundTripper, u string, cid types.ID, fs *stats.FollowerS u: u, cid: cid, fs: fs, - q: make(chan []byte), + q: make(chan []byte, senderBufSize), shouldstop: shouldstop, } s.wg.Add(connPerSender) diff --git a/etcdserver/sender_test.go b/etcdserver/sender_test.go index 7c070f723f2..e24637093f9 100644 --- a/etcdserver/sender_test.go +++ b/etcdserver/sender_test.go @@ -97,9 +97,6 @@ func TestSendHubShouldStop(t *testing.T) { cl := newTestCluster(membs) ls := stats.NewLeaderStats("") h := newSendHub(tr, cl, nil, ls) - // wait for handle goroutines start - // TODO: wait for goroutines ready before return newSender - time.Sleep(10 * time.Millisecond) shouldstop := h.ShouldStopNotify() select { @@ -123,9 +120,7 @@ func TestSenderSend(t *testing.T) { tr := &roundTripperRecorder{} fs := &stats.FollowerStats{} s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) - // wait for handle goroutines start - // TODO: wait for goroutines ready before return newSender - time.Sleep(10 * time.Millisecond) + if err := s.send([]byte("some data")); err != nil { t.Fatalf("unexpect send error: %v", err) } @@ -145,22 +140,26 @@ func TestSenderExceedMaximalServing(t *testing.T) { tr := newRoundTripperBlocker() fs := &stats.FollowerStats{} s := newSender(tr, "http://10.0.0.1", types.ID(1), fs, nil) - // wait for handle goroutines start - // TODO: wait for goroutines ready before return newSender - time.Sleep(10 * time.Millisecond) - // It could handle that many requests at the same time. - for i := 0; i < connPerSender; i++ { + + // keep the sender busy and make the buffer full + // nothing can go out as we block the sender + for i := 0; i < connPerSender+senderBufSize; i++ { if err := s.send([]byte("some data")); err != nil { t.Errorf("send err = %v, want nil", err) } + // force the sender to grab data + testutil.ForceGosched() } - // This one exceeds its maximal serving ability + + // try to send a data when we are sure the buffer is full if err := s.send([]byte("some data")); err == nil { t.Errorf("unexpect send success") } + + // unblock the senders and force them to send out the data tr.unblock() - // Make handles finish their post testutil.ForceGosched() + // It could send new data after previous ones succeed if err := s.send([]byte("some data")); err != nil { t.Errorf("send err = %v, want nil", err) @@ -173,9 +172,7 @@ func TestSenderExceedMaximalServing(t *testing.T) { func TestSenderSendFailed(t *testing.T) { fs := &stats.FollowerStats{} s := newSender(newRespRoundTripper(0, errors.New("blah")), "http://10.0.0.1", types.ID(1), fs, nil) - // wait for handle goroutines start - // TODO: wait for goroutines ready before return newSender - time.Sleep(10 * time.Millisecond) + if err := s.send([]byte("some data")); err != nil { t.Fatalf("unexpect send error: %v", err) }