Skip to content

Commit

Permalink
raft: don't silently return 0 from raftLog.term()
Browse files Browse the repository at this point in the history
This commit makes raftLog.term() call return ErrCompacted and ErrUnavailable
errors if the requested log index is out of bounds. Previously it would return
0 which was an error-prone behaviour.

Signed-off-by: Pavel Kalinnikov <pavel@cockroachlabs.com>
  • Loading branch information
pav-kv committed Mar 8, 2023
1 parent 6df333b commit d0fb0cd
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 41 deletions.
21 changes: 12 additions & 9 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
if !l.matchTerm(ne.Index, ne.Term) {
if ne.Index <= l.lastIndex() {
l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
ne.Index, l.zeroTermOnOutOfBounds(l.term(ne.Index)), ne.Term)
}
return ne.Index
}
Expand Down Expand Up @@ -388,11 +388,14 @@ func (l *raftLog) term(i uint64) (uint64, error) {
return t, nil
}

// The valid term range is [index of dummy entry, last index].
dummyIndex := l.firstIndex() - 1
if i < dummyIndex || i > l.lastIndex() {
// TODO: return an error instead?
return 0, nil
// The valid term range is [firstIndex-1, lastIndex]. Even though the entry at
// firstIndex-1 is compacted away, its term is available for matching purposes
// when doing log appends.
if i+1 < l.firstIndex() {
return 0, ErrCompacted
}
if i > l.lastIndex() {
return 0, ErrUnavailable
}

t, err := l.storage.Term(i)
Expand Down Expand Up @@ -444,7 +447,7 @@ func (l *raftLog) matchTerm(i, term uint64) bool {
}

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
if maxIndex > l.committed && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term {
l.commitTo(maxIndex)
return true
}
Expand Down Expand Up @@ -515,11 +518,11 @@ func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
return nil
}

func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 {
func (l *raftLog) zeroTermOnOutOfBounds(t uint64, err error) uint64 {
if err == nil {
return t
}
if err == ErrCompacted {
if err == ErrCompacted || err == ErrUnavailable {
return 0
}
l.logger.Panicf("unexpected error (%v)", err)
Expand Down
55 changes: 29 additions & 26 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,31 +771,31 @@ func TestIsOutOfBounds(t *testing.T) {
}

func TestTerm(t *testing.T) {
var i uint64
offset := uint64(100)
num := uint64(100)

storage := NewMemoryStorage()
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}})
l := newLog(storage, raftLogger)
for i = 1; i < num; i++ {
for i := uint64(1); i < num; i++ {
l.append(pb.Entry{Index: offset + i, Term: i})
}

tests := []struct {
index uint64
w uint64
for i, tt := range []struct {
idx uint64
term uint64
err error
}{
{offset - 1, 0},
{offset, 1},
{offset + num/2, num / 2},
{offset + num - 1, num - 1},
{offset + num, 0},
}

for j, tt := range tests {
t.Run(fmt.Sprint(j), func(t *testing.T) {
require.Equal(t, tt.w, mustTerm(l.term(tt.index)))
{idx: offset - 1, err: ErrCompacted},
{idx: offset, term: 1},
{idx: offset + num/2, term: num / 2},
{idx: offset + num - 1, term: num - 1},
{idx: offset + num, err: ErrUnavailable},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
term, err := l.term(tt.idx)
require.Equal(t, tt.term, term)
require.Equal(t, tt.err, err)
})
}
}
Expand All @@ -809,22 +809,25 @@ func TestTermWithUnstableSnapshot(t *testing.T) {
l := newLog(storage, raftLogger)
l.restore(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}})

tests := []struct {
index uint64
w uint64
for i, tt := range []struct {
idx uint64
term uint64
err error
}{
// cannot get term from storage
{storagesnapi, 0},
{idx: storagesnapi, err: ErrCompacted},
// cannot get term from the gap between storage ents and unstable snapshot
{storagesnapi + 1, 0},
{unstablesnapi - 1, 0},
{idx: storagesnapi + 1, err: ErrCompacted},
{idx: unstablesnapi - 1, err: ErrCompacted},
// get term from unstable snapshot index
{unstablesnapi, 1},
}

for i, tt := range tests {
{idx: unstablesnapi, term: 1},
// the log beyond the unstable snapshot is empty
{idx: unstablesnapi + 1, err: ErrUnavailable},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
require.Equal(t, tt.w, mustTerm(l.term(tt.index)))
term, err := l.term(tt.idx)
require.Equal(t, tt.term, term)
require.Equal(t, tt.err, err)
})
}
}
Expand Down
7 changes: 2 additions & 5 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,6 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {

lastIndex, nextIndex := pr.Next-1, pr.Next
lastTerm, errt := r.raftLog.term(lastIndex)
if lastIndex != 0 && lastTerm == 0 && errt == nil {
errt = ErrCompacted
}

var ents []pb.Entry
var erre error
Expand Down Expand Up @@ -1656,7 +1653,7 @@ func (r *raft) handleAppendEntries(m pb.Message) {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)

// Return a hint to the leader about the maximum index and term that the
// two logs could be divergent at. Do this by searching through the
Expand Down Expand Up @@ -1912,7 +1909,7 @@ func (r *raft) abortLeaderTransfer() {

// committedEntryInCurrentTerm return true if the peer has committed an entry in its term.
func (r *raft) committedEntryInCurrentTerm() bool {
return r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) == r.Term
return r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(r.raftLog.committed)) == r.Term
}

// responseToReadIndexReq constructs a response for `req`. If `req` comes from the peer
Expand Down
2 changes: 1 addition & 1 deletion raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2559,7 +2559,7 @@ func TestReadOnlyForNewLeader(t *testing.T) {
if sm.raftLog.committed != 4 {
t.Fatalf("committed = %d, want 4", sm.raftLog.committed)
}
lastLogTerm := sm.raftLog.zeroTermOnErrCompacted(sm.raftLog.term(sm.raftLog.committed))
lastLogTerm := sm.raftLog.zeroTermOnOutOfBounds(sm.raftLog.term(sm.raftLog.committed))
if lastLogTerm != sm.Term {
t.Fatalf("last log term = %d, want %d", lastLogTerm, sm.Term)
}
Expand Down

0 comments on commit d0fb0cd

Please sign in to comment.