Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: return one entry less in Entries #29579

Merged
merged 1 commit into from
Sep 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions pkg/storage/entry_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,15 +173,16 @@ func (rec *raftEntryCache) getTerm(rangeID roachpb.RangeID, index uint64) (uint6
// getEntries returns entries between [lo, hi) for specified range.
// If any entries are returned for the specified indexes, they will
// start with index lo and proceed sequentially without gaps until
// 1) all entries exclusive of hi are fetched, 2) > maxBytes of
// entries data is fetched, or 3) a cache miss occurs.
// 1) all entries exclusive of hi are fetched, 2) fetching another entry
// would add up to more than maxBytes of data, or 3) a cache miss occurs.
// The returned size reflects the size of the returned entries.
func (rec *raftEntryCache) getEntries(
ents []raftpb.Entry, rangeID roachpb.RangeID, lo, hi, maxBytes uint64,
) ([]raftpb.Entry, uint64, uint64) {
) (_ []raftpb.Entry, size uint64, nextIndex uint64, exceededMaxBytes bool) {
rec.RLock()
defer rec.RUnlock()
var bytes uint64
nextIndex := lo
nextIndex = lo

fromKey := entryCacheKey{RangeID: rangeID, Index: lo}
toKey := entryCacheKey{RangeID: rangeID, Index: hi}
Expand All @@ -191,16 +192,20 @@ func (rec *raftEntryCache) getEntries(
return true
}
ent := v.(*raftpb.Entry)
ents = append(ents, *ent)
bytes += uint64(ent.Size())
nextIndex++
if maxBytes > 0 && bytes > maxBytes {
return true
size := uint64(ent.Size())
if bytes+size > maxBytes {
exceededMaxBytes = true
if len(ents) > 0 {
return true
}
}
return false
nextIndex++
bytes += size
ents = append(ents, *ent)
return exceededMaxBytes
}, &fromKey, &toKey)

return ents, bytes, nextIndex
return ents, bytes, nextIndex, exceededMaxBytes
}

// delEntries deletes entries between [lo, hi) for specified range.
Expand Down
15 changes: 9 additions & 6 deletions pkg/storage/entry_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
"math"
"reflect"
"testing"

Expand All @@ -24,6 +25,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)

const noLimit = math.MaxUint64

