Skip to content

Commit

Permalink
Index/Term's in raft state should be locked/get/set as a single thing…
Browse files Browse the repository at this point in the history
…. fixes Issue hashicorp#84 item 1
  • Loading branch information
superfell committed Feb 26, 2016
1 parent 057b893 commit ac7d930
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 59 deletions.
37 changes: 17 additions & 20 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna

// Restore the current term and the last log
r.setCurrentTerm(currentTerm)
r.setLastLogIndex(lastLog.Index)
r.setLastLogTerm(lastLog.Term)
r.setLastLog(lastLog.Index, lastLog.Term)

// Attempt to restore a snapshot if there are any
if err := r.restoreSnapshot(); err != nil {
Expand Down Expand Up @@ -463,16 +462,18 @@ func (r *Raft) Stats() map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
lastLogIndex, lastLogTerm := r.getLastLog()
lastSnapIndex, lastSnapTerm := r.getLastSnapshot()
s := map[string]string{
"state": r.getState().String(),
"term": toString(r.getCurrentTerm()),
"last_log_index": toString(r.getLastLogIndex()),
"last_log_term": toString(r.getLastLogTerm()),
"last_log_index": toString(lastLogIndex),
"last_log_term": toString(lastLogTerm),
"commit_index": toString(r.getCommitIndex()),
"applied_index": toString(r.getLastApplied()),
"fsm_pending": toString(uint64(len(r.fsmCommitCh))),
"last_snapshot_index": toString(r.getLastSnapshotIndex()),
"last_snapshot_term": toString(r.getLastSnapshotTerm()),
"last_snapshot_index": toString(lastSnapIndex),
"last_snapshot_term": toString(lastSnapTerm),
"num_peers": toString(uint64(len(r.peers))),
}
last := r.LastContact()
Expand Down Expand Up @@ -1125,8 +1126,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
r.leaderState.inflight.StartAll(applyLogs)

// Update the last log since it's on disk now
r.setLastLogIndex(lastIndex + uint64(len(applyLogs)))
r.setLastLogTerm(term)
r.setLastLog(lastIndex+uint64(len(applyLogs)), term)

// Notify the replicators of the new log
for _, f := range r.leaderState.replState {
Expand Down Expand Up @@ -1367,7 +1367,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
last := a.Entries[n-1]

// Delete any conflicting entries
lastLogIdx := r.getLastLogIndex()
lastLogIdx, _ := r.getLastLog()
if first.Index <= lastLogIdx {
r.logger.Printf("[WARN] raft: Clearing log suffix from %d to %d", first.Index, lastLogIdx)
if err := r.logs.DeleteRange(first.Index, lastLogIdx); err != nil {
Expand All @@ -1383,8 +1383,7 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
}

// Update the lastLog
r.setLastLogIndex(last.Index)
r.setLastLogTerm(last.Term)
r.setLastLog(last.Index, last.Term)
metrics.MeasureSince([]string{"raft", "rpc", "appendEntries", "storeLogs"}, start)
}

Expand Down Expand Up @@ -1569,8 +1568,7 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
r.setLastApplied(req.LastLogIndex)

// Update the last stable snapshot info
r.setLastSnapshotIndex(req.LastLogIndex)
r.setLastSnapshotTerm(req.LastLogTerm)
r.setLastSnapshot(req.LastLogIndex, req.LastLogTerm)

// Restore the peer set
peers := decodePeers(req.Peers, r.trans)
Expand Down Expand Up @@ -1732,7 +1730,7 @@ func (r *Raft) runSnapshots() {
// a new snapshot.
func (r *Raft) shouldSnapshot() bool {
// Check the last snapshot index
lastSnap := r.getLastSnapshotIndex()
lastSnap, _ := r.getLastSnapshot()

// Check the last log index
lastIdx, err := r.logs.LastIndex()
Expand Down Expand Up @@ -1797,8 +1795,7 @@ func (r *Raft) takeSnapshot() error {
}

// Update the last stable snapshot info
r.setLastSnapshotIndex(req.index)
r.setLastSnapshotTerm(req.term)
r.setLastSnapshot(req.index, req.term)

// Compact the logs
if err := r.compactLogs(req.index); err != nil {
Expand All @@ -1821,15 +1818,16 @@ func (r *Raft) compactLogs(snapIdx uint64) error {
}

// Check if we have enough logs to truncate
if r.getLastLogIndex() <= r.conf.TrailingLogs {
lastLogIdx, _ := r.getLastLog()
if lastLogIdx <= r.conf.TrailingLogs {
return nil
}

// Truncate up to the end of the snapshot, or `TrailingLogs`
// back from the head, which ever is further back. This ensures
// at least `TrailingLogs` entries, but does not allow logs
// after the snapshot to be removed.
maxLog := min(snapIdx, r.getLastLogIndex()-r.conf.TrailingLogs)
maxLog := min(snapIdx, lastLogIdx-r.conf.TrailingLogs)

// Log this
r.logger.Printf("[INFO] raft: Compacting logs from %d to %d", minLog, maxLog)
Expand Down Expand Up @@ -1872,8 +1870,7 @@ func (r *Raft) restoreSnapshot() error {
r.setLastApplied(snapshot.Index)

// Update the last stable snapshot info
r.setLastSnapshotIndex(snapshot.Index)
r.setLastSnapshotTerm(snapshot.Term)
r.setLastSnapshot(snapshot.Index, snapshot.Term)

// Success!
return nil
Expand Down
15 changes: 8 additions & 7 deletions replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,9 @@ RPC:
}
return
case <-s.triggerCh:
shouldStop = r.replicateTo(s, r.getLastLogIndex())
shouldStop = r.replicateTo(s, r.getLastLogIndexOnly())
case <-randomTimeout(r.conf.CommitTimeout):
shouldStop = r.replicateTo(s, r.getLastLogIndex())
shouldStop = r.replicateTo(s, r.getLastLogIndexOnly())
}

// If things looks healthy, switch to pipeline mode
Expand Down Expand Up @@ -358,9 +358,9 @@ SEND:
}
break SEND
case <-s.triggerCh:
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, r.getLastLogIndex())
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, r.getLastLogIndexOnly())
case <-randomTimeout(r.conf.CommitTimeout):
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, r.getLastLogIndex())
shouldStop = r.pipelineSend(s, pipeline, &nextIndex, r.getLastLogIndexOnly())
}
}

