From 76f1249811314330929f91ff8f8c2cfafb488d22 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 7 Mar 2023 12:34:39 +0000 Subject: [PATCH 1/4] raft: fix panic on MsgApp after log truncation This commit fixes a panic in the following scenario: 1. The flow of MsgApp from the leader to a follower is throttled (i.e. Inflights is full). 2. The leader doesn't fetch entries from storage and only periodically sends MsgApps with empty Entries. 3. A log compaction/truncation happens in the background, and cuts out a portion of the log beyond what's still in-flight towards the slow follower (i.e. Progress.Match < log cutoff) 4. Some messages to the slow follower get dropped, and as a result it replies with a rejection MsgAppResp. 5. The leader resets Progress.Next = Progress.Match+1, and is about to retry sending entries from this point. 6. In raft.maybeSendAppend it calls raftLog.term and gets 0 for the missing entry (instead of some indication/error that the log was truncated at this index). It also skips fetching entries (as in step 2), and goes ahead sending an empty MsgApp (with LogTerm = 0 and a fresh Commit index). 7. When the follower gets this MsgApp, in raftLog.maybeAppend it a) wrongly passes the matchTerm check because the 0 index matches the 0 corresponding to a missing entry in the local log, b) tries to bump the Commit index and panics because this index is beyond its local log's lastIndex(). This bug was introduced in 42419da. Specifically, the steps (2) and (6) previously used to unconditionally fetch raftLog.entries(), which would return ErrCompacted in the above scenario, and prevent sending the problematic MsgApp. The commit above inroduced a condition under which the ErrCompacted would be unnoticed. This commit makes maybeSendAppend more aware of this compaction scenario, and prevents sending the problematic MsgApp. Signed-off-by: Pavel Kalinnikov --- raft.go | 17 +++++++++++------ raft_test.go | 4 ++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/raft.go b/raft.go index 0746615b..9fee1f96 100644 --- a/raft.go +++ b/raft.go @@ -571,7 +571,12 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { return false } - term, errt := r.raftLog.term(pr.Next - 1) + lastIndex, nextIndex := pr.Next-1, pr.Next + lastTerm, errt := r.raftLog.term(lastIndex) + if lastIndex != 0 && lastTerm == 0 && errt == nil { + errt = ErrCompacted + } + var ents []pb.Entry var erre error // In a throttled StateReplicate only send empty MsgApp, to ensure progress. @@ -581,7 +586,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { // leader to send an append), allowing it to be acked or rejected, both of // which will clear out Inflights. if pr.State != tracker.StateReplicate || !pr.Inflights.Full() { - ents, erre = r.raftLog.entries(pr.Next, r.maxMsgSize) + ents, erre = r.raftLog.entries(nextIndex, r.maxMsgSize) } if len(ents) == 0 && !sendIfEmpty { @@ -616,15 +621,15 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { } // Send the actual MsgApp otherwise, and update the progress accordingly. - next := pr.Next // save Next for later, as the progress update can change it - if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), next); err != nil { + if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), nextIndex); err != nil { r.logger.Panicf("%x: %v", r.id, err) } + // NB: pr has been updated, but we make sure to only use its old values below. r.send(pb.Message{ To: to, Type: pb.MsgApp, - Index: next - 1, - LogTerm: term, + Index: lastIndex, + LogTerm: lastTerm, Entries: ents, Commit: r.raftLog.committed, }) diff --git a/raft_test.go b/raft_test.go index 6abda3cf..ca030262 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2614,7 +2614,7 @@ func TestLeaderAppResp(t *testing.T) { // thus the last log term must be 1 to be committed. sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) sm.raftLog = &raftLog{ - storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}, + storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}}, unstable: unstable{offset: 3}, } sm.becomeCandidate() @@ -2722,7 +2722,7 @@ func TestRecvMsgBeat(t *testing.T) { for i, tt := range tests { sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3))) - sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}} + sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}}} sm.Term = 1 sm.state = tt.state switch tt.state { From 6df333bdae70f53136305c74da9aa05efb3bcff1 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Tue, 7 Mar 2023 12:34:17 +0000 Subject: [PATCH 2/4] testdata: add test with log compaction and slow follower Signed-off-by: Pavel Kalinnikov --- rafttest/interaction_env_logger.go | 8 +- testdata/slow_follower_after_compaction.txt | 121 ++++++++++++++++++++ 2 files changed, 128 insertions(+), 1 deletion(-) create mode 100644 testdata/slow_follower_after_compaction.txt diff --git a/rafttest/interaction_env_logger.go b/rafttest/interaction_env_logger.go index 3f124673..1dab9e99 100644 --- a/rafttest/interaction_env_logger.go +++ b/rafttest/interaction_env_logger.go @@ -82,17 +82,23 @@ func (l *RedirectLogger) Errorf(format string, v ...interface{}) { func (l *RedirectLogger) Fatal(v ...interface{}) { l.print(4, v...) + panic(v) } func (l *RedirectLogger) Fatalf(format string, v ...interface{}) { - l.printf(4, format, v...) + panic(fmt.Sprintf(format, v...)) } func (l *RedirectLogger) Panic(v ...interface{}) { l.print(4, v...) + panic(v) } func (l *RedirectLogger) Panicf(format string, v ...interface{}) { l.printf(4, format, v...) + // TODO(pavelkalinnikov): catch the panic gracefully in datadriven package. + // This would allow observing all the intermediate logging while debugging, + // and testing the cases when panic is expected. + panic(fmt.Sprintf(format, v...)) } diff --git a/testdata/slow_follower_after_compaction.txt b/testdata/slow_follower_after_compaction.txt new file mode 100644 index 00000000..e0d7e2fa --- /dev/null +++ b/testdata/slow_follower_after_compaction.txt @@ -0,0 +1,121 @@ +# This is a regression test for https://github.com/etcd-io/raft/pull/31. + +# Turn off output during the setup of the test. +log-level none +---- +ok + +# Start with 3 nodes, with a limited in-flight capacity. +add-nodes 3 voters=(1,2,3) index=10 inflight=2 +---- +ok + +campaign 1 +---- +ok + +stabilize +---- +ok (quiet) + +# Propose 3 entries. +propose 1 prop_1_12 +---- +ok + +propose 1 prop_1_13 +---- +ok + +propose 1 prop_1_14 +---- +ok + +stabilize +---- +ok (quiet) + +# Re-enable log messages. +log-level debug +---- +ok + +# All nodes up-to-date. +status 1 +---- +1: StateReplicate match=14 next=15 +2: StateReplicate match=14 next=15 +3: StateReplicate match=14 next=15 + +log-level none +---- +ok + +propose 1 prop_1_15 +---- +ok + +propose 1 prop_1_16 +---- +ok + +propose 1 prop_1_17 +---- +ok + +propose 1 prop_1_18 +---- +ok + +# Commit entries on nodes 1 and 2. +stabilize 1 2 +---- +ok (quiet) + +log-level debug +---- +ok + +# Nodes 1 and 2 up-to-date, 3 is behind and MsgApp flow is throttled. +status 1 +---- +1: StateReplicate match=18 next=19 +2: StateReplicate match=18 next=19 +3: StateReplicate match=14 next=17 paused inflight=2[full] + +# Break the MsgApp flow from the leader to node 3. +deliver-msgs drop=3 +---- +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[1/15 EntryNormal "prop_1_15"] +dropped: 1->3 MsgApp Term:1 Log:1/15 Commit:14 Entries:[1/16 EntryNormal "prop_1_16"] + +# Truncate the leader's log beyond node 3 log size. +compact 1 17 +---- +1/18 EntryNormal "prop_1_18" + +# Trigger a round of empty MsgApp "probe" from leader. It will reach node 3 +# which will reply with a rejection MsgApp because it sees a gap in the log. +# Node 1 will reset the MsgApp flow and send a snapshot to catch node 3 up. +tick-heartbeat 1 +---- +ok + +log-level none +---- +ok + +stabilize +---- +ok (quiet) + +log-level debug +---- +ok + +# All nodes caught up. +status 1 +---- +1: StateReplicate match=18 next=19 +2: StateReplicate match=18 next=19 +3: StateReplicate match=18 next=19 \ No newline at end of file From d0fb0cd65f0a4bc7d88548444bbaed0275a6997d Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 8 Mar 2023 12:29:06 +0000 Subject: [PATCH 3/4] raft: don't silently return 0 from raftLog.term() This commit makes raftLog.term() call return ErrCompacted and ErrUnavailable errors if the requested log index is out of bounds. Previously it would return 0 which was an error-prone behaviour. Signed-off-by: Pavel Kalinnikov --- log.go | 21 +++++++++++--------- log_test.go | 55 +++++++++++++++++++++++++++------------------------- raft.go | 7 ++----- raft_test.go | 2 +- 4 files changed, 44 insertions(+), 41 deletions(-) diff --git a/log.go b/log.go index daf7acef..3cad705a 100644 --- a/log.go +++ b/log.go @@ -153,7 +153,7 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 { if !l.matchTerm(ne.Index, ne.Term) { if ne.Index <= l.lastIndex() { l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", - ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term) + ne.Index, l.zeroTermOnOutOfBounds(l.term(ne.Index)), ne.Term) } return ne.Index } @@ -388,11 +388,14 @@ func (l *raftLog) term(i uint64) (uint64, error) { return t, nil } - // The valid term range is [index of dummy entry, last index]. - dummyIndex := l.firstIndex() - 1 - if i < dummyIndex || i > l.lastIndex() { - // TODO: return an error instead? - return 0, nil + // The valid term range is [firstIndex-1, lastIndex]. Even though the entry at + // firstIndex-1 is compacted away, its term is available for matching purposes + // when doing log appends. + if i+1 < l.firstIndex() { + return 0, ErrCompacted + } + if i > l.lastIndex() { + return 0, ErrUnavailable } t, err := l.storage.Term(i) @@ -444,7 +447,7 @@ func (l *raftLog) matchTerm(i, term uint64) bool { } func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { - if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term { + if maxIndex > l.committed && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true } @@ -515,11 +518,11 @@ func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { return nil } -func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 { +func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 { if err == nil { return t } - if err == ErrCompacted { + if err == ErrCompacted || err == ErrUnavailable { return 0 } l.logger.Panicf("unexpected error (%v)", err) diff --git a/log_test.go b/log_test.go index 89d5827e..355283ea 100644 --- a/log_test.go +++ b/log_test.go @@ -771,31 +771,31 @@ func TestIsOutOfBounds(t *testing.T) { } func TestTerm(t *testing.T) { - var i uint64 offset := uint64(100) num := uint64(100) storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}}) l := newLog(storage, raftLogger) - for i = 1; i < num; i++ { + for i := uint64(1); i < num; i++ { l.append(pb.Entry{Index: offset + i, Term: i}) } - tests := []struct { - index uint64 - w uint64 + for i, tt := range []struct { + idx uint64 + term uint64 + err error }{ - {offset - 1, 0}, - {offset, 1}, - {offset + num/2, num / 2}, - {offset + num - 1, num - 1}, - {offset + num, 0}, - } - - for j, tt := range tests { - t.Run(fmt.Sprint(j), func(t *testing.T) { - require.Equal(t, tt.w, mustTerm(l.term(tt.index))) + {idx: offset - 1, err: ErrCompacted}, + {idx: offset, term: 1}, + {idx: offset + num/2, term: num / 2}, + {idx: offset + num - 1, term: num - 1}, + {idx: offset + num, err: ErrUnavailable}, + } { + t.Run(fmt.Sprint(i), func(t *testing.T) { + term, err := l.term(tt.idx) + require.Equal(t, tt.term, term) + require.Equal(t, tt.err, err) }) } } @@ -809,22 +809,25 @@ func TestTermWithUnstableSnapshot(t *testing.T) { l := newLog(storage, raftLogger) l.restore(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}}) - tests := []struct { - index uint64 - w uint64 + for i, tt := range []struct { + idx uint64 + term uint64 + err error }{ // cannot get term from storage - {storagesnapi, 0}, + {idx: storagesnapi, err: ErrCompacted}, // cannot get term from the gap between storage ents and unstable snapshot - {storagesnapi + 1, 0}, - {unstablesnapi - 1, 0}, + {idx: storagesnapi + 1, err: ErrCompacted}, + {idx: unstablesnapi - 1, err: ErrCompacted}, // get term from unstable snapshot index - {unstablesnapi, 1}, - } - - for i, tt := range tests { + {idx: unstablesnapi, term: 1}, + // the log beyond the unstable snapshot is empty + {idx: unstablesnapi + 1, err: ErrUnavailable}, + } { t.Run(fmt.Sprint(i), func(t *testing.T) { - require.Equal(t, tt.w, mustTerm(l.term(tt.index))) + term, err := l.term(tt.idx) + require.Equal(t, tt.term, term) + require.Equal(t, tt.err, err) }) } } diff --git a/raft.go b/raft.go index 9fee1f96..1c0845e5 100644 --- a/raft.go +++ b/raft.go @@ -573,9 +573,6 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { lastIndex, nextIndex := pr.Next-1, pr.Next lastTerm, errt := r.raftLog.term(lastIndex) - if lastIndex != 0 && lastTerm == 0 && errt == nil { - errt = ErrCompacted - } var ents []pb.Entry var erre error @@ -1656,7 +1653,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", - r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) + r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) // Return a hint to the leader about the maximum index and term that the // two logs could be divergent at. Do this by searching through the @@ -1912,7 +1909,7 @@ func (r *raft) abortLeaderTransfer() { // committedEntryInCurrentTerm return true if the peer has committed an entry in its term. func (r *raft) committedEntryInCurrentTerm() bool { - return r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) == r.Term + return r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(r.raftLog.committed)) == r.Term } // responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer diff --git a/raft_test.go b/raft_test.go index ca030262..65a61694 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2559,7 +2559,7 @@ func TestReadOnlyForNewLeader(t *testing.T) { if sm.raftLog.committed != 4 { t.Fatalf("committed = %d, want 4", sm.raftLog.committed) } - lastLogTerm := sm.raftLog.zeroTermOnErrCompacted(sm.raftLog.term(sm.raftLog.committed)) + lastLogTerm := sm.raftLog.zeroTermOnOutOfBounds(sm.raftLog.term(sm.raftLog.committed)) if lastLogTerm != sm.Term { t.Fatalf("last log term = %d, want %d", lastLogTerm, sm.Term) } From 574d2f114d79cded52dc5c1714dac2786263a431 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 9 Mar 2023 16:56:34 +0000 Subject: [PATCH 4/4] raft,rafttest: print panic arguments correctly This commit makes the testing loggers print Panic and Fatal agruments before redirecting them to the panic() call. Previously they would be displayed in a non-human-readable way, as something like: panic: ([]interface {}) 0x1400000eb88 Signed-off-by: Pavel Kalinnikov --- node_util_test.go | 4 ++-- rafttest/interaction_env_logger.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/node_util_test.go b/node_util_test.go index 5093cba6..49a477f1 100644 --- a/node_util_test.go +++ b/node_util_test.go @@ -60,7 +60,7 @@ func (l *nodeTestHarness) Warningf(format string, v ...interface{}) { func (l *nodeTestHarness) Fatal(v ...interface{}) { l.t.Error(v...) - panic(v) + panic(fmt.Sprint(v...)) } func (l *nodeTestHarness) Fatalf(format string, v ...interface{}) { @@ -70,7 +70,7 @@ func (l *nodeTestHarness) Fatalf(format string, v ...interface{}) { func (l *nodeTestHarness) Panic(v ...interface{}) { l.t.Log(v...) - panic(v) + panic(fmt.Sprint(v...)) } func (l *nodeTestHarness) Panicf(format string, v ...interface{}) { diff --git a/rafttest/interaction_env_logger.go b/rafttest/interaction_env_logger.go index 1dab9e99..78298f54 100644 --- a/rafttest/interaction_env_logger.go +++ b/rafttest/interaction_env_logger.go @@ -82,7 +82,7 @@ func (l *RedirectLogger) Errorf(format string, v ...interface{}) { func (l *RedirectLogger) Fatal(v ...interface{}) { l.print(4, v...) - panic(v) + panic(fmt.Sprint(v...)) } func (l *RedirectLogger) Fatalf(format string, v ...interface{}) { @@ -92,7 +92,7 @@ func (l *RedirectLogger) Fatalf(format string, v ...interface{}) { func (l *RedirectLogger) Panic(v ...interface{}) { l.print(4, v...) - panic(v) + panic(fmt.Sprint(v...)) } func (l *RedirectLogger) Panicf(format string, v ...interface{}) {