func newEntry(index, size uint64) raftpb.Entry {
return raftpb.Entry{
Index: index,
Expand All @@ -48,12 +51,12 @@ func verifyGet(
expEnts []raftpb.Entry,
expNextIndex uint64,
) {
ents, _, nextIndex := rec.getEntries(nil, rangeID, lo, hi, 0)
ents, _, nextIndex, _ := rec.getEntries(nil, rangeID, lo, hi, noLimit)
if !(len(expEnts) == 0 && len(ents) == 0) && !reflect.DeepEqual(expEnts, ents) {
t.Fatalf("expected entries %+v; got %+v", expEnts, ents)
}
if nextIndex != expNextIndex {
t.Fatalf("expected next index %d; got %d", nextIndex, expNextIndex)
t.Fatalf("expected next index %d; got %d", expNextIndex, nextIndex)
}
for _, e := range ents {
term, ok := rec.getTerm(rangeID, e.Index)
Expand Down Expand Up @@ -115,10 +118,10 @@ func TestEntryCacheClearTo(t *testing.T) {
rec.addEntries(rangeID, []raftpb.Entry{newEntry(2, 1)})
rec.addEntries(rangeID, []raftpb.Entry{newEntry(20, 1), newEntry(21, 1)})
rec.clearTo(rangeID, 21)
if ents, _, _ := rec.getEntries(nil, rangeID, 2, 21, 0); len(ents) != 0 {
if ents, _, _, _ := rec.getEntries(nil, rangeID, 2, 21, noLimit); len(ents) != 0 {
t.Errorf("expected no entries after clearTo")
}
if ents, _, _ := rec.getEntries(nil, rangeID, 21, 22, 0); len(ents) != 1 {
if ents, _, _, _ := rec.getEntries(nil, rangeID, 21, 22, noLimit); len(ents) != 1 {
t.Errorf("expected entry 22 to remain in the cache clearTo")
}
}
Expand All @@ -128,13 +131,13 @@ func TestEntryCacheEviction(t *testing.T) {
rangeID := roachpb.RangeID(1)
rec := newRaftEntryCache(100)
rec.addEntries(rangeID, []raftpb.Entry{newEntry(1, 40), newEntry(2, 40)})
ents, _, hi := rec.getEntries(nil, rangeID, 1, 3, 0)
ents, _, hi, _ := rec.getEntries(nil, rangeID, 1, 3, noLimit)
if len(ents) != 2 || hi != 3 {
t.Errorf("expected both entries; got %+v, %d", ents, hi)
}
// Add another entry to evict first.
rec.addEntries(rangeID, []raftpb.Entry{newEntry(3, 40)})
ents, _, hi = rec.getEntries(nil, rangeID, 2, 4, 0)
ents, _, hi, _ = rec.getEntries(nil, rangeID, 2, 4, noLimit)
if len(ents) != 2 || hi != 4 {
t.Errorf("expected only two entries; got %+v, %d", ents, hi)
}
Expand Down
19 changes: 11 additions & 8 deletions pkg/storage/replica_raftstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState,

// Entries implements the raft.Storage interface. Note that maxBytes is advisory
// and this method will always return at least one entry even if it exceeds
// maxBytes. Passing maxBytes equal to zero disables size checking. Sideloaded
// proposals count towards maxBytes with their payloads inlined.
// maxBytes. Sideloaded proposals count towards maxBytes with their payloads inlined.
func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) {
readonly := r.store.Engine().NewReadOnly()
defer readonly.Close()
Expand Down Expand Up @@ -115,10 +114,11 @@ func entries(
}
ents := make([]raftpb.Entry, 0, n)

ents, size, hitIndex := eCache.getEntries(ents, rangeID, lo, hi, maxBytes)
ents, size, hitIndex, exceededMaxBytes := eCache.getEntries(ents, rangeID, lo, hi, maxBytes)

// Return results if the correct number of results came back or if
// we ran into the max bytes limit.
if uint64(len(ents)) == hi-lo || (maxBytes > 0 && size > maxBytes) {
if uint64(len(ents)) == hi-lo || exceededMaxBytes {
return ents, nil
}

Expand All @@ -131,7 +131,6 @@ func entries(
canCache := true

var ent raftpb.Entry
exceededMaxBytes := false
scanFunc := func(kv roachpb.KeyValue) (bool, error) {
if err := kv.Value.GetProto(&ent); err != nil {
return false, err
Expand Down Expand Up @@ -159,9 +158,13 @@ func entries(

// Note that we track the size of proposals with payloads inlined.
size += uint64(ent.Size())

if size > maxBytes {
exceededMaxBytes = true
if len(ents) > 0 {
return exceededMaxBytes, nil
}
}
ents = append(ents, ent)
exceededMaxBytes = maxBytes > 0 && size > maxBytes
return exceededMaxBytes, nil
}

Expand Down Expand Up @@ -275,7 +278,7 @@ func term(
) (uint64, error) {
// entries() accepts a `nil` sideloaded storage and will skip inlining of
// sideloaded entries. We only need the term, so this is what we do.
ents, err := entries(ctx, rsl, eng, rangeID, eCache, nil /* sideloaded */, i, i+1, 0)
ents, err := entries(ctx, rsl, eng, rangeID, eCache, nil /* sideloaded */, i, i+1, math.MaxUint64 /* maxBytes */)
if err == raft.ErrCompacted {
ts, err := rsl.LoadTruncatedState(ctx, eng)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_sideload.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func maybeInlineSideloadedRaftCommand(
// We could unmarshal this yet again, but if it's committed we
// are very likely to have appended it recently, in which case
// we can save work.
cachedSingleton, _, _ := entryCache.getEntries(
cachedSingleton, _, _, _ := entryCache.getEntries(
nil, rangeID, ent.Index, ent.Index+1, 1<<20,
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_sideload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ func TestRaftSSTableSideloadingSnapshot(t *testing.T) {
if len(entries) != 1 {
t.Fatalf("no or too many entries returned from cache: %+v", entries)
}
ents, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20)
ents, _, _, _ := tc.store.raftEntryCache.getEntries(nil, tc.repl.RangeID, sideloadedIndex, sideloadedIndex+1, 1<<20)
if withSS {
// We passed the sideload storage, so we expect to get our
// inlined index back from the cache.
Expand Down
45 changes: 33 additions & 12 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7195,8 +7195,8 @@ func TestEntries(t *testing.T) {
}},
// Case 5: Get a single entry from cache.
{lo: indexes[5], hi: indexes[6], expResultCount: 1, expCacheCount: 1, setup: nil},
// Case 6: Use MaxUint64 instead of 0 for maxBytes.
{lo: indexes[5], hi: indexes[9], maxBytes: math.MaxUint64, expResultCount: 4, expCacheCount: 4, setup: nil},
// Case 6: Get range without size limitation. (Like case 4, without truncating).
{lo: indexes[5], hi: indexes[9], expResultCount: 4, expCacheCount: 4, setup: nil},
// Case 7: maxBytes is set low so only a single value should be
// returned.
{lo: indexes[5], hi: indexes[9], maxBytes: 1, expResultCount: 1, expCacheCount: 1, setup: nil},
Expand Down Expand Up @@ -7242,7 +7242,10 @@ func TestEntries(t *testing.T) {
if tc.setup != nil {
tc.setup()
}
cacheEntries, _, _ := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes)
if tc.maxBytes == 0 {
tc.maxBytes = math.MaxUint64
}
cacheEntries, _, _, hitLimit := repl.store.raftEntryCache.getEntries(nil, rangeID, tc.lo, tc.hi, tc.maxBytes)
if len(cacheEntries) != tc.expCacheCount {
t.Errorf("%d: expected cache count %d, got %d", i, tc.expCacheCount, len(cacheEntries))
}
Expand All @@ -7258,12 +7261,17 @@ func TestEntries(t *testing.T) {
}
if len(ents) != tc.expResultCount {
t.Errorf("%d: expected %d entries, got %d", i, tc.expResultCount, len(ents))
} else if tc.expResultCount > 0 {
expHitLimit := ents[len(ents)-1].Index < tc.hi-1
if hitLimit != expHitLimit {
t.Errorf("%d: unexpected hit limit: %t", i, hitLimit)
}
}
}

// Case 23: Lo must be less than or equal to hi.
repl.mu.Lock()
if _, err := repl.raftEntriesLocked(indexes[9], indexes[5], 0); err == nil {
if _, err := repl.raftEntriesLocked(indexes[9], indexes[5], math.MaxUint64); err == nil {
t.Errorf("23: error expected, got none")
}
repl.mu.Unlock()
Expand All @@ -7276,21 +7284,34 @@ func TestEntries(t *testing.T) {

repl.mu.Lock()
defer repl.mu.Unlock()
if _, err := repl.raftEntriesLocked(indexes[5], indexes[9], 0); err == nil {
if _, err := repl.raftEntriesLocked(indexes[5], indexes[9], math.MaxUint64); err == nil {
t.Errorf("24: error expected, got none")
}

// Case 25: don't hit the gap due to maxBytes.
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
if err != nil {
t.Errorf("25: expected no error, got %s", err)
// Case 25a: don't hit the gap due to maxBytes, cache populated.
{
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
if err != nil {
t.Errorf("25: expected no error, got %s", err)
}
if len(ents) != 1 {
t.Errorf("25: expected 1 entry, got %d", len(ents))
}
}
if len(ents) != 1 {
t.Errorf("25: expected 1 entry, got %d", len(ents))
// Case 25b: don't hit the gap due to maxBytes, cache cleared.
{
repl.store.raftEntryCache.delEntries(rangeID, indexes[5], indexes[5]+1)
ents, err := repl.raftEntriesLocked(indexes[5], indexes[9], 1)
if err != nil {
t.Errorf("25: expected no error, got %s", err)
}
if len(ents) != 1 {
t.Errorf("25: expected 1 entry, got %d", len(ents))
}
}

// Case 26: don't hit the gap due to truncation.
if _, err := repl.raftEntriesLocked(indexes[4], indexes[9], 0); err != raft.ErrCompacted {
if _, err := repl.raftEntriesLocked(indexes[4], indexes[9], math.MaxUint64); err != raft.ErrCompacted {
t.Errorf("26: expected error %s , got %s", raft.ErrCompacted, err)
}
}
Expand Down