diff --git a/log.go b/log.go index daf7acef..3cad705a 100644 --- a/log.go +++ b/log.go @@ -153,7 +153,7 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 { if !l.matchTerm(ne.Index, ne.Term) { if ne.Index <= l.lastIndex() { l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", - ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term) + ne.Index, l.zeroTermOnOutOfBounds(l.term(ne.Index)), ne.Term) } return ne.Index } @@ -388,11 +388,14 @@ func (l *raftLog) term(i uint64) (uint64, error) { return t, nil } - // The valid term range is [index of dummy entry, last index]. - dummyIndex := l.firstIndex() - 1 - if i < dummyIndex || i > l.lastIndex() { - // TODO: return an error instead? - return 0, nil + // The valid term range is [firstIndex-1, lastIndex]. Even though the entry at + // firstIndex-1 is compacted away, its term is available for matching purposes + // when doing log appends. + if i+1 < l.firstIndex() { + return 0, ErrCompacted + } + if i > l.lastIndex() { + return 0, ErrUnavailable } t, err := l.storage.Term(i) @@ -444,7 +447,7 @@ func (l *raftLog) matchTerm(i, term uint64) bool { } func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { - if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term { + if maxIndex > l.committed && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true } @@ -515,11 +518,11 @@ func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { return nil } -func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 { +func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 { if err == nil { return t } - if err == ErrCompacted { + if err == ErrCompacted || err == ErrUnavailable { return 0 } l.logger.Panicf("unexpected error (%v)", err) diff --git a/log_test.go b/log_test.go index 89d5827e..355283ea 100644 --- a/log_test.go +++ b/log_test.go @@ -771,31 +771,31 @@ func TestIsOutOfBounds(t *testing.T) { } func TestTerm(t *testing.T) { - var i uint64 offset := uint64(100) num := uint64(100) storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}}) l := newLog(storage, raftLogger) - for i = 1; i < num; i++ { + for i := uint64(1); i < num; i++ { l.append(pb.Entry{Index: offset + i, Term: i}) } - tests := []struct { - index uint64 - w uint64 + for i, tt := range []struct { + idx uint64 + term uint64 + err error }{ - {offset - 1, 0}, - {offset, 1}, - {offset + num/2, num / 2}, - {offset + num - 1, num - 1}, - {offset + num, 0}, - } - - for j, tt := range tests { - t.Run(fmt.Sprint(j), func(t *testing.T) { - require.Equal(t, tt.w, mustTerm(l.term(tt.index))) + {idx: offset - 1, err: ErrCompacted}, + {idx: offset, term: 1}, + {idx: offset + num/2, term: num / 2}, + {idx: offset + num - 1, term: num - 1}, + {idx: offset + num, err: ErrUnavailable}, + } { + t.Run(fmt.Sprint(i), func(t *testing.T) { + term, err := l.term(tt.idx) + require.Equal(t, tt.term, term) + require.Equal(t, tt.err, err) }) } } @@ -809,22 +809,25 @@ func TestTermWithUnstableSnapshot(t *testing.T) { l := newLog(storage, raftLogger) l.restore(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}}) - tests := []struct { - index uint64 - w uint64 + for i, tt := range []struct { + idx uint64 + term uint64 + err error }{ // cannot get term from storage - {storagesnapi, 0}, + {idx: storagesnapi, err: ErrCompacted}, // cannot get term from the gap between storage ents and unstable snapshot - {storagesnapi + 1, 0}, - {unstablesnapi - 1, 0}, + {idx: storagesnapi + 1, err: ErrCompacted}, + {idx: unstablesnapi - 1, err: ErrCompacted}, // get term from unstable snapshot index - {unstablesnapi, 1}, - } - - for i, tt := range tests { + {idx: unstablesnapi, term: 1}, + // the log beyond the unstable snapshot is empty + {idx: unstablesnapi + 1, err: ErrUnavailable}, + } { t.Run(fmt.Sprint(i), func(t *testing.T) { - require.Equal(t, tt.w, mustTerm(l.term(tt.index))) + term, err := l.term(tt.idx) + require.Equal(t, tt.term, term) + require.Equal(t, tt.err, err) }) } } diff --git a/node_util_test.go b/node_util_test.go index 5093cba6..49a477f1 100644 --- a/node_util_test.go +++ b/node_util_test.go @@ -60,7 +60,7 @@ func (l *nodeTestHarness) Warningf(format string, v ...interface{}) { func (l *nodeTestHarness) Fatal(v ...interface{}) { l.t.Error(v...) - panic(v) + panic(fmt.Sprint(v...)) } func (l *nodeTestHarness) Fatalf(format string, v ...interface{}) { @@ -70,7 +70,7 @@ func (l *nodeTestHarness) Fatalf(format string, v ...interface{}) { func (l *nodeTestHarness) Panic(v ...interface{}) { l.t.Log(v...) - panic(v) + panic(fmt.Sprint(v...)) } func (l *nodeTestHarness) Panicf(format string, v ...interface{}) { diff --git a/raft.go b/raft.go index 0746615b..1c0845e5 100644 --- a/raft.go +++ b/raft.go @@ -571,7 +571,9 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { return false } - term, errt := r.raftLog.term(pr.Next - 1) + lastIndex, nextIndex := pr.Next-1, pr.Next + lastTerm, errt := r.raftLog.term(lastIndex) + var ents []pb.Entry var erre error // In a throttled StateReplicate only send empty MsgApp, to ensure progress. @@ -581,7 +583,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // leader to send an append), allowing it to be acked or rejected, both of // which will clear out Inflights. if pr.State != tracker.StateReplicate || !pr.Inflights.Full() { - ents, erre = r.raftLog.entries(pr.Next, r.maxMsgSize) + ents, erre = r.raftLog.entries(nextIndex, r.maxMsgSize) } if len(ents) == 0 && !sendIfEmpty { @@ -616,15 +618,15 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { } // Send the actual MsgApp otherwise, and update the progress accordingly. - next := pr.Next // save Next for later, as the progress update can change it - if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), next); err != nil { + if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), nextIndex); err != nil { r.logger.Panicf("%x: %v", r.id, err) } + // NB: pr has been updated, but we make sure to only use its old values below. r.send(pb.Message{ To: to, Type: pb.MsgApp, - Index: next - 1, - LogTerm: term, + Index: lastIndex, + LogTerm: lastTerm, Entries: ents, Commit: r.raftLog.committed, }) @@ -1651,7 +1653,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", - r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) + r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) // Return a hint to the leader about the maximum index and term that the // two logs could be divergent at. Do this by searching through the @@ -1907,7 +1909,7 @@ func (r *raft) abortLeaderTransfer() { // committedEntryInCurrentTerm return true if the peer has committed an entry in its term. func (r *raft) committedEntryInCurrentTerm() bool { - return r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) == r.Term + return r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(r.raftLog.committed)) == r.Term } // responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer diff --git a/raft_test.go b/raft_test.go index 6abda3cf..65a61694 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2559,7 +2559,7 @@ func TestReadOnlyForNewLeader(t *testing.T) { if sm.raftLog.committed != 4 { t.Fatalf("committed = %d, want 4", sm.raftLog.committed) } - lastLogTerm := sm.raftLog.zeroTermOnErrCompacted(sm.raftLog.term(sm.raftLog.committed)) + lastLogTerm := sm.raftLog.zeroTermOnOutOfBounds(sm.raftLog.term(sm.raftLog.committed)) if lastLogTerm != sm.Term { t.Fatalf("last log term = %d, want %d", lastLogTerm, sm.Term) } @@ -2614,7 +2614,7 @@ func TestLeaderAppResp(t *testing.T) { // thus the last log term must be 1 to be committed. sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) sm.raftLog = &raftLog{ - storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}, + storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}}, unstable: unstable{offset: 3}, } sm.becomeCandidate() @@ -2722,7 +2722,7 @@ func TestRecvMsgBeat(t *testing.T) { for i, tt := range tests { sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}} + sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}}} sm.Term = 1 sm.state = tt.state switch tt.state { diff --git a/rafttest/interaction_env_logger.go b/rafttest/interaction_env_logger.go index 3f124673..78298f54 100644 --- a/rafttest/interaction_env_logger.go +++ b/rafttest/interaction_env_logger.go @@ -82,17 +82,23 @@ func (l *RedirectLogger) Errorf(format string, v ...interface{}) { func (l *RedirectLogger) Fatal(v ...interface{}) { l.print(4, v...) + panic(fmt.Sprint(v...)) } func (l *RedirectLogger) Fatalf(format string, v ...interface{}) { - l.printf(4, format, v...) + panic(fmt.Sprintf(format, v...)) } func (l *RedirectLogger) Panic(v ...interface{}) { l.print(4, v...) + panic(fmt.Sprint(v...)) } func (l *RedirectLogger) Panicf(format string, v ...interface{}) { l.printf(4, format, v...) + // TODO(pavelkalinnikov): catch the panic gracefully in datadriven package. + // This would allow observing all the intermediate logging while debugging, + // and testing the cases when panic is expected. + panic(fmt.Sprintf(format, v...)) } diff --git a/testdata/slow_follower_after_compaction.txt b/testdata/slow_follower_after_compaction.txt new file mode 100644 index 00000000..e0d7e2fa --- /dev/null +++ b/testdata/slow_follower_after_compaction.txt @@ -0,0 +1,121 @@ +# This is a regression test for https://github.com/etcd-io/raft/pull/31. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +# Start with 3 nodes, with a limited in-flight capacity. +add-nodes 3 voters=(1,2,3) index=10 inflight=2 +---- +ok + +campaign 1 +---- +ok + +stabilize +---- +ok (quiet) + +# Propose 3 entries. +propose 1 prop_1_12 +---- +ok + +propose 1 prop_1_13 +---- +ok + +propose 1 prop_1_14 +---- +ok + +stabilize +---- +ok (quiet) + +# Re-enable log messages. +log-level debug +---- +ok + +# All nodes up-to-date. +status 1 +---- +1: StateReplicate match=14 next=15 +2: StateReplicate match=14 next=15 +3: StateReplicate match=14 next=15 + +log-level none +---- +ok + +propose 1 prop_1_15 +---- +ok + +propose 1 prop_1_16 +---- +ok + +propose 1 prop_1_17 +---- +ok + +propose 1 prop_1_18 +---- +ok + +# Commit entries on nodes 1 and 2. +stabilize 1 2 +---- +ok (quiet) + +log-level debug +---- +ok + +# Nodes 1 and 2 up-to-date, 3 is behind and MsgApp flow is throttled. +status 1 +---- +1: StateReplicate match=18 next=19 +2: StateReplicate match=18 next=19 +3: StateReplicate match=14 next=17 paused inflight=2[full] + +# Break the MsgApp flow from the leader to node 3. +deliver-msgs drop=3 +---- +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[1/15 EntryNormal "prop_1_15"] +dropped: 1->3 MsgApp Term:1 Log:1/15 Commit:14 Entries:[1/16 EntryNormal "prop_1_16"] + +# Truncate the leader's log beyond node 3 log size. +compact 1 17 +---- +1/18 EntryNormal "prop_1_18" + +# Trigger a round of empty MsgApp "probe" from leader. It will reach node 3 +# which will reply with a rejection MsgApp because it sees a gap in the log. +# Node 1 will reset the MsgApp flow and send a snapshot to catch node 3 up. +tick-heartbeat 1 +---- +ok + +log-level none +---- +ok + +stabilize +---- +ok (quiet) + +log-level debug +---- +ok + +# All nodes caught up. +status 1 +---- +1: StateReplicate match=18 next=19 +2: StateReplicate match=18 next=19 +3: StateReplicate match=18 next=19 \ No newline at end of file