Skip to content

Commit

Permalink
Merge pull request #36 from pavelkalinnikov/protect-from-panic
Browse files Browse the repository at this point in the history
  • Loading branch information
tbg committed Mar 15, 2023
2 parents f261aca + 0c22de0 commit 5fe1c31
Show file tree
Hide file tree
Showing 4 changed files with 199 additions and 123 deletions.
85 changes: 43 additions & 42 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,25 @@ func (l *raftLog) String() string {
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if l.matchTerm(index, logTerm) {
lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
if ci-offset > uint64(len(ents)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents))
}
l.append(ents[ci-offset:]...)
if !l.matchTerm(index, logTerm) {
return 0, false
}

lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
if ci-offset > uint64(len(ents)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents))
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
l.append(ents[ci-offset:]...)
}
return 0, false
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}

func (l *raftLog) append(ents ...pb.Entry) uint64 {
Expand Down Expand Up @@ -161,34 +162,31 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
return 0
}

// findConflictByTerm takes an (index, term) pair (indicating a conflicting log
// entry on a leader/follower during an append) and finds the largest index in
// log l with a term <= `term` and an index <= `index`. If no such index exists
// in the log, the log's first index is returned.
// findConflictByTerm returns a best guess on where this log ends matching
// another log, given that the only information known about the other log is the
// (index, term) of its single entry.
//
// Specifically, the first returned value is the max guessIndex <= index, such
// that term(guessIndex) <= term or term(guessIndex) is not known (because this
// index is compacted or not yet stored).
//
// The index provided MUST be equal to or less than l.lastIndex(). Invalid
// inputs log a warning and the input index is returned.
func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 {
if li := l.lastIndex(); index > li {
// NB: such calls should not exist, but since there is a straightfoward
// way to recover, do it.
//
// It is tempting to also check something about the first index, but
// there is odd behavior with peers that have no log, in which case
// lastIndex will return zero and firstIndex will return one, which
// leads to calls with an index of zero into this method.
l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm",
index, li)
return index
}
for {
logTerm, err := l.term(index)
if logTerm <= term || err != nil {
break
// The second returned value is the term(guessIndex), or 0 if it is unknown.
//
// This function is used by a follower and leader to resolve log conflicts after
// an unsuccessful append to a follower, and ultimately restore the steady flow
// of appends.
func (l *raftLog) findConflictByTerm(index uint64, term uint64) (uint64, uint64) {
for ; index > 0; index-- {
// If there is an error (likely ErrCompacted or ErrUnavailable), we don't
// know whether it's a match or not, so assume a possible match and return
// the index, with 0 term indicating an unknown term.
if ourTerm, err := l.term(index); err != nil {
return index, 0
} else if ourTerm <= term {
return index, ourTerm
}
index--
}
return index
return 0, 0
}

// nextUnstableEnts returns all entries that are available to be written to the
Expand Down Expand Up @@ -447,7 +445,10 @@ func (l *raftLog) matchTerm(i, term uint64) bool {
}

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
if maxIndex > l.committed && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term {
// NB: term should never be 0 on a commit because the leader campaigns at
// least at term 1. But if it is 0 for some reason, we don't want to consider
// this a term match in case zeroTermOnOutOfBounds returns 0.
if maxIndex > l.committed && term != 0 && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term {
l.commitTo(maxIndex)
return true
}
Expand Down
57 changes: 57 additions & 0 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,63 @@ func TestFindConflict(t *testing.T) {
}
}

func TestFindConflictByTerm(t *testing.T) {
ents := func(fromIndex uint64, terms []uint64) []pb.Entry {
e := make([]pb.Entry, 0, len(terms))
for i, term := range terms {
e = append(e, pb.Entry{Term: term, Index: fromIndex + uint64(i)})
}
return e
}
for _, tt := range []struct {
ents []pb.Entry // ents[0] contains the (index, term) of the snapshot
index uint64
term uint64
want uint64
}{
// Log starts from index 1.
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 100, term: 2, want: 100}, // ErrUnavailable
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 6, want: 5},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 5, want: 5},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 4, want: 2},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 2, want: 2},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 1, want: 0},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 1, term: 2, want: 1},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 1, term: 1, want: 0},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 0, term: 0, want: 0},
// Log with compacted entries.
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 30, term: 3, want: 30}, // ErrUnavailable
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 9, want: 14},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 4, want: 14},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 3, want: 12},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 2, want: 9},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 11, term: 5, want: 11},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 10, term: 5, want: 10},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 10, term: 3, want: 10},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 10, term: 2, want: 9},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 9, term: 2, want: 9}, // ErrCompacted
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 4, term: 2, want: 4}, // ErrCompacted
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 0, term: 0, want: 0}, // ErrCompacted
} {
t.Run("", func(t *testing.T) {
st := NewMemoryStorage()
require.NotEmpty(t, tt.ents)
st.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{
Index: tt.ents[0].Index,
Term: tt.ents[0].Term,
}})
l := newLog(st, raftLogger)
l.append(tt.ents[1:]...)

index, term := l.findConflictByTerm(tt.index, tt.term)
require.Equal(t, tt.want, index)
wantTerm, err := l.term(index)
wantTerm = l.zeroTermOnOutOfBounds(wantTerm, err)
require.Equal(t, wantTerm, term)
})
}
}

