diff --git a/compaction.go b/compaction.go index 81eab36846..c59c09d49b 100644 --- a/compaction.go +++ b/compaction.go @@ -566,6 +566,7 @@ type compaction struct { // resulting version has been installed (if successful), but the compaction // goroutine is still cleaning up (eg, deleting obsolete files). versionEditApplied bool + bufferPool sstable.BufferPool score float64 @@ -1342,7 +1343,10 @@ func (c *compaction) newInputIter( f manifest.LevelFile, _ *IterOptions, l manifest.Level, bytesIterated *uint64, ) (keyspan.FragmentIterator, error) { iter, rangeDelIter, err := newIters(context.Background(), f.FileMetadata, - &IterOptions{level: l}, internalIterOpts{bytesIterated: &c.bytesIterated}) + &IterOptions{level: l}, internalIterOpts{ + bytesIterated: &c.bytesIterated, + bufferPool: &c.bufferPool, + }) if err == nil { // TODO(peter): It is mildly wasteful to open the point iterator only to // immediately close it. One way to solve this would be to add new @@ -1427,7 +1431,10 @@ func (c *compaction) newInputIter( // to configure the levelIter at these levels to hide the obsolete points. addItersForLevel := func(level *compactionLevel, l manifest.Level) error { iters = append(iters, newLevelIter(iterOpts, c.cmp, nil /* split */, newIters, - level.files.Iter(), l, &c.bytesIterated)) + level.files.Iter(), l, internalIterOpts{ + bytesIterated: &c.bytesIterated, + bufferPool: &c.bufferPool, + })) // TODO(jackson): Use keyspan.LevelIter to avoid loading all the range // deletions into memory upfront. (See #2015, which reverted this.) // There will be no user keys that are split between sstables @@ -2745,6 +2752,11 @@ func (d *DB) runCompaction( d.mu.Unlock() defer d.mu.Lock() + // Compactions use a pool of buffers to read blocks, avoiding polluting the + // block cache with blocks that will not be read again. + c.bufferPool.Init(d.opts.Cache, 5) + defer c.bufferPool.Release() + iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter, snapshots) if err != nil { return nil, pendingOutputs, stats, err diff --git a/flushable.go b/flushable.go index cbd486eb79..474d410c52 100644 --- a/flushable.go +++ b/flushable.go @@ -172,7 +172,7 @@ func (s *ingestedFlushable) newIter(o *IterOptions) internalIterator { // aren't truly levels in the lsm. Right now, the encoding only supports // L0 sublevels, and the rest of the levels in the lsm. return newLevelIter( - opts, s.cmp, s.split, s.newIters, s.slice.Iter(), manifest.Level(0), nil, + opts, s.cmp, s.split, s.newIters, s.slice.Iter(), manifest.Level(0), internalIterOpts{}, ) } diff --git a/ingest.go b/ingest.go index 44c8b655bb..db1374076f 100644 --- a/ingest.go +++ b/ingest.go @@ -702,7 +702,7 @@ func ingestTargetLevel( // Check for overlap over the keys of L0 by iterating over the sublevels. for subLevel := 0; subLevel < len(v.L0SublevelFiles); subLevel++ { iter := newLevelIter(iterOps, cmp, nil /* split */, newIters, - v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), nil) + v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), internalIterOpts{}) var rangeDelIter keyspan.FragmentIterator // Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE @@ -733,7 +733,7 @@ func ingestTargetLevel( level := baseLevel for ; level < numLevels; level++ { levelIter := newLevelIter(iterOps, cmp, nil /* split */, newIters, - v.Levels[level].Iter(), manifest.Level(level), nil) + v.Levels[level].Iter(), manifest.Level(level), internalIterOpts{}) var rangeDelIter keyspan.FragmentIterator // Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE // sets it up for the target file. diff --git a/level_iter.go b/level_iter.go index 3cf90274a0..d10ec39488 100644 --- a/level_iter.go +++ b/level_iter.go @@ -51,6 +51,7 @@ func tableNewRangeDelIter(ctx context.Context, newIters tableNewIters) keyspan.T type internalIterOpts struct { bytesIterated *uint64 + bufferPool *sstable.BufferPool stats *base.InternalIteratorStats boundLimitedFilter sstable.BoundLimitedBlockPropertyFilter } @@ -246,11 +247,11 @@ func newLevelIter( newIters tableNewIters, files manifest.LevelIterator, level manifest.Level, - bytesIterated *uint64, + internalOpts internalIterOpts, ) *levelIter { l := &levelIter{} l.init(context.Background(), opts, cmp, split, newIters, files, level, - internalIterOpts{bytesIterated: bytesIterated}) + internalOpts) return l } diff --git a/level_iter_test.go b/level_iter_test.go index d2ec77b80b..233249ba70 100644 --- a/level_iter_test.go +++ b/level_iter_test.go @@ -88,7 +88,7 @@ func TestLevelIter(t *testing.T) { iter := newLevelIter(opts, DefaultComparer.Compare, func(a []byte) int { return len(a) }, newIters, files.Iter(), manifest.Level(level), - nil) + internalIterOpts{}) defer iter.Close() // Fake up the range deletion initialization. iter.initRangeDel(new(keyspan.FragmentIterator)) @@ -131,7 +131,7 @@ func TestLevelIter(t *testing.T) { iter := newLevelIter(opts, DefaultComparer.Compare, func(a []byte) int { return len(a) }, newIters2, files.Iter(), - manifest.Level(level), nil) + manifest.Level(level), internalIterOpts{}) iter.SeekGE([]byte(key), base.SeekGEFlagsNone) lower, upper := tableOpts.GetLowerBound(), tableOpts.GetUpperBound() return fmt.Sprintf("[%s,%s]\n", lower, upper) @@ -326,7 +326,7 @@ func TestLevelIterBoundaries(t *testing.T) { slice := manifest.NewLevelSliceKeySorted(lt.cmp.Compare, lt.metas) iter = newLevelIter(IterOptions{}, DefaultComparer.Compare, func(a []byte) int { return len(a) }, lt.newIters, slice.Iter(), - manifest.Level(level), nil) + manifest.Level(level), internalIterOpts{}) // Fake up the range deletion initialization. iter.initRangeDel(new(keyspan.FragmentIterator)) } @@ -536,7 +536,7 @@ func BenchmarkLevelIterSeekGE(b *testing.B) { iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */) return iter, nil, err } - l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil) + l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{}) rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano()))) b.ResetTimer() @@ -578,7 +578,7 @@ func BenchmarkLevelIterSeqSeekGEWithBounds(b *testing.B) { opts.LowerBound, opts.UpperBound) return iter, nil, err } - l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil) + l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{}) // Fake up the range deletion initialization, to resemble the usage // in a mergingIter. l.initRangeDel(new(keyspan.FragmentIterator)) @@ -627,7 +627,7 @@ func BenchmarkLevelIterSeqSeekPrefixGE(b *testing.B) { func(b *testing.B) { l := newLevelIter(IterOptions{}, DefaultComparer.Compare, func(a []byte) int { return len(a) }, newIters, metas.Iter(), - manifest.Level(level), nil) + manifest.Level(level), internalIterOpts{}) // Fake up the range deletion initialization, to resemble the usage // in a mergingIter. l.initRangeDel(new(keyspan.FragmentIterator)) @@ -672,7 +672,7 @@ func BenchmarkLevelIterNext(b *testing.B) { iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */) return iter, nil, err } - l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil) + l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{}) b.ResetTimer() for i := 0; i < b.N; i++ { @@ -706,7 +706,7 @@ func BenchmarkLevelIterPrev(b *testing.B) { iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */) return iter, nil, err } - l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil) + l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), internalIterOpts{}) b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/merging_iter_test.go b/merging_iter_test.go index 8d5c8d6088..24c3b2209d 100644 --- a/merging_iter_test.go +++ b/merging_iter_test.go @@ -629,7 +629,7 @@ func buildMergingIter(readers [][]*sstable.Reader, levelSlices []manifest.LevelS } l := newLevelIter(IterOptions{}, DefaultComparer.Compare, func(a []byte) int { return len(a) }, newIters, levelSlices[i].Iter(), - manifest.Level(level), nil) + manifest.Level(level), internalIterOpts{}) l.initRangeDel(&mils[level].rangeDelIter) l.initBoundaryContext(&mils[level].levelIterBoundaryContext) mils[level].iter = l diff --git a/sstable/block.go b/sstable/block.go index 53fe7991fb..a261551c6f 100644 --- a/sstable/block.go +++ b/sstable/block.go @@ -403,7 +403,7 @@ type blockIter struct { cached []blockEntry cachedBuf []byte cacheHandle cache.Handle - // The first user key in the block. This is used by the caller to set bounds + handle bufferHandle // for block iteration for already loaded blocks. firstUserKey []byte lazyValueHandling struct { @@ -458,10 +458,10 @@ func (i *blockIter) init( // ingested. // - Foreign sstable iteration: globalSeqNum is always set. func (i *blockIter) initHandle( - cmp Compare, block cache.Handle, globalSeqNum uint64, hideObsoletePoints bool, + cmp Compare, block bufferHandle, globalSeqNum uint64, hideObsoletePoints bool, ) error { - i.cacheHandle.Release() - i.cacheHandle = block + i.handle.Release() + i.handle = block return i.init(cmp, block.Get(), globalSeqNum, hideObsoletePoints) } @@ -1515,8 +1515,8 @@ func (i *blockIter) Error() error { // Close implements internalIterator.Close, as documented in the pebble // package. func (i *blockIter) Close() error { - i.cacheHandle.Release() - i.cacheHandle = cache.Handle{} + i.handle.Release() + i.handle = bufferHandle{} i.val = nil i.lazyValue = base.LazyValue{} i.lazyValueHandling.vbr = nil diff --git a/sstable/block_property_test.go b/sstable/block_property_test.go index e7298eccb7..76ad99ea2e 100644 --- a/sstable/block_property_test.go +++ b/sstable/block_property_test.go @@ -1316,7 +1316,7 @@ func runBlockPropsCmd(r *Reader, td *datadriven.TestData) string { // block that bhp points to, along with its block properties. if twoLevelIndex { subiter := &blockIter{} - subIndex, err := r.readBlock(context.Background(), bhp.BlockHandle, nil, nil, nil) + subIndex, err := r.readBlock(context.Background(), bhp.BlockHandle, nil, nil, nil, nil) if err != nil { return err.Error() } diff --git a/sstable/buffer_pool.go b/sstable/buffer_pool.go new file mode 100644 index 0000000000..9c5edd2207 --- /dev/null +++ b/sstable/buffer_pool.go @@ -0,0 +1,140 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package sstable + +import ( + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble/internal/cache" +) + +// A bufferHandle is a handle to manually-managed memory. The handle may point +// to a block in the block cache (h.Get() != nil), or a buffer that exists +// outside the block cache allocated from a BufferPool (b.Valid()). +type bufferHandle struct { + h cache.Handle + b Buf +} + +// Get retrieves the underlying buffer referenced by the handle. +func (bh bufferHandle) Get() []byte { + if v := bh.h.Get(); v != nil { + return v + } else if bh.b.p != nil { + return bh.b.p.pool[bh.b.i].b + } + return nil +} + +// Release releases the buffer, either back to the block cache or BufferPool. +func (bh bufferHandle) Release() { + bh.h.Release() + bh.b.Release() +} + +// A BufferPool holds a pool of buffers for holding sstable blocks. An initial +// size of the pool is provided on Init, but a BufferPool will grow to meet the +// largest working set size. It'll never shrink. When a buffer is released, the +// BufferPool recycles the buffer for future allocations. +// +// A BufferPool should only be used for short-lived allocations with +// well-understood working set sizes to avoid excessive memory consumption. +// +// BufferPool is not thread-safe. +type BufferPool struct { + cache *cache.Cache + // pool contains all the buffers held by the pool, including buffers that + // are in-use. + pool []allocedBuffer +} + +type allocedBuffer struct { + v *cache.Value + // b holds the current byte slice. It's backed by v, but may be a subslice + // of v's memory while the buffer is in-use [ len(b) ≤ len(v.Buf()) ]. + // + // If the buffer is not currently in-use, b is nil. When being recycled, the + // BufferPool.Alloc will reset b to be a subslice of v.Buf(). + b []byte +} + +// Init initializes the pool to use the provided cache for allocations, and +// with an initial working set buffer size of `initialSize`. +func (p *BufferPool) Init(cache *cache.Cache, initialSize int) { + *p = BufferPool{ + cache: cache, + pool: make([]allocedBuffer, 0, initialSize), + } +} + +// Release releases all buffers held by the pool and resets the pool to an +// uninitialized state. +func (p *BufferPool) Release() { + for i := range p.pool { + if p.pool[i].b != nil { + panic(errors.AssertionFailedf("Release called on a BufferPool with in-use buffers")) + } + p.cache.Free(p.pool[i].v) + } + *p = BufferPool{} +} + +// Alloc allocates a new buffer of size n. If the pool already holds a buffer at +// least as large as n, the pooled buffer is used instead. +// +// Alloc is O(MAX(N,M)) where N is the largest number of concurrently in-use +// buffers allocated and M is the initialSize passed to Init. +func (p *BufferPool) Alloc(n int) Buf { + unusableBufferIdx := -1 + for i := 0; i < len(p.pool); i++ { + if p.pool[i].b == nil { + if len(p.pool[i].v.Buf()) >= n { + p.pool[i].b = p.pool[i].v.Buf()[:n] + return Buf{p: p, i: i} + } + unusableBufferIdx = i + } + } + + // If we would need to grow the size of the pool to allocate another buffer, + // but there was a slot available occupied by a buffer that's just too + // small, replace the too-small buffer. + if len(p.pool) == cap(p.pool) && unusableBufferIdx >= 0 { + i := unusableBufferIdx + p.cache.Free(p.pool[i].v) + p.pool[i].v = p.cache.Alloc(n) + p.pool[i].b = p.pool[i].v.Buf() + return Buf{p: p, i: i} + } + + // Allocate a new buffer. + v := p.cache.Alloc(n) + p.pool = append(p.pool, allocedBuffer{v: v, b: v.Buf()[:n]}) + return Buf{p: p, i: len(p.pool) - 1} +} + +// A Buf holds a reference to a manually-managed, pooled byte buffer. +type Buf struct { + p *BufferPool + // i holds the index into p.pool where the buffer may be found. This scheme + // avoids needing to allocate the handle to the buffer on the heap at the + // cost of copying two words instead of one. + i int +} + +// Valid returns true if the buf holds a valid buffer. +func (b Buf) Valid() bool { + return b.p != nil +} + +// Release releases the buffer back to the pool. +func (b Buf) Release() { + if b.p == nil { + return + } + // Clear the allocedBuffer's byte slice. This signals the allocated buffer + // is no longer in use and a future call to BufferPool.Alloc may reuse this + // buffer. + b.p.pool[b.i].b = nil +} diff --git a/sstable/buffer_pool_test.go b/sstable/buffer_pool_test.go new file mode 100644 index 0000000000..8596ba10cf --- /dev/null +++ b/sstable/buffer_pool_test.go @@ -0,0 +1,81 @@ +// Copyright 2023 The LevelDB-Go and Pebble Authors. All rights reserved. Use +// of this source code is governed by a BSD-style license that can be found in +// the LICENSE file. + +package sstable + +import ( + "bytes" + "fmt" + "io" + "testing" + + "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/pebble/internal/cache" +) + +func writeBufferPool(w io.Writer, bp *BufferPool) { + for i := 0; i < cap(bp.pool); i++ { + if i > 0 { + fmt.Fprint(w, " ") + } + if i >= len(bp.pool) { + fmt.Fprint(w, "[ ]") + continue + } + sz := len(bp.pool[i].v.Buf()) + if bp.pool[i].b == nil { + fmt.Fprintf(w, "[%4d]", sz) + } else { + fmt.Fprintf(w, "<%4d>", sz) + } + } +} + +func TestBufferPool(t *testing.T) { + c := cache.New(256 << 10) + defer c.Unref() + + var bp BufferPool + var buf bytes.Buffer + handles := map[string]Buf{} + drainPool := func() { + for h, b := range handles { + b.Release() + delete(handles, h) + } + bp.Release() + } + defer drainPool() + datadriven.RunTest(t, "testdata/buffer_pool", func(t *testing.T, td *datadriven.TestData) string { + buf.Reset() + switch td.Cmd { + case "init": + if cap(bp.pool) > 0 { + drainPool() + } + var initialSize int + td.ScanArgs(t, "size", &initialSize) + bp.Init(c, initialSize) + writeBufferPool(&buf, &bp) + return buf.String() + case "alloc": + var n int + var handle string + td.ScanArgs(t, "n", &n) + td.ScanArgs(t, "handle", &handle) + handles[handle] = bp.Alloc(n) + writeBufferPool(&buf, &bp) + return buf.String() + case "release": + var handle string + td.ScanArgs(t, "handle", &handle) + handles[handle].Release() + delete(handles, handle) + writeBufferPool(&buf, &bp) + return buf.String() + default: + return fmt.Sprintf("unrecognized command %q", td.Cmd) + } + }) +} diff --git a/sstable/reader.go b/sstable/reader.go index 8b8afd1cea..69b2bb3f25 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -224,6 +224,7 @@ type singleLevelIterator struct { err error closeHook func(i Iterator) error stats *base.InternalIteratorStats + bufferPool *BufferPool // boundsCmp and positionedUsingLatestBounds are for optimizing iteration // that uses multiple adjacent bounds. The seek after setting a new bound @@ -362,32 +363,32 @@ var rangeKeyFragmentBlockIterPool = sync.Pool{ func checkSingleLevelIterator(obj interface{}) { i := obj.(*singleLevelIterator) - if p := i.data.cacheHandle.Get(); p != nil { - fmt.Fprintf(os.Stderr, "singleLevelIterator.data.cacheHandle is not nil: %p\n", p) + if p := i.data.handle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.data.handle is not nil: %p\n", p) os.Exit(1) } - if p := i.index.cacheHandle.Get(); p != nil { - fmt.Fprintf(os.Stderr, "singleLevelIterator.index.cacheHandle is not nil: %p\n", p) + if p := i.index.handle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.index.handle is not nil: %p\n", p) os.Exit(1) } } func checkTwoLevelIterator(obj interface{}) { i := obj.(*twoLevelIterator) - if p := i.data.cacheHandle.Get(); p != nil { - fmt.Fprintf(os.Stderr, "singleLevelIterator.data.cacheHandle is not nil: %p\n", p) + if p := i.data.handle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.data.handle is not nil: %p\n", p) os.Exit(1) } - if p := i.index.cacheHandle.Get(); p != nil { - fmt.Fprintf(os.Stderr, "singleLevelIterator.index.cacheHandle is not nil: %p\n", p) + if p := i.index.handle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "singleLevelIterator.index.handle is not nil: %p\n", p) os.Exit(1) } } func checkRangeKeyFragmentBlockIterator(obj interface{}) { i := obj.(*rangeKeyFragmentBlockIter) - if p := i.blockIter.cacheHandle.Get(); p != nil { - fmt.Fprintf(os.Stderr, "fragmentBlockIter.blockIter.cacheHandle is not nil: %p\n", p) + if p := i.blockIter.handle.Get(); p != nil { + fmt.Fprintf(os.Stderr, "fragmentBlockIter.blockIter.handle is not nil: %p\n", p) os.Exit(1) } } @@ -408,6 +409,7 @@ func (i *singleLevelIterator) init( useFilter, hideObsoletePoints bool, stats *base.InternalIteratorStats, rp ReaderProvider, + bufferPool *BufferPool, ) error { if r.err != nil { return r.err @@ -430,6 +432,7 @@ func (i *singleLevelIterator) init( i.cmp = r.Compare i.stats = stats i.hideObsoletePoints = hideObsoletePoints + i.bufferPool = bufferPool err = i.index.initHandle(i.cmp, indexH, r.Properties.GlobalSeqNum, false) if err != nil { // blockIter.Close releases indexH and always returns a nil error @@ -581,7 +584,7 @@ func (i *singleLevelIterator) loadBlock(dir int8) loadBlockResult { // blockIntersects } ctx := objiotracing.WithBlockType(i.ctx, objiotracing.DataBlock) - block, err := i.reader.readBlock(ctx, i.dataBH, nil /* transform */, i.dataRH, i.stats) + block, err := i.reader.readBlock(ctx, i.dataBH, nil /* transform */, i.dataRH, i.stats, i.bufferPool) if err != nil { i.err = err return loadBlockFailed @@ -600,9 +603,9 @@ func (i *singleLevelIterator) loadBlock(dir int8) loadBlockResult { // the valueBlockReader. func (i *singleLevelIterator) readBlockForVBR( ctx context.Context, h BlockHandle, stats *base.InternalIteratorStats, -) (cache.Handle, error) { +) (bufferHandle, error) { ctx = objiotracing.WithBlockType(ctx, objiotracing.ValueBlock) - return i.reader.readBlock(ctx, h, nil, i.vbRH, stats) + return i.reader.readBlock(ctx, h, nil, i.vbRH, stats, i.bufferPool) } // resolveMaybeExcluded is invoked when the block-property filterer has found @@ -967,7 +970,7 @@ func (i *singleLevelIterator) seekPrefixGE( } i.lastBloomFilterMatched = false // Check prefix bloom filter. - var dataH cache.Handle + var dataH bufferHandle dataH, i.err = i.reader.readFilter(i.ctx, i.stats) if i.err != nil { i.data.invalidate() @@ -1780,7 +1783,7 @@ func (i *twoLevelIterator) loadIndex(dir int8) loadBlockResult { // blockIntersects } ctx := objiotracing.WithBlockType(i.ctx, objiotracing.MetadataBlock) - indexBlock, err := i.reader.readBlock(ctx, bhp.BlockHandle, nil /* transform */, nil /* readHandle */, i.stats) + indexBlock, err := i.reader.readBlock(ctx, bhp.BlockHandle, nil /* transform */, nil /* readHandle */, i.stats, i.bufferPool) if err != nil { i.err = err return loadBlockFailed @@ -1866,6 +1869,7 @@ func (i *twoLevelIterator) init( useFilter, hideObsoletePoints bool, stats *base.InternalIteratorStats, rp ReaderProvider, + bufferPool *BufferPool, ) error { if r.err != nil { return r.err @@ -1889,6 +1893,7 @@ func (i *twoLevelIterator) init( i.cmp = r.Compare i.stats = stats i.hideObsoletePoints = hideObsoletePoints + i.bufferPool = bufferPool err = i.topLevelIndex.initHandle(i.cmp, topLevelIndexH, r.Properties.GlobalSeqNum, false) if err != nil { // blockIter.Close releases topLevelIndexH and always returns a nil error @@ -2130,7 +2135,7 @@ func (i *twoLevelIterator) SeekPrefixGE( flags = flags.DisableTrySeekUsingNext() } i.lastBloomFilterMatched = false - var dataH cache.Handle + var dataH bufferHandle dataH, i.err = i.reader.readFilter(i.ctx, i.stats) if i.err != nil { i.data.invalidate() @@ -2924,9 +2929,9 @@ func MakeVirtualReader(reader *Reader, meta manifest.VirtualFileMeta) VirtualRea // NewCompactionIter is the compaction iterator function for virtual readers. func (v *VirtualReader) NewCompactionIter( - bytesIterated *uint64, rp ReaderProvider, + bytesIterated *uint64, rp ReaderProvider, bufferPool *BufferPool, ) (Iterator, error) { - return v.reader.newCompactionIter(bytesIterated, rp, &v.vState) + return v.reader.newCompactionIter(bytesIterated, rp, &v.vState, bufferPool) } // NewIterWithBlockPropertyFiltersAndContextEtc wraps @@ -3136,7 +3141,7 @@ func (r *Reader) newIterWithBlockPropertyFiltersAndContext( // until the final iterator closes. if r.Properties.IndexType == twoLevelIndex { i := twoLevelIterPool.Get().(*twoLevelIterator) - err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats, rp) + err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats, rp, nil /* bufferPool */) if err != nil { return nil, err } @@ -3144,7 +3149,7 @@ func (r *Reader) newIterWithBlockPropertyFiltersAndContext( } i := singleLevelIterPool.Get().(*singleLevelIterator) - err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats, rp) + err := i.init(ctx, r, v, lower, upper, filterer, useFilterBlock, hideObsoletePoints, stats, rp, nil /* bufferPool */) if err != nil { return nil, err } @@ -3164,12 +3169,14 @@ func (r *Reader) NewIter(lower, upper []byte) (Iterator, error) { // NewCompactionIter returns an iterator similar to NewIter but it also increments // the number of bytes iterated. If an error occurs, NewCompactionIter cleans up // after itself and returns a nil iterator. -func (r *Reader) NewCompactionIter(bytesIterated *uint64, rp ReaderProvider) (Iterator, error) { - return r.newCompactionIter(bytesIterated, rp, nil) +func (r *Reader) NewCompactionIter( + bytesIterated *uint64, rp ReaderProvider, bufferPool *BufferPool, +) (Iterator, error) { + return r.newCompactionIter(bytesIterated, rp, nil, bufferPool) } func (r *Reader) newCompactionIter( - bytesIterated *uint64, rp ReaderProvider, v *virtualState, + bytesIterated *uint64, rp ReaderProvider, v *virtualState, bufferPool *BufferPool, ) (Iterator, error) { if r.Properties.IndexType == twoLevelIndex { i := twoLevelIterPool.Get().(*twoLevelIterator) @@ -3177,7 +3184,7 @@ func (r *Reader) newCompactionIter( context.Background(), r, v, nil /* lower */, nil /* upper */, nil, false /* useFilter */, false, /* hideObsoletePoints */ - nil /* stats */, rp, + nil /* stats */, rp, bufferPool, ) if err != nil { return nil, err @@ -3192,7 +3199,7 @@ func (r *Reader) newCompactionIter( err := i.init( context.Background(), r, v, nil /* lower */, nil, /* upper */ nil, false /* useFilter */, false, /* hideObsoletePoints */ - nil /* stats */, rp, + nil /* stats */, rp, bufferPool, ) if err != nil { return nil, err @@ -3259,26 +3266,26 @@ func (i *rangeKeyFragmentBlockIter) Close() error { func (r *Reader) readIndex( ctx context.Context, stats *base.InternalIteratorStats, -) (cache.Handle, error) { +) (bufferHandle, error) { ctx = objiotracing.WithBlockType(ctx, objiotracing.MetadataBlock) - return r.readBlock(ctx, r.indexBH, nil, nil, stats) + return r.readBlock(ctx, r.indexBH, nil, nil, stats, nil /* buffer pool */) } func (r *Reader) readFilter( ctx context.Context, stats *base.InternalIteratorStats, -) (cache.Handle, error) { +) (bufferHandle, error) { ctx = objiotracing.WithBlockType(ctx, objiotracing.FilterBlock) - return r.readBlock(ctx, r.filterBH, nil /* transform */, nil /* readHandle */, stats) + return r.readBlock(ctx, r.filterBH, nil /* transform */, nil /* readHandle */, stats, nil /* buffer pool */) } -func (r *Reader) readRangeDel(stats *base.InternalIteratorStats) (cache.Handle, error) { +func (r *Reader) readRangeDel(stats *base.InternalIteratorStats) (bufferHandle, error) { ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock) - return r.readBlock(ctx, r.rangeDelBH, r.rangeDelTransform, nil /* readHandle */, stats) + return r.readBlock(ctx, r.rangeDelBH, r.rangeDelTransform, nil /* readHandle */, stats, nil /* buffer pool */) } -func (r *Reader) readRangeKey(stats *base.InternalIteratorStats) (cache.Handle, error) { +func (r *Reader) readRangeKey(stats *base.InternalIteratorStats) (bufferHandle, error) { ctx := objiotracing.WithBlockType(context.Background(), objiotracing.MetadataBlock) - return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats) + return r.readBlock(ctx, r.rangeKeyBH, nil /* transform */, nil /* readHandle */, stats, nil /* buffer pool */) } func checkChecksum( @@ -3303,15 +3310,48 @@ func checkChecksum( return nil } -// readBlock reads and decompresses a block from disk into memory. +type cacheValueOrBuf struct { + // buf.Valid() returns true if backed by a BufferPool. + buf Buf + + // cache and v are non-nil if backed by the block cache. + cache *cache.Cache + v *cache.Value +} + +func (b cacheValueOrBuf) get() []byte { + if b.buf.Valid() { + return b.buf.p.pool[b.buf.i].b + } + return b.v.Buf() +} + +func (b cacheValueOrBuf) release() { + if b.buf.Valid() { + b.buf.Release() + } else { + b.cache.Free(b.v) + } +} + +func (b cacheValueOrBuf) truncate(n int) { + if b.buf.Valid() { + b.buf.p.pool[b.buf.i].b = b.buf.p.pool[b.buf.i].b[:n] + } else { + b.v.Truncate(n) + } +} + func (r *Reader) readBlock( ctx context.Context, bh BlockHandle, transform blockTransform, readHandle objstorage.ReadHandle, stats *base.InternalIteratorStats, -) (handle cache.Handle, _ error) { + bufferPool *BufferPool, +) (handle bufferHandle, _ error) { if h := r.opts.Cache.Get(r.cacheID, r.fileNum, bh.Offset); h.Get() != nil { + // Cache hit. if readHandle != nil { readHandle.RecordCacheHit(ctx, int64(bh.Offset), int64(bh.Length+blockTrailerLen)) } @@ -3319,17 +3359,30 @@ func (r *Reader) readBlock( stats.BlockBytes += bh.Length stats.BlockBytesInCache += bh.Length } - return h, nil + // This block is already in the cache; return a handle to existing vlaue + // in the cache. + return bufferHandle{h: h}, nil + } + + // Cache miss. + var compressed cacheValueOrBuf + if bufferPool != nil { + compressed = cacheValueOrBuf{ + buf: bufferPool.Alloc(int(bh.Length + blockTrailerLen)), + } + } else { + compressed = cacheValueOrBuf{ + cache: r.opts.Cache, + v: r.opts.Cache.Alloc(int(bh.Length + blockTrailerLen)), + } } - v := r.opts.Cache.Alloc(int(bh.Length + blockTrailerLen)) - b := v.Buf() readStartTime := time.Now() var err error if readHandle != nil { - err = readHandle.ReadAt(ctx, b, int64(bh.Offset)) + err = readHandle.ReadAt(ctx, compressed.get(), int64(bh.Offset)) } else { - err = r.readable.ReadAt(ctx, b, int64(bh.Offset)) + err = r.readable.ReadAt(ctx, compressed.get(), int64(bh.Offset)) } readDuration := time.Since(readStartTime) // TODO(sumeer): should the threshold be configurable. @@ -3342,56 +3395,80 @@ func (r *Reader) readBlock( // interface{}, unless necessary. if readDuration >= slowReadTracingThreshold && r.opts.LoggerAndTracer.IsTracingEnabled(ctx) { r.opts.LoggerAndTracer.Eventf(ctx, "reading %d bytes took %s", - bh.Length+blockTrailerLen, readDuration.String()) + int(bh.Length+blockTrailerLen), readDuration.String()) } if stats != nil { stats.BlockReadDuration += readDuration } if err != nil { - r.opts.Cache.Free(v) - return cache.Handle{}, err + compressed.release() + return bufferHandle{}, err } - - if err := checkChecksum(r.checksumType, b, bh, r.fileNum.FileNum()); err != nil { - r.opts.Cache.Free(v) - return cache.Handle{}, err + if err := checkChecksum(r.checksumType, compressed.get(), bh, r.fileNum.FileNum()); err != nil { + compressed.release() + return bufferHandle{}, err } - typ := blockType(b[bh.Length]) - b = b[:bh.Length] - v.Truncate(len(b)) + typ := blockType(compressed.get()[bh.Length]) + compressed.truncate(int(bh.Length)) - decoded, err := decompressBlock(r.opts.Cache, typ, b) - if decoded != nil { - r.opts.Cache.Free(v) - v = decoded - b = v.Buf() - } else if err != nil { - r.opts.Cache.Free(v) - return cache.Handle{}, err + var decompressed cacheValueOrBuf + if typ == noCompressionBlockType { + decompressed = compressed + } else { + // Decode the length of the decompressed value. + decodedLen, prefixLen, err := decompressedLen(typ, compressed.get()) + if err != nil { + compressed.release() + return bufferHandle{}, err + } + + if bufferPool != nil { + decompressed = cacheValueOrBuf{buf: bufferPool.Alloc(decodedLen)} + } else { + decompressed = cacheValueOrBuf{ + cache: r.opts.Cache, + v: r.opts.Cache.Alloc(decodedLen), + } + } + if _, err := decompressInto(typ, compressed.get()[prefixLen:], decompressed.get()); err != nil { + compressed.release() + return bufferHandle{}, err + } + compressed.release() } if transform != nil { - // Transforming blocks is rare, so the extra copy of the transformed data - // is not problematic. - var err error - b, err = transform(b) + // Transforming blocks is very rare, so the extra copy of the + // transformed data is not problematic. + tmpTransformed, err := transform(decompressed.get()) if err != nil { - r.opts.Cache.Free(v) - return cache.Handle{}, err + decompressed.release() + return bufferHandle{}, err + } + + var transformed cacheValueOrBuf + if bufferPool != nil { + transformed = cacheValueOrBuf{buf: bufferPool.Alloc(len(tmpTransformed))} + } else { + transformed = cacheValueOrBuf{ + cache: r.opts.Cache, + v: r.opts.Cache.Alloc(len(tmpTransformed)), + } } - newV := r.opts.Cache.Alloc(len(b)) - copy(newV.Buf(), b) - r.opts.Cache.Free(v) - v = newV + copy(transformed.get(), tmpTransformed) + decompressed.release() + decompressed = transformed } if stats != nil { stats.BlockBytes += bh.Length } - - h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, v) - return h, nil + if decompressed.buf.Valid() { + return bufferHandle{b: decompressed.buf}, nil + } + h := r.opts.Cache.Set(r.cacheID, r.fileNum, bh.Offset, decompressed.v) + return bufferHandle{h: h}, nil } func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { @@ -3439,7 +3516,7 @@ func (r *Reader) transformRangeDelV1(b []byte) ([]byte, error) { func (r *Reader) readMetaindex(metaindexBH BlockHandle) error { b, err := r.readBlock( - context.Background(), metaindexBH, nil /* transform */, nil /* readHandle */, nil /* stats */) + context.Background(), metaindexBH, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */) if err != nil { return err } @@ -3482,7 +3559,7 @@ func (r *Reader) readMetaindex(metaindexBH BlockHandle) error { if bh, ok := meta[metaPropertiesName]; ok { b, err = r.readBlock( - context.Background(), bh, nil /* transform */, nil /* readHandle */, nil /* stats */) + context.Background(), bh, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */) if err != nil { return err } @@ -3587,8 +3664,8 @@ func (r *Reader) Layout() (*Layout, error) { } l.Index = append(l.Index, indexBH.BlockHandle) - subIndex, err := r.readBlock(context.Background(), - indexBH.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */) + subIndex, err := r.readBlock(context.Background(), indexBH.BlockHandle, + nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */) if err != nil { return nil, err } @@ -3611,7 +3688,7 @@ func (r *Reader) Layout() (*Layout, error) { } } if r.valueBIH.h.Length != 0 { - vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil) + vbiH, err := r.readBlock(context.Background(), r.valueBIH.h, nil, nil, nil, nil /* buffer pool */) if err != nil { return nil, err } @@ -3682,7 +3759,7 @@ func (r *Reader) ValidateBlockChecksums() error { } // Read the block, which validates the checksum. - h, err := r.readBlock(context.Background(), bh, nil, rh, nil) + h, err := r.readBlock(context.Background(), bh, nil, rh, nil, nil /* buffer pool */) if err != nil { return err } @@ -3744,8 +3821,8 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { if err != nil { return 0, errCorruptIndexEntry } - startIdxBlock, err := r.readBlock(context.Background(), - startIdxBH.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */) + startIdxBlock, err := r.readBlock(context.Background(), startIdxBH.BlockHandle, + nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */) if err != nil { return 0, err } @@ -3766,7 +3843,7 @@ func (r *Reader) EstimateDiskUsage(start, end []byte) (uint64, error) { return 0, errCorruptIndexEntry } endIdxBlock, err := r.readBlock(context.Background(), - endIdxBH.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */) + endIdxBH.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */) if err != nil { return 0, err } @@ -4043,7 +4120,7 @@ func (l *Layout) Describe( } h, err := r.readBlock( - context.Background(), b.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */) + context.Background(), b.BlockHandle, nil /* transform */, nil /* readHandle */, nil /* stats */, nil /* buffer pool */) if err != nil { fmt.Fprintf(w, " [err: %s]\n", err) continue diff --git a/sstable/reader_test.go b/sstable/reader_test.go index 9509ef42ec..b432b72949 100644 --- a/sstable/reader_test.go +++ b/sstable/reader_test.go @@ -183,6 +183,7 @@ func TestVirtualReader(t *testing.T) { // Set during the latest build command. var r *Reader var meta manifest.PhysicalFileMeta + var bp BufferPool // Set during the latest virtualize command. var vMeta1 manifest.VirtualFileMeta @@ -191,6 +192,7 @@ func TestVirtualReader(t *testing.T) { defer func() { if r != nil { require.NoError(t, r.Close()) + bp.Release() } }() @@ -251,6 +253,7 @@ func TestVirtualReader(t *testing.T) { switch td.Cmd { case "build": if r != nil { + bp.Release() _ = r.Close() r = nil meta.FileMetadata = nil @@ -275,6 +278,7 @@ func TestVirtualReader(t *testing.T) { if err != nil { return err.Error() } + bp.Init(r.opts.Cache, 5) // Create a fake filemetada using the writer meta. meta, err = createPhysicalMeta(wMeta, r) @@ -330,7 +334,7 @@ func TestVirtualReader(t *testing.T) { var rp ReaderProvider var bytesIterated uint64 - iter, err := v.NewCompactionIter(&bytesIterated, rp) + iter, err := v.NewCompactionIter(&bytesIterated, rp, &bp) if err != nil { return err.Error() } @@ -680,7 +684,7 @@ func indexLayoutString(t *testing.T, r *Reader) string { fmt.Fprintf(&buf, " %s: size %d\n", string(key.UserKey), bh.Length) if twoLevelIndex { b, err := r.readBlock( - context.Background(), bh.BlockHandle, nil, nil, nil) + context.Background(), bh.BlockHandle, nil, nil, nil, nil) require.NoError(t, err) defer b.Release() iter2, err := newBlockIter(r.Compare, b.Get()) @@ -911,7 +915,9 @@ func testBytesIteratedWithCompression( for _, numEntries := range []uint64{0, 1, maxNumEntries[i]} { r := buildTestTable(t, numEntries, blockSize, indexBlockSize, compression) var bytesIterated, prevIterated uint64 - citer, err := r.NewCompactionIter(&bytesIterated, TrivialReaderProvider{Reader: r}) + var pool BufferPool + pool.Init(r.opts.Cache, 5) + citer, err := r.NewCompactionIter(&bytesIterated, TrivialReaderProvider{Reader: r}, &pool) require.NoError(t, err) for key, _ := citer.First(); key != nil; key, _ = citer.Next() { @@ -930,6 +936,7 @@ func testBytesIteratedWithCompression( require.NoError(t, citer.Close()) require.NoError(t, r.Close()) + pool.Release() } } } @@ -965,7 +972,9 @@ func TestCompactionIteratorSetupForCompaction(t *testing.T) { for _, numEntries := range []uint64{0, 1, 1e5} { r := buildTestTableWithProvider(t, provider, numEntries, blockSize, indexBlockSize, DefaultCompression) var bytesIterated uint64 - citer, err := r.NewCompactionIter(&bytesIterated, TrivialReaderProvider{Reader: r}) + var pool BufferPool + pool.Init(r.opts.Cache, 5) + citer, err := r.NewCompactionIter(&bytesIterated, TrivialReaderProvider{Reader: r}, &pool) require.NoError(t, err) switch i := citer.(type) { case *compactionIterator: @@ -983,6 +992,7 @@ func TestCompactionIteratorSetupForCompaction(t *testing.T) { } require.NoError(t, citer.Close()) require.NoError(t, r.Close()) + pool.Release() } } } @@ -1017,7 +1027,9 @@ func TestReadaheadSetupForV3TablesWithMultipleVersions(t *testing.T) { require.NoError(t, err) defer r.Close() { - citer, err := r.NewCompactionIter(nil, TrivialReaderProvider{Reader: r}) + var pool BufferPool + pool.Init(r.opts.Cache, 5) + citer, err := r.NewCompactionIter(nil, TrivialReaderProvider{Reader: r}, &pool) require.NoError(t, err) defer citer.Close() i := citer.(*compactionIterator) diff --git a/sstable/table_test.go b/sstable/table_test.go index b3c5341b6a..2bf6b3cd2e 100644 --- a/sstable/table_test.go +++ b/sstable/table_test.go @@ -694,7 +694,7 @@ func TestMetaIndexEntriesSorted(t *testing.T) { r, err := newReader(f, ReaderOptions{}) require.NoError(t, err) - b, err := r.readBlock(context.Background(), r.metaIndexBH, nil, nil, nil) + b, err := r.readBlock(context.Background(), r.metaIndexBH, nil, nil, nil, nil) require.NoError(t, err) defer b.Release() diff --git a/sstable/testdata/buffer_pool b/sstable/testdata/buffer_pool new file mode 100644 index 0000000000..70161624d4 --- /dev/null +++ b/sstable/testdata/buffer_pool @@ -0,0 +1,84 @@ +# Each command prints the current state of the buffer pool. +# +# [ ] - Indicates a cell within BufferPool.pool's underlying array that's +# unused and does not hold a buffer. +# [ n] - Indicates a cell within BufferPool.pool that is not currently in use, +# but does hold a buffer of size n. +# < n> - Indicates a cell within BufferPool.pool that holds a buffer of size +# n, and that buffer is presently in-use and ineligible for reuse. + +init size=5 +---- +[ ] [ ] [ ] [ ] [ ] + +alloc n=512 handle=foo +---- +< 512> [ ] [ ] [ ] [ ] + +release handle=foo +---- +[ 512] [ ] [ ] [ ] [ ] + +# Allocating again should use the existing buffer. + +alloc n=512 handle=bar +---- +< 512> [ ] [ ] [ ] [ ] + +# Allocating again should allocate a new buffer for the next slot. + +alloc n=512 handle=bax +---- +< 512> < 512> [ ] [ ] [ ] + +release handle=bar +---- +[ 512] < 512> [ ] [ ] [ ] + +release handle=bax +---- +[ 512] [ 512] [ ] [ ] [ ] + +# Fill up the entire preallocated pool slice. + +alloc n=128 handle=bar +---- +< 512> [ 512] [ ] [ ] [ ] + +alloc n=1 handle=bax +---- +< 512> < 512> [ ] [ ] [ ] + +alloc n=1 handle=bux +---- +< 512> < 512> < 1> [ ] [ ] + +alloc n=1024 handle=foo +---- +< 512> < 512> < 1> <1024> [ ] + +alloc n=1024 handle=fax +---- +< 512> < 512> < 1> <1024> <1024> + +# Allocating one more should grow the underlying slice, and allocate a +# new appropriately sized buffer. + +alloc n=2048 handle=zed +---- +< 512> < 512> < 1> <1024> <1024> <2048> [ ] [ ] [ ] [ ] + +release handle=bux +---- +< 512> < 512> [ 1] <1024> <1024> <2048> [ ] [ ] [ ] [ ] + +alloc n=2 handle=bux +---- +< 512> < 512> [ 1] <1024> <1024> <2048> < 2> [ ] [ ] [ ] + +init size=0 +---- + +alloc n=1 handle=foo +---- +< 1> diff --git a/sstable/value_block.go b/sstable/value_block.go index fc0e7004fc..7139364eaf 100644 --- a/sstable/value_block.go +++ b/sstable/value_block.go @@ -13,7 +13,6 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" - "github.com/cockroachdb/pebble/internal/cache" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/objstorage/objstorageprovider/objiotracing" "golang.org/x/exp/rand" @@ -728,7 +727,7 @@ func (ukb *UserKeyPrefixBound) IsEmpty() bool { type blockProviderWhenOpen interface { readBlockForVBR( ctx context.Context, h BlockHandle, stats *base.InternalIteratorStats, - ) (cache.Handle, error) + ) (bufferHandle, error) } type blockProviderWhenClosed struct { @@ -749,9 +748,9 @@ func (bpwc *blockProviderWhenClosed) close() { func (bpwc blockProviderWhenClosed) readBlockForVBR( ctx context.Context, h BlockHandle, stats *base.InternalIteratorStats, -) (cache.Handle, error) { +) (bufferHandle, error) { ctx = objiotracing.WithBlockType(ctx, objiotracing.ValueBlock) - return bpwc.r.readBlock(ctx, h, nil, nil, stats) + return bpwc.r.readBlock(ctx, h, nil, nil, stats, nil /* buffer pool */) } // ReaderProvider supports the implementation of blockProviderWhenClosed. @@ -790,16 +789,16 @@ type valueBlockReader struct { // The value blocks index is lazily retrieved the first time the reader // needs to read a value that resides in a value block. vbiBlock []byte - vbiCache cache.Handle + vbiCache bufferHandle // When sequentially iterating through all key-value pairs, the cost of // repeatedly getting a block that is already in the cache and releasing the - // cache.Handle can be ~40% of the cpu overhead. So the reader remembers the + // bufferHandle can be ~40% of the cpu overhead. So the reader remembers the // last value block it retrieved, in case there is locality of access, and // this value block can be used for the next value retrieval. valueBlockNum uint32 valueBlock []byte valueBlockPtr unsafe.Pointer - valueCache cache.Handle + valueCache bufferHandle lazyFetcher base.LazyFetcher closed bool bufToMangle []byte @@ -833,12 +832,12 @@ func (r *valueBlockReader) close() { // we were to reopen this valueBlockReader and retrieve the same // Handle.value from the cache, we don't want to accidentally unref it when // attempting to unref the old handle. - r.vbiCache = cache.Handle{} + r.vbiCache = bufferHandle{} r.valueBlock = nil r.valueBlockPtr = nil r.valueCache.Release() // See comment above. - r.valueCache = cache.Handle{} + r.valueCache = bufferHandle{} r.closed = true // rp, vbih, stats remain valid, so that LazyFetcher.ValueFetcher can be // implemented. diff --git a/table_cache.go b/table_cache.go index 90acb40621..fb3dc715ce 100644 --- a/table_cache.go +++ b/table_cache.go @@ -439,6 +439,7 @@ func (c *tableCacheShard) newIters( NewCompactionIter( bytesIterated *uint64, rp sstable.ReaderProvider, + bufferPool *sstable.BufferPool, ) (sstable.Iterator, error) } @@ -520,7 +521,7 @@ func (c *tableCacheShard) newIters( } if internalOpts.bytesIterated != nil { - iter, err = ic.NewCompactionIter(internalOpts.bytesIterated, rp) + iter, err = ic.NewCompactionIter(internalOpts.bytesIterated, rp, internalOpts.bufferPool) } else { iter, err = ic.NewIterWithBlockPropertyFiltersAndContextEtc( ctx, opts.GetLowerBound(), opts.GetUpperBound(), filterer, hideObsoletePoints, useFilter, diff --git a/testdata/metrics b/testdata/metrics index 4a362f27d3..a48140a1eb 100644 --- a/testdata/metrics +++ b/testdata/metrics @@ -82,7 +82,7 @@ compact 1 0 B 0 B 0 (size == esti memtbl 1 256 K zmemtbl 2 512 K ztbl 2 1.2 K - bcache 8 1.1 K 42.9% (score == hit-rate) + bcache 7 1.1 K 42.9% (score == hit-rate) tcache 2 1.3 K 66.7% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 2 @@ -116,7 +116,7 @@ compact 1 0 B 0 B 0 (size == esti memtbl 1 256 K zmemtbl 1 256 K ztbl 2 1.2 K - bcache 8 1.1 K 42.9% (score == hit-rate) + bcache 7 1.1 K 42.9% (score == hit-rate) tcache 2 1.3 K 66.7% (score == hit-rate) snaps 0 - 0 (score == earliest seq num) titers 2