Skip to content

Commit

Permalink
rafthttp: add "ActivePeers" to "Transport"
Browse files Browse the repository at this point in the history
Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
  • Loading branch information
gyuho committed Mar 11, 2018
1 parent 9e84f2d commit 0a527ef
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 0 deletions.
1 change: 1 addition & 0 deletions rafthttp/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,7 @@ func (pr *fakePeer) sendSnap(m raftsnap.Message) {
func (pr *fakePeer) update(urls types.URLs) { pr.peerURLs = urls }
func (pr *fakePeer) attachOutgoingConn(conn *outgoingConn) { pr.connc <- conn }
func (pr *fakePeer) activeSince() time.Time { return time.Time{} }
func (pr *fakePeer) isActive() bool { return true }
func (pr *fakePeer) stop() {}
func (pr *fakePeer) Pause() { pr.paused = true }
func (pr *fakePeer) Resume() { pr.paused = false }
5 changes: 5 additions & 0 deletions rafthttp/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ type Peer interface {
// activeSince returns the time that the connection with the
// peer becomes active.
activeSince() time.Time
// isActive returns true if the connection to this peer
// has been established
isActive() bool
// stop performs any necessary finalization and terminates the peer
// elegantly.
stop()
Expand Down Expand Up @@ -258,6 +261,8 @@ func (p *peer) attachOutgoingConn(conn *outgoingConn) {

func (p *peer) activeSince() time.Time { return p.status.activeSince() }

func (p *peer) isActive() bool { return p.status.isActive() }

// Pause pauses the peer. The peer will simply drops all incoming
// messages without returning an error.
func (p *peer) Pause() {
Expand Down
12 changes: 12 additions & 0 deletions rafthttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,18 @@ func (t *Transport) Resume() {
}
}

// ActivePeers returns a channel that closes when an initial
// peer connection has been established. Use this to wait until the
// first peer connection becomes active.
func (t *Transport) ActivePeers() (cnt int) {
for _, p := range t.peers {
if p.isActive() {
cnt++
}
}
return cnt
}

type nopTransporter struct{}

func NewNopTransporter() Transporter {
Expand Down

0 comments on commit 0a527ef

Please sign in to comment.