Skip to content

Commit

Permalink
raft: use learner instead of suffrage
Browse files Browse the repository at this point in the history
  • Loading branch information
lishuai87 committed Sep 30, 2017
1 parent c098dbb commit b159733
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 398 deletions.
4 changes: 2 additions & 2 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,8 @@ func (n *node) run(r *raft) {
switch cc.Type {
case pb.ConfChangeAddNode:
r.addNode(cc.NodeID)
case pb.ConfChangeAddNonvoter:
r.addNonvoter(cc.NodeID)
case pb.ConfChangeAddLearner:
r.addLearner(cc.NodeID)
case pb.ConfChangeRemoveNode:
// block incoming proposal when local node is
// removed
Expand Down
9 changes: 3 additions & 6 deletions raft/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,7 @@

package raft

import (
"fmt"

pb "github.com/coreos/etcd/raft/raftpb"
)
import "fmt"

const (
ProgressStateProbe ProgressStateType = iota
Expand Down Expand Up @@ -53,7 +49,6 @@ type Progress struct {
// before and stops sending any replication message.
State ProgressStateType

Suffrage pb.SuffrageState
// Paused is used in ProgressStateProbe.
// When Paused is true, raft should pause sending replication message to this peer.
Paused bool
Expand Down Expand Up @@ -82,6 +77,8 @@ type Progress struct {
// be freed by calling inflights.freeTo with the index of the last
// received entry.
ins *inflights

isLearner bool
}

func (pr *Progress) resetState(state ProgressStateType) {
Expand Down
114 changes: 52 additions & 62 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ type Config struct {
// should only be set when starting a new raft cluster. Restarting raft from
// previous configuration will panic if peers is set. peer is private and only
// used for testing right now.
peers []pb.Server
peers []uint64

learners []uint64

// ElectionTick is the number of Node.Tick invocations that must pass between
// elections. That is, if a follower does not receive any message from the
Expand Down Expand Up @@ -238,7 +240,7 @@ type raft struct {

state StateType

suffrage pb.SuffrageState
isLearner bool

votes map[uint64]bool

Expand Down Expand Up @@ -290,33 +292,22 @@ func newRaft(c *Config) *raft {
if err != nil {
panic(err) // TODO(bdarnell)
}
peers := c.peers
if len(cs.Nodes) > 0 && len(cs.Servers) > 0 {
panic("cannot specify both ConfState.Nodes and ConfState.Servers")
}
if len(cs.Nodes) > 0 {
if len(peers) > 0 {
voters := c.peers
learners := c.learners
if len(cs.Nodes) > 0 || len(cs.Learners) > 0 {
if len(voters) > 0 || len(learners) > 0 {
// TODO(bdarnell): the peers argument is always nil except in
// tests; the argument should be removed and these tests should be
// updated to specify their nodes through a snapshot.
panic("cannot specify both newRaft(peers) and ConfState.Nodes)")
}
for _, n := range cs.Nodes {
peers = append(peers, pb.Server{Node: n, Suffrage: pb.Voter})
}
}
if len(cs.Servers) > 0 {
if len(peers) > 0 {
panic("cannot specify both newRaft(peers) and ConfState.Servers)")
}
for _, s := range cs.Servers {
peers = append(peers, *s)
panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)")
}
voters = cs.Nodes
learners = cs.Learners
}
r := &raft{
id: c.ID,
lead: None,
suffrage: pb.Voter,
isLearner: false,
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxInflight: c.MaxInflightMsgs,
Expand All @@ -329,10 +320,16 @@ func newRaft(c *Config) *raft {
readOnly: newReadOnly(c.ReadOnlyOption),
disableProposalForwarding: c.DisableProposalForwarding,
}
for _, p := range peers {
r.prs[p.Node] = &Progress{Next: 1, ins: newInflights(r.maxInflight), Suffrage: p.Suffrage}
if r.id == p.Node {
r.suffrage = p.Suffrage
for _, n := range voters {
r.prs[n] = &Progress{Next: 1, ins: newInflights(r.maxInflight), isLearner: false}
}
for _, n := range learners {
if _, has := r.prs[n]; has {
panic(fmt.Sprintf("cannot specify both Voter and Learner for node: %x", n))
}
r.prs[n] = &Progress{Next: 1, ins: newInflights(r.maxInflight), isLearner: true}
if r.id == n {
r.isLearner = true
}
}
if !isHardStateEqual(hs, emptyState) {
Expand Down Expand Up @@ -369,7 +366,7 @@ func (r *raft) hardState() pb.HardState {
func (r *raft) voterCount() int {
count := 0
for _, p := range r.prs {
if p.Suffrage == pb.Voter {
if !p.isLearner {
count++
}
}
Expand Down Expand Up @@ -536,7 +533,7 @@ func (r *raft) maybeCommit() bool {
// TODO(bmizerany): optimize.. Currently naive
mis := make(uint64Slice, 0, r.voterCount())
for _, p := range r.prs {
if p.Suffrage == pb.Voter {
if !p.isLearner {
mis = append(mis, p.Match)
}
}
Expand All @@ -560,7 +557,7 @@ func (r *raft) reset(term uint64) {

r.votes = make(map[uint64]bool)
for id := range r.prs {
r.prs[id] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), Suffrage: r.prs[id].Suffrage}
r.prs[id] = &Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), isLearner: r.prs[id].isLearner}
if id == r.id {
r.prs[id].Match = r.raftLog.lastIndex()
}
Expand Down Expand Up @@ -708,7 +705,7 @@ func (r *raft) campaign(t CampaignType) {
if id == r.id {
continue
}
if r.prs[id].Suffrage != pb.Voter {
if r.prs[id].isLearner {
continue
}
r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
Expand Down Expand Up @@ -1025,8 +1022,8 @@ func stepLeader(r *raft, m pb.Message) {
}
r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
case pb.MsgTransferLeader:
if pr.Suffrage != pb.Voter {
r.logger.Debugf("%x is not Voter. Ignored transferring leadership", r.id)
if pr.isLearner {
r.logger.Debugf("%x is Learner. Ignored transferring leadership", r.id)
return
}
leadTransferee := m.From
Expand Down Expand Up @@ -1210,64 +1207,57 @@ func (r *raft) restore(s pb.Snapshot) bool {

r.raftLog.restore(s)
r.prs = make(map[uint64]*Progress)
if len(s.Metadata.ConfState.Nodes) > 0 && len(s.Metadata.ConfState.Servers) > 0 {
panic("cannot specify both ConfState.Nodes and ConfState.Servers")
}
for _, n := range s.Metadata.ConfState.Nodes {
r.restoreNode(s.Metadata.ConfState.Nodes, false)
r.restoreNode(s.Metadata.ConfState.Learners, true)
return true
}

func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
for _, n := range nodes {
match, next := uint64(0), r.raftLog.lastIndex()+1
if n == r.id {
match = next - 1
r.suffrage = pb.Voter
r.isLearner = isLearner
}
r.setProgress(n, match, next, pb.Voter)
r.setProgress(n, match, next, isLearner)
r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs[n])
}
for _, server := range s.Metadata.ConfState.Servers {
match, next := uint64(0), r.raftLog.lastIndex()+1
if server.Node == r.id {
match = next - 1
r.suffrage = server.Suffrage
}
r.setProgress(server.Node, match, next, server.Suffrage)
r.logger.Infof("%x restored progress of %x [%s]", r.id, server.Node, r.prs[server.Node])
}
return true
}

// promotable indicates whether state machine can be promoted to leader,
// which is true when its own id is in progress list and suffrage is Voter
// which is true when its own id is in progress list and its not learner.
func (r *raft) promotable() bool {
_, ok := r.prs[r.id]
return ok && r.suffrage == pb.Voter
return ok && !r.isLearner
}

func (r *raft) addNode(id uint64) {
r.addNodeWithSuffrage(id, pb.Voter)
r.addLearnerNode(id, false)
}

func (r *raft) addNonvoter(id uint64) {
r.addNodeWithSuffrage(id, pb.Nonvoter)
func (r *raft) addLearner(id uint64) {
r.addLearnerNode(id, true)
}

func (r *raft) addNodeWithSuffrage(id uint64, suffrage pb.SuffrageState) {
func (r *raft) addLearnerNode(id uint64, isLearner bool) {
r.pendingConf = false
if _, ok := r.prs[id]; ok {
if r.prs[id].Suffrage == suffrage {
if r.prs[id].isLearner == isLearner {
// Ignore any redundant addNode calls (which can happen because the
// initial bootstrapping entries are applied twice).
return
}
if suffrage != pb.Voter {
// addNode can only change suffrage from Nonvoter to Voter
if isLearner {
// can only change Learner to Voter
return
}
r.prs[id].Suffrage = suffrage
r.prs[id].isLearner = isLearner
} else {
r.setProgress(id, 0, r.raftLog.lastIndex()+1, suffrage)
r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
}

if r.id == id {
r.suffrage = suffrage
r.isLearner = isLearner
}
// When a node is first added, we should mark it as recently active.
// Otherwise, CheckQuorum may cause us to step down if it is invoked
Expand Down Expand Up @@ -1297,8 +1287,8 @@ func (r *raft) removeNode(id uint64) {

func (r *raft) resetPendingConf() { r.pendingConf = false }

func (r *raft) setProgress(id, match, next uint64, suffrage pb.SuffrageState) {
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), Suffrage: suffrage}
func (r *raft) setProgress(id, match, next uint64, isLearner bool) {
r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), isLearner: isLearner}
}

func (r *raft) delProgress(id uint64) {
Expand Down Expand Up @@ -1338,7 +1328,7 @@ func (r *raft) checkQuorumActive() bool {
continue
}

if r.prs[id].RecentActive && r.prs[id].Suffrage == pb.Voter {
if r.prs[id].RecentActive && !r.prs[id].isLearner {
act++
}

Expand Down
56 changes: 30 additions & 26 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,14 +350,14 @@ func testLeaderElection(t *testing.T, preVote bool) {

func TestNonvoterElectionTimeout(t *testing.T) {
n1 := newTestRaft(1, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
n1.prs[3].Suffrage = pb.Nonvoter
n1.prs[3].isLearner = true

n2 := newTestRaft(2, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
n2.prs[3].Suffrage = pb.Nonvoter
n2.prs[3].isLearner = true

n3 := newTestRaft(3, []uint64{1, 2, 3}, 10, 1, NewMemoryStorage())
n3.suffrage = pb.Nonvoter
n3.prs[3].Suffrage = pb.Nonvoter
n3.isLearner = true
n3.prs[3].isLearner = true

n1.becomeFollower(1, None)
n2.becomeFollower(1, None)
Expand Down Expand Up @@ -1106,7 +1106,7 @@ func TestCommit(t *testing.T) {

sm := newTestRaft(1, []uint64{1}, 5, 1, storage)
for j := 0; j < len(tt.matches); j++ {
sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, pb.Voter)
sm.setProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false)
}
sm.maybeCommit()
if g := sm.raftLog.committed; g != tt.w {
Expand Down Expand Up @@ -2377,13 +2377,9 @@ func TestRestore(t *testing.T) {
func TestRestoreWithNonvoter(t *testing.T) {
s := pb.Snapshot{
Metadata: pb.SnapshotMetadata{
Index: 11, // magic number
Term: 11, // magic number
ConfState: pb.ConfState{Servers: []*pb.Server{
{Node: 1, Suffrage: pb.Voter},
{Node: 2, Suffrage: pb.Voter},
{Node: 3, Suffrage: pb.Nonvoter},
}},
Index: 11, // magic number
Term: 11, // magic number
ConfState: pb.ConfState{Nodes: []uint64{1, 2}, Learners: []uint64{3}},
},
}

Expand All @@ -2399,9 +2395,18 @@ func TestRestoreWithNonvoter(t *testing.T) {
if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
}
for _, server := range s.Metadata.ConfState.Servers {
if sm.prs[server.Node].Suffrage != server.Suffrage {
t.Errorf("sm.Node %x suffrage = %s, want %s", server.Node, sm.prs[server.Node], server.Suffrage)
sg := sm.nodes()
if len(sg) != len(s.Metadata.ConfState.Nodes)+len(s.Metadata.ConfState.Learners) {
t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState)
}
for _, n := range s.Metadata.ConfState.Nodes {
if sm.prs[n].isLearner {
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs[n], true)
}
}
for _, n := range s.Metadata.ConfState.Learners {
if !sm.prs[n].isLearner {
t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs[n], false)
}
}

Expand Down Expand Up @@ -2656,7 +2661,7 @@ func TestAddNode(t *testing.T) {
func TestAddNonvoter(t *testing.T) {
r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
r.pendingConf = true
r.addNonvoter(2)
r.addLearner(2)
if r.pendingConf {
t.Errorf("pendingConf = %v, want false", r.pendingConf)
}
Expand All @@ -2665,8 +2670,8 @@ func TestAddNonvoter(t *testing.T) {
if !reflect.DeepEqual(nodes, wnodes) {
t.Errorf("nodes = %v, want %v", nodes, wnodes)
}
if r.prs[2].Suffrage != pb.Nonvoter {
t.Fatalf("node 2 has suffrage %s, want %s", r.prs[2].Suffrage, pb.Nonvoter)
if !r.prs[2].isLearner {
t.Fatalf("node 2 has suffrage %t, want %t", r.prs[2].isLearner, true)
}
}

Expand Down Expand Up @@ -3439,14 +3444,17 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
sm := newRaft(cfg)
npeers[id] = sm
case *raft:
suffrages := make(map[uint64]pb.SuffrageState, size)
learners := make(map[uint64]bool, 0)
for i, pr := range p.(*raft).prs {
suffrages[i] = pr.Suffrage
if pr.isLearner {
learners[i] = true
}
}
v.id = id
v.prs = make(map[uint64]*Progress)
for i := 0; i < size; i++ {
v.prs[peerAddrs[i]] = &Progress{Suffrage: suffrages[peerAddrs[i]]}
_, isLearner := learners[peerAddrs[i]]
v.prs[peerAddrs[i]] = &Progress{isLearner: isLearner}
}
v.reset(v.Term)
npeers[id] = v
Expand Down Expand Up @@ -3553,13 +3561,9 @@ func setRandomizedElectionTimeout(r *raft, v int) {
}

func newTestConfig(id uint64, peers []uint64, election, heartbeat int, storage Storage) *Config {
new_peers := make([]pb.Server, 0)
for _, p := range peers {
new_peers = append(new_peers, pb.Server{Node: p, Suffrage: pb.Voter})
}
return &Config{
ID: id,
peers: new_peers,
peers: peers,
ElectionTick: election,
HeartbeatTick: heartbeat,
Storage: storage,
Expand Down
Loading

0 comments on commit b159733

Please sign in to comment.