diff --git a/integration/bloom_building_test.go b/integration/bloom_building_test.go index 2c4662eef4cb3..678ee281d6fda 100644 --- a/integration/bloom_building_test.go +++ b/integration/bloom_building_test.go @@ -61,15 +61,7 @@ func TestBloomBuilding(t *testing.T) { cliIngester.Now = now // We now ingest some logs across many series. - series := make([]labels.Labels, 0, nSeries) - for i := 0; i < nSeries; i++ { - lbs := labels.FromStrings("job", fmt.Sprintf("job-%d", i)) - series = append(series, lbs) - - for j := 0; j < nLogsPerSeries; j++ { - require.NoError(t, cliDistributor.PushLogLine(fmt.Sprintf("log line %d", j), now, nil, lbs.Map())) - } - } + series := writeSeries(t, nSeries, nLogsPerSeries, cliDistributor, now, "job") // restart ingester which should flush the chunks and index require.NoError(t, tIngester.Restart()) @@ -122,14 +114,8 @@ func TestBloomBuilding(t *testing.T) { checkSeriesInBlooms(t, now, tenantID, bloomStore, series) // Push some more logs so TSDBs need to be updated. - for i := 0; i < nSeries; i++ { - lbs := labels.FromStrings("job", fmt.Sprintf("job-new-%d", i)) - series = append(series, lbs) - - for j := 0; j < nLogsPerSeries; j++ { - require.NoError(t, cliDistributor.PushLogLine(fmt.Sprintf("log line %d", j), now, nil, lbs.Map())) - } - } + newSeries := writeSeries(t, nSeries, nLogsPerSeries, cliDistributor, now, "job-new") + series = append(series, newSeries...) // restart ingester which should flush the chunks and index require.NoError(t, tIngester.Restart()) @@ -145,6 +131,33 @@ func TestBloomBuilding(t *testing.T) { checkSeriesInBlooms(t, now, tenantID, bloomStore, series) } +func writeSeries(t *testing.T, nSeries int, nLogsPerSeries int, cliDistributor *client.Client, now time.Time, seriesPrefix string) []labels.Labels { + series := make([]labels.Labels, 0, nSeries) + for i := 0; i < nSeries; i++ { + lbs := labels.FromStrings("job", fmt.Sprintf("%s-%d", seriesPrefix, i)) + series = append(series, lbs) + + for j := 0; j < nLogsPerSeries; j++ { + // Only write wtructured metadata for half of the series + var metadata map[string]string + if i%2 == 0 { + metadata = map[string]string{ + "traceID": fmt.Sprintf("%d%d", i, j), + "user": fmt.Sprintf("%d%d", i, j%10), + } + } + + require.NoError(t, cliDistributor.PushLogLine( + fmt.Sprintf("log line %d", j), + now, + metadata, + lbs.Map(), + )) + } + } + return series +} + func checkCompactionFinished(t *testing.T, cliCompactor *client.Client) { checkForTimestampMetric(t, cliCompactor, "loki_boltdb_shipper_compact_tables_operation_last_successful_run_timestamp_seconds") } diff --git a/pkg/bloombuild/builder/spec.go b/pkg/bloombuild/builder/spec.go index 781a0ca04872e..f7c147fb0a2f8 100644 --- a/pkg/bloombuild/builder/spec.go +++ b/pkg/bloombuild/builder/spec.go @@ -137,7 +137,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt ) } - return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter) + return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.logger, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter) } // LazyBlockBuilderIterator is a lazy iterator over blocks that builds @@ -146,6 +146,7 @@ type LazyBlockBuilderIterator struct { ctx context.Context opts v1.BlockOptions metrics *v1.Metrics + logger log.Logger populate v1.BloomPopulatorFunc writerReaderFunc func() (v1.BlockWriter, v1.BlockReader) series iter.PeekIterator[*v1.Series] @@ -160,6 +161,7 @@ func NewLazyBlockBuilderIterator( ctx context.Context, opts v1.BlockOptions, metrics *v1.Metrics, + logger log.Logger, populate v1.BloomPopulatorFunc, writerReaderFunc func() (v1.BlockWriter, v1.BlockReader), series iter.PeekIterator[*v1.Series], @@ -169,6 +171,7 @@ func NewLazyBlockBuilderIterator( ctx: ctx, opts: opts, metrics: metrics, + logger: logger, populate: populate, writerReaderFunc: writerReaderFunc, series: series, @@ -196,7 +199,7 @@ func (b *LazyBlockBuilderIterator) Next() bool { return false } - mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics) + mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics, b.logger) writer, reader := b.writerReaderFunc() blockBuilder, err := v1.NewBlockBuilder(b.opts, writer) if err != nil { diff --git a/pkg/storage/bloom/v1/bloom_builder.go b/pkg/storage/bloom/v1/bloom_builder.go index 9829d9ffc380a..c327f5d6bfd95 100644 --- a/pkg/storage/bloom/v1/bloom_builder.go +++ b/pkg/storage/bloom/v1/bloom_builder.go @@ -28,6 +28,10 @@ func NewBloomBlockBuilder(opts BlockOptions, writer io.WriteCloser) *BloomBlockB } } +func (b *BloomBlockBuilder) UnflushedSize() int { + return b.scratch.Len() + b.page.UnflushedSize() +} + func (b *BloomBlockBuilder) Append(bloom *Bloom) (BloomOffset, error) { if !b.writtenSchema { if err := b.writeSchema(); err != nil { @@ -68,6 +72,14 @@ func (b *BloomBlockBuilder) writeSchema() error { } func (b *BloomBlockBuilder) Close() (uint32, error) { + if !b.writtenSchema { + // We will get here only if we haven't appended any bloom filters to the block + // This would happen only if all series yielded empty blooms + if err := b.writeSchema(); err != nil { + return 0, errors.Wrap(err, "writing schema") + } + } + if b.page.Count() > 0 { if err := b.flushPage(); err != nil { return 0, errors.Wrap(err, "flushing final bloom page") diff --git a/pkg/storage/bloom/v1/builder.go b/pkg/storage/bloom/v1/builder.go index fa5f5aa047a7b..466687aa44b9a 100644 --- a/pkg/storage/bloom/v1/builder.go +++ b/pkg/storage/bloom/v1/builder.go @@ -5,6 +5,8 @@ import ( "hash" "io" + "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/grafana/loki/v3/pkg/compression" @@ -112,6 +114,10 @@ func (w *PageWriter) Reset() { w.n = 0 } +func (w *PageWriter) UnflushedSize() int { + return w.enc.Len() +} + func (w *PageWriter) SpaceFor(numBytes int) bool { // if a single bloom exceeds the target size, still accept it // otherwise only accept it if adding it would not exceed the target size @@ -189,6 +195,7 @@ type MergeBuilder struct { // Add chunks of a single series to a bloom populate BloomPopulatorFunc metrics *Metrics + logger log.Logger } type BloomPopulatorFunc func(series *Series, preExistingBlooms iter.SizedIterator[*Bloom], chunksToAdd ChunkRefs, ch chan *BloomCreation) @@ -202,6 +209,7 @@ func NewMergeBuilder( store iter.Iterator[*Series], populate BloomPopulatorFunc, metrics *Metrics, + logger log.Logger, ) *MergeBuilder { // combinedSeriesIter handles series with fingerprint collisions: // because blooms dont contain the label-set (only the fingerprint), @@ -229,6 +237,7 @@ func NewMergeBuilder( store: combinedSeriesIter, populate: populate, metrics: metrics, + logger: logger, } } @@ -306,6 +315,12 @@ func (mb *MergeBuilder) processNextSeries( if creation.Err != nil { return nil, info.sourceBytes, 0, false, false, errors.Wrap(creation.Err, "populating bloom") } + + if creation.Bloom.IsEmpty() { + level.Debug(mb.logger).Log("msg", "received empty bloom. Adding to index but skipping offsets", "fingerprint", nextInStore.Fingerprint) + continue + } + offset, err := builder.AddBloom(creation.Bloom) if err != nil { return nil, info.sourceBytes, 0, false, false, errors.Wrapf( diff --git a/pkg/storage/bloom/v1/builder_test.go b/pkg/storage/bloom/v1/builder_test.go index a1917bc096a7c..fa8ccbc87a3d9 100644 --- a/pkg/storage/bloom/v1/builder_test.go +++ b/pkg/storage/bloom/v1/builder_test.go @@ -6,6 +6,7 @@ import ( "sort" "testing" + "github.com/go-kit/log" "github.com/prometheus/common/model" "github.com/stretchr/testify/require" @@ -263,7 +264,7 @@ func TestMergeBuilder(t *testing.T) { ) // Ensure that the merge builder combines all the blocks correctly - mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, populate, NewMetrics(nil)) + mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, populate, NewMetrics(nil), log.NewNopLogger()) indexBuf := bytes.NewBuffer(nil) bloomsBuf := bytes.NewBuffer(nil) writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) @@ -350,6 +351,8 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) { // We're not testing the ability to extend a bloom in this test pop := func(_ *Series, _ iter.SizedIterator[*Bloom], _ ChunkRefs, ch chan *BloomCreation) { bloom := NewBloom() + // Add something to the bloom so it's not empty + bloom.Add([]byte("hello")) stats := indexingInfo{ sourceBytes: int(bloom.Capacity()) / 8, indexedFields: NewSetFromLiteral[Field]("__all__"), @@ -367,6 +370,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) { iter.NewSliceIter(data), pop, NewMetrics(nil), + log.NewNopLogger(), ) _, _, err = mergeBuilder.Build(builder) @@ -539,6 +543,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) { dedupedStore, pop, NewMetrics(nil), + log.NewNopLogger(), ) builder, err := NewBlockBuilder(blockOpts, writer) require.Nil(t, err) diff --git a/pkg/storage/bloom/v1/fuse.go b/pkg/storage/bloom/v1/fuse.go index 147a81502c336..b25743ea4c541 100644 --- a/pkg/storage/bloom/v1/fuse.go +++ b/pkg/storage/bloom/v1/fuse.go @@ -32,6 +32,8 @@ func NewBloomRecorder(ctx context.Context, id string) *BloomRecorder { chunksSkipped: atomic.NewInt64(0), seriesMissed: atomic.NewInt64(0), chunksMissed: atomic.NewInt64(0), + seriesEmpty: atomic.NewInt64(0), + chunksEmpty: atomic.NewInt64(0), chunksFiltered: atomic.NewInt64(0), } } @@ -45,6 +47,8 @@ type BloomRecorder struct { seriesSkipped, chunksSkipped *atomic.Int64 // not found in bloom seriesMissed, chunksMissed *atomic.Int64 + // exists in block index but empty offsets + seriesEmpty, chunksEmpty *atomic.Int64 // filtered out chunksFiltered *atomic.Int64 } @@ -56,6 +60,8 @@ func (r *BloomRecorder) Merge(other *BloomRecorder) { r.chunksSkipped.Add(other.chunksSkipped.Load()) r.seriesMissed.Add(other.seriesMissed.Load()) r.chunksMissed.Add(other.chunksMissed.Load()) + r.seriesEmpty.Add(other.seriesEmpty.Load()) + r.chunksEmpty.Add(other.chunksEmpty.Load()) r.chunksFiltered.Add(other.chunksFiltered.Load()) } @@ -66,13 +72,15 @@ func (r *BloomRecorder) Report(logger log.Logger, metrics *Metrics) { seriesFound = r.seriesFound.Load() seriesSkipped = r.seriesSkipped.Load() seriesMissed = r.seriesMissed.Load() - seriesRequested = seriesFound + seriesSkipped + seriesMissed + seriesEmpty = r.seriesEmpty.Load() + seriesRequested = seriesFound + seriesSkipped + seriesMissed + seriesEmpty chunksFound = r.chunksFound.Load() chunksSkipped = r.chunksSkipped.Load() chunksMissed = r.chunksMissed.Load() chunksFiltered = r.chunksFiltered.Load() - chunksRequested = chunksFound + chunksSkipped + chunksMissed + chunksEmpty = r.chunksEmpty.Load() + chunksRequested = chunksFound + chunksSkipped + chunksMissed + chunksEmpty ) level.Debug(logger).Log( "recorder_msg", "bloom search results", @@ -82,11 +90,13 @@ func (r *BloomRecorder) Report(logger log.Logger, metrics *Metrics) { "recorder_series_found", seriesFound, "recorder_series_skipped", seriesSkipped, "recorder_series_missed", seriesMissed, + "recorder_series_empty", seriesEmpty, "recorder_chunks_requested", chunksRequested, "recorder_chunks_found", chunksFound, "recorder_chunks_skipped", chunksSkipped, "recorder_chunks_missed", chunksMissed, + "recorder_chunks_empty", chunksEmpty, "recorder_chunks_filtered", chunksFiltered, ) @@ -94,25 +104,27 @@ func (r *BloomRecorder) Report(logger log.Logger, metrics *Metrics) { metrics.recorderSeries.WithLabelValues(recorderRequested).Add(float64(seriesRequested)) metrics.recorderSeries.WithLabelValues(recorderFound).Add(float64(seriesFound)) metrics.recorderSeries.WithLabelValues(recorderSkipped).Add(float64(seriesSkipped)) + metrics.recorderSeries.WithLabelValues(recorderEmpty).Add(float64(seriesEmpty)) metrics.recorderSeries.WithLabelValues(recorderMissed).Add(float64(seriesMissed)) metrics.recorderChunks.WithLabelValues(recorderRequested).Add(float64(chunksRequested)) metrics.recorderChunks.WithLabelValues(recorderFound).Add(float64(chunksFound)) metrics.recorderChunks.WithLabelValues(recorderSkipped).Add(float64(chunksSkipped)) metrics.recorderChunks.WithLabelValues(recorderMissed).Add(float64(chunksMissed)) + metrics.recorderChunks.WithLabelValues(recorderEmpty).Add(float64(chunksEmpty)) metrics.recorderChunks.WithLabelValues(recorderFiltered).Add(float64(chunksFiltered)) } } -func (r *BloomRecorder) record( - seriesFound, chunksFound, seriesSkipped, chunksSkipped, seriesMissed, chunksMissed, chunksFiltered int, -) { +func (r *BloomRecorder) record(seriesFound, chunksFound, seriesSkipped, chunksSkipped, seriesMissed, chunksMissed, seriesEmpty, chunksEmpty, chunksFiltered int) { r.seriesFound.Add(int64(seriesFound)) r.chunksFound.Add(int64(chunksFound)) r.seriesSkipped.Add(int64(seriesSkipped)) r.chunksSkipped.Add(int64(chunksSkipped)) r.seriesMissed.Add(int64(seriesMissed)) r.chunksMissed.Add(int64(chunksMissed)) + r.seriesEmpty.Add(int64(seriesEmpty)) + r.chunksEmpty.Add(int64(chunksEmpty)) r.chunksFiltered.Add(int64(chunksFiltered)) } @@ -170,6 +182,7 @@ func (fq *FusedQuerier) recordMissingFp( 0, 0, // found 0, 0, // skipped 1, len(input.Chks), // missed + 0, 0, // empty 0, // chunks filtered ) }) @@ -184,6 +197,22 @@ func (fq *FusedQuerier) recordSkippedFp( 0, 0, // found 1, len(input.Chks), // skipped 0, 0, // missed + 0, 0, // empty + 0, // chunks filtered + ) + }) +} + +func (fq *FusedQuerier) recordEmptyFp( + batch []Request, + fp model.Fingerprint, +) { + fq.noRemovals(batch, fp, func(input Request) { + input.Recorder.record( + 0, 0, // found + 0, 0, // skipped + 0, 0, // missed + 1, len(input.Chks), // empty 0, // chunks filtered ) }) @@ -280,6 +309,19 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque }) } + if len(series.Offsets) == 0 { + // We end up here for series with no structured metadata fields. + // While building blooms, these series would yield empty blooms. + // We add these series to the index of the block so we don't report them as missing, + // but we don't filter any chunks for them. + level.Debug(fq.logger).Log( + "msg", "series with empty offsets", + "fp", series.Fingerprint, + ) + fq.recordEmptyFp(reqs, series.Fingerprint) + return + } + for i, offset := range series.Offsets { skip := fq.bq.blooms.LoadOffset(offset) if skip { @@ -361,6 +403,7 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque 1, len(inputs[i].InBlooms), // found 0, 0, // skipped 0, len(inputs[i].Missing), // missed + 0, 0, // empty len(removals), // filtered ) req.Response <- Output{ diff --git a/pkg/storage/bloom/v1/index.go b/pkg/storage/bloom/v1/index.go index a9e03efc41af9..8be1a45d35c21 100644 --- a/pkg/storage/bloom/v1/index.go +++ b/pkg/storage/bloom/v1/index.go @@ -153,7 +153,8 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader { fromFp, _ := xs[0].Bounds.Bounds() _, throughFP := xs[len(xs)-1].Bounds.Bounds() res := SeriesHeader{ - Bounds: NewBounds(fromFp, throughFP), + NumSeries: len(xs), + Bounds: NewBounds(fromFp, throughFP), } for i, x := range xs { diff --git a/pkg/storage/bloom/v1/index_builder.go b/pkg/storage/bloom/v1/index_builder.go index 067a79ad03f4e..9703177f1200b 100644 --- a/pkg/storage/bloom/v1/index_builder.go +++ b/pkg/storage/bloom/v1/index_builder.go @@ -35,6 +35,10 @@ func NewIndexBuilder(opts BlockOptions, writer io.WriteCloser) *IndexBuilder { } } +func (b *IndexBuilder) UnflushedSize() int { + return b.scratch.Len() + b.page.UnflushedSize() +} + func (b *IndexBuilder) WriteOpts() error { b.scratch.Reset() b.opts.Encode(b.scratch) diff --git a/pkg/storage/bloom/v1/metrics.go b/pkg/storage/bloom/v1/metrics.go index e2ce99a4702d1..0ad86848df1b1 100644 --- a/pkg/storage/bloom/v1/metrics.go +++ b/pkg/storage/bloom/v1/metrics.go @@ -56,6 +56,7 @@ const ( recorderFound = "found" recorderSkipped = "skipped" recorderMissed = "missed" + recorderEmpty = "empty" recorderFiltered = "filtered" ) diff --git a/pkg/storage/bloom/v1/test_util.go b/pkg/storage/bloom/v1/test_util.go index c2f1f3e8f8d30..f040ab4297282 100644 --- a/pkg/storage/bloom/v1/test_util.go +++ b/pkg/storage/bloom/v1/test_util.go @@ -132,9 +132,11 @@ func CompareIterators[A, B any]( a iter.Iterator[A], b iter.Iterator[B], ) { + var i int for a.Next() { - require.True(t, b.Next()) + require.Truef(t, b.Next(), "'a' has %dth element but 'b' does not'", i) f(t, a.At(), b.At()) + i++ } require.False(t, b.Next()) require.NoError(t, a.Err()) diff --git a/pkg/storage/bloom/v1/versioned_builder.go b/pkg/storage/bloom/v1/versioned_builder.go index 8844ddf43eb11..960c6bcdde928 100644 --- a/pkg/storage/bloom/v1/versioned_builder.go +++ b/pkg/storage/bloom/v1/versioned_builder.go @@ -125,10 +125,35 @@ func (b *V3Builder) AddSeries(series Series, offsets []BloomOffset, fields Set[F return false, errors.Wrapf(err, "writing index for series %v", series.Fingerprint) } - full, _, err := b.writer.Full(b.opts.BlockSize) + full, err := b.full() if err != nil { return false, errors.Wrap(err, "checking if block is full") } return full, nil } + +func (b *V3Builder) full() (bool, error) { + if b.opts.BlockSize == 0 { + // Unlimited block size + return false, nil + } + + full, writtenSize, err := b.writer.Full(b.opts.BlockSize) + if err != nil { + return false, errors.Wrap(err, "checking if block writer is full") + } + if full { + return true, nil + } + + // Even if the block writer is not full, we may have unflushed data in the bloom builders. + // Check if by flushing these, we would exceed the block size. + unflushedIndexSize := b.index.UnflushedSize() + unflushedBloomSize := b.blooms.UnflushedSize() + if uint64(writtenSize+unflushedIndexSize+unflushedBloomSize) > b.opts.BlockSize { + return true, nil + } + + return false, nil +} diff --git a/pkg/storage/bloom/v1/versioned_builder_test.go b/pkg/storage/bloom/v1/versioned_builder_test.go index 1e4a0f5a93b26..6d2cc621be459 100644 --- a/pkg/storage/bloom/v1/versioned_builder_test.go +++ b/pkg/storage/bloom/v1/versioned_builder_test.go @@ -4,6 +4,7 @@ import ( "bytes" "testing" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" "github.com/grafana/loki/v3/pkg/compression" @@ -17,7 +18,7 @@ import ( func smallBlockOpts(v Version, enc compression.Codec) BlockOptions { return BlockOptions{ Schema: NewSchema(v, enc), - SeriesPageSize: 100, + SeriesPageSize: 4 << 10, BloomPageSize: 2 << 10, BlockSize: 0, // unlimited } @@ -78,3 +79,103 @@ func TestV3Roundtrip(t *testing.T) { querier, ) } + +func seriesWithBlooms(nSeries int, fromFp, throughFp model.Fingerprint) []SeriesWithBlooms { + series, _ := MkBasicSeriesWithBlooms(nSeries, fromFp, throughFp, 0, 10000) + return series +} + +func seriesWithoutBlooms(nSeries int, fromFp, throughFp model.Fingerprint) []SeriesWithBlooms { + series := seriesWithBlooms(nSeries, fromFp, throughFp) + + // remove blooms from series + for i := range series { + series[i].Blooms = v2.NewEmptyIter[*Bloom]() + } + + return series +} +func TestFullBlock(t *testing.T) { + opts := smallBlockOpts(V3, compression.None) + minBlockSize := opts.SeriesPageSize // 1 index page, 4KB + const maxEmptySeriesPerBlock = 47 + for _, tc := range []struct { + name string + maxBlockSize uint64 + series []SeriesWithBlooms + expected []SeriesWithBlooms + }{ + { + name: "only series without blooms", + maxBlockSize: minBlockSize, + // +1 so we test adding the last series that fills the block + series: seriesWithoutBlooms(maxEmptySeriesPerBlock+1, 0, 0xffff), + expected: seriesWithoutBlooms(maxEmptySeriesPerBlock+1, 0, 0xffff), + }, + { + name: "series without blooms and one with blooms", + maxBlockSize: minBlockSize, + series: append( + seriesWithoutBlooms(maxEmptySeriesPerBlock, 0, 0x7fff), + seriesWithBlooms(50, 0x8000, 0xffff)..., + ), + expected: append( + seriesWithoutBlooms(maxEmptySeriesPerBlock, 0, 0x7fff), + seriesWithBlooms(1, 0x8000, 0x8001)..., + ), + }, + { + name: "only one series with bloom", + maxBlockSize: minBlockSize, + series: seriesWithBlooms(10, 0, 0xffff), + expected: seriesWithBlooms(1, 0, 1), + }, + { + name: "one huge series with bloom and then series without", + maxBlockSize: minBlockSize, + series: append( + seriesWithBlooms(1, 0, 1), + seriesWithoutBlooms(100, 1, 0xffff)..., + ), + expected: seriesWithBlooms(1, 0, 1), + }, + { + name: "big block", + maxBlockSize: 1 << 20, // 1MB + series: seriesWithBlooms(100, 0, 0xffff), + expected: seriesWithBlooms(100, 0, 0xffff), + }, + } { + t.Run(tc.name, func(t *testing.T) { + indexBuf := bytes.NewBuffer(nil) + bloomsBuf := bytes.NewBuffer(nil) + writer := NewMemoryBlockWriter(indexBuf, bloomsBuf) + reader := NewByteReader(indexBuf, bloomsBuf) + opts.BlockSize = tc.maxBlockSize + + b, err := NewBlockBuilderV3(opts, writer) + require.NoError(t, err) + + _, err = b.BuildFrom(v2.NewSliceIter(tc.series)) + require.NoError(t, err) + + block := NewBlock(reader, NewMetrics(nil)) + querier := NewBlockQuerier(block, &mempool.SimpleHeapAllocator{}, DefaultMaxPageSize).Iter() + + CompareIterators( + t, + func(t *testing.T, a SeriesWithBlooms, b *SeriesWithBlooms) { + require.Equal(t, a.Series.Fingerprint, b.Series.Fingerprint) + require.ElementsMatch(t, a.Series.Chunks, b.Series.Chunks) + bloomsA, err := v2.Collect(a.Blooms) + require.NoError(t, err) + bloomsB, err := v2.Collect(b.Blooms) + require.NoError(t, err) + require.Equal(t, len(bloomsB), len(bloomsA)) + }, + v2.NewSliceIter(tc.expected), + querier, + ) + }) + } +} diff --git a/pkg/storage/stores/shipper/bloomshipper/client.go b/pkg/storage/stores/shipper/bloomshipper/client.go index 1c66e500a6b9c..627016c63c025 100644 --- a/pkg/storage/stores/shipper/bloomshipper/client.go +++ b/pkg/storage/stores/shipper/bloomshipper/client.go @@ -225,13 +225,16 @@ func newBlockRefWithEncoding(ref Ref, enc compression.Codec) BlockRef { } func BlockFrom(enc compression.Codec, tenant, table string, blk *v1.Block) (Block, error) { - md, _ := blk.Metadata() + md, err := blk.Metadata() + if err != nil { + return Block{}, errors.Wrap(err, "decoding index") + } + ref := newBlockRefWithEncoding(newRefFrom(tenant, table, md), enc) // TODO(owen-d): pool buf := bytes.NewBuffer(nil) - err := v1.TarCompress(ref.Codec, buf, blk.Reader()) - + err = v1.TarCompress(ref.Codec, buf, blk.Reader()) if err != nil { return Block{}, err }