Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: do not cache compaction block reads #2699

Merged
merged 3 commits into from
Jul 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ type compaction struct {
// resulting version has been installed (if successful), but the compaction
// goroutine is still cleaning up (eg, deleting obsolete files).
versionEditApplied bool
bufferPool sstable.BufferPool

score float64

Expand Down Expand Up @@ -1342,7 +1343,10 @@ func (c *compaction) newInputIter(
f manifest.LevelFile, _ *IterOptions, l manifest.Level, bytesIterated *uint64,
) (keyspan.FragmentIterator, error) {
iter, rangeDelIter, err := newIters(context.Background(), f.FileMetadata,
&IterOptions{level: l}, internalIterOpts{bytesIterated: &c.bytesIterated})
&IterOptions{level: l}, internalIterOpts{
bytesIterated: &c.bytesIterated,
bufferPool: &c.bufferPool,
})
if err == nil {
// TODO(peter): It is mildly wasteful to open the point iterator only to
// immediately close it. One way to solve this would be to add new
Expand Down Expand Up @@ -1427,7 +1431,10 @@ func (c *compaction) newInputIter(
// to configure the levelIter at these levels to hide the obsolete points.
addItersForLevel := func(level *compactionLevel, l manifest.Level) error {
iters = append(iters, newLevelIter(iterOpts, c.cmp, nil /* split */, newIters,
level.files.Iter(), l, &c.bytesIterated))
level.files.Iter(), l, internalIterOpts{
bytesIterated: &c.bytesIterated,
bufferPool: &c.bufferPool,
}))
// TODO(jackson): Use keyspan.LevelIter to avoid loading all the range
// deletions into memory upfront. (See #2015, which reverted this.)
// There will be no user keys that are split between sstables
Expand Down Expand Up @@ -2745,6 +2752,32 @@ func (d *DB) runCompaction(
d.mu.Unlock()
defer d.mu.Lock()

// Compactions use a pool of buffers to read blocks, avoiding polluting the
// block cache with blocks that will not be read again. We initialize the
// buffer pool with a size 12. This initial size does not need to be
// accurate, because the pool will grow to accommodate the maximum number of
// blocks allocated at a given time over the course of the compaction. But
// choosing a size larger than that working set avoids any additional
// allocations to grow the size of the pool over the course of iteration.
//
// Justification for initial size 12: In a two-level compaction, at any
// given moment we'll have 2 index blocks in-use and 2 data blocks in-use.
// Additionally, when decoding a compressed block, we'll temporarily
// allocate 1 additional block to hold the compressed buffer. In the worst
// case that all input sstables have two-level index blocks (+2), value
// blocks (+2), range deletion blocks (+n) and range key blocks (+n), we'll
// additionally require 2n+4 blocks where n is the number of input sstables.
// Range deletion and range key blocks are relatively rare, and the cost of
// an additional allocation or two over the course of the compaction is
// considered to be okay. A larger initial size would cause the pool to hold
// on to more memory, even when it's not in-use because the pool will
// recycle buffers up to the current capacity of the pool. The memory use of
// a 12-buffer pool is expected to be within reason, even if all the buffers
// grow to the typical size of an index block (256 KiB) which would
// translate to 3 MiB per compaction.
c.bufferPool.Init(12)
defer c.bufferPool.Release()

iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter, snapshots)
if err != nil {
return nil, pendingOutputs, stats, err
Expand Down
9 changes: 4 additions & 5 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
"github.com/cockroachdb/pebble/sstable"
Expand Down Expand Up @@ -787,14 +788,12 @@ func TestIterLeakSharedCache(t *testing.T) {
}

func TestMemTableReservation(t *testing.T) {
cache := NewCache(128 << 10 /* 128 KB */)
defer cache.Unref()

opts := &Options{
Cache: cache,
Cache: NewCache(128 << 10 /* 128 KB */),
MemTableSize: initialMemTableSize,
FS: vfs.NewMem(),
}
defer opts.Cache.Unref()
opts.testingRandomized(t)
opts.EnsureDefaults()
// We're going to be looking at and asserting the global memtable reservation
Expand All @@ -805,7 +804,7 @@ func TestMemTableReservation(t *testing.T) {
// cache size, so opening the DB should cause this block to be evicted.
tmpID := opts.Cache.NewID()
helloWorld := []byte("hello world")
value := opts.Cache.Alloc(len(helloWorld))
value := cache.Alloc(len(helloWorld))
copy(value.Buf(), helloWorld)
opts.Cache.Set(tmpID, base.FileNum(0).DiskFileNum(), 0, value).Release()

Expand Down
2 changes: 1 addition & 1 deletion flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator {
// aren't truly levels in the lsm. Right now, the encoding only supports
// L0 sublevels, and the rest of the levels in the lsm.
return newLevelIter(
opts, s.cmp, s.split, s.newIters, s.slice.Iter(), manifest.Level(0), nil,
opts, s.cmp, s.split, s.newIters, s.slice.Iter(), manifest.Level(0), internalIterOpts{},
)
}

Expand Down
4 changes: 2 additions & 2 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ func ingestTargetLevel(
// Check for overlap over the keys of L0 by iterating over the sublevels.
for subLevel := 0; subLevel < len(v.L0SublevelFiles); subLevel++ {
iter := newLevelIter(iterOps, cmp, nil /* split */, newIters,
v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), nil)
v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), internalIterOpts{})

var rangeDelIter keyspan.FragmentIterator
// Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
Expand Down Expand Up @@ -733,7 +733,7 @@ func ingestTargetLevel(
level := baseLevel
for ; level < numLevels; level++ {
levelIter := newLevelIter(iterOps, cmp, nil /* split */, newIters,
v.Levels[level].Iter(), manifest.Level(level), nil)
v.Levels[level].Iter(), manifest.Level(level), internalIterOpts{})
var rangeDelIter keyspan.FragmentIterator
// Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE
// sets it up for the target file.
Expand Down
4 changes: 2 additions & 2 deletions internal/cache/clockpro.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,14 +785,14 @@ func (c *Cache) Size() int64 {
// manually managed. The caller MUST either add the value to the cache (via
// Cache.Set), or release the value (via Cache.Free). Failure to do so will
// result in a memory leak.
func (c *Cache) Alloc(n int) *Value {
func Alloc(n int) *Value {
return newValue(n)
}

// Free frees the specified value. The buffer associated with the value will
// possibly be reused, making it invalid to use the buffer after calling
// Free. Do not call Free on a value that has been added to the cache.
func (c *Cache) Free(v *Value) {
func Free(v *Value) {
if n := v.refs(); n > 1 {
panic(fmt.Sprintf("pebble: Value has been added to the cache: refs=%d", n))
}
Expand Down
4 changes: 2 additions & 2 deletions internal/cache/clockpro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestCache(t *testing.T) {
var hit bool
h := cache.Get(1, base.FileNum(uint64(key)).DiskFileNum(), 0)
if v := h.Get(); v == nil {
value := cache.Alloc(1)
value := Alloc(1)
value.Buf()[0] = fields[0][0]
cache.Set(1, base.FileNum(uint64(key)).DiskFileNum(), 0, value).Release()
} else {
Expand All @@ -60,7 +60,7 @@ func TestCache(t *testing.T) {

func testValue(cache *Cache, s string, repeat int) *Value {
b := bytes.Repeat([]byte(s), repeat)
v := cache.Alloc(len(b))
v := Alloc(len(b))
copy(v.Buf(), b)
return v
}
Expand Down
5 changes: 3 additions & 2 deletions level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func tableNewRangeDelIter(ctx context.Context, newIters tableNewIters) keyspan.T

type internalIterOpts struct {
bytesIterated *uint64
bufferPool *sstable.BufferPool
stats *base.InternalIteratorStats
boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter
}
Expand Down Expand Up @@ -246,11 +247,11 @@ func newLevelIter(
newIters tableNewIters,
files manifest.LevelIterator,
level manifest.Level,
bytesIterated *uint64,
internalOpts internalIterOpts,
) *levelIter {
l := &levelIter{}
l.init(context.Background(), opts, cmp, split, newIters, files, level,
internalIterOpts{bytesIterated: bytesIterated})
internalOpts)
return l
}

Expand Down
16 changes: 8 additions & 8 deletions level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestLevelIter(t *testing.T) {

iter := newLevelIter(opts, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, newIters, files.Iter(), manifest.Level(level),
nil)
internalIterOpts{})
defer iter.Close()
// Fake up the range deletion initialization.
iter.initRangeDel(new(keyspan.FragmentIterator))
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestLevelIter(t *testing.T) {

iter := newLevelIter(opts, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, newIters2, files.Iter(),
manifest.Level(level), nil)
manifest.Level(level), internalIterOpts{})
iter.SeekGE([]byte(key), base.SeekGEFlagsNone)
lower, upper := tableOpts.GetLowerBound(), tableOpts.GetUpperBound()
return fmt.Sprintf("[%s,%s]\n", lower, upper)
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestLevelIterBoundaries(t *testing.T) {
slice := manifest.NewLevelSliceKeySorted(lt.cmp.Compare, lt.metas)
iter = newLevelIter(IterOptions{}, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, lt.newIters, slice.Iter(),
manifest.Level(level), nil)
manifest.Level(level), internalIterOpts{})
// Fake up the range deletion initialization.
iter.initRangeDel(new(keyspan.FragmentIterator))
}
Expand Down Expand Up @@ -536,7 +536,7 @@ func BenchmarkLevelIterSeekGE(b *testing.B) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))

b.ResetTimer()
Expand Down Expand Up @@ -578,7 +578,7 @@ func BenchmarkLevelIterSeqSeekGEWithBounds(b *testing.B) {
opts.LowerBound, opts.UpperBound)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})
// Fake up the range deletion initialization, to resemble the usage
// in a mergingIter.
l.initRangeDel(new(keyspan.FragmentIterator))
Expand Down Expand Up @@ -627,7 +627,7 @@ func BenchmarkLevelIterSeqSeekPrefixGE(b *testing.B) {
func(b *testing.B) {
l := newLevelIter(IterOptions{}, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, newIters, metas.Iter(),
manifest.Level(level), nil)
manifest.Level(level), internalIterOpts{})
// Fake up the range deletion initialization, to resemble the usage
// in a mergingIter.
l.initRangeDel(new(keyspan.FragmentIterator))
Expand Down Expand Up @@ -672,7 +672,7 @@ func BenchmarkLevelIterNext(b *testing.B) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -706,7 +706,7 @@ func BenchmarkLevelIterPrev(b *testing.B) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{})

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
2 changes: 1 addition & 1 deletion merging_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, newIters, levelSlices[i].Iter(),
manifest.Level(level), nil)
manifest.Level(level), internalIterOpts{})
l.initRangeDel(&mils[level].rangeDelIter)
l.initBoundaryContext(&mils[level].levelIterBoundaryContext)
mils[level].iter = l
Expand Down
18 changes: 8 additions & 10 deletions sstable/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/cache"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manual"
Expand Down Expand Up @@ -400,10 +399,9 @@ type blockIter struct {
// For a block encoded with a restart interval of 1, cached and cachedBuf
// will not be used as there are no prefix compressed entries between the
// restart points.
cached []blockEntry
cachedBuf []byte
cacheHandle cache.Handle
// The first user key in the block. This is used by the caller to set bounds
cached []blockEntry
cachedBuf []byte
handle bufferHandle
// for block iteration for already loaded blocks.
firstUserKey []byte
lazyValueHandling struct {
Expand Down Expand Up @@ -458,10 +456,10 @@ func (i *blockIter) init(
// ingested.
// - Foreign sstable iteration: globalSeqNum is always set.
func (i *blockIter) initHandle(
cmp Compare, block cache.Handle, globalSeqNum uint64, hideObsoletePoints bool,
cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool,
) error {
i.cacheHandle.Release()
i.cacheHandle = block
i.handle.Release()
i.handle = block
return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints)
}

Expand Down Expand Up @@ -1515,8 +1513,8 @@ func (i *blockIter) Error() error {
// Close implements internalIterator.Close, as documented in the pebble
// package.
func (i *blockIter) Close() error {
i.cacheHandle.Release()
i.cacheHandle = cache.Handle{}
i.handle.Release()
i.handle = bufferHandle{}
i.val = nil
i.lazyValue = base.LazyValue{}
i.lazyValueHandling.vbr = nil
Expand Down
2 changes: 1 addition & 1 deletion sstable/block_property_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1316,7 +1316,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string {
// block that bhp points to, along with its block properties.
if twoLevelIndex {
subiter := &blockIter{}
subIndex, err := r.readBlock(context.Background(), bhp.BlockHandle, nil, nil, nil)
subIndex, err := r.readBlock(context.Background(), bhp.BlockHandle, nil, nil, nil, nil)
if err != nil {
return err.Error()
}
Expand Down
Loading