From bd18840fd6c5a84524bdedf91764a26f3d70bbf5 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Wed, 31 May 2017 12:25:22 +0300 Subject: [PATCH 1/4] rafthttp: configurable stream reader retry timeout rafthttp.Transport.DialRetryTimeout field alters the frequency of dial attempts --- rafthttp/peer.go | 8 ++++++++ rafthttp/stream.go | 31 ++++++++++++++++++++++--------- rafthttp/stream_test.go | 6 ++++++ rafthttp/transport.go | 6 ++++-- 4 files changed, 40 insertions(+), 11 deletions(-) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index a82d7beed74..394998514ec 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -24,6 +24,7 @@ import ( "github.com/coreos/etcd/raft/raftpb" "github.com/coreos/etcd/snap" "golang.org/x/net/context" + "golang.org/x/time/rate" ) const ( @@ -198,6 +199,13 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats recvc: p.recvc, propc: p.propc, } + + if transport.DialRetryTimeout != 0 { + limit := rate.Every(transport.DialRetryTimeout) + p.msgAppV2Reader.rl = rate.NewLimiter(limit, 1) + p.msgAppReader.rl = rate.NewLimiter(limit, 1) + } + p.msgAppV2Reader.start() p.msgAppReader.start() diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 2a6c620f56d..2f53562d05e 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -25,6 +25,8 @@ import ( "sync" "time" + "golang.org/x/time/rate" + "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/httputil" "github.com/coreos/etcd/pkg/transport" @@ -278,6 +280,8 @@ type streamReader struct { recvc chan<- raftpb.Message propc chan<- raftpb.Message + rl *rate.Limiter // alters the frequency of dial retrial attempts + errorc chan<- error mu sync.Mutex @@ -289,14 +293,21 @@ type streamReader struct { done chan struct{} } -func (r *streamReader) start() { - r.stopc = make(chan struct{}) - r.done = make(chan struct{}) - if r.errorc == nil { - r.errorc = r.tr.ErrorC +func (cr *streamReader) start() { + cr.stopc = make(chan struct{}) + cr.done = make(chan struct{}) + if cr.errorc == nil { + cr.errorc = cr.tr.ErrorC + } + + if cr.rl == nil { + // If client didn't provide rate limiter, use the default which will + // wait 100ms to create a new stream, so it doesn't bring too much + // overhead when retry. + cr.rl = rate.NewLimiter(rate.Every(100*time.Millisecond), 1) } - go r.run() + go cr.run() } func (cr *streamReader) run() { @@ -323,13 +334,15 @@ func (cr *streamReader) run() { } } select { - // Wait 100ms to create a new stream, so it doesn't bring too much - // overhead when retry. - case <-time.After(100 * time.Millisecond): case <-cr.stopc: plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t) close(cr.done) return + default: + // wait for a while before new dial attempt + if err := cr.rl.Wait(context.TODO()); err != nil { + plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err) + } } } } diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index f48714e7c57..a8985d86b85 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -25,6 +25,8 @@ import ( "testing" "time" + "golang.org/x/time/rate" + "github.com/coreos/etcd/etcdserver/stats" "github.com/coreos/etcd/pkg/testutil" "github.com/coreos/etcd/pkg/types" @@ -113,6 +115,7 @@ func TestStreamReaderDialRequest(t *testing.T) { peerID: types.ID(2), tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), + rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), } sr.dial(tt) @@ -167,6 +170,7 @@ func TestStreamReaderDialResult(t *testing.T) { tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), errorc: make(chan error, 1), + rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), } _, err := sr.dial(streamTypeMessage) @@ -192,6 +196,7 @@ func TestStreamReaderStopOnDial(t *testing.T) { errorc: make(chan error, 1), typ: streamTypeMessage, status: newPeerStatus(types.ID(2)), + rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), } tr.onResp = func() { // stop() waits for the run() goroutine to exit, but that exit @@ -246,6 +251,7 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) { peerID: types.ID(2), tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), + rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), } _, err := sr.dial(typ) diff --git a/rafthttp/transport.go b/rafthttp/transport.go index 1f0b46836e6..a33db3c08cd 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -94,8 +94,10 @@ type Transporter interface { // User needs to call Start before calling other functions, and call // Stop when the Transport is no longer used. type Transport struct { - DialTimeout time.Duration // maximum duration before timing out dial of the request - TLSInfo transport.TLSInfo // TLS information used when creating connection + DialTimeout time.Duration // maximum duration before timing out dial of the request + DialRetryTimeout time.Duration // alters the frequency of streamReader dial retrial attempts + + TLSInfo transport.TLSInfo // TLS information used when creating connection ID types.ID // local member ID URLs types.URLs // local peer URLs From ba50819d60a72ecf028e6818f6f1adfe6f60dabe Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Thu, 1 Jun 2017 01:27:40 +0300 Subject: [PATCH 2/4] rafthttp: configurable stream reader retry timeout rafthttp.Transport.DialRetryFrequency streamReader dial retrial attempts frequency --- rafthttp/peer.go | 8 ++------ rafthttp/stream.go | 8 -------- rafthttp/stream_test.go | 1 + rafthttp/transport.go | 14 ++++++++++++-- 4 files changed, 15 insertions(+), 16 deletions(-) diff --git a/rafthttp/peer.go b/rafthttp/peer.go index 394998514ec..b8de635aa83 100644 --- a/rafthttp/peer.go +++ b/rafthttp/peer.go @@ -189,6 +189,7 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats status: status, recvc: p.recvc, propc: p.propc, + rl: rate.NewLimiter(transport.DialRetryFrequency, 1), } p.msgAppReader = &streamReader{ peerID: peerID, @@ -198,12 +199,7 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats status: status, recvc: p.recvc, propc: p.propc, - } - - if transport.DialRetryTimeout != 0 { - limit := rate.Every(transport.DialRetryTimeout) - p.msgAppV2Reader.rl = rate.NewLimiter(limit, 1) - p.msgAppReader.rl = rate.NewLimiter(limit, 1) + rl: rate.NewLimiter(transport.DialRetryFrequency, 1), } p.msgAppV2Reader.start() diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 2f53562d05e..9afb4b6880f 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -299,14 +299,6 @@ func (cr *streamReader) start() { if cr.errorc == nil { cr.errorc = cr.tr.ErrorC } - - if cr.rl == nil { - // If client didn't provide rate limiter, use the default which will - // wait 100ms to create a new stream, so it doesn't bring too much - // overhead when retry. - cr.rl = rate.NewLimiter(rate.Every(100*time.Millisecond), 1) - } - go cr.run() } diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index a8985d86b85..e5ae1f59575 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -317,6 +317,7 @@ func TestStream(t *testing.T) { status: newPeerStatus(types.ID(2)), recvc: recvc, propc: propc, + rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), } sr.start() diff --git a/rafthttp/transport.go b/rafthttp/transport.go index a33db3c08cd..50219db71b9 100644 --- a/rafthttp/transport.go +++ b/rafthttp/transport.go @@ -29,6 +29,7 @@ import ( "github.com/coreos/pkg/capnslog" "github.com/xiang90/probing" "golang.org/x/net/context" + "golang.org/x/time/rate" ) var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp")) @@ -94,8 +95,10 @@ type Transporter interface { // User needs to call Start before calling other functions, and call // Stop when the Transport is no longer used. type Transport struct { - DialTimeout time.Duration // maximum duration before timing out dial of the request - DialRetryTimeout time.Duration // alters the frequency of streamReader dial retrial attempts + DialTimeout time.Duration // maximum duration before timing out dial of the request + // DialRetryFrequency defines the frequency of streamReader dial retrial attempts; + // a distinct rate limiter is created per every peer (default value: 10 events/sec) + DialRetryFrequency rate.Limit TLSInfo transport.TLSInfo // TLS information used when creating connection @@ -137,6 +140,13 @@ func (t *Transport) Start() error { t.remotes = make(map[types.ID]*remote) t.peers = make(map[types.ID]Peer) t.prober = probing.NewProber(t.pipelineRt) + + // If client didn't provide dial retry frequence, use the default + // (100ms backoff between attempts to create a new stream), + // so it doesn't bring too much overhead when retry. + if t.DialRetryFrequency == 0 { + t.DialRetryFrequency = rate.Every(100 * time.Millisecond) + } return nil } From 30bdad8fd290f800bb35b1f5c3b09fbc4b11a57e Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Thu, 1 Jun 2017 12:24:42 +0300 Subject: [PATCH 3/4] rafthttp: configurable stream reader retry timeout use context instead of stop channel inside streamReader --- rafthttp/stream.go | 46 ++++++++++++++++++++--------------------- rafthttp/stream_test.go | 7 ++++--- 2 files changed, 27 insertions(+), 26 deletions(-) diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 9afb4b6880f..9cd3bb421c9 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -245,7 +245,9 @@ func (cw *streamWriter) closeUnlocked() bool { if !cw.working { return false } - cw.closer.Close() + if err := cw.closer.Close(); err != nil { + plog.Errorf("Peer %s (writer) connection close error: %v", cw.peerID, err) + } if len(cw.msgc) > 0 { cw.r.ReportUnreachable(uint64(cw.peerID)) } @@ -286,19 +288,21 @@ type streamReader struct { mu sync.Mutex paused bool - cancel func() closer io.Closer - stopc chan struct{} - done chan struct{} + ctx context.Context + cancel context.CancelFunc + done chan struct{} } func (cr *streamReader) start() { - cr.stopc = make(chan struct{}) cr.done = make(chan struct{}) if cr.errorc == nil { cr.errorc = cr.tr.ErrorC } + if cr.ctx == nil { + cr.ctx, cr.cancel = context.WithCancel(context.Background()) + } go cr.run() } @@ -314,7 +318,7 @@ func (cr *streamReader) run() { } else { cr.status.activate() plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) - err := cr.decodeLoop(rc, t) + err = cr.decodeLoop(rc, t) plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ) switch { // all data is read out @@ -325,16 +329,15 @@ func (cr *streamReader) run() { cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error()) } } - select { - case <-cr.stopc: + // Wait for a while before new dial attempt + err = cr.rl.Wait(cr.ctx) + if cr.ctx.Err() != nil { plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t) close(cr.done) return - default: - // wait for a while before new dial attempt - if err := cr.rl.Wait(context.TODO()); err != nil { - plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err) - } + } + if err != nil { + plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err) } } } @@ -351,7 +354,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { plog.Panicf("unhandled stream type %s", t) } select { - case <-cr.stopc: + case <-cr.ctx.Done(): cr.mu.Unlock() if err := rc.Close(); err != nil { return err @@ -406,11 +409,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error { } func (cr *streamReader) stop() { - close(cr.stopc) cr.mu.Lock() - if cr.cancel != nil { - cr.cancel() - } + cr.cancel() cr.close() cr.mu.Unlock() <-cr.done @@ -434,13 +434,11 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { setPeerURLsHeader(req, cr.tr.URLs) - ctx, cancel := context.WithCancel(context.Background()) - req = req.WithContext(ctx) + req = req.WithContext(cr.ctx) cr.mu.Lock() - cr.cancel = cancel select { - case <-cr.stopc: + case <-cr.ctx.Done(): cr.mu.Unlock() return nil, fmt.Errorf("stream reader is stopped") default: @@ -502,7 +500,9 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { func (cr *streamReader) close() { if cr.closer != nil { - cr.closer.Close() + if err := cr.closer.Close(); err != nil { + plog.Errorf("Peer %s (reader) connection close error: %v", cr.peerID, err) + } } cr.closer = nil } diff --git a/rafthttp/stream_test.go b/rafthttp/stream_test.go index e5ae1f59575..c9cd2b3d69c 100644 --- a/rafthttp/stream_test.go +++ b/rafthttp/stream_test.go @@ -15,6 +15,7 @@ package rafthttp import ( + "context" "errors" "fmt" "io" @@ -115,7 +116,7 @@ func TestStreamReaderDialRequest(t *testing.T) { peerID: types.ID(2), tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), + ctx: context.Background(), } sr.dial(tt) @@ -170,7 +171,7 @@ func TestStreamReaderDialResult(t *testing.T) { tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), errorc: make(chan error, 1), - rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), + ctx: context.Background(), } _, err := sr.dial(streamTypeMessage) @@ -251,7 +252,7 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) { peerID: types.ID(2), tr: &Transport{streamRt: tr, ClusterID: types.ID(1)}, picker: mustNewURLPicker(t, []string{"http://localhost:2380"}), - rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1), + ctx: context.Background(), } _, err := sr.dial(typ) From 0a9092abc4474c838df94ff294e41f0411a606f6 Mon Sep 17 00:00:00 2001 From: Vitaly Isaev Date: Thu, 1 Jun 2017 20:45:20 +0300 Subject: [PATCH 4/4] rafthttp: configurable stream reader retry timeout minor changes after code review --- rafthttp/stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rafthttp/stream.go b/rafthttp/stream.go index 9cd3bb421c9..9dfe22148a3 100644 --- a/rafthttp/stream.go +++ b/rafthttp/stream.go @@ -246,7 +246,7 @@ func (cw *streamWriter) closeUnlocked() bool { return false } if err := cw.closer.Close(); err != nil { - plog.Errorf("Peer %s (writer) connection close error: %v", cw.peerID, err) + plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err) } if len(cw.msgc) > 0 { cw.r.ReportUnreachable(uint64(cw.peerID)) @@ -501,7 +501,7 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) { func (cr *streamReader) close() { if cr.closer != nil { if err := cr.closer.Close(); err != nil { - plog.Errorf("Peer %s (reader) connection close error: %v", cr.peerID, err) + plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err) } } cr.closer = nil