From 0be90300e66c2d1d697fb69436ef8fdb702c2fa6 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 15 Mar 2023 11:19:05 +0000 Subject: [PATCH 1/6] raft: add unit test for raftLog.findConflictByTerm Signed-off-by: Pavel Kalinnikov --- log_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/log_test.go b/log_test.go index 355283ea..47ed4a45 100644 --- a/log_test.go +++ b/log_test.go @@ -55,6 +55,58 @@ func TestFindConflict(t *testing.T) { } } +func TestFindConflictByTerm(t *testing.T) { + ents := func(fromIndex uint64, terms []uint64) []pb.Entry { + e := make([]pb.Entry, 0, len(terms)) + for i, term := range terms { + e = append(e, pb.Entry{Term: term, Index: fromIndex + uint64(i)}) + } + return e + } + for _, tt := range []struct { + ents []pb.Entry // ents[0] contains the (index, term) of the snapshot + index uint64 + term uint64 + want uint64 + }{ + // Log starts from index 1. + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 100, term: 2, want: 100}, // ErrUnavailable + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 6, want: 5}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 5, want: 5}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 4, want: 2}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 2, want: 2}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 1, want: 0}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 1, term: 2, want: 1}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 1, term: 1, want: 0}, + {ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 0, term: 0, want: 0}, + // Log with compacted entries. + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 30, term: 3, want: 30}, // ErrUnavailable + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 9, want: 14}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 4, want: 14}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 3, want: 12}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 2, want: 9}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 11, term: 5, want: 11}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 10, term: 5, want: 10}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 10, term: 3, want: 10}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 10, term: 2, want: 9}, + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 9, term: 2, want: 9}, // ErrCompacted + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 4, term: 2, want: 4}, // ErrCompacted + {ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 0, term: 0, want: 0}, // ErrCompacted + } { + t.Run("", func(t *testing.T) { + st := NewMemoryStorage() + require.NotEmpty(t, tt.ents) + st.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{ + Index: tt.ents[0].Index, + Term: tt.ents[0].Term, + }}) + l := newLog(st, raftLogger) + l.append(tt.ents[1:]...) + require.Equal(t, tt.want, l.findConflictByTerm(tt.index, tt.term)) + }) + } +} + func TestIsUpToDate(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}} raftLog := newLog(NewMemoryStorage(), raftLogger) From 552234b55ee5393ad4725390bb06630685aae049 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 13 Mar 2023 14:50:14 +0000 Subject: [PATCH 2/6] raft: check term != 0 and document why Signed-off-by: Pavel Kalinnikov --- log.go | 5 ++++- raft.go | 2 ++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/log.go b/log.go index 3cad705a..7b9888b1 100644 --- a/log.go +++ b/log.go @@ -447,7 +447,10 @@ func (l *raftLog) matchTerm(i, term uint64) bool { } func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { - if maxIndex > l.committed && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term { + // NB: term should never be 0 on a commit because the leader campaigns at + // least at term 1. But if it is 0 for some reason, we don't want to consider + // this a term match in case zeroTermOnOutOfBounds returns 0. + if maxIndex > l.committed && term != 0 && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true } diff --git a/raft.go b/raft.go index 1c0845e5..15e43576 100644 --- a/raft.go +++ b/raft.go @@ -1909,6 +1909,8 @@ func (r *raft) abortLeaderTransfer() { // committedEntryInCurrentTerm return true if the peer has committed an entry in its term. func (r *raft) committedEntryInCurrentTerm() bool { + // NB: r.Term is never 0 on a leader, so if zeroTermOnOutOfBounds returns 0, + // we won't see it as a match with r.Term. return r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(r.raftLog.committed)) == r.Term } From e20b4772b17fb0e7a4a19dc852e2f4401052c548 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 13 Mar 2023 18:50:50 +0000 Subject: [PATCH 3/6] raft: cleanup TestFastLogRejection Signed-off-by: Pavel Kalinnikov --- raft_test.go | 82 +++++++++++------------------ testdata/probe_after_compaction.txt | 0 2 files changed, 32 insertions(+), 50 deletions(-) create mode 100644 testdata/probe_after_compaction.txt diff --git a/raft_test.go b/raft_test.go index 65a61694..1c39c5db 100644 --- a/raft_test.go +++ b/raft_test.go @@ -23,6 +23,8 @@ import ( "strings" "testing" + "github.com/stretchr/testify/require" + pb "go.etcd.io/raft/v3/raftpb" "go.etcd.io/raft/v3/tracker" ) @@ -4703,72 +4705,52 @@ func TestFastLogRejection(t *testing.T) { }, } - for i, test := range tests { + for _, test := range tests { t.Run("", func(t *testing.T) { s1 := NewMemoryStorage() s1.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}} s1.Append(test.leaderLog) + last := test.leaderLog[len(test.leaderLog)-1] + s1.SetHardState(pb.HardState{ + Term: last.Term - 1, + Commit: last.Index, + }) + n1 := newTestRaft(1, 10, 1, s1) + n1.becomeCandidate() // bumps Term to last.Term + n1.becomeLeader() + s2 := NewMemoryStorage() s2.snapshot.Metadata.ConfState = pb.ConfState{Voters: []uint64{1, 2, 3}} s2.Append(test.followerLog) - - n1 := newTestRaft(1, 10, 1, s1) + s2.SetHardState(pb.HardState{ + Term: last.Term, + Vote: 1, + Commit: 0, + }) n2 := newTestRaft(2, 10, 1, s2) - n1.becomeCandidate() - n1.becomeLeader() - - n2.Step(pb.Message{From: 1, To: 1, Type: pb.MsgHeartbeat}) - + require.NoError(t, n2.Step(pb.Message{From: 1, To: 2, Type: pb.MsgHeartbeat})) msgs := n2.readMessages() - if len(msgs) != 1 { - t.Errorf("can't read 1 message from peer 2") - } - if msgs[0].Type != pb.MsgHeartbeatResp { - t.Errorf("can't read heartbeat response from peer 2") - } - if n1.Step(msgs[0]) != nil { - t.Errorf("peer 1 step heartbeat response fail") - } + require.Len(t, msgs, 1, "can't read 1 message from peer 2") + require.Equal(t, pb.MsgHeartbeatResp, msgs[0].Type) + require.NoError(t, n1.Step(msgs[0])) msgs = n1.readMessages() - if len(msgs) != 1 { - t.Errorf("can't read 1 message from peer 1") - } - if msgs[0].Type != pb.MsgApp { - t.Errorf("can't read append from peer 1") - } + require.Len(t, msgs, 1, "can't read 1 message from peer 1") + require.Equal(t, pb.MsgApp, msgs[0].Type) - if n2.Step(msgs[0]) != nil { - t.Errorf("peer 2 step append fail") - } + require.NoError(t, n2.Step(msgs[0]), "peer 2 step append fail") msgs = n2.readMessages() - if len(msgs) != 1 { - t.Errorf("can't read 1 message from peer 2") - } - if msgs[0].Type != pb.MsgAppResp { - t.Errorf("can't read append response from peer 2") - } - if !msgs[0].Reject { - t.Errorf("expected rejected append response from peer 2") - } - if msgs[0].LogTerm != test.rejectHintTerm { - t.Fatalf("#%d expected hint log term = %d, but got %d", i, test.rejectHintTerm, msgs[0].LogTerm) - } - if msgs[0].RejectHint != test.rejectHintIndex { - t.Fatalf("#%d expected hint index = %d, but got %d", i, test.rejectHintIndex, msgs[0].RejectHint) - } + require.Len(t, msgs, 1, "can't read 1 message from peer 2") + require.Equal(t, pb.MsgAppResp, msgs[0].Type) + require.True(t, msgs[0].Reject, "expected rejected append response from peer 2") + require.Equal(t, test.rejectHintTerm, msgs[0].LogTerm, "hint log term mismatch") + require.Equal(t, test.rejectHintIndex, msgs[0].RejectHint, "hint log index mismatch") - if n1.Step(msgs[0]) != nil { - t.Errorf("peer 1 step append fail") - } + require.NoError(t, n1.Step(msgs[0]), "peer 1 step append fail") msgs = n1.readMessages() - if msgs[0].LogTerm != test.nextAppendTerm { - t.Fatalf("#%d expected log term = %d, but got %d", i, test.nextAppendTerm, msgs[0].LogTerm) - } - if msgs[0].Index != test.nextAppendIndex { - t.Fatalf("#%d expected index = %d, but got %d", i, test.nextAppendIndex, msgs[0].Index) - } + require.Equal(t, test.nextAppendTerm, msgs[0].LogTerm) + require.Equal(t, test.nextAppendIndex, msgs[0].Index) }) } } diff --git a/testdata/probe_after_compaction.txt b/testdata/probe_after_compaction.txt new file mode 100644 index 00000000..e69de29b From af62363acd398759bd367885700c91b9a51f3c8b Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 13 Mar 2023 19:41:32 +0000 Subject: [PATCH 4/6] raft: unindent maybeAppend Signed-off-by: Pavel Kalinnikov --- log.go | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/log.go b/log.go index 7b9888b1..5e7cefc3 100644 --- a/log.go +++ b/log.go @@ -107,24 +107,25 @@ func (l *raftLog) String() string { // maybeAppend returns (0, false) if the entries cannot be appended. Otherwise, // it returns (last index of new entries, true). func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) { - if l.matchTerm(index, logTerm) { - lastnewi = index + uint64(len(ents)) - ci := l.findConflict(ents) - switch { - case ci == 0: - case ci <= l.committed: - l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) - default: - offset := index + 1 - if ci-offset > uint64(len(ents)) { - l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents)) - } - l.append(ents[ci-offset:]...) + if !l.matchTerm(index, logTerm) { + return 0, false + } + + lastnewi = index + uint64(len(ents)) + ci := l.findConflict(ents) + switch { + case ci == 0: + case ci <= l.committed: + l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed) + default: + offset := index + 1 + if ci-offset > uint64(len(ents)) { + l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents)) } - l.commitTo(min(committed, lastnewi)) - return lastnewi, true + l.append(ents[ci-offset:]...) } - return 0, false + l.commitTo(min(committed, lastnewi)) + return lastnewi, true } func (l *raftLog) append(ents ...pb.Entry) uint64 { From c5dabf86f435160b89580659760ccefd89b54441 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 15 Mar 2023 13:19:06 +0000 Subject: [PATCH 5/6] raft: unindent handleAppendEntries Signed-off-by: Pavel Kalinnikov --- raft.go | 55 +++++++++++++++++++++++++++---------------------------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/raft.go b/raft.go index 15e43576..ccdc0588 100644 --- a/raft.go +++ b/raft.go @@ -1648,37 +1648,36 @@ func (r *raft) handleAppendEntries(m pb.Message) { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed}) return } - if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) - } else { - r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", - r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) - - // Return a hint to the leader about the maximum index and term that the - // two logs could be divergent at. Do this by searching through the - // follower's log for the maximum (index, term) pair with a term <= the - // MsgApp's LogTerm and an index <= the MsgApp's Index. This can help - // skip all indexes in the follower's uncommitted tail with terms - // greater than the MsgApp's LogTerm. - // - // See the other caller for findConflictByTerm (in stepLeader) for a much - // more detailed explanation of this mechanism. - hintIndex := min(m.Index, r.raftLog.lastIndex()) - hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) - hintTerm, err := r.raftLog.term(hintIndex) - if err != nil { - panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err)) - } - r.send(pb.Message{ - To: m.From, - Type: pb.MsgAppResp, - Index: m.Index, - Reject: true, - RejectHint: hintIndex, - LogTerm: hintTerm, - }) + return } + + r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", + r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) + + // Return a hint to the leader about the maximum index and term that the two + // logs could be divergent at. Do this by searching through the follower's log + // for the maximum (index, term) pair with a term <= the MsgApp's LogTerm and + // an index <= the MsgApp's Index. This can help skip all indexes in the + // follower's uncommitted tail with terms greater than the MsgApp's LogTerm. + // + // See the other caller for findConflictByTerm (in stepLeader) for a much more + // detailed explanation of this mechanism. + hintIndex := min(m.Index, r.raftLog.lastIndex()) + hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) + hintTerm, err := r.raftLog.term(hintIndex) + if err != nil { + panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err)) + } + r.send(pb.Message{ + To: m.From, + Type: pb.MsgAppResp, + Index: m.Index, + Reject: true, + RejectHint: hintIndex, + LogTerm: hintTerm, + }) } func (r *raft) handleHeartbeat(m pb.Message) { From 0c22de09081964a3d24c171279d3c8d8be721cf0 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 14 Mar 2023 12:56:36 +0000 Subject: [PATCH 6/6] raft: don't panic when looking for term conflicts This commit fixes a hypothetical panic that may occur when a stale MsgApp message arrives to a follower. The conflict searching algorithm in findConflictByTerm may return a log index which is not present in the log, and thus the raftLog.term() method may return an error. It is safe to ignore this error and send MsgAppResp with the found index and a zero LogTerm. This commit also restores the behaviour that existed before commit d0fb0cd6. Back then, the term() function would silently return 0 instead of an error, and a zero LogTerm would be sent with the rejecting MsgAppResp. After that commit, there is a new possible panic. We remove the possibility of this panic here. Signed-off-by: Pavel Kalinnikov --- log.go | 47 ++++++++++++++--------------- log_test.go | 7 ++++- raft.go | 27 +++++++++-------- raft_test.go | 36 ++++++++++++++++++++-- testdata/probe_after_compaction.txt | 0 5 files changed, 77 insertions(+), 40 deletions(-) delete mode 100644 testdata/probe_after_compaction.txt diff --git a/log.go b/log.go index 5e7cefc3..db22740b 100644 --- a/log.go +++ b/log.go @@ -162,34 +162,31 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 { return 0 } -// findConflictByTerm takes an (index, term) pair (indicating a conflicting log -// entry on a leader/follower during an append) and finds the largest index in -// log l with a term <= `term` and an index <= `index`. If no such index exists -// in the log, the log's first index is returned. +// findConflictByTerm returns a best guess on where this log ends matching +// another log, given that the only information known about the other log is the +// (index, term) of its single entry. // -// The index provided MUST be equal to or less than l.lastIndex(). Invalid -// inputs log a warning and the input index is returned. -func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 { - if li := l.lastIndex(); index > li { - // NB: such calls should not exist, but since there is a straightfoward - // way to recover, do it. - // - // It is tempting to also check something about the first index, but - // there is odd behavior with peers that have no log, in which case - // lastIndex will return zero and firstIndex will return one, which - // leads to calls with an index of zero into this method. - l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm", - index, li) - return index - } - for { - logTerm, err := l.term(index) - if logTerm <= term || err != nil { - break +// Specifically, the first returned value is the max guessIndex <= index, such +// that term(guessIndex) <= term or term(guessIndex) is not known (because this +// index is compacted or not yet stored). +// +// The second returned value is the term(guessIndex), or 0 if it is unknown. +// +// This function is used by a follower and leader to resolve log conflicts after +// an unsuccessful append to a follower, and ultimately restore the steady flow +// of appends. +func (l *raftLog) findConflictByTerm(index uint64, term uint64) (uint64, uint64) { + for ; index > 0; index-- { + // If there is an error (likely ErrCompacted or ErrUnavailable), we don't + // know whether it's a match or not, so assume a possible match and return + // the index, with 0 term indicating an unknown term. + if ourTerm, err := l.term(index); err != nil { + return index, 0 + } else if ourTerm <= term { + return index, ourTerm } - index-- } - return index + return 0, 0 } // nextUnstableEnts returns all entries that are available to be written to the diff --git a/log_test.go b/log_test.go index 47ed4a45..14f4e9bb 100644 --- a/log_test.go +++ b/log_test.go @@ -102,7 +102,12 @@ func TestFindConflictByTerm(t *testing.T) { }}) l := newLog(st, raftLogger) l.append(tt.ents[1:]...) - require.Equal(t, tt.want, l.findConflictByTerm(tt.index, tt.term)) + + index, term := l.findConflictByTerm(tt.index, tt.term) + require.Equal(t, tt.want, index) + wantTerm, err := l.term(index) + wantTerm = l.zeroTermOnOutOfBounds(wantTerm, err) + require.Equal(t, wantTerm, term) }) } } diff --git a/raft.go b/raft.go index ccdc0588..d1048294 100644 --- a/raft.go +++ b/raft.go @@ -1394,7 +1394,7 @@ func stepLeader(r *raft, m pb.Message) error { // 7, the rejection points it at the end of the follower's log // which is at a higher log term than the actually committed // log. - nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm) + nextProbeIdx, _ = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm) } if pr.MaybeDecrTo(m.Index, nextProbeIdx) { r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr) @@ -1652,24 +1652,27 @@ func (r *raft) handleAppendEntries(m pb.Message) { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) return } - r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) - // Return a hint to the leader about the maximum index and term that the two - // logs could be divergent at. Do this by searching through the follower's log - // for the maximum (index, term) pair with a term <= the MsgApp's LogTerm and - // an index <= the MsgApp's Index. This can help skip all indexes in the - // follower's uncommitted tail with terms greater than the MsgApp's LogTerm. + // Our log does not match the leader's at index m.Index. Return a hint to the + // leader - a guess on the maximal (index, term) at which the logs match. Do + // this by searching through the follower's log for the maximum (index, term) + // pair with a term <= the MsgApp's LogTerm and an index <= the MsgApp's + // Index. This can help skip all indexes in the follower's uncommitted tail + // with terms greater than the MsgApp's LogTerm. // // See the other caller for findConflictByTerm (in stepLeader) for a much more // detailed explanation of this mechanism. + + // NB: m.Index >= raftLog.committed by now (see the early return above), and + // raftLog.lastIndex() >= raftLog.committed by invariant, so min of the two is + // also >= raftLog.committed. Hence, the findConflictByTerm argument is within + // the valid interval, which then will return a valid (index, term) pair with + // a non-zero term (unless the log is empty). However, it is safe to send a zero + // LogTerm in this response in any case, so we don't verify it here. hintIndex := min(m.Index, r.raftLog.lastIndex()) - hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) - hintTerm, err := r.raftLog.term(hintIndex) - if err != nil { - panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err)) - } + hintIndex, hintTerm := r.raftLog.findConflictByTerm(hintIndex, m.LogTerm) r.send(pb.Message{ To: m.From, Type: pb.MsgAppResp, diff --git a/raft_test.go b/raft_test.go index 1c39c5db..5637c4a4 100644 --- a/raft_test.go +++ b/raft_test.go @@ -4494,6 +4494,7 @@ func TestFastLogRejection(t *testing.T) { tests := []struct { leaderLog []pb.Entry // Logs on the leader followerLog []pb.Entry // Logs on the follower + followerCompact uint64 // Index at which the follower log is compacted. rejectHintTerm uint64 // Expected term included in rejected MsgAppResp. rejectHintIndex uint64 // Expected index included in rejected MsgAppResp. nextAppendTerm uint64 // Expected term when leader appends after rejected. @@ -4698,10 +4699,35 @@ func TestFastLogRejection(t *testing.T) { {Term: 4, Index: 7}, {Term: 4, Index: 8}, }, - nextAppendTerm: 2, - nextAppendIndex: 1, rejectHintTerm: 2, rejectHintIndex: 1, + nextAppendTerm: 2, + nextAppendIndex: 1, + }, + // A case when a stale MsgApp from leader arrives after the corresponding + // log index got compacted. + // A stale (type=MsgApp,index=3,logTerm=3,entries=[(term=3,index=4)]) is + // delivered to a follower who has already compacted beyond log index 3. The + // MsgAppResp rejection will return same index=3, with logTerm=0. The leader + // will rollback by one entry, and send MsgApp with index=2,logTerm=1. + { + leaderLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2}, + {Term: 3, Index: 3}, + }, + followerLog: []pb.Entry{ + {Term: 1, Index: 1}, + {Term: 1, Index: 2}, + {Term: 3, Index: 3}, + {Term: 3, Index: 4}, + {Term: 3, Index: 5}, // <- this entry and below are compacted + }, + followerCompact: 5, + rejectHintTerm: 0, + rejectHintIndex: 3, + nextAppendTerm: 1, + nextAppendIndex: 2, }, } @@ -4728,6 +4754,12 @@ func TestFastLogRejection(t *testing.T) { Commit: 0, }) n2 := newTestRaft(2, 10, 1, s2) + if test.followerCompact != 0 { + s2.Compact(test.followerCompact) + // NB: the state of n2 after this compaction isn't realistic because the + // commit index is still at 0. We do this to exercise a "doesn't happen" + // edge case behaviour, in case it still does happen in some other way. + } require.NoError(t, n2.Step(pb.Message{From: 1, To: 2, Type: pb.MsgHeartbeat})) msgs := n2.readMessages() diff --git a/testdata/probe_after_compaction.txt b/testdata/probe_after_compaction.txt deleted file mode 100644 index e69de29b..00000000