Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: fix panic on MsgApp after log truncation #31

Merged
merged 4 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
if !l.matchTerm(ne.Index, ne.Term) {
if ne.Index <= l.lastIndex() {
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
ne.Index, l.zeroTermOnOutOfBounds(l.term(ne.Index)), ne.Term)
}
return ne.Index
}
Expand Down Expand Up @@ -388,11 +388,14 @@ func (l *raftLog) term(i uint64) (uint64, error) {
return t, nil
}

// The valid term range is [index of dummy entry, last index].
dummyIndex := l.firstIndex() - 1
if i < dummyIndex || i > l.lastIndex() {
// TODO: return an error instead?
return 0, nil
// The valid term range is [firstIndex-1, lastIndex]. Even though the entry at
// firstIndex-1 is compacted away, its term is available for matching purposes
// when doing log appends.
if i+1 < l.firstIndex() {
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
return 0, ErrCompacted
}
if i > l.lastIndex() {
return 0, ErrUnavailable
}

t, err := l.storage.Term(i)
Expand Down Expand Up @@ -444,7 +447,7 @@ func (l *raftLog) matchTerm(i, term uint64) bool {
}

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
if maxIndex > l.committed && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term {
l.commitTo(maxIndex)
return true
}
Expand Down Expand Up @@ -515,11 +518,11 @@ func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
return nil
}

func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 {
func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 {
if err == nil {
return t
}
if err == ErrCompacted {
if err == ErrCompacted || err == ErrUnavailable {
return 0
}
l.logger.Panicf("unexpected error (%v)", err)
Expand Down
55 changes: 29 additions & 26 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,31 +771,31 @@ func TestIsOutOfBounds(t *testing.T) {
}

func TestTerm(t *testing.T) {
var i uint64
offset := uint64(100)
num := uint64(100)

storage := NewMemoryStorage()
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}})
l := newLog(storage, raftLogger)
for i = 1; i < num; i++ {
for i := uint64(1); i < num; i++ {
l.append(pb.Entry{Index: offset + i, Term: i})
}

tests := []struct {
index uint64
w uint64
for i, tt := range []struct {
idx uint64
term uint64
err error
}{
{offset - 1, 0},
{offset, 1},
{offset + num/2, num / 2},
{offset + num - 1, num - 1},
{offset + num, 0},
}

for j, tt := range tests {
t.Run(fmt.Sprint(j), func(t *testing.T) {
require.Equal(t, tt.w, mustTerm(l.term(tt.index)))
{idx: offset - 1, err: ErrCompacted},
{idx: offset, term: 1},
{idx: offset + num/2, term: num / 2},
{idx: offset + num - 1, term: num - 1},
{idx: offset + num, err: ErrUnavailable},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
term, err := l.term(tt.idx)
require.Equal(t, tt.term, term)
require.Equal(t, tt.err, err)
})
}
}
Expand All @@ -809,22 +809,25 @@ func TestTermWithUnstableSnapshot(t *testing.T) {
l := newLog(storage, raftLogger)
l.restore(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}})

tests := []struct {
index uint64
w uint64
for i, tt := range []struct {
idx uint64
term uint64
err error
}{
// cannot get term from storage
{storagesnapi, 0},
{idx: storagesnapi, err: ErrCompacted},
// cannot get term from the gap between storage ents and unstable snapshot
{storagesnapi + 1, 0},
{unstablesnapi - 1, 0},
{idx: storagesnapi + 1, err: ErrCompacted},
{idx: unstablesnapi - 1, err: ErrCompacted},
// get term from unstable snapshot index
{unstablesnapi, 1},
}

for i, tt := range tests {
{idx: unstablesnapi, term: 1},
// the log beyond the unstable snapshot is empty
{idx: unstablesnapi + 1, err: ErrUnavailable},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, tt.w, mustTerm(l.term(tt.index)))
term, err := l.term(tt.idx)
require.Equal(t, tt.term, term)
require.Equal(t, tt.err, err)
})
}
}
Expand Down
18 changes: 10 additions & 8 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,9 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
return false
}

term, errt := r.raftLog.term(pr.Next - 1)
lastIndex, nextIndex := pr.Next-1, pr.Next
lastTerm, errt := r.raftLog.term(lastIndex)

var ents []pb.Entry
var erre error
// In a throttled StateReplicate only send empty MsgApp, to ensure progress.
Expand All @@ -581,7 +583,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
// leader to send an append), allowing it to be acked or rejected, both of
// which will clear out Inflights.
if pr.State != tracker.StateReplicate || !pr.Inflights.Full() {
ents, erre = r.raftLog.entries(pr.Next, r.maxMsgSize)
ents, erre = r.raftLog.entries(nextIndex, r.maxMsgSize)
}

if len(ents) == 0 && !sendIfEmpty {
Expand Down Expand Up @@ -616,15 +618,15 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
}

// Send the actual MsgApp otherwise, and update the progress accordingly.
next := pr.Next // save Next for later, as the progress update can change it
if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), next); err != nil {
if err := pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents)), nextIndex); err != nil {
r.logger.Panicf("%x: %v", r.id, err)
}
// NB: pr has been updated, but we make sure to only use its old values below.
r.send(pb.Message{
To: to,
Type: pb.MsgApp,
Index: next - 1,
LogTerm: term,
Index: lastIndex,
LogTerm: lastTerm,
Entries: ents,
Commit: r.raftLog.committed,
})
Expand Down Expand Up @@ -1651,7 +1653,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)

