Skip to content

Commit

Permalink
feat(blooms): Separate page buffer pools for series pages and bloom p…
Browse files Browse the repository at this point in the history
…ages (#12992)

Series pages are much smaller than bloom pages and therefore can make use of a separate buffer pool with different buckets.

The second commit fixes a possible panic.


Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
  • Loading branch information
chaudum authored May 20, 2024
1 parent 94d610e commit 75ccf21
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 13 deletions.
14 changes: 7 additions & 7 deletions pkg/storage/bloom/v1/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Bloom struct {

func (b *Bloom) Encode(enc *encoding.Encbuf) error {
// divide by 8 b/c bloom capacity is measured in bits, but we want bytes
buf := bytes.NewBuffer(BlockPool.Get(int(b.Capacity() / 8)))
buf := bytes.NewBuffer(BloomPagePool.Get(int(b.Capacity() / 8)))

// TODO(owen-d): have encoder implement writer directly so we don't need
// to indirect via a buffer
Expand All @@ -36,7 +36,7 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error {
data := buf.Bytes()
enc.PutUvarint(len(data)) // length of bloom filter
enc.PutBytes(data)
BlockPool.Put(data[:0]) // release to pool
BloomPagePool.Put(data[:0]) // release to pool
return nil
}

Expand Down Expand Up @@ -65,8 +65,8 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error {
}

func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) {
data := BlockPool.Get(page.Len)[:page.Len]
defer BlockPool.Put(data)
data := BloomPagePool.Get(page.Len)[:page.Len]
defer BloomPagePool.Put(data)

_, err := io.ReadFull(r, data)
if err != nil {
Expand All @@ -84,7 +84,7 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe
}
defer pool.PutReader(decompressor)

b := BlockPool.Get(page.DecompressedLen)[:page.DecompressedLen]
b := BloomPagePool.Get(page.DecompressedLen)[:page.DecompressedLen]

if _, err = io.ReadFull(decompressor, b); err != nil {
return nil, errors.Wrap(err, "decompressing bloom page")
Expand All @@ -101,7 +101,7 @@ func LazyDecodeBloomPageNoCompression(r io.Reader, page BloomPageHeader) (*Bloom
if page.Len != page.DecompressedLen+4 {
return nil, errors.New("the Len and DecompressedLen of the page do not match")
}
data := BlockPool.Get(page.Len)[:page.Len]
data := BloomPagePool.Get(page.Len)[:page.Len]

_, err := io.ReadFull(r, data)
if err != nil {
Expand Down Expand Up @@ -163,7 +163,7 @@ func (d *BloomPageDecoder) Relinquish() {
d.data = nil

if cap(data) > 0 {
BlockPool.Put(data)
BloomPagePool.Put(data)
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/storage/bloom/v1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead
defer func() {
if err != nil {
metrics.pagesSkipped.WithLabelValues(pageTypeSeries, skipReasonErr).Inc()
metrics.bytesSkipped.WithLabelValues(pageTypeSeries).Add(float64(header.DecompressedLen))
metrics.bytesSkipped.WithLabelValues(pageTypeSeries, skipReasonErr).Add(float64(header.DecompressedLen))
} else {
metrics.pagesRead.WithLabelValues(pageTypeSeries).Inc()
metrics.bytesRead.WithLabelValues(pageTypeSeries).Add(float64(header.DecompressedLen))
Expand All @@ -166,8 +166,8 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead
return nil, errors.Wrap(err, "seeking to series page")
}

data := BlockPool.Get(header.Len)[:header.Len]
defer BlockPool.Put(data)
data := SeriesPagePool.Get(header.Len)[:header.Len]
defer SeriesPagePool.Put(data)
_, err = io.ReadFull(r, data)
if err != nil {
return nil, errors.Wrap(err, "reading series page")
Expand Down
17 changes: 14 additions & 3 deletions pkg/storage/bloom/v1/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,21 @@ var (
},
}

// 4KB -> 128MB
BlockPool = BytePool{
// buffer pool for series pages
// 1KB 2KB 4KB 8KB 16KB 32KB 64KB 128KB
SeriesPagePool = BytePool{
pool: pool.New(
4<<10, 128<<20, 2,
1<<10, 128<<10, 2,
func(size int) interface{} {
return make([]byte, size)
}),
}

// buffer pool for bloom pages
// 128KB 256KB 512KB 1MB 2MB 4MB 8MB 16MB 32MB 64MB 128MB
BloomPagePool = BytePool{
pool: pool.New(
128<<10, 128<<20, 2,
func(size int) interface{} {
return make([]byte, size)
}),
Expand Down

0 comments on commit 75ccf21

Please sign in to comment.