diff --git a/log.go b/log.go index 3cad705a..db22740b 100644 --- a/log.go +++ b/log.go @@ -107,24 +107,25 @@ func (l *raftLog) String() string { // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, // it returns (last index of new entries, true). func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { - if l.matchTerm(index, logTerm) { - lastnewi = index + uint64(len(ents)) - ci := l.findConflict(ents) - switch { - case ci == 0: - case ci <= l.committed: - l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) - default: - offset := index + 1 - if ci-offset > uint64(len(ents)) { - l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents)) - } - l.append(ents[ci-offset:]...) + if !l.matchTerm(index, logTerm) { + return 0, false + } + + lastnewi = index + uint64(len(ents)) + ci := l.findConflict(ents) + switch { + case ci == 0: + case ci <= l.committed: + l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) + default: + offset := index + 1 + if ci-offset > uint64(len(ents)) { + l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents)) } - l.commitTo(min(committed, lastnewi)) - return lastnewi, true + l.append(ents[ci-offset:]...) } - return 0, false + l.commitTo(min(committed, lastnewi)) + return lastnewi, true } func (l *raftLog) append(ents ...pb.Entry) uint64 { @@ -161,34 +162,31 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 { return 0 } -// findConflictByTerm takes an (index, term) pair (indicating a conflicting log -// entry on a leader/follower during an append) and finds the largest index in -// log l with a term <= `term` and an index <= `index`. If no such index exists -// in the log, the log's first index is returned. +// findConflictByTerm returns a best guess on where this log ends matching +// another log, given that the only information known about the other log is the +// (index, term) of its single entry. +// +// Specifically, the first returned value is the max guessIndex <= index, such +// that term(guessIndex) <= term or term(guessIndex) is not known (because this +// index is compacted or not yet stored). // -// The index provided MUST be equal to or less than l.lastIndex(). Invalid -// inputs log a warning and the input index is returned. -func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 { - if li := l.lastIndex(); index > li { - // NB: such calls should not exist, but since there is a straightfoward - // way to recover, do it. - // - // It is tempting to also check something about the first index, but - // there is odd behavior with peers that have no log, in which case - // lastIndex will return zero and firstIndex will return one, which - // leads to calls with an index of zero into this method. - l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm", - index, li) - return index - } - for { - logTerm, err := l.term(index) - if logTerm <= term || err != nil { - break +// The second returned value is the term(guessIndex), or 0 if it is unknown. +// +// This function is used by a follower and leader to resolve log conflicts after +// an unsuccessful append to a follower, and ultimately restore the steady flow +// of appends. +func (l *raftLog) findConflictByTerm(index uint64, term uint64) (uint64, uint64) { + for ; index > 0; index-- { + // If there is an error (likely ErrCompacted or ErrUnavailable), we don't + // know whether it's a match or not, so assume a possible match and return + // the index, with 0 term indicating an unknown term. + if ourTerm, err := l.term(index); err != nil { + return index, 0 + } else if ourTerm <= term { + return index, ourTerm } - index-- } - return index + return 0, 0 } // nextUnstableEnts returns all entries that are available to be written to the @@ -447,7 +445,10 @@ func (l *raftLog) matchTerm(i, term uint64) bool { } func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { - if maxIndex > l.committed && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term { + // NB: term should never be 0 on a commit because the leader campaigns at + // least at term 1. But if it is 0 for some reason, we don't want to consider + // this a term match in case zeroTermOnOutOfBounds returns 0. + if maxIndex > l.committed && term != 0 && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true } diff --git a/log_test.go b/log_test.go index 355283ea..14f4e9bb 100644 --- a/log_test.go +++ b/log_test.go @@ -55,6 +55,63 @@ func TestFindConflict(t *testing.T) { } } +func TestFindConflictByTerm(t *testing.T) { + ents := func(fromIndex uint64, terms []uint64) []pb.Entry { + e := make([]pb.Entry, 0, len(terms)) + for i, term := range terms { + e = append(e, pb.Entry{Term: term, Index: fromIndex + uint64(i)}) + } + return e + } + for _, tt := range []struct { + ents []pb.Entry // ents[0] contains the (index, term) of the snapshot + index uint64 + term uint64 + want uint64 + }{ + // Log starts from index 1. + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 100, term: 2, want: 100}, // ErrUnavailable + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 6, want: 5}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 5, want: 5}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 4, want: 2}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 2, want: 2}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 1, want: 0}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 1, term: 2, want: 1}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 1, term: 1, want: 0}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 0, term: 0, want: 0}, + // Log with compacted entries. + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 30, term: 3, want: 30}, // ErrUnavailable + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 9, want: 14}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 4, want: 14}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 3, want: 12}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 2, want: 9}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 11, term: 5, want: 11}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 10, term: 5, want: 10}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 10, term: 3, want: 10}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 10, term: 2, want: 9}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 9, term: 2, want: 9}, // ErrCompacted + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 4, term: 2, want: 4}, // ErrCompacted + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 0, term: 0, want: 0}, // ErrCompacted + } { + t.Run("", func(t *testing.T) { + st := NewMemoryStorage() + require.NotEmpty(t, tt.ents) + st.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{ + Index: tt.ents[0].Index, + Term: tt.ents[0].Term, + }}) + l := newLog(st, raftLogger) + l.append(tt.ents[1:]...) + + index, term := l.findConflictByTerm(tt.index, tt.term) + require.Equal(t, tt.want, index) + wantTerm, err := l.term(index) + wantTerm = l.zeroTermOnOutOfBounds(wantTerm, err) + require.Equal(t, wantTerm, term) + }) + } +} + func TestIsUpToDate(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}} raftLog := newLog(NewMemoryStorage(), raftLogger) diff --git a/raft.go b/raft.go index 1c0845e5..d1048294 100644 --- a/raft.go +++ b/raft.go @@ -1394,7 +1394,7 @@ func stepLeader(r *raft, m pb.Message) error { // 7, the rejection points it at the end of the follower's log // which is at a higher log term than the actually committed // log. - nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm) + nextProbeIdx, _ = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm) } if pr.MaybeDecrTo(m.Index, nextProbeIdx) { r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) @@ -1648,37 +1648,39 @@ func (r *raft) handleAppendEntries(m pb.Message) { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) return } - if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) - } else { - r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", - r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) - - // Return a hint to the leader about the maximum index and term that the - // two logs could be divergent at. Do this by searching through the - // follower's log for the maximum (index, term) pair with a term <= the - // MsgApp's LogTerm and an index <= the MsgApp's Index. This can help - // skip all indexes in the follower's uncommitted tail with terms - // greater than the MsgApp's LogTerm. - // - // See the other caller for findConflictByTerm (in stepLeader) for a much - // more detailed explanation of this mechanism. - hintIndex := min(m.Index, r.raftLog.lastIndex()) - hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) - hintTerm, err := r.raftLog.term(hintIndex) - if err != nil { - panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err)) - } - r.send(pb.Message{ - To: m.From, - Type: pb.MsgAppResp, - Index: m.Index, - Reject: true, - RejectHint: hintIndex, - LogTerm: hintTerm, - }) + return } + r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", + r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) + + // Our log does not match the leader's at index m.Index. Return a hint to the + // leader - a guess on the maximal (index, term) at which the logs match. Do + // this by searching through the follower's log for the maximum (index, term) + // pair with a term <= the MsgApp's LogTerm and an index <= the MsgApp's + // Index. This can help skip all indexes in the follower's uncommitted tail + // with terms greater than the MsgApp's LogTerm. + // + // See the other caller for findConflictByTerm (in stepLeader) for a much more + // detailed explanation of this mechanism. + + // NB: m.Index >= raftLog.committed by now (see the early return above), and + // raftLog.lastIndex() >= raftLog.committed by invariant, so min of the two is + // also >= raftLog.committed. Hence, the findConflictByTerm argument is within + // the valid interval, which then will return a valid (index, term) pair with + // a non-zero term (unless the log is empty). However, it is safe to send a zero + // LogTerm in this response in any case, so we don't verify it here. + hintIndex := min(m.Index, r.raftLog.lastIndex()) + hintIndex, hintTerm := r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) + r.send(pb.Message{ + To: m.From, + Type: pb.MsgAppResp, + Index: m.Index, + Reject: true, + RejectHint: hintIndex, + LogTerm: hintTerm, + }) } func (r *raft) handleHeartbeat(m pb.Message) { @@ -1909,6 +1911,8 @@ func (r *raft) abortLeaderTransfer() { // committedEntryInCurrentTerm return true if the peer has committed an entry in its term. func (r *raft) committedEntryInCurrentTerm() bool { + // NB: r.Term is never 0 on a leader, so if zeroTermOnOutOfBounds returns 0, + // we won't see it as a match with r.Term. return r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(r.raftLog.committed)) == r.Term } diff --git a/raft_test.go b/raft_test.go index 65a61694..5637c4a4 100644 --- a/raft_test.go +++ b/raft_test.go @@ -23,6 +23,8 @@ import ( "strings" "testing" + "github.com/stretchr/testify/require" + pb "go.etcd.io/raft/v3/raftpb" "go.etcd.io/raft/v3/tracker" ) @@ -4492,6 +4494,7 @@ func TestFastLogRejection(t *testing.T) { tests := []struct { leaderLog []pb.Entry // Logs on the leader followerLog []pb.Entry // Logs on the follower + followerCompact uint64 // Index at which the follower log is compacted. rejectHintTerm uint64 // Expected term included in rejected MsgAppResp. rejectHintIndex uint64 // Expected index included in rejected MsgAppResp. nextAppendTerm uint64 // Expected term when leader appends after rejected. @@ -4696,79 +4699,90 @@ func TestFastLogRejection(t *testing.T) { {Term: 4, Index: 7}, {Term: 4, Index: 8}, }, - nextAppendTerm: 2, - nextAppendIndex: 1, rejectHintTerm: 2, rejectHintIndex: 1, + nextAppendTerm: 2, + nextAppendIndex: 1, + }, + // A case when a stale MsgApp from leader arrives after the corresponding + // log index got compacted. + // A stale (type=MsgApp,index=3,logTerm=3,entries=[(term=3,index=4)]) is + // delivered to a follower who has already compacted beyond log index 3. The + // MsgAppResp rejection will return same index=3, with logTerm=0. The leader + // will rollback by one entry, and send MsgApp with index=2,logTerm=1. + { + leaderLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2}, + {Term: 3, Index: 3}, + }, + followerLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2}, + {Term: 3, Index: 3}, + {Term: 3, Index: 4}, + {Term: 3, Index: 5}, // <- this entry and below are compacted + }, + followerCompact: 5, + rejectHintTerm: 0, + rejectHintIndex: 3, + nextAppendTerm: 1, + nextAppendIndex: 2, }, } - for i, test := range tests { + for _, test := range tests { t.Run("", func(t *testing.T) { s1 := NewMemoryStorage() s1.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}} s1.Append(test.leaderLog) + last := test.leaderLog[len(test.leaderLog)-1] + s1.SetHardState(pb.HardState{ + Term: last.Term - 1, + Commit: last.Index, + }) + n1 := newTestRaft(1, 10, 1, s1) + n1.becomeCandidate() // bumps Term to last.Term + n1.becomeLeader() + s2 := NewMemoryStorage() s2.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}} s2.Append(test.followerLog) - - n1 := newTestRaft(1, 10, 1, s1) + s2.SetHardState(pb.HardState{ + Term: last.Term, + Vote: 1, + Commit: 0, + }) n2 := newTestRaft(2, 10, 1, s2) + if test.followerCompact != 0 { + s2.Compact(test.followerCompact) + // NB: the state of n2 after this compaction isn't realistic because the + // commit index is still at 0. We do this to exercise a "doesn't happen" + // edge case behaviour, in case it still does happen in some other way. + } - n1.becomeCandidate() - n1.becomeLeader() - - n2.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHeartbeat}) - + require.NoError(t, n2.Step(pb.Message{From: 1, To: 2, Type: pb.MsgHeartbeat})) msgs := n2.readMessages() - if len(msgs) != 1 { - t.Errorf("can't read 1 message from peer 2") - } - if msgs[0].Type != pb.MsgHeartbeatResp { - t.Errorf("can't read heartbeat response from peer 2") - } - if n1.Step(msgs[0]) != nil { - t.Errorf("peer 1 step heartbeat response fail") - } + require.Len(t, msgs, 1, "can't read 1 message from peer 2") + require.Equal(t, pb.MsgHeartbeatResp, msgs[0].Type) + require.NoError(t, n1.Step(msgs[0])) msgs = n1.readMessages() - if len(msgs) != 1 { - t.Errorf("can't read 1 message from peer 1") - } - if msgs[0].Type != pb.MsgApp { - t.Errorf("can't read append from peer 1") - } + require.Len(t, msgs, 1, "can't read 1 message from peer 1") + require.Equal(t, pb.MsgApp, msgs[0].Type) - if n2.Step(msgs[0]) != nil { - t.Errorf("peer 2 step append fail") - } + require.NoError(t, n2.Step(msgs[0]), "peer 2 step append fail") msgs = n2.readMessages() - if len(msgs) != 1 { - t.Errorf("can't read 1 message from peer 2") - } - if msgs[0].Type != pb.MsgAppResp { - t.Errorf("can't read append response from peer 2") - } - if !msgs[0].Reject { - t.Errorf("expected rejected append response from peer 2") - } - if msgs[0].LogTerm != test.rejectHintTerm { - t.Fatalf("#%d expected hint log term = %d, but got %d", i, test.rejectHintTerm, msgs[0].LogTerm) - } - if msgs[0].RejectHint != test.rejectHintIndex { - t.Fatalf("#%d expected hint index = %d, but got %d", i, test.rejectHintIndex, msgs[0].RejectHint) - } + require.Len(t, msgs, 1, "can't read 1 message from peer 2") + require.Equal(t, pb.MsgAppResp, msgs[0].Type) + require.True(t, msgs[0].Reject, "expected rejected append response from peer 2") + require.Equal(t, test.rejectHintTerm, msgs[0].LogTerm, "hint log term mismatch") + require.Equal(t, test.rejectHintIndex, msgs[0].RejectHint, "hint log index mismatch") - if n1.Step(msgs[0]) != nil { - t.Errorf("peer 1 step append fail") - } + require.NoError(t, n1.Step(msgs[0]), "peer 1 step append fail") msgs = n1.readMessages() - if msgs[0].LogTerm != test.nextAppendTerm { - t.Fatalf("#%d expected log term = %d, but got %d", i, test.nextAppendTerm, msgs[0].LogTerm) - } - if msgs[0].Index != test.nextAppendIndex { - t.Fatalf("#%d expected index = %d, but got %d", i, test.nextAppendIndex, msgs[0].Index) - } + require.Equal(t, test.nextAppendTerm, msgs[0].LogTerm) + require.Equal(t, test.nextAppendIndex, msgs[0].Index) }) } }