// Return a hint to the leader about the maximum index and term that the
// two logs could be divergent at. Do this by searching through the
Expand Down Expand Up @@ -1907,7 +1909,7 @@ func (r *raft) abortLeaderTransfer() {

// committedEntryInCurrentTerm return true if the peer has committed an entry in its term.
func (r *raft) committedEntryInCurrentTerm() bool {
return r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) == r.Term
return r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(r.raftLog.committed)) == r.Term
}

// responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer
Expand Down
6 changes: 3 additions & 3 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2559,7 +2559,7 @@ func TestReadOnlyForNewLeader(t *testing.T) {
if sm.raftLog.committed != 4 {
t.Fatalf("committed = %d, want 4", sm.raftLog.committed)
}
lastLogTerm := sm.raftLog.zeroTermOnErrCompacted(sm.raftLog.term(sm.raftLog.committed))
lastLogTerm := sm.raftLog.zeroTermOnOutOfBounds(sm.raftLog.term(sm.raftLog.committed))
if lastLogTerm != sm.Term {
t.Fatalf("last log term = %d, want %d", lastLogTerm, sm.Term)
}
Expand Down Expand Up @@ -2614,7 +2614,7 @@ func TestLeaderAppResp(t *testing.T) {
// thus the last log term must be 1 to be committed.
sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
sm.raftLog = &raftLog{
storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}},
storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}},
unstable: unstable{offset: 3},
}
sm.becomeCandidate()
Expand Down Expand Up @@ -2722,7 +2722,7 @@ func TestRecvMsgBeat(t *testing.T) {

for i, tt := range tests {
sm := newTestRaft(1, 10, 1, newTestMemoryStorage(withPeers(1, 2, 3)))
sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 0}, {Index: 2, Term: 1}}}}
sm.raftLog = &raftLog{storage: &MemoryStorage{ents: []pb.Entry{{}, {Index: 1, Term: 1}, {Index: 2, Term: 1}}}}
sm.Term = 1
sm.state = tt.state
switch tt.state {
Expand Down
8 changes: 7 additions & 1 deletion rafttest/interaction_env_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,23 @@ func (l *RedirectLogger) Errorf(format string, v ...interface{}) {

func (l *RedirectLogger) Fatal(v ...interface{}) {
l.print(4, v...)
panic(v)
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
}

func (l *RedirectLogger) Fatalf(format string, v ...interface{}) {

l.printf(4, format, v...)
panic(fmt.Sprintf(format, v...))
}

func (l *RedirectLogger) Panic(v ...interface{}) {
l.print(4, v...)
panic(v)
}

func (l *RedirectLogger) Panicf(format string, v ...interface{}) {
l.printf(4, format, v...)
// TODO(pavelkalinnikov): catch the panic gracefully in datadriven package.
// This would allow observing all the intermediate logging while debugging,
// and testing the cases when panic is expected.
panic(fmt.Sprintf(format, v...))
}
121 changes: 121 additions & 0 deletions testdata/slow_follower_after_compaction.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# This is a regression test for https://github.com/etcd-io/raft/pull/31.

# Turn off output during the setup of the test.
log-level none
----
ok

# Start with 3 nodes, with a limited in-flight capacity.
add-nodes 3 voters=(1,2,3) index=10 inflight=2
----
ok

campaign 1
----
ok

stabilize
----
ok (quiet)

# Propose 3 entries.
propose 1 prop_1_12
----
ok

propose 1 prop_1_13
----
ok

propose 1 prop_1_14
----
ok

stabilize
----
ok (quiet)

# Re-enable log messages.
log-level debug
----
ok

# All nodes up-to-date.
status 1
----
1: StateReplicate match=14 next=15
2: StateReplicate match=14 next=15
3: StateReplicate match=14 next=15

log-level none
----
ok

propose 1 prop_1_15
----
ok

propose 1 prop_1_16
----
ok

propose 1 prop_1_17
----
ok

propose 1 prop_1_18
----
ok

# Commit entries on nodes 1 and 2.
stabilize 1 2
----
ok (quiet)

log-level debug
----
ok

# Nodes 1 and 2 up-to-date, 3 is behind and MsgApp flow is throttled.
status 1
----
1: StateReplicate match=18 next=19
2: StateReplicate match=18 next=19
3: StateReplicate match=14 next=17 paused inflight=2[full]

# Break the MsgApp flow from the leader to node 3.
deliver-msgs drop=3
----
dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[1/15 EntryNormal "prop_1_15"]
dropped: 1->3 MsgApp Term:1 Log:1/15 Commit:14 Entries:[1/16 EntryNormal "prop_1_16"]

# Truncate the leader's log beyond node 3 log size.
compact 1 17
----
1/18 EntryNormal "prop_1_18"

# Trigger a round of empty MsgApp "probe" from leader. It will reach node 3
# which will reply with a rejection MsgApp because it sees a gap in the log.
# Node 1 will reset the MsgApp flow and send a snapshot to catch node 3 up.
tick-heartbeat 1
----
ok

log-level none
----
ok

stabilize
----
ok (quiet)

log-level debug
----
ok

# All nodes caught up.
status 1
----
1: StateReplicate match=18 next=19
2: StateReplicate match=18 next=19
3: StateReplicate match=18 next=19