Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Do not add empty blooms to offsets #14577

Merged
merged 4 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 30 additions & 17 deletions integration/bloom_building_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,7 @@ func TestBloomBuilding(t *testing.T) {
cliIngester.Now = now

// We now ingest some logs across many series.
series := make([]labels.Labels, 0, nSeries)
for i := 0; i < nSeries; i++ {
lbs := labels.FromStrings("job", fmt.Sprintf("job-%d", i))
series = append(series, lbs)

for j := 0; j < nLogsPerSeries; j++ {
require.NoError(t, cliDistributor.PushLogLine(fmt.Sprintf("log line %d", j), now, nil, lbs.Map()))
}
}
series := writeSeries(t, nSeries, nLogsPerSeries, cliDistributor, now, "job")

// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())
Expand Down Expand Up @@ -122,14 +114,8 @@ func TestBloomBuilding(t *testing.T) {
checkSeriesInBlooms(t, now, tenantID, bloomStore, series)

// Push some more logs so TSDBs need to be updated.
for i := 0; i < nSeries; i++ {
lbs := labels.FromStrings("job", fmt.Sprintf("job-new-%d", i))
series = append(series, lbs)

for j := 0; j < nLogsPerSeries; j++ {
require.NoError(t, cliDistributor.PushLogLine(fmt.Sprintf("log line %d", j), now, nil, lbs.Map()))
}
}
newSeries := writeSeries(t, nSeries, nLogsPerSeries, cliDistributor, now, "job-new")
series = append(series, newSeries...)

// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())
Expand All @@ -145,6 +131,33 @@ func TestBloomBuilding(t *testing.T) {
checkSeriesInBlooms(t, now, tenantID, bloomStore, series)
}

func writeSeries(t *testing.T, nSeries int, nLogsPerSeries int, cliDistributor *client.Client, now time.Time, seriesPrefix string) []labels.Labels {
series := make([]labels.Labels, 0, nSeries)
for i := 0; i < nSeries; i++ {
lbs := labels.FromStrings("job", fmt.Sprintf("%s-%d", seriesPrefix, i))
series = append(series, lbs)

for j := 0; j < nLogsPerSeries; j++ {
// Only write wtructured metadata for half of the series
var metadata map[string]string
if i%2 == 0 {
metadata = map[string]string{
"traceID": fmt.Sprintf("%d%d", i, j),
"user": fmt.Sprintf("%d%d", i, j%10),
}
}

require.NoError(t, cliDistributor.PushLogLine(
fmt.Sprintf("log line %d", j),
now,
metadata,
lbs.Map(),
))
}
}
return series
}

func checkCompactionFinished(t *testing.T, cliCompactor *client.Client) {
checkForTimestampMetric(t, cliCompactor, "loki_boltdb_shipper_compact_tables_operation_last_successful_run_timestamp_seconds")
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/bloombuild/builder/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *SimpleBloomGenerator) Generate(ctx context.Context) *LazyBlockBuilderIt
)
}

return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter)
return NewLazyBlockBuilderIterator(ctx, s.opts, s.metrics, s.logger, s.populator(ctx), s.writerReaderFunc, series, s.blocksIter)
}

// LazyBlockBuilderIterator is a lazy iterator over blocks that builds
Expand All @@ -146,6 +146,7 @@ type LazyBlockBuilderIterator struct {
ctx context.Context
opts v1.BlockOptions
metrics *v1.Metrics
logger log.Logger
populate v1.BloomPopulatorFunc
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader)
series iter.PeekIterator[*v1.Series]
Expand All @@ -160,6 +161,7 @@ func NewLazyBlockBuilderIterator(
ctx context.Context,
opts v1.BlockOptions,
metrics *v1.Metrics,
logger log.Logger,
populate v1.BloomPopulatorFunc,
writerReaderFunc func() (v1.BlockWriter, v1.BlockReader),
series iter.PeekIterator[*v1.Series],
Expand All @@ -169,6 +171,7 @@ func NewLazyBlockBuilderIterator(
ctx: ctx,
opts: opts,
metrics: metrics,
logger: logger,
populate: populate,
writerReaderFunc: writerReaderFunc,
series: series,
Expand Down Expand Up @@ -196,7 +199,7 @@ func (b *LazyBlockBuilderIterator) Next() bool {
return false
}

mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics)
mergeBuilder := v1.NewMergeBuilder(b.blocks, b.series, b.populate, b.metrics, b.logger)
writer, reader := b.writerReaderFunc()
blockBuilder, err := v1.NewBlockBuilder(b.opts, writer)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/bloom/v1/bloom_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func NewBloomBlockBuilder(opts BlockOptions, writer io.WriteCloser) *BloomBlockB
}
}

