Skip to content

Commit

Permalink
raft: add non-voter
Browse files Browse the repository at this point in the history
  • Loading branch information
lishuai87 committed Sep 25, 2017
1 parent 4830ca7 commit a91a51d
Show file tree
Hide file tree
Showing 7 changed files with 526 additions and 89 deletions.
2 changes: 2 additions & 0 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ func (n *node) run(r *raft) {
switch cc.Type {
case pb.ConfChangeAddNode:
r.addNode(cc.NodeID)
case pb.ConfChangeAddNonvoter:
r.addNonvoter(cc.NodeID)
case pb.ConfChangeRemoveNode:
// block incoming proposal when local node is
// removed
Expand Down
8 changes: 7 additions & 1 deletion raft/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package raft

import "fmt"
import (
"fmt"

pb "github.com/coreos/etcd/raft/raftpb"
)

const (
ProgressStateProbe ProgressStateType = iota
Expand Down Expand Up @@ -48,6 +52,8 @@ type Progress struct {
// When in ProgressStateSnapshot, leader should have sent out snapshot
// before and stops sending any replication message.
State ProgressStateType

Suffrage pb.SuffrageState
// Paused is used in ProgressStateProbe.
// When Paused is true, raft should pause sending replication message to this peer.
Paused bool
Expand Down
107 changes: 90 additions & 17 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type Config struct {
// should only be set when starting a new raft cluster. Restarting raft from
// previous configuration will panic if peers is set. peer is private and only
// used for testing right now.
peers []uint64
peers []pb.Server

// ElectionTick is the number of Node.Tick invocations that must pass between
// elections. That is, if a follower does not receive any message from the
Expand Down Expand Up @@ -238,6 +238,8 @@ type raft struct {

state StateType

suffrage pb.SuffrageState

votes map[uint64]bool

msgs []pb.Message
Expand Down Expand Up @@ -289,18 +291,32 @@ func newRaft(c *Config) *raft {
panic(err) // TODO(bdarnell)
}
peers := c.peers
if len(cs.Nodes) > 0 && len(cs.Servers) > 0 {
panic("cannot specify both ConfState.Nodes and ConfState.Servers")
}
if len(cs.Nodes) > 0 {
if len(peers) > 0 {
// TODO(bdarnell): the peers argument is always nil except in
// tests; the argument should be removed and these tests should be
// updated to specify their nodes through a snapshot.
panic("cannot specify both newRaft(peers) and ConfState.Nodes)")
}
peers = cs.Nodes
for _, n := range cs.Nodes {
peers = append(peers, pb.Server{Node: n, Suffrage: pb.Voter})
}
}
if len(cs.Servers) > 0 {
if len(peers) > 0 {
panic("cannot specify both newRaft(peers) and ConfState.Servers)")
}
for _, s := range cs.Servers {
peers = append(peers, *s)
}
}
r := &raft{
id: c.ID,
lead: None,
suffrage: pb.Voter,
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxInflight: c.MaxInflightMsgs,
Expand All @@ -314,7 +330,10 @@ func newRaft(c *Config) *raft {
disableProposalForwarding: c.DisableProposalForwarding,
}
for _, p := range peers {
r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
r.prs[p.Node] = &Progress{Next: 1, ins: newInflights(r.maxInflight), Suffrage: p.Suffrage}
if r.id == p.Node {
r.suffrage = p.Suffrage
}
}
if !isHardStateEqual(hs, emptyState) {
r.loadState(hs)
Expand Down Expand Up @@ -346,7 +365,18 @@ func (r *raft) hardState() pb.HardState {
}
}

func (r *raft) quorum() int { return len(r.prs)/2 + 1 }
// TODO(shuaili): optimize, maybe record voterCount in raft
func (r *raft) voterCount() int {
count := 0
for _, p := range r.prs {
if p.Suffrage == pb.Voter {
count++
}
}
return count
}

func (r *raft) quorum() int { return r.voterCount()/2 + 1 }

func (r *raft) nodes() []uint64 {
nodes := make([]uint64, 0, len(r.prs))
Expand Down Expand Up @@ -504,9 +534,11 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
// r.bcastAppend).
func (r *raft) maybeCommit() bool {
// TODO(bmizerany): optimize.. Currently naive
mis := make(uint64Slice, 0, len(r.prs))
for id := range r.prs {
mis = append(mis, r.prs[id].Match)
mis := make(uint64Slice, 0, r.voterCount())
for _, p := range r.prs {
if p.Suffrage == pb.Voter {
mis = append(mis, p.Match)
}
}
sort.Sort(sort.Reverse(mis))
mci := mis[r.quorum()-1]
Expand All @@ -528,7 +560,7 @@ func (r *raft) reset(term uint64) {

r.votes = make(map[uint64]bool)
for id := range r.prs {
r.prs[id] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight)}
r.prs[id] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), Suffrage: r.prs[id].Suffrage}
if id == r.id {
r.prs[id].Match = r.raftLog.lastIndex()
}
Expand All @@ -555,7 +587,9 @@ func (r *raft) tickElection() {

if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
if r.suffrage == pb.Voter {
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
}

Expand Down Expand Up @@ -676,6 +710,9 @@ func (r *raft) campaign(t CampaignType) {
if id == r.id {
continue
}
if r.prs[id].Suffrage != pb.Voter {
continue
}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)

Expand Down Expand Up @@ -990,6 +1027,10 @@ func stepLeader(r *raft, m pb.Message) {
}
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
case pb.MsgTransferLeader:
if pr.Suffrage != pb.Voter {
r.logger.Debugf("%x is not Voter. Ignored transferring leadership", r.id)
return
}
leadTransferee := m.From
lastLeadTransferee := r.leadTransferee
if lastLeadTransferee != None {
Expand Down Expand Up @@ -1171,14 +1212,27 @@ func (r *raft) restore(s pb.Snapshot) bool {

r.raftLog.restore(s)
r.prs = make(map[uint64]*Progress)
if len(s.Metadata.ConfState.Nodes) > 0 && len(s.Metadata.ConfState.Servers) > 0 {
panic("cannot specify both ConfState.Nodes and ConfState.Servers")
}
for _, n := range s.Metadata.ConfState.Nodes {
match, next := uint64(0), r.raftLog.lastIndex()+1
if n == r.id {
match = next - 1
r.suffrage = pb.Voter
}
r.setProgress(n, match, next)
r.setProgress(n, match, next, pb.Voter)
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n])
}
for _, server := range s.Metadata.ConfState.Servers {
match, next := uint64(0), r.raftLog.lastIndex()+1
if server.Node == r.id {
match = next - 1
r.suffrage = server.Suffrage
}
r.setProgress(server.Node, match, next, server.Suffrage)
r.logger.Infof("%x restored progress of %x [%s]", r.id, server.Node, r.prs[server.Node])
}
return true
}

Expand All @@ -1190,14 +1244,33 @@ func (r *raft) promotable() bool {
}

func (r *raft) addNode(id uint64) {
r.addNodeWithSuffrage(id, pb.Voter)
}

func (r *raft) addNonvoter(id uint64) {
r.addNodeWithSuffrage(id, pb.Nonvoter)
}

func (r *raft) addNodeWithSuffrage(id uint64, suffrage pb.SuffrageState) {
r.pendingConf = false
if _, ok := r.prs[id]; ok {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
if r.prs[id].Suffrage == suffrage {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
}
if suffrage != pb.Voter {
// addNode can only change suffrage from Nonvoter to Voter
return
}
r.prs[id].Suffrage = suffrage
} else {
r.setProgress(id, 0, r.raftLog.lastIndex()+1, suffrage)
}

r.setProgress(id, 0, r.raftLog.lastIndex()+1)
if r.id == id {
r.suffrage = suffrage
}
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
// before the added node has a chance to communicate with us.
Expand Down Expand Up @@ -1226,8 +1299,8 @@ func (r *raft) removeNode(id uint64) {

func (r *raft) resetPendingConf() { r.pendingConf = false }

func (r *raft) setProgress(id, match, next uint64) {
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
func (r *raft) setProgress(id, match, next uint64, suffrage pb.SuffrageState) {
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), Suffrage: suffrage}
}

func (r *raft) delProgress(id uint64) {
Expand Down Expand Up @@ -1267,7 +1340,7 @@ func (r *raft) checkQuorumActive() bool {
continue
}

if r.prs[id].RecentActive {
if r.prs[id].RecentActive && r.prs[id].Suffrage == pb.Voter {
act++
}

Expand Down
Loading

0 comments on commit a91a51d

Please sign in to comment.