diff --git a/pkg/bloombuild/builder/spec_test.go b/pkg/bloombuild/builder/spec_test.go index 40225dc45865b..3397a2f60bfe4 100644 --- a/pkg/bloombuild/builder/spec_test.go +++ b/pkg/bloombuild/builder/spec_test.go @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser for i, b := range blocks { bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{ BlockRef: refs[i], - BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize), + BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize), }) } @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) { expectedRefs := v1.PointerSlice(data) outputRefs := make([]*v1.SeriesWithBloom, 0, len(data)) for _, block := range outputBlocks { - bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize) + bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize) for bq.Next() { outputRefs = append(outputRefs, bq.At()) } diff --git a/pkg/bloomcompactor/spec_test.go b/pkg/bloomcompactor/spec_test.go index 7e39b8dec57f0..373b99de68ad3 100644 --- a/pkg/bloomcompactor/spec_test.go +++ b/pkg/bloomcompactor/spec_test.go @@ -74,7 +74,7 @@ func dummyBloomGen(t *testing.T, opts v1.BlockOptions, store v1.Iterator[*v1.Ser for i, b := range blocks { bqs = append(bqs, &bloomshipper.CloseableBlockQuerier{ BlockRef: refs[i], - BlockQuerier: v1.NewBlockQuerier(b, false, v1.DefaultMaxPageSize), + BlockQuerier: v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize), }) } @@ -152,7 +152,7 @@ func TestSimpleBloomGenerator(t *testing.T) { expectedRefs := v1.PointerSlice(data) outputRefs := make([]*v1.SeriesWithBloom, 0, len(data)) for _, block := range outputBlocks { - bq := v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize) + bq := v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize) for bq.Next() { outputRefs = append(outputRefs, bq.At()) } diff --git a/pkg/bloomgateway/bloomgateway_test.go b/pkg/bloomgateway/bloomgateway_test.go index 15c9ca2be2d85..8663bcf079590 100644 --- a/pkg/bloomgateway/bloomgateway_test.go +++ b/pkg/bloomgateway/bloomgateway_test.go @@ -215,50 +215,6 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) { } }) - t.Run("request cancellation does not result in channel locking", func(t *testing.T) { - now := mktime("2024-01-25 10:00") - - // replace store implementation and re-initialize workers and sub-services - refs, metas, queriers, data := createBlocks(t, tenantID, 10, now.Add(-1*time.Hour), now, 0x0000, 0x0fff) - mockStore := newMockBloomStore(queriers, metas) - mockStore.delay = 2000 * time.Millisecond - - reg := prometheus.NewRegistry() - gw, err := New(cfg, mockStore, logger, reg) - require.NoError(t, err) - - err = services.StartAndAwaitRunning(context.Background(), gw) - require.NoError(t, err) - t.Cleanup(func() { - err = services.StopAndAwaitTerminated(context.Background(), gw) - require.NoError(t, err) - }) - - chunkRefs := createQueryInputFromBlockData(t, tenantID, data, 100) - - // saturate workers - // then send additional request - for i := 0; i < gw.cfg.WorkerConcurrency+1; i++ { - expr, err := syntax.ParseExpr(`{foo="bar"} |= "does not match"`) - require.NoError(t, err) - - req := &logproto.FilterChunkRefRequest{ - From: now.Add(-24 * time.Hour), - Through: now, - Refs: groupRefs(t, chunkRefs), - Plan: plan.QueryPlan{AST: expr}, - Blocks: stringSlice(refs), - } - - ctx, cancelFn := context.WithTimeout(context.Background(), 500*time.Millisecond) - ctx = user.InjectOrgID(ctx, tenantID) - t.Cleanup(cancelFn) - - res, err := gw.FilterChunkRefs(ctx, req) - require.ErrorContainsf(t, err, context.DeadlineExceeded.Error(), "%+v", res) - } - }) - t.Run("returns unfiltered chunk refs if no filters provided", func(t *testing.T) { now := mktime("2023-10-03 10:00") diff --git a/pkg/bloomgateway/util_test.go b/pkg/bloomgateway/util_test.go index a3f219c326efd..4bd9d9609d647 100644 --- a/pkg/bloomgateway/util_test.go +++ b/pkg/bloomgateway/util_test.go @@ -399,7 +399,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time, // } // } querier := &bloomshipper.CloseableBlockQuerier{ - BlockQuerier: v1.NewBlockQuerier(block, false, v1.DefaultMaxPageSize), + BlockQuerier: v1.NewBlockQuerier(block, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize), BlockRef: blockRef, } queriers = append(queriers, querier) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 22cd46743ea27..8f91e0d754427 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -35,6 +35,7 @@ import ( "github.com/grafana/loki/v3/pkg/bloomcompactor" "github.com/grafana/loki/v3/pkg/logqlmodel/stats" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/types" "github.com/grafana/loki/v3/pkg/analytics" @@ -79,6 +80,7 @@ import ( "github.com/grafana/loki/v3/pkg/util/httpreq" "github.com/grafana/loki/v3/pkg/util/limiter" util_log "github.com/grafana/loki/v3/pkg/util/log" + "github.com/grafana/loki/v3/pkg/util/mempool" "github.com/grafana/loki/v3/pkg/util/querylimits" lokiring "github.com/grafana/loki/v3/pkg/util/ring" serverutil "github.com/grafana/loki/v3/pkg/util/server" @@ -730,6 +732,19 @@ func (t *Loki) initBloomStore() (services.Service, error) { reg := prometheus.DefaultRegisterer bsCfg := t.Cfg.StorageConfig.BloomShipperConfig + // Set global BloomPageAllocator variable + switch bsCfg.MemoryManagement.BloomPageAllocationType { + case "simple": + bloomshipper.BloomPageAllocator = &v1.SimpleHeapAllocator{} + case "dynamic": + bloomshipper.BloomPageAllocator = v1.BloomPagePool + case "fixed": + bloomshipper.BloomPageAllocator = mempool.New("bloom-page-pool", bsCfg.MemoryManagement.BloomPageMemPoolBuckets, reg) + default: + // do nothing + bloomshipper.BloomPageAllocator = nil + } + var metasCache cache.Cache if t.Cfg.isTarget(IndexGateway) && cache.IsCacheConfigured(bsCfg.MetasCache) { metasCache, err = cache.New(bsCfg.MetasCache, reg, logger, stats.BloomMetasCache, constants.Loki) diff --git a/pkg/storage/bloom/v1/block.go b/pkg/storage/bloom/v1/block.go index ba661de79c498..042c55a7a0666 100644 --- a/pkg/storage/bloom/v1/block.go +++ b/pkg/storage/bloom/v1/block.go @@ -117,11 +117,11 @@ type BlockQuerier struct { // will be returned to the pool for efficiency. This can only safely be used // when the underlying bloom bytes don't escape the decoder, i.e. // when loading blooms for querying (bloom-gw) but not for writing (bloom-compactor). -func NewBlockQuerier(b *Block, noCapture bool, maxPageSize int) *BlockQuerier { +func NewBlockQuerier(b *Block, alloc Allocator, maxPageSize int) *BlockQuerier { return &BlockQuerier{ block: b, series: NewLazySeriesIter(b), - blooms: NewLazyBloomIter(b, noCapture, maxPageSize), + blooms: NewLazyBloomIter(b, alloc, maxPageSize), } } @@ -173,3 +173,7 @@ func (bq *BlockQuerier) Err() error { return bq.blooms.Err() } + +func (bq *BlockQuerier) Close() { + bq.blooms.Close() +} diff --git a/pkg/storage/bloom/v1/bloom.go b/pkg/storage/bloom/v1/bloom.go index aa51762d4e4ec..fc39133e81b05 100644 --- a/pkg/storage/bloom/v1/bloom.go +++ b/pkg/storage/bloom/v1/bloom.go @@ -24,7 +24,7 @@ type Bloom struct { func (b *Bloom) Encode(enc *encoding.Encbuf) error { // divide by 8 b/c bloom capacity is measured in bits, but we want bytes - buf := bytes.NewBuffer(BloomPagePool.Get(int(b.Capacity() / 8))) + buf := bytes.NewBuffer(make([]byte, 0, int(b.Capacity()/8))) // TODO(owen-d): have encoder implement writer directly so we don't need // to indirect via a buffer @@ -36,7 +36,6 @@ func (b *Bloom) Encode(enc *encoding.Encbuf) error { data := buf.Bytes() enc.PutUvarint(len(data)) // length of bloom filter enc.PutBytes(data) - BloomPagePool.Put(data[:0]) // release to pool return nil } @@ -64,11 +63,14 @@ func (b *Bloom) Decode(dec *encoding.Decbuf) error { return nil } -func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) { - data := BloomPagePool.Get(page.Len)[:page.Len] - defer BloomPagePool.Put(data) +func LazyDecodeBloomPage(r io.Reader, alloc Allocator, pool chunkenc.ReaderPool, page BloomPageHeader) (*BloomPageDecoder, error) { + data, err := alloc.Get(page.Len) + if err != nil { + return nil, errors.Wrap(err, "allocating buffer") + } + defer alloc.Put(data) - _, err := io.ReadFull(r, data) + _, err = io.ReadFull(r, data) if err != nil { return nil, errors.Wrap(err, "reading bloom page") } @@ -84,7 +86,10 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe } defer pool.PutReader(decompressor) - b := BloomPagePool.Get(page.DecompressedLen)[:page.DecompressedLen] + b, err := alloc.Get(page.DecompressedLen) + if err != nil { + return nil, errors.Wrap(err, "allocating buffer") + } if _, err = io.ReadFull(decompressor, b); err != nil { return nil, errors.Wrap(err, "decompressing bloom page") @@ -96,14 +101,18 @@ func LazyDecodeBloomPage(r io.Reader, pool chunkenc.ReaderPool, page BloomPageHe } // shortcut to skip allocations when we know the page is not compressed -func LazyDecodeBloomPageNoCompression(r io.Reader, page BloomPageHeader) (*BloomPageDecoder, error) { +func LazyDecodeBloomPageNoCompression(r io.Reader, alloc Allocator, page BloomPageHeader) (*BloomPageDecoder, error) { // data + checksum if page.Len != page.DecompressedLen+4 { return nil, errors.New("the Len and DecompressedLen of the page do not match") } - data := BloomPagePool.Get(page.Len)[:page.Len] - _, err := io.ReadFull(r, data) + data, err := alloc.Get(page.Len) + if err != nil { + return nil, errors.Wrap(err, "allocating buffer") + } + + _, err = io.ReadFull(r, data) if err != nil { return nil, errors.Wrap(err, "reading bloom page") } @@ -158,12 +167,16 @@ type BloomPageDecoder struct { // This can only safely be used when the underlying bloom // bytes don't escape the decoder: // on reads in the bloom-gw but not in the bloom-compactor -func (d *BloomPageDecoder) Relinquish() { +func (d *BloomPageDecoder) Relinquish(alloc Allocator) { + if d == nil { + return + } + data := d.data d.data = nil if cap(data) > 0 { - BloomPagePool.Put(data) + _ = alloc.Put(data) } } @@ -277,7 +290,7 @@ func (b *BloomBlock) DecodeHeaders(r io.ReadSeeker) (uint32, error) { // BloomPageDecoder returns a decoder for the given page index. // It may skip the page if it's too large. // NB(owen-d): if `skip` is true, err _must_ be nil. -func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) { +func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, alloc Allocator, pageIdx int, maxPageSize int, metrics *Metrics) (res *BloomPageDecoder, skip bool, err error) { if pageIdx < 0 || pageIdx >= len(b.pageHeaders) { metrics.pagesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Inc() metrics.bytesSkipped.WithLabelValues(pageTypeBloom, skipReasonOOB).Add(float64(b.pageHeaders[pageIdx].DecompressedLen)) @@ -300,9 +313,9 @@ func (b *BloomBlock) BloomPageDecoder(r io.ReadSeeker, pageIdx int, maxPageSize } if b.schema.encoding == chunkenc.EncNone { - res, err = LazyDecodeBloomPageNoCompression(r, page) + res, err = LazyDecodeBloomPageNoCompression(r, alloc, page) } else { - res, err = LazyDecodeBloomPage(r, b.schema.DecompressorPool(), page) + res, err = LazyDecodeBloomPage(r, alloc, b.schema.DecompressorPool(), page) } if err != nil { diff --git a/pkg/storage/bloom/v1/bloom_querier.go b/pkg/storage/bloom/v1/bloom_querier.go index 8de9a33e713f0..b90bae8a046bb 100644 --- a/pkg/storage/bloom/v1/bloom_querier.go +++ b/pkg/storage/bloom/v1/bloom_querier.go @@ -7,11 +7,11 @@ type BloomQuerier interface { } type LazyBloomIter struct { - usePool bool - b *Block m int // max page size in bytes + alloc Allocator + // state initialized bool err error @@ -24,11 +24,11 @@ type LazyBloomIter struct { // will be returned to the pool for efficiency. // This can only safely be used when the underlying bloom // bytes don't escape the decoder. -func NewLazyBloomIter(b *Block, pool bool, maxSize int) *LazyBloomIter { +func NewLazyBloomIter(b *Block, alloc Allocator, maxSize int) *LazyBloomIter { return &LazyBloomIter{ - usePool: pool, - b: b, - m: maxSize, + b: b, + m: maxSize, + alloc: alloc, } } @@ -53,16 +53,14 @@ func (it *LazyBloomIter) LoadOffset(offset BloomOffset) (skip bool) { // drop the current page if it exists and // we're using the pool - if it.curPage != nil && it.usePool { - it.curPage.Relinquish() - } + it.curPage.Relinquish(it.alloc) r, err := it.b.reader.Blooms() if err != nil { it.err = errors.Wrap(err, "getting blooms reader") return false } - decoder, skip, err := it.b.blooms.BloomPageDecoder(r, offset.Page, it.m, it.b.metrics) + decoder, skip, err := it.b.blooms.BloomPageDecoder(r, it.alloc, offset.Page, it.m, it.b.metrics) if err != nil { it.err = errors.Wrap(err, "loading bloom page") return false @@ -106,6 +104,7 @@ func (it *LazyBloomIter) next() bool { var skip bool it.curPage, skip, err = it.b.blooms.BloomPageDecoder( r, + it.alloc, it.curPageIndex, it.m, it.b.metrics, @@ -130,11 +129,8 @@ func (it *LazyBloomIter) next() bool { // we've exhausted the current page, progress to next it.curPageIndex++ - // drop the current page if it exists and - // we're using the pool - if it.usePool { - it.curPage.Relinquish() - } + // drop the current page if it exists + it.curPage.Relinquish(it.alloc) it.curPage = nil continue } @@ -161,3 +157,7 @@ func (it *LazyBloomIter) Err() error { return nil } } + +func (it *LazyBloomIter) Close() { + it.curPage.Relinquish(it.alloc) +} diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index 56d03cbd7c930..45461824970ab 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -117,7 +117,7 @@ func TestBlockBuilder_RoundTrip(t *testing.T) { } block := NewBlock(tc.reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize) + querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize) err = block.LoadHeaders() require.Nil(t, err) @@ -218,7 +218,7 @@ func TestMergeBuilder(t *testing.T) { itr := NewSliceIter[SeriesWithBloom](data[min:max]) _, err = builder.BuildFrom(itr) require.Nil(t, err) - blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize))) + blocks = append(blocks, NewPeekingIter[*SeriesWithBloom](NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize))) } // We're not testing the ability to extend a bloom in this test @@ -252,7 +252,7 @@ func TestMergeBuilder(t *testing.T) { require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize) + querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize) EqualIterators[*SeriesWithBloom]( t, @@ -296,7 +296,7 @@ func TestBlockReset(t *testing.T) { _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize) + querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize) rounds := make([][]model.Fingerprint, 2) @@ -362,7 +362,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { _, err = builder.BuildFrom(itr) require.Nil(t, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, false, DefaultMaxPageSize) + querier := NewBlockQuerier(block, &SimpleHeapAllocator{}, DefaultMaxPageSize) // rather than use the block querier directly, collect it's data // so we can use it in a few places later @@ -423,7 +423,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { // ensure the new block contains one copy of all the data // by comparing it against an iterator over the source data - mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), false, DefaultMaxPageSize) + mergedBlockQuerier := NewBlockQuerier(NewBlock(reader, NewMetrics(nil)), &SimpleHeapAllocator{}, DefaultMaxPageSize) sourceItr := NewSliceIter[*SeriesWithBloom](PointerSlice[SeriesWithBloom](xs)) EqualIterators[*SeriesWithBloom]( diff --git a/pkg/storage/bloom/v1/fuse_test.go b/pkg/storage/bloom/v1/fuse_test.go index a0dc23001e939..3df65a8da27c4 100644 --- a/pkg/storage/bloom/v1/fuse_test.go +++ b/pkg/storage/bloom/v1/fuse_test.go @@ -76,7 +76,7 @@ func TestFusedQuerier(t *testing.T) { require.NoError(t, err) require.False(t, itr.Next()) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, true, DefaultMaxPageSize) + querier := NewBlockQuerier(block, BloomPagePool, DefaultMaxPageSize) n := 2 nReqs := numSeries / n @@ -215,7 +215,7 @@ func TestLazyBloomIter_Seek_ResetError(t *testing.T) { require.False(t, itr.Next()) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, true, 1000) + querier := NewBlockQuerier(block, BloomPagePool, 1000) for fp := model.Fingerprint(0); fp < model.Fingerprint(numSeries); fp++ { err := querier.Seek(fp) @@ -264,7 +264,7 @@ func setupBlockForBenchmark(b *testing.B) (*BlockQuerier, [][]Request, []chan Ou _, err = builder.BuildFrom(itr) require.Nil(b, err) block := NewBlock(reader, NewMetrics(nil)) - querier := NewBlockQuerier(block, true, DefaultMaxPageSize) + querier := NewBlockQuerier(block, BloomPagePool, DefaultMaxPageSize) numRequestChains := 100 seriesPerRequest := 100 diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index caadfa26ddf74..eed6d21ce5c9c 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -166,7 +166,7 @@ func (b *BlockIndex) NewSeriesPageDecoder(r io.ReadSeeker, header SeriesPageHead return nil, errors.Wrap(err, "seeking to series page") } - data := SeriesPagePool.Get(header.Len)[:header.Len] + data, _ := SeriesPagePool.Get(header.Len) defer SeriesPagePool.Put(data) _, err = io.ReadFull(r, data) if err != nil { diff --git a/pkg/storage/bloom/v1/util.go b/pkg/storage/bloom/v1/util.go index 22fb47e43e799..92e64048fa5bd 100644 --- a/pkg/storage/bloom/v1/util.go +++ b/pkg/storage/bloom/v1/util.go @@ -44,7 +44,7 @@ var ( // buffer pool for bloom pages // 128KB 256KB 512KB 1MB 2MB 4MB 8MB 16MB 32MB 64MB 128MB - BloomPagePool = BytePool{ + BloomPagePool = &BytePool{ pool: pool.New( 128<<10, 128<<20, 2, func(size int) interface{} { @@ -53,15 +53,38 @@ var ( } ) +// Allocator handles byte slices for bloom queriers. +// It exists to reduce the cost of allocations and allows to re-use already allocated memory. +type Allocator interface { + Get(size int) ([]byte, error) + Put([]byte) bool +} + +// SimpleHeapAllocator allocates a new byte slice every time and does not re-cycle buffers. +type SimpleHeapAllocator struct{} + +func (a *SimpleHeapAllocator) Get(size int) ([]byte, error) { + return make([]byte, size), nil +} + +func (a *SimpleHeapAllocator) Put([]byte) bool { + return true +} + +// BytePool uses a sync.Pool to re-cycle already allocated buffers. type BytePool struct { pool *pool.Pool } -func (p *BytePool) Get(size int) []byte { - return p.pool.Get(size).([]byte)[:0] +// Get implement Allocator +func (p *BytePool) Get(size int) ([]byte, error) { + return p.pool.Get(size).([]byte)[:size], nil } -func (p *BytePool) Put(b []byte) { + +// Put implement Allocator +func (p *BytePool) Put(b []byte) bool { p.pool.Put(b) + return true } func newCRC32() hash.Hash32 { diff --git a/pkg/storage/stores/shipper/bloomshipper/cache.go b/pkg/storage/stores/shipper/bloomshipper/cache.go index 6ff6ef64948e3..c1e3964fe8fc0 100644 --- a/pkg/storage/stores/shipper/bloomshipper/cache.go +++ b/pkg/storage/stores/shipper/bloomshipper/cache.go @@ -15,6 +15,8 @@ import ( "github.com/grafana/loki/v3/pkg/util" ) +var BloomPageAllocator v1.Allocator + type CloseableBlockQuerier struct { BlockRef *v1.BlockQuerier @@ -22,6 +24,7 @@ type CloseableBlockQuerier struct { } func (c *CloseableBlockQuerier) Close() error { + c.BlockQuerier.Close() if c.close != nil { return c.close() } @@ -157,15 +160,24 @@ func (b *BlockDirectory) resolveSize() error { // BlockQuerier returns a new block querier from the directory. // The passed function `close` is called when the the returned querier is closed. - func (b BlockDirectory) BlockQuerier( usePool bool, close func() error, maxPageSize int, metrics *v1.Metrics, ) *CloseableBlockQuerier { + + var alloc v1.Allocator + if usePool && BloomPageAllocator != nil { + alloc = BloomPageAllocator + } else { + alloc = &v1.SimpleHeapAllocator{} + } + + bq := v1.NewBlockQuerier(b.Block(metrics), alloc, maxPageSize) + return &CloseableBlockQuerier{ - BlockQuerier: v1.NewBlockQuerier(b.Block(metrics), usePool, maxPageSize), + BlockQuerier: bq, BlockRef: b.BlockRef, close: close, } diff --git a/pkg/storage/stores/shipper/bloomshipper/config/config.go b/pkg/storage/stores/shipper/bloomshipper/config/config.go index 72d8f8557b095..6de144a3f84bf 100644 --- a/pkg/storage/stores/shipper/bloomshipper/config/config.go +++ b/pkg/storage/stores/shipper/bloomshipper/config/config.go @@ -4,11 +4,16 @@ package config import ( "errors" "flag" + "fmt" + "slices" + "strings" "time" "github.com/grafana/dskit/flagext" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" + lokiflagext "github.com/grafana/loki/v3/pkg/util/flagext" + "github.com/grafana/loki/v3/pkg/util/mempool" ) type Config struct { @@ -18,6 +23,7 @@ type Config struct { BlocksCache BlocksCacheConfig `yaml:"blocks_cache"` MetasCache cache.Config `yaml:"metas_cache"` MetasLRUCache cache.EmbeddedCacheConfig `yaml:"metas_lru_cache"` + MemoryManagement MemoryManagementConfig `yaml:"memory_management" doc:"hidden"` // This will always be set to true when flags are registered. // In tests, where config is created as literal, it can be set manually. @@ -34,6 +40,7 @@ func (c *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { c.BlocksCache.RegisterFlagsWithPrefixAndDefaults(prefix+"blocks-cache.", "Cache for bloom blocks. ", f, 24*time.Hour) c.MetasCache.RegisterFlagsWithPrefix(prefix+"metas-cache.", "Cache for bloom metas. ", f) c.MetasLRUCache.RegisterFlagsWithPrefix(prefix+"metas-lru-cache.", "In-memory LRU cache for bloom metas. ", f) + c.MemoryManagement.RegisterFlagsWithPrefix(prefix+"memory-management.", f) // always cache LIST operations c.CacheListOps = true @@ -43,6 +50,9 @@ func (c *Config) Validate() error { if len(c.WorkingDirectory) == 0 { return errors.New("at least one working directory must be specified") } + if err := c.MemoryManagement.Validate(); err != nil { + return err + } return nil } @@ -81,3 +91,60 @@ func (cfg *BlocksCacheConfig) Validate() error { } return nil } + +var ( + // the default that describes a 4GiB memory pool + defaultMemPoolBuckets = mempool.Buckets{ + {Size: 128, Capacity: 64 << 10}, // 8MiB -- for tests + {Size: 512, Capacity: 2 << 20}, // 1024MiB + {Size: 128, Capacity: 8 << 20}, // 1024MiB + {Size: 32, Capacity: 32 << 20}, // 1024MiB + {Size: 8, Capacity: 128 << 20}, // 1024MiB + } + types = supportedAllocationTypes{ + "simple", "simple heap allocations using Go's make([]byte, n) and no re-cycling of buffers", + "dynamic", "a buffer pool with variable sized buckets and best effort re-cycling of buffers using Go's sync.Pool", + "fixed", "a fixed size memory pool with configurable slab sizes, see mem-pool-buckets", + } +) + +type MemoryManagementConfig struct { + BloomPageAllocationType string `yaml:"bloom_page_alloc_type"` + BloomPageMemPoolBuckets lokiflagext.CSV[mempool.Bucket] `yaml:"bloom_page_mem_pool_buckets"` +} + +func (cfg *MemoryManagementConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + f.StringVar(&cfg.BloomPageAllocationType, prefix+"alloc-type", "dynamic", fmt.Sprintf("One of: %s", strings.Join(types.descriptions(), ", "))) + + _ = cfg.BloomPageMemPoolBuckets.Set(defaultMemPoolBuckets.String()) + f.Var(&cfg.BloomPageMemPoolBuckets, prefix+"mem-pool-buckets", "Comma separated list of buckets in the format {size}x{bytes}") +} + +func (cfg *MemoryManagementConfig) Validate() error { + if !slices.Contains(types.names(), cfg.BloomPageAllocationType) { + msg := fmt.Sprintf("bloom_page_alloc_type must be one of: %s", strings.Join(types.descriptions(), ", ")) + return errors.New(msg) + } + if cfg.BloomPageAllocationType == "fixed" && len(cfg.BloomPageMemPoolBuckets) == 0 { + return errors.New("fixed memory pool requires at least one bucket") + } + return nil +} + +type supportedAllocationTypes []string + +func (t supportedAllocationTypes) names() []string { + names := make([]string, 0, len(t)/2) + for i := 0; i < len(t); i += 2 { + names = append(names, t[i]) + } + return names +} + +func (t supportedAllocationTypes) descriptions() []string { + names := make([]string, 0, len(t)/2) + for i := 0; i < len(t); i += 2 { + names = append(names, fmt.Sprintf("%s (%s)", t[i], t[i+1])) + } + return names +} diff --git a/pkg/util/flagext/csv.go b/pkg/util/flagext/csv.go new file mode 100644 index 0000000000000..6ed5f9bad11a0 --- /dev/null +++ b/pkg/util/flagext/csv.go @@ -0,0 +1,62 @@ +package flagext + +import ( + "strings" +) + +type ListValue interface { + String() string + Parse(s string) (any, error) +} + +// StringSliceCSV is a slice of strings that is parsed from a comma-separated string +// It implements flag.Value and yaml Marshalers +type CSV[T ListValue] []T + +// String implements flag.Value +func (v CSV[T]) String() string { + s := make([]string, 0, len(v)) + for i := range v { + s = append(s, v[i].String()) + } + return strings.Join(s, ",") +} + +// Set implements flag.Value +func (v *CSV[T]) Set(s string) error { + if len(s) == 0 { + *v = nil + return nil + } + var zero T + values := strings.Split(s, ",") + *v = make(CSV[T], 0, len(values)) + for _, val := range values { + el, err := zero.Parse(val) + if err != nil { + return err + } + *v = append(*v, el.(T)) + } + return nil +} + +// String implements flag.Getter +func (v CSV[T]) Get() []T { + return v +} + +// UnmarshalYAML implements yaml.Unmarshaler. +func (v *CSV[T]) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + + return v.Set(s) +} + +// MarshalYAML implements yaml.Marshaler. +func (v CSV[T]) MarshalYAML() (interface{}, error) { + return v.String(), nil +} diff --git a/pkg/util/flagext/csv_test.go b/pkg/util/flagext/csv_test.go new file mode 100644 index 0000000000000..aca4ea8a77eef --- /dev/null +++ b/pkg/util/flagext/csv_test.go @@ -0,0 +1,79 @@ +package flagext + +import ( + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +type customType int + +// Parse implements ListValue. +func (l customType) Parse(s string) (any, error) { + v, err := strconv.Atoi(s) + if err != nil { + return customType(0), err + } + return customType(v), nil +} + +// String implements ListValue. +func (l customType) String() string { + return strconv.Itoa(int(l)) +} + +var _ ListValue = customType(0) + +func Test_CSV(t *testing.T) { + for _, tc := range []struct { + in string + err bool + out []customType + }{ + { + in: "", + err: false, + out: nil, + }, + { + in: ",", + err: true, + out: []customType{}, + }, + { + in: "1", + err: false, + out: []customType{1}, + }, + { + in: "1,2", + err: false, + out: []customType{1, 2}, + }, + { + in: "1,", + err: true, + out: []customType{}, + }, + { + in: ",1", + err: true, + out: []customType{}, + }, + } { + t.Run(tc.in, func(t *testing.T) { + var v CSV[customType] + + err := v.Set(tc.in) + if tc.err { + require.NotNil(t, err) + } else { + require.Nil(t, err) + require.Equal(t, tc.out, v.Get()) + } + + }) + } + +} diff --git a/pkg/util/mempool/bucket.go b/pkg/util/mempool/bucket.go new file mode 100644 index 0000000000000..a041eb49e3f8f --- /dev/null +++ b/pkg/util/mempool/bucket.go @@ -0,0 +1,51 @@ +package mempool + +import ( + "errors" + "fmt" + "strconv" + "strings" + + "github.com/c2h5oh/datasize" +) + +type Bucket struct { + Size int + Capacity uint64 +} + +func (b Bucket) Parse(s string) (any, error) { + parts := strings.Split(s, "x") + if len(parts) != 2 { + return nil, errors.New("bucket must be in format {count}x{bytes}") + } + + size, err := strconv.Atoi(parts[0]) + if err != nil { + return nil, err + } + + capacity, err := datasize.ParseString(parts[1]) + if err != nil { + panic(err.Error()) + } + + return Bucket{ + Size: size, + Capacity: uint64(capacity), + }, nil +} + +func (b Bucket) String() string { + return fmt.Sprintf("%dx%s", b.Size, datasize.ByteSize(b.Capacity).String()) +} + +type Buckets []Bucket + +func (b Buckets) String() string { + s := make([]string, 0, len(b)) + for i := range b { + s = append(s, b[i].String()) + } + return strings.Join(s, ",") +} diff --git a/pkg/util/mempool/metrics.go b/pkg/util/mempool/metrics.go new file mode 100644 index 0000000000000..f7d5a52eb0d91 --- /dev/null +++ b/pkg/util/mempool/metrics.go @@ -0,0 +1,32 @@ +package mempool + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + + "github.com/grafana/loki/v3/pkg/util/constants" +) + +type metrics struct { + availableBuffersPerSlab *prometheus.CounterVec + errorsCounter *prometheus.CounterVec +} + +func newMetrics(r prometheus.Registerer, name string) *metrics { + return &metrics{ + availableBuffersPerSlab: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Subsystem: "mempool", + Name: "available_buffers_per_slab", + Help: "The amount of available buffers per slab.", + ConstLabels: prometheus.Labels{"pool": name}, + }, []string{"slab"}), + errorsCounter: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: constants.Loki, + Subsystem: "mempool", + Name: "errors_total", + Help: "The total amount of errors returned from the pool.", + ConstLabels: prometheus.Labels{"pool": name}, + }, []string{"slab", "reason"}), + } +} diff --git a/pkg/util/mempool/pool.go b/pkg/util/mempool/pool.go new file mode 100644 index 0000000000000..b42d8d9237677 --- /dev/null +++ b/pkg/util/mempool/pool.go @@ -0,0 +1,135 @@ +package mempool + +import ( + "errors" + "fmt" + "sync" + "unsafe" + + "github.com/dustin/go-humanize" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + errSlabExhausted = errors.New("slab exhausted") + + reasonSizeExceeded = "size-exceeded" + reasonSlabExhausted = "slab-exhausted" +) + +type slab struct { + buffer chan unsafe.Pointer + size, count int + mtx sync.Mutex + metrics *metrics + name string +} + +func newSlab(bufferSize, bufferCount int, m *metrics) *slab { + name := humanize.Bytes(uint64(bufferSize)) + m.availableBuffersPerSlab.WithLabelValues(name).Add(0) // initialize metric with value 0 + + return &slab{ + size: bufferSize, + count: bufferCount, + metrics: m, + name: name, + } +} + +func (s *slab) init() { + s.buffer = make(chan unsafe.Pointer, s.count) + for i := 0; i < s.count; i++ { + buf := make([]byte, 0, s.size) + ptr := unsafe.Pointer(unsafe.SliceData(buf)) + s.buffer <- ptr + } + s.metrics.availableBuffersPerSlab.WithLabelValues(s.name).Add(float64(s.count)) +} + +func (s *slab) get(size int) ([]byte, error) { + s.mtx.Lock() + if s.buffer == nil { + s.init() + } + defer s.mtx.Unlock() + + // wait for available buffer on channel + var buf []byte + select { + case ptr := <-s.buffer: + buf = unsafe.Slice((*byte)(ptr), s.size) + default: + s.metrics.errorsCounter.WithLabelValues(s.name, reasonSlabExhausted).Inc() + return nil, errSlabExhausted + } + + // Taken from https://github.com/ortuman/nuke/blob/main/monotonic_arena.go#L37-L48 + // This piece of code will be translated into a runtime.memclrNoHeapPointers + // invocation by the compiler, which is an assembler optimized implementation. + // Architecture specific code can be found at src/runtime/memclr_$GOARCH.s + // in Go source (since https://codereview.appspot.com/137880043). + for i := range buf { + buf[i] = 0 + } + + return buf[:size], nil +} + +func (s *slab) put(buf []byte) { + if s.buffer == nil { + panic("slab is not initialized") + } + + ptr := unsafe.Pointer(unsafe.SliceData(buf)) + s.buffer <- ptr +} + +// MemPool is an Allocator implementation that uses a fixed size memory pool +// that is split into multiple slabs of different buffer sizes. +// Buffers are re-cycled and need to be returned back to the pool, otherwise +// the pool runs out of available buffers. +type MemPool struct { + slabs []*slab + metrics *metrics +} + +func New(name string, buckets []Bucket, r prometheus.Registerer) *MemPool { + a := &MemPool{ + slabs: make([]*slab, 0, len(buckets)), + metrics: newMetrics(r, name), + } + for _, b := range buckets { + a.slabs = append(a.slabs, newSlab(int(b.Capacity), b.Size, a.metrics)) + } + return a +} + +// Get satisfies Allocator interface +// Allocating a buffer from an exhausted pool/slab, or allocating a buffer that +// exceeds the largest slab size will return an error. +func (a *MemPool) Get(size int) ([]byte, error) { + for i := 0; i < len(a.slabs); i++ { + if a.slabs[i].size < size { + continue + } + return a.slabs[i].get(size) + } + a.metrics.errorsCounter.WithLabelValues("pool", reasonSizeExceeded).Inc() + return nil, fmt.Errorf("no slab found for size: %d", size) +} + +// Put satisfies Allocator interface +// Every buffer allocated with Get(size int) needs to be returned to the pool +// using Put(buffer []byte) so it can be re-cycled. +func (a *MemPool) Put(buffer []byte) bool { + size := cap(buffer) + for i := 0; i < len(a.slabs); i++ { + if a.slabs[i].size < size { + continue + } + a.slabs[i].put(buffer) + return true + } + return false +} diff --git a/pkg/util/mempool/pool_test.go b/pkg/util/mempool/pool_test.go new file mode 100644 index 0000000000000..da0fc361dd4a4 --- /dev/null +++ b/pkg/util/mempool/pool_test.go @@ -0,0 +1,133 @@ +package mempool + +import ( + "math/rand" + "sync" + "testing" + "time" + "unsafe" + + "github.com/stretchr/testify/require" +) + +func TestMemPool(t *testing.T) { + + t.Run("empty pool", func(t *testing.T) { + pool := New("test", []Bucket{}, nil) + _, err := pool.Get(256) + require.Error(t, err) + }) + + t.Run("requested size too big", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 1, Capacity: 128}, + }, nil) + _, err := pool.Get(256) + require.Error(t, err) + }) + + t.Run("requested size within bucket", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 1, Capacity: 128}, + {Size: 1, Capacity: 256}, + {Size: 1, Capacity: 512}, + }, nil) + res, err := pool.Get(200) + require.NoError(t, err) + require.Equal(t, 200, len(res)) + require.Equal(t, 256, cap(res)) + + res, err = pool.Get(300) + require.NoError(t, err) + require.Equal(t, 300, len(res)) + require.Equal(t, 512, cap(res)) + }) + + t.Run("buffer is cleared when returned", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 1, Capacity: 64}, + }, nil) + res, err := pool.Get(8) + require.NoError(t, err) + require.Equal(t, 8, len(res)) + source := []byte{0, 1, 2, 3, 4, 5, 6, 7} + copy(res, source) + + pool.Put(res) + + res, err = pool.Get(8) + require.NoError(t, err) + require.Equal(t, 8, len(res)) + require.Equal(t, make([]byte, 8), res) + }) + + t.Run("pool returns error when no buffer is available", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 1, Capacity: 64}, + }, nil) + buf1, _ := pool.Get(32) + require.Equal(t, 32, len(buf1)) + + _, err := pool.Get(16) + require.ErrorContains(t, err, errSlabExhausted.Error()) + }) + + t.Run("test ring buffer returns same backing array", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 2, Capacity: 128}, + }, nil) + res1, _ := pool.Get(32) + ptr1 := unsafe.Pointer(unsafe.SliceData(res1)) + + res2, _ := pool.Get(64) + ptr2 := unsafe.Pointer(unsafe.SliceData(res2)) + + pool.Put(res2) + pool.Put(res1) + + res3, _ := pool.Get(48) + ptr3 := unsafe.Pointer(unsafe.SliceData(res3)) + + res4, _ := pool.Get(96) + ptr4 := unsafe.Pointer(unsafe.SliceData(res4)) + + require.Equal(t, ptr1, ptr4) + require.Equal(t, ptr2, ptr3) + }) + + t.Run("concurrent access", func(t *testing.T) { + pool := New("test", []Bucket{ + {Size: 32, Capacity: 2 << 10}, + {Size: 16, Capacity: 4 << 10}, + {Size: 8, Capacity: 8 << 10}, + {Size: 4, Capacity: 16 << 10}, + {Size: 2, Capacity: 32 << 10}, + }, nil) + + var wg sync.WaitGroup + numWorkers := 256 + n := 10 + + for i := 0; i < numWorkers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < n; i++ { + s := 2 << rand.Intn(5) + buf1, err1 := pool.Get(s) + buf2, err2 := pool.Get(s) + if err2 == nil { + pool.Put(buf2) + } + time.Sleep(time.Millisecond * time.Duration(rand.Intn(10))) + if err1 == nil { + pool.Put(buf1) + } + } + }() + } + + wg.Wait() + t.Log("finished") + }) +} diff --git a/tools/bloom/inspector/main.go b/tools/bloom/inspector/main.go index dfcc7c79cd86d..90aa8c29c2cbb 100644 --- a/tools/bloom/inspector/main.go +++ b/tools/bloom/inspector/main.go @@ -18,7 +18,7 @@ func main() { r := v1.NewDirectoryBlockReader(path) b := v1.NewBlock(r, v1.NewMetrics(nil)) - q := v1.NewBlockQuerier(b, true, v1.DefaultMaxPageSize) + q := v1.NewBlockQuerier(b, &v1.SimpleHeapAllocator{}, v1.DefaultMaxPageSize) md, err := q.Metadata() if err != nil {