func (b *BloomBlockBuilder) UnflushedSize() int {
return b.scratch.Len() + b.page.UnflushedSize()
}

func (b *BloomBlockBuilder) Append(bloom *Bloom) (BloomOffset, error) {
if !b.writtenSchema {
if err := b.writeSchema(); err != nil {
Expand Down Expand Up @@ -68,6 +72,14 @@ func (b *BloomBlockBuilder) writeSchema() error {
}

func (b *BloomBlockBuilder) Close() (uint32, error) {
if !b.writtenSchema {
// We will get here only if we haven't appended any bloom filters to the block
// This would happen only if all series yielded empty blooms
if err := b.writeSchema(); err != nil {
return 0, errors.Wrap(err, "writing schema")
}
}

if b.page.Count() > 0 {
if err := b.flushPage(); err != nil {
return 0, errors.Wrap(err, "flushing final bloom page")
Expand Down
15 changes: 15 additions & 0 deletions pkg/storage/bloom/v1/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"hash"
"io"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"

"github.com/grafana/loki/v3/pkg/compression"
Expand Down Expand Up @@ -112,6 +114,10 @@ func (w *PageWriter) Reset() {
w.n = 0
}

func (w *PageWriter) UnflushedSize() int {
return w.enc.Len()
}

func (w *PageWriter) SpaceFor(numBytes int) bool {
// if a single bloom exceeds the target size, still accept it
// otherwise only accept it if adding it would not exceed the target size
Expand Down Expand Up @@ -189,6 +195,7 @@ type MergeBuilder struct {
// Add chunks of a single series to a bloom
populate BloomPopulatorFunc
metrics *Metrics
logger log.Logger
}

type BloomPopulatorFunc func(series *Series, preExistingBlooms iter.SizedIterator[*Bloom], chunksToAdd ChunkRefs, ch chan *BloomCreation)
Expand All @@ -202,6 +209,7 @@ func NewMergeBuilder(
store iter.Iterator[*Series],
populate BloomPopulatorFunc,
metrics *Metrics,
logger log.Logger,
) *MergeBuilder {
// combinedSeriesIter handles series with fingerprint collisions:
// because blooms dont contain the label-set (only the fingerprint),
Expand Down Expand Up @@ -229,6 +237,7 @@ func NewMergeBuilder(
store: combinedSeriesIter,
populate: populate,
metrics: metrics,
logger: logger,
}
}

Expand Down Expand Up @@ -306,6 +315,12 @@ func (mb *MergeBuilder) processNextSeries(
if creation.Err != nil {
return nil, info.sourceBytes, 0, false, false, errors.Wrap(creation.Err, "populating bloom")
}

if creation.Bloom.IsEmpty() {
level.Debug(mb.logger).Log("msg", "received empty bloom. Adding to index but skipping offsets", "fingerprint", nextInStore.Fingerprint)
continue
}

offset, err := builder.AddBloom(creation.Bloom)
if err != nil {
return nil, info.sourceBytes, 0, false, false, errors.Wrapf(
Expand Down
7 changes: 6 additions & 1 deletion pkg/storage/bloom/v1/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sort"
"testing"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -263,7 +264,7 @@ func TestMergeBuilder(t *testing.T) {
)

// Ensure that the merge builder combines all the blocks correctly
mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, populate, NewMetrics(nil))
mergeBuilder := NewMergeBuilder(dedupedBlocks(blocks), storeItr, populate, NewMetrics(nil), log.NewNopLogger())
indexBuf := bytes.NewBuffer(nil)
bloomsBuf := bytes.NewBuffer(nil)
writer := NewMemoryBlockWriter(indexBuf, bloomsBuf)
Expand Down Expand Up @@ -350,6 +351,8 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) {
// We're not testing the ability to extend a bloom in this test
pop := func(_ *Series, _ iter.SizedIterator[*Bloom], _ ChunkRefs, ch chan *BloomCreation) {
bloom := NewBloom()
// Add something to the bloom so it's not empty
bloom.Add([]byte("hello"))
stats := indexingInfo{
sourceBytes: int(bloom.Capacity()) / 8,
indexedFields: NewSetFromLiteral[Field]("__all__"),
Expand All @@ -367,6 +370,7 @@ func TestMergeBuilderFingerprintCollision(t *testing.T) {
iter.NewSliceIter(data),
pop,
NewMetrics(nil),
log.NewNopLogger(),
)

_, _, err = mergeBuilder.Build(builder)
Expand Down Expand Up @@ -539,6 +543,7 @@ func TestMergeBuilder_Roundtrip(t *testing.T) {
dedupedStore,
pop,
NewMetrics(nil),
log.NewNopLogger(),
)
builder, err := NewBlockBuilder(blockOpts, writer)
require.Nil(t, err)
Expand Down
53 changes: 48 additions & 5 deletions pkg/storage/bloom/v1/fuse.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ func NewBloomRecorder(ctx context.Context, id string) *BloomRecorder {
chunksSkipped: atomic.NewInt64(0),
seriesMissed: atomic.NewInt64(0),
chunksMissed: atomic.NewInt64(0),
seriesEmpty: atomic.NewInt64(0),
chunksEmpty: atomic.NewInt64(0),
chunksFiltered: atomic.NewInt64(0),
}
}
Expand All @@ -45,6 +47,8 @@ type BloomRecorder struct {
seriesSkipped, chunksSkipped *atomic.Int64
// not found in bloom
seriesMissed, chunksMissed *atomic.Int64
// exists in block index but empty offsets
seriesEmpty, chunksEmpty *atomic.Int64
// filtered out
chunksFiltered *atomic.Int64
}
Expand All @@ -56,6 +60,8 @@ func (r *BloomRecorder) Merge(other *BloomRecorder) {
r.chunksSkipped.Add(other.chunksSkipped.Load())
r.seriesMissed.Add(other.seriesMissed.Load())
r.chunksMissed.Add(other.chunksMissed.Load())
r.seriesEmpty.Add(other.seriesEmpty.Load())
r.chunksEmpty.Add(other.chunksEmpty.Load())
r.chunksFiltered.Add(other.chunksFiltered.Load())
}

Expand All @@ -66,13 +72,15 @@ func (r *BloomRecorder) Report(logger log.Logger, metrics *Metrics) {
seriesFound = r.seriesFound.Load()
seriesSkipped = r.seriesSkipped.Load()
seriesMissed = r.seriesMissed.Load()
seriesRequested = seriesFound + seriesSkipped + seriesMissed
seriesEmpty = r.seriesEmpty.Load()
seriesRequested = seriesFound + seriesSkipped + seriesMissed + seriesEmpty

chunksFound = r.chunksFound.Load()
chunksSkipped = r.chunksSkipped.Load()
chunksMissed = r.chunksMissed.Load()
chunksFiltered = r.chunksFiltered.Load()
chunksRequested = chunksFound + chunksSkipped + chunksMissed
chunksEmpty = r.chunksEmpty.Load()
chunksRequested = chunksFound + chunksSkipped + chunksMissed + chunksEmpty
)
level.Debug(logger).Log(
"recorder_msg", "bloom search results",
Expand All @@ -82,37 +90,41 @@ func (r *BloomRecorder) Report(logger log.Logger, metrics *Metrics) {
"recorder_series_found", seriesFound,
"recorder_series_skipped", seriesSkipped,
"recorder_series_missed", seriesMissed,
"recorder_series_empty", seriesEmpty,

"recorder_chunks_requested", chunksRequested,
"recorder_chunks_found", chunksFound,
"recorder_chunks_skipped", chunksSkipped,
"recorder_chunks_missed", chunksMissed,
"recorder_chunks_empty", chunksEmpty,
"recorder_chunks_filtered", chunksFiltered,
)

if metrics != nil {
metrics.recorderSeries.WithLabelValues(recorderRequested).Add(float64(seriesRequested))
metrics.recorderSeries.WithLabelValues(recorderFound).Add(float64(seriesFound))
metrics.recorderSeries.WithLabelValues(recorderSkipped).Add(float64(seriesSkipped))
metrics.recorderSeries.WithLabelValues(recorderEmpty).Add(float64(seriesEmpty))
metrics.recorderSeries.WithLabelValues(recorderMissed).Add(float64(seriesMissed))

metrics.recorderChunks.WithLabelValues(recorderRequested).Add(float64(chunksRequested))
metrics.recorderChunks.WithLabelValues(recorderFound).Add(float64(chunksFound))
metrics.recorderChunks.WithLabelValues(recorderSkipped).Add(float64(chunksSkipped))
metrics.recorderChunks.WithLabelValues(recorderMissed).Add(float64(chunksMissed))
metrics.recorderChunks.WithLabelValues(recorderEmpty).Add(float64(chunksEmpty))
metrics.recorderChunks.WithLabelValues(recorderFiltered).Add(float64(chunksFiltered))
}
}

func (r *BloomRecorder) record(
seriesFound, chunksFound, seriesSkipped, chunksSkipped, seriesMissed, chunksMissed, chunksFiltered int,
) {
func (r *BloomRecorder) record(seriesFound, chunksFound, seriesSkipped, chunksSkipped, seriesMissed, chunksMissed, seriesEmpty, chunksEmpty, chunksFiltered int) {
r.seriesFound.Add(int64(seriesFound))
r.chunksFound.Add(int64(chunksFound))
r.seriesSkipped.Add(int64(seriesSkipped))
r.chunksSkipped.Add(int64(chunksSkipped))
r.seriesMissed.Add(int64(seriesMissed))
r.chunksMissed.Add(int64(chunksMissed))
r.seriesEmpty.Add(int64(seriesEmpty))
r.chunksEmpty.Add(int64(chunksEmpty))
r.chunksFiltered.Add(int64(chunksFiltered))
}

Expand Down Expand Up @@ -170,6 +182,7 @@ func (fq *FusedQuerier) recordMissingFp(
0, 0, // found
0, 0, // skipped
1, len(input.Chks), // missed
0, 0, // empty
0, // chunks filtered
)
})
Expand All @@ -184,6 +197,22 @@ func (fq *FusedQuerier) recordSkippedFp(
0, 0, // found
1, len(input.Chks), // skipped
0, 0, // missed
0, 0, // empty
0, // chunks filtered
)
})
}

func (fq *FusedQuerier) recordEmptyFp(
batch []Request,
fp model.Fingerprint,
) {
fq.noRemovals(batch, fp, func(input Request) {
input.Recorder.record(
0, 0, // found
0, 0, // skipped
0, 0, // missed
1, len(input.Chks), // empty
0, // chunks filtered
)
})
Expand Down Expand Up @@ -280,6 +309,19 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque
})
}

if len(series.Offsets) == 0 {
// We end up here for series with no structured metadata fields.
// While building blooms, these series would yield empty blooms.
// We add these series to the index of the block so we don't report them as missing,
// but we don't filter any chunks for them.
level.Debug(fq.logger).Log(
"msg", "series with empty offsets",
"fp", series.Fingerprint,
)
fq.recordEmptyFp(reqs, series.Fingerprint)
return
}

for i, offset := range series.Offsets {
skip := fq.bq.blooms.LoadOffset(offset)
if skip {
Expand Down Expand Up @@ -361,6 +403,7 @@ func (fq *FusedQuerier) runSeries(_ Schema, series *SeriesWithMeta, reqs []Reque
1, len(inputs[i].InBlooms), // found
0, 0, // skipped
0, len(inputs[i].Missing), // missed
0, 0, // empty
len(removals), // filtered
)
req.Response <- Output{
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/bloom/v1/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ func aggregateHeaders(xs []SeriesHeader) SeriesHeader {
fromFp, _ := xs[0].Bounds.Bounds()
_, throughFP := xs[len(xs)-1].Bounds.Bounds()
res := SeriesHeader{
Bounds: NewBounds(fromFp, throughFP),
NumSeries: len(xs),
Bounds: NewBounds(fromFp, throughFP),
}

for i, x := range xs {
Expand Down
Loading
Loading