Skip to content

Commit

Permalink
storage: return one entry less in Entries
Browse files Browse the repository at this point in the history
This works around the bug outlined in:

etcd-io/etcd#10063

by matching Raft's internal implementation of commit pagination.
Once the above PR lands, we can revert this commit (but I assume
that it will take a little bit), and I think we should do that
because the code hasn't gotten any nicer to look at.

Fixes #28918.

Release note: None
  • Loading branch information
tbg committed Sep 5, 2018
1 parent 5692414 commit a051ed9
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 30 deletions.
27 changes: 16 additions & 11 deletions pkg/storage/entry_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,16 @@ func (rec *raftEntryCache) getTerm(rangeID roachpb.RangeID, index uint64) (uint6
// getEntries returns entries between [lo, hi) for specified range.
// If any entries are returned for the specified indexes, they will
// start with index lo and proceed sequentially without gaps until
// 1) all entries exclusive of hi are fetched, 2) > maxBytes of
// entries data is fetched, or 3) a cache miss occurs.
// 1) all entries exclusive of hi are fetched, 2) fetching another entry
// would add up to more than maxBytes of data, or 3) a cache miss occurs.
// The returned size reflects the size of the returned entries.
func (rec *raftEntryCache) getEntries(
ents []raftpb.Entry, rangeID roachpb.RangeID, lo, hi, maxBytes uint64,
) ([]raftpb.Entry, uint64, uint64) {
) (_ []raftpb.Entry, size uint64, nextIndex uint64, hitMaxSize bool) {
rec.RLock()
defer rec.RUnlock()
var bytes uint64
nextIndex := lo
nextIndex = lo

fromKey := entryCacheKey{RangeID: rangeID, Index: lo}
toKey := entryCacheKey{RangeID: rangeID, Index: hi}
Expand All @@ -191,16 +192,20 @@ func (rec *raftEntryCache) getEntries(
return true
}
ent := v.(*raftpb.Entry)
ents = append(ents, *ent)
bytes += uint64(ent.Size())
nextIndex++
if maxBytes > 0 && bytes > maxBytes {
return true
size := uint64(ent.Size())
if maxBytes > 0 && bytes+size > maxBytes {
hitMaxSize = true
if len(ents) > 0 {
return true
}
}
return false
nextIndex++
bytes += size
ents = append(ents, *ent)
return hitMaxSize
}, &fromKey, &toKey)

return ents, bytes, nextIndex
return ents, bytes, nextIndex, hitMaxSize
}