Expand Down Expand Up @@ -446,13 +446,14 @@ func (r *Raft) setupAppendEntries(s *followerReplication, req *AppendEntriesRequ
func (r *Raft) setPreviousLog(req *AppendEntriesRequest, nextIndex uint64) error {
// Guard for the first index, since there is no 0 log entry
// Guard against the previous index being a snapshot as well
lastSnapIdx, lastSnapTerm := r.getLastSnapshot()
if nextIndex == 1 {
req.PrevLogEntry = 0
req.PrevLogTerm = 0

} else if (nextIndex - 1) == r.getLastSnapshotIndex() {
req.PrevLogEntry = r.getLastSnapshotIndex()
req.PrevLogTerm = r.getLastSnapshotTerm()
} else if (nextIndex - 1) == lastSnapIdx {
req.PrevLogEntry = lastSnapIdx
req.PrevLogTerm = lastSnapTerm

} else {
var l Log
Expand Down
75 changes: 43 additions & 32 deletions state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package raft

import (
"sync"
"sync/atomic"
)

Expand Down Expand Up @@ -44,20 +45,23 @@ type raftState struct {
// The current term, cache of StableStore
currentTerm uint64

// Cache the latest log from LogStore
LastLogIndex uint64
LastLogTerm uint64

// Highest committed log entry
commitIndex uint64

// Last applied log to the FSM
lastApplied uint64

// protects 4 next fields
lastLock sync.Mutex

// Cache the latest snapshot index/term
lastSnapshotIndex uint64
lastSnapshotTerm uint64

// Cache the latest log from LogStore
lastLogIndex uint64
lastLogTerm uint64

// Tracks the number of live routines
runningRoutines int32

Expand All @@ -83,20 +87,39 @@ func (r *raftState) setCurrentTerm(term uint64) {
atomic.StoreUint64(&r.currentTerm, term)
}

func (r *raftState) getLastLogIndex() uint64 {
return atomic.LoadUint64(&r.LastLogIndex)
func (r *raftState) getLastLog() (index, term uint64) {
r.lastLock.Lock()
index = r.lastLogIndex
term = r.lastLogTerm
r.lastLock.Unlock()
return
}

func (r *raftState) getLastLogIndexOnly() uint64 {
i, _ := r.getLastLog()
return i
}

func (r *raftState) setLastLogIndex(term uint64) {
atomic.StoreUint64(&r.LastLogIndex, term)
func (r *raftState) setLastLog(index, term uint64) {
r.lastLock.Lock()
r.lastLogIndex = index
r.lastLogTerm = term
r.lastLock.Unlock()
}

func (r *raftState) getLastLogTerm() uint64 {
return atomic.LoadUint64(&r.LastLogTerm)
func (r *raftState) getLastSnapshot() (index, term uint64) {
r.lastLock.Lock()
index = r.lastSnapshotIndex
term = r.lastSnapshotTerm
r.lastLock.Unlock()
return
}

func (r *raftState) setLastLogTerm(term uint64) {
atomic.StoreUint64(&r.LastLogTerm, term)
func (r *raftState) setLastSnapshot(index, term uint64) {
r.lastLock.Lock()
r.lastSnapshotIndex = index
r.lastSnapshotTerm = term
r.lastLock.Unlock()
}

func (r *raftState) getCommitIndex() uint64 {
Expand All @@ -115,22 +138,6 @@ func (r *raftState) setLastApplied(term uint64) {
atomic.StoreUint64(&r.lastApplied, term)
}

func (r *raftState) getLastSnapshotIndex() uint64 {
return atomic.LoadUint64(&r.lastSnapshotIndex)
}

func (r *raftState) setLastSnapshotIndex(term uint64) {
atomic.StoreUint64(&r.lastSnapshotIndex, term)
}

func (r *raftState) getLastSnapshotTerm() uint64 {
return atomic.LoadUint64(&r.lastSnapshotTerm)
}

func (r *raftState) setLastSnapshotTerm(term uint64) {
atomic.StoreUint64(&r.lastSnapshotTerm, term)
}

func (r *raftState) incrRoutines() {
atomic.AddInt32(&r.runningRoutines, 1)
}
Expand All @@ -156,14 +163,18 @@ func (r *raftState) goFunc(f func()) {
// getLastIndex returns the last index in stable storage.
// Either from the last log or from the last snapshot.
func (r *raftState) getLastIndex() uint64 {
return max(r.getLastLogIndex(), r.getLastSnapshotIndex())
r.lastLock.Lock()
defer r.lastLock.Unlock()
return max(r.lastLogIndex, r.lastSnapshotIndex)
}

// getLastEntry returns the last index and term in stable storage.
// Either from the last log or from the last snapshot.
func (r *raftState) getLastEntry() (uint64, uint64) {
if r.getLastLogIndex() >= r.getLastSnapshotIndex() {
return r.getLastLogIndex(), r.getLastLogTerm()
r.lastLock.Lock()
defer r.lastLock.Unlock()
if r.lastLogIndex >= r.lastSnapshotIndex {
return r.lastLogIndex, r.lastLogTerm
}
return r.getLastSnapshotIndex(), r.getLastSnapshotTerm()
return r.lastSnapshotIndex, r.lastSnapshotTerm
}

0 comments on commit ac7d930

Please sign in to comment.