func TestIsUpToDate(t *testing.T) {
previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}
raftLog := newLog(NewMemoryStorage(), raftLogger)
Expand Down
62 changes: 33 additions & 29 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,7 @@ func stepLeader(r *raft, m pb.Message) error {
// 7, the rejection points it at the end of the follower's log
// which is at a higher log term than the actually committed
// log.
nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
nextProbeIdx, _ = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
}
if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
Expand Down Expand Up @@ -1648,37 +1648,39 @@ func (r *raft) handleAppendEntries(m pb.Message) {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}

if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)

// Return a hint to the leader about the maximum index and term that the
// two logs could be divergent at. Do this by searching through the
// follower's log for the maximum (index, term) pair with a term <= the
// MsgApp's LogTerm and an index <= the MsgApp's Index. This can help
// skip all indexes in the follower's uncommitted tail with terms
// greater than the MsgApp's LogTerm.
//
// See the other caller for findConflictByTerm (in stepLeader) for a much
// more detailed explanation of this mechanism.
hintIndex := min(m.Index, r.raftLog.lastIndex())
hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
hintTerm, err := r.raftLog.term(hintIndex)
if err != nil {
panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))
}
r.send(pb.Message{
To: m.From,
Type: pb.MsgAppResp,
Index: m.Index,
Reject: true,
RejectHint: hintIndex,
LogTerm: hintTerm,
})
return
}
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)

// Our log does not match the leader's at index m.Index. Return a hint to the
// leader - a guess on the maximal (index, term) at which the logs match. Do
// this by searching through the follower's log for the maximum (index, term)
// pair with a term <= the MsgApp's LogTerm and an index <= the MsgApp's
// Index. This can help skip all indexes in the follower's uncommitted tail
// with terms greater than the MsgApp's LogTerm.
//
// See the other caller for findConflictByTerm (in stepLeader) for a much more
// detailed explanation of this mechanism.

// NB: m.Index >= raftLog.committed by now (see the early return above), and
// raftLog.lastIndex() >= raftLog.committed by invariant, so min of the two is
// also >= raftLog.committed. Hence, the findConflictByTerm argument is within
// the valid interval, which then will return a valid (index, term) pair with
// a non-zero term (unless the log is empty). However, it is safe to send a zero
// LogTerm in this response in any case, so we don't verify it here.
hintIndex := min(m.Index, r.raftLog.lastIndex())
hintIndex, hintTerm := r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
r.send(pb.Message{
To: m.From,
Type: pb.MsgAppResp,
Index: m.Index,
Reject: true,
RejectHint: hintIndex,
LogTerm: hintTerm,
})
}

func (r *raft) handleHeartbeat(m pb.Message) {
Expand Down Expand Up @@ -1909,6 +1911,8 @@ func (r *raft) abortLeaderTransfer() {

// committedEntryInCurrentTerm return true if the peer has committed an entry in its term.
func (r *raft) committedEntryInCurrentTerm() bool {
// NB: r.Term is never 0 on a leader, so if zeroTermOnOutOfBounds returns 0,
// we won't see it as a match with r.Term.
return r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(r.raftLog.committed)) == r.Term
}

Expand Down
Loading

0 comments on commit 5fe1c31

Please sign in to comment.