From d0fb0cd65f0a4bc7d88548444bbaed0275a6997d Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 8 Mar 2023 12:29:06 +0000 Subject: [PATCH] raft: don't silently return 0 from raftLog.term() This commit makes raftLog.term() call return ErrCompacted and ErrUnavailable errors if the requested log index is out of bounds. Previously it would return 0 which was an error-prone behaviour. Signed-off-by: Pavel Kalinnikov --- log.go | 21 +++++++++++--------- log_test.go | 55 +++++++++++++++++++++++++++------------------------- raft.go | 7 ++----- raft_test.go | 2 +- 4 files changed, 44 insertions(+), 41 deletions(-) diff --git a/log.go b/log.go index daf7acef..3cad705a 100644 --- a/log.go +++ b/log.go @@ -153,7 +153,7 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 { if !l.matchTerm(ne.Index, ne.Term) { if ne.Index <= l.lastIndex() { l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]", - ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term) + ne.Index, l.zeroTermOnOutOfBounds(l.term(ne.Index)), ne.Term) } return ne.Index } @@ -388,11 +388,14 @@ func (l *raftLog) term(i uint64) (uint64, error) { return t, nil } - // The valid term range is [index of dummy entry, last index]. - dummyIndex := l.firstIndex() - 1 - if i < dummyIndex || i > l.lastIndex() { - // TODO: return an error instead? - return 0, nil + // The valid term range is [firstIndex-1, lastIndex]. Even though the entry at + // firstIndex-1 is compacted away, its term is available for matching purposes + // when doing log appends. + if i+1 < l.firstIndex() { + return 0, ErrCompacted + } + if i > l.lastIndex() { + return 0, ErrUnavailable } t, err := l.storage.Term(i) @@ -444,7 +447,7 @@ func (l *raftLog) matchTerm(i, term uint64) bool { } func (l *raftLog) maybeCommit(maxIndex, term uint64) bool { - if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term { + if maxIndex > l.committed && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term { l.commitTo(maxIndex) return true } @@ -515,11 +518,11 @@ func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error { return nil } -func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 { +func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 { if err == nil { return t } - if err == ErrCompacted { + if err == ErrCompacted || err == ErrUnavailable { return 0 } l.logger.Panicf("unexpected error (%v)", err) diff --git a/log_test.go b/log_test.go index 89d5827e..355283ea 100644 --- a/log_test.go +++ b/log_test.go @@ -771,31 +771,31 @@ func TestIsOutOfBounds(t *testing.T) { } func TestTerm(t *testing.T) { - var i uint64 offset := uint64(100) num := uint64(100) storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}}) l := newLog(storage, raftLogger) - for i = 1; i < num; i++ { + for i := uint64(1); i < num; i++ { l.append(pb.Entry{Index: offset + i, Term: i}) } - tests := []struct { - index uint64 - w uint64 + for i, tt := range []struct { + idx uint64 + term uint64 + err error }{ - {offset - 1, 0}, - {offset, 1}, - {offset + num/2, num / 2}, - {offset + num - 1, num - 1}, - {offset + num, 0}, - } - - for j, tt := range tests { - t.Run(fmt.Sprint(j), func(t *testing.T) { - require.Equal(t, tt.w, mustTerm(l.term(tt.index))) + {idx: offset - 1, err: ErrCompacted}, + {idx: offset, term: 1}, + {idx: offset + num/2, term: num / 2}, + {idx: offset + num - 1, term: num - 1}, + {idx: offset + num, err: ErrUnavailable}, + } { + t.Run(fmt.Sprint(i), func(t *testing.T) { + term, err := l.term(tt.idx) + require.Equal(t, tt.term, term) + require.Equal(t, tt.err, err) }) } } @@ -809,22 +809,25 @@ func TestTermWithUnstableSnapshot(t *testing.T) { l := newLog(storage, raftLogger) l.restore(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}}) - tests := []struct { - index uint64 - w uint64 + for i, tt := range []struct { + idx uint64 + term uint64 + err error }{ // cannot get term from storage - {storagesnapi, 0}, + {idx: storagesnapi, err: ErrCompacted}, // cannot get term from the gap between storage ents and unstable snapshot - {storagesnapi + 1, 0}, - {unstablesnapi - 1, 0}, + {idx: storagesnapi + 1, err: ErrCompacted}, + {idx: unstablesnapi - 1, err: ErrCompacted}, // get term from unstable snapshot index - {unstablesnapi, 1}, - } - - for i, tt := range tests { + {idx: unstablesnapi, term: 1}, + // the log beyond the unstable snapshot is empty + {idx: unstablesnapi + 1, err: ErrUnavailable}, + } { t.Run(fmt.Sprint(i), func(t *testing.T) { - require.Equal(t, tt.w, mustTerm(l.term(tt.index))) + term, err := l.term(tt.idx) + require.Equal(t, tt.term, term) + require.Equal(t, tt.err, err) }) } } diff --git a/raft.go b/raft.go index 9fee1f96..1c0845e5 100644 --- a/raft.go +++ b/raft.go @@ -573,9 +573,6 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { lastIndex, nextIndex := pr.Next-1, pr.Next lastTerm, errt := r.raftLog.term(lastIndex) - if lastIndex != 0 && lastTerm == 0 && errt == nil { - errt = ErrCompacted - } var ents []pb.Entry var erre error @@ -1656,7 +1653,7 @@ func (r *raft) handleAppendEntries(m pb.Message) { r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex}) } else { r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x", - r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) + r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From) // Return a hint to the leader about the maximum index and term that the // two logs could be divergent at. Do this by searching through the @@ -1912,7 +1909,7 @@ func (r *raft) abortLeaderTransfer() { // committedEntryInCurrentTerm return true if the peer has committed an entry in its term. func (r *raft) committedEntryInCurrentTerm() bool { - return r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) == r.Term + return r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(r.raftLog.committed)) == r.Term } // responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer diff --git a/raft_test.go b/raft_test.go index ca030262..65a61694 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2559,7 +2559,7 @@ func TestReadOnlyForNewLeader(t *testing.T) { if sm.raftLog.committed != 4 { t.Fatalf("committed = %d, want 4", sm.raftLog.committed) } - lastLogTerm := sm.raftLog.zeroTermOnErrCompacted(sm.raftLog.term(sm.raftLog.committed)) + lastLogTerm := sm.raftLog.zeroTermOnOutOfBounds(sm.raftLog.term(sm.raftLog.committed)) if lastLogTerm != sm.Term { t.Fatalf("last log term = %d, want %d", lastLogTerm, sm.Term) }