// delEntries deletes entries between [lo, hi) for specified range.
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/entry_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func verifyGet(
expEnts []raftpb.Entry,
expNextIndex uint64,
) {
ents, _, nextIndex := rec.getEntries(nil, rangeID, lo, hi, 0)
ents, _, nextIndex, _ := rec.getEntries(nil, rangeID, lo, hi, 0)
if !(len(expEnts) == 0 && len(ents) == 0) && !reflect.DeepEqual(expEnts, ents) {
t.Fatalf("expected entries %+v; got %+v", expEnts, ents)
}
Expand Down Expand Up @@ -115,10 +115,10 @@ func TestEntryCacheClearTo(t *testing.T) {
rec.addEntries(rangeID, []raftpb.Entry{newEntry(2, 1)})
rec.addEntries(rangeID, []raftpb.Entry{newEntry(20, 1), newEntry(21, 1)})
rec.clearTo(rangeID, 21)
if ents, _, _ := rec.getEntries(nil, rangeID, 2, 21, 0); len(ents) != 0 {
if ents, _, _, _ := rec.getEntries(nil, rangeID, 2, 21, 0); len(ents) != 0 {
t.Errorf("expected no entries after clearTo")
}
if ents, _, _ := rec.getEntries(nil, rangeID, 21, 22, 0); len(ents) != 1 {
if ents, _, _, _ := rec.getEntries(nil, rangeID, 21, 22, 0); len(ents) != 1 {
t.Errorf("expected entry 22 to remain in the cache clearTo")
}
}
Expand All @@ -128,13 +128,13 @@ func TestEntryCacheEviction(t *testing.T) {
rangeID := roachpb.RangeID(1)
rec := newRaftEntryCache(100)
rec.addEntries(rangeID, []raftpb.Entry{newEntry(1, 40), newEntry(2, 40)})
ents, _, hi := rec.getEntries(nil, rangeID, 1, 3, 0)
ents, _, hi, _ := rec.getEntries(nil, rangeID, 1, 3, 0)
if len(ents) != 2 || hi != 3 {
t.Errorf("expected both entries; got %+v, %d", ents, hi)
}
// Add another entry to evict first.
rec.addEntries(rangeID, []raftpb.Entry{newEntry(3, 40)})
ents, _, hi = rec.getEntries(nil, rangeID, 2, 4, 0)
ents, _, hi, _ = rec.getEntries(nil, rangeID, 2, 4, 0)
if len(ents) != 2 || hi != 4 {
t.Errorf("expected only two entries; got %+v, %d", ents, hi)
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,11 @@ func entries(
}
ents := make([]raftpb.Entry, 0, n)

ents, size, hitIndex := eCache.getEntries(ents, rangeID, lo, hi, maxBytes)
ents, size, hitIndex, exceededMaxBytes := eCache.getEntries(ents, rangeID, lo, hi, maxBytes)

// Return results if the correct number of results came back or if
// we ran into the max bytes limit.
if uint64(len(ents)) == hi-lo || (maxBytes > 0 && size > maxBytes) {
if uint64(len(ents)) == hi-lo || exceededMaxBytes {
return ents, nil
}

Expand All @@ -131,7 +132,6 @@ func entries(
canCache := true

var ent raftpb.Entry
exceededMaxBytes := false
scanFunc := func(kv roachpb.KeyValue) (bool, error) {
if err := kv.Value.GetProto(&ent); err != nil {
return false, err
Expand Down Expand Up @@ -159,9 +159,13 @@ func entries(

// Note that we track the size of proposals with payloads inlined.
size += uint64(ent.Size())

if maxBytes > 0 && size > maxBytes {
exceededMaxBytes = true
if len(ents) > 0 {
return exceededMaxBytes, nil
}
}
ents = append(ents, ent)
exceededMaxBytes = maxBytes > 0 && size > maxBytes
return exceededMaxBytes, nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func maybeInlineSideloadedRaftCommand(
// We could unmarshal this yet again, but if it's committed we
// are very likely to have appended it recently, in which case
// we can save work.
cachedSingleton, _, _ := entryCache.getEntries(
cachedSingleton, _, _, _ := entryCache.getEntries(
nil, rangeID, ent.Index, ent.Index+1, 1<<20,
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) {
if len(entries) != 1 {
t.Fatalf("no or too many entries returned from cache: %+v", entries)
}
ents, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20)
ents, _, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20)
if withSS {
// We passed the sideload storage, so we expect to get our
// inlined index back from the cache.
Expand Down
32 changes: 25 additions & 7 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7242,7 +7242,7 @@ func TestEntries(t *testing.T) {
if tc.setup != nil {
tc.setup()
}
cacheEntries, _, _ := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes)
cacheEntries, _, _, hitLimit := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes)
if len(cacheEntries) != tc.expCacheCount {
t.Errorf("%d: expected cache count %d, got %d", i, tc.expCacheCount, len(cacheEntries))
}
Expand All @@ -7258,6 +7258,11 @@ func TestEntries(t *testing.T) {
}
if len(ents) != tc.expResultCount {
t.Errorf("%d: expected %d entries, got %d", i, tc.expResultCount, len(ents))
} else if tc.expResultCount > 0 {
expHitLimit := ents[len(ents)-1].Index < tc.hi-1
if hitLimit != expHitLimit {
t.Errorf("%d: unexpected hit limit: %t", i, hitLimit)
}
}
}

Expand All @@ -7280,13 +7285,26 @@ func TestEntries(t *testing.T) {
t.Errorf("24: error expected, got none")
}

// Case 25: don't hit the gap due to maxBytes.
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
if err != nil {
t.Errorf("25: expected no error, got %s", err)
// Case 25a: don't hit the gap due to maxBytes, cache populated.
{
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
if err != nil {
t.Errorf("25: expected no error, got %s", err)
}
if len(ents) != 1 {
t.Errorf("25: expected 1 entry, got %d", len(ents))
}
}
if len(ents) != 1 {
t.Errorf("25: expected 1 entry, got %d", len(ents))
// Case 25b: don't hit the gap due to maxBytes, cache cleared.
{
repl.store.raftEntryCache.delEntries(rangeID, indexes[5], indexes[5]+1)
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
if err != nil {
t.Errorf("25: expected no error, got %s", err)
}
if len(ents) != 1 {
t.Errorf("25: expected 1 entry, got %d", len(ents))
}
}

// Case 26: don't hit the gap due to truncation.
Expand Down

0 comments on commit a051ed9

Please sign in to comment.