From c36b1142c7acd6a13a3634ddbef71254040cff73 Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Wed, 24 Apr 2024 13:23:42 +0200 Subject: [PATCH] fix(blooms): Correctly return unfiltered chunks for series that are not mapped to any block (#12774) This PR fixes a conceptual mistake in the code of resolving blocks on the index gateways. Currently, a series does not resolve to any block is discarded instead of being kept for the response. This change adds the chunks of the skipped series to the bloom querier response. Signed-off-by: Christian Haudum --- pkg/bloomgateway/querier.go | 5 +- pkg/bloomgateway/querier_test.go | 4 +- pkg/bloomgateway/resolver.go | 35 ++++++- pkg/bloomgateway/resolver_test.go | 156 ++++++++++++++++++++++++++++-- 4 files changed, 184 insertions(+), 16 deletions(-) diff --git a/pkg/bloomgateway/querier.go b/pkg/bloomgateway/querier.go index bbb9f7495d8ef..a6209f9ccf34a 100644 --- a/pkg/bloomgateway/querier.go +++ b/pkg/bloomgateway/querier.go @@ -99,7 +99,7 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from // only covers a single day, and if not, it's at most two days. for _, s := range partitionSeriesByDay(from, through, grouped) { day := bloomshipper.NewInterval(s.day.Time, s.day.Time.Add(Day)) - blocks, err := bq.blockResolver.Resolve(ctx, tenant, day, s.series) + blocks, skipped, err := bq.blockResolver.Resolve(ctx, tenant, day, s.series) if err != nil { return nil, err } @@ -121,6 +121,9 @@ func (bq *BloomQuerier) FilterChunkRefs(ctx context.Context, tenant string, from return nil, err } + // add chunk refs from series that were not mapped to any blocks + refs = append(refs, skipped...) + for i := range refs { seriesSeen[refs[i].Fingerprint] = struct{}{} for _, ref := range refs[i].Refs { diff --git a/pkg/bloomgateway/querier_test.go b/pkg/bloomgateway/querier_test.go index a27d90a021241..516f1cd403bb3 100644 --- a/pkg/bloomgateway/querier_test.go +++ b/pkg/bloomgateway/querier_test.go @@ -40,7 +40,7 @@ func (c *noopClient) FilterChunks(_ context.Context, _ string, _ bloomshipper.In type mockBlockResolver struct{} // Resolve implements BlockResolver. -func (*mockBlockResolver) Resolve(_ context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, error) { +func (*mockBlockResolver) Resolve(_ context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error) { day := truncateDay(interval.Start) first, last := getFirstLast(series) block := bloomshipper.BlockRef{ @@ -53,7 +53,7 @@ func (*mockBlockResolver) Resolve(_ context.Context, tenant string, interval blo Checksum: 0, }, } - return []blockWithSeries{{block: block, series: series}}, nil + return []blockWithSeries{{block: block, series: series}}, nil, nil } var _ BlockResolver = &mockBlockResolver{} diff --git a/pkg/bloomgateway/resolver.go b/pkg/bloomgateway/resolver.go index 3c5d8853d9ab5..c10ebc33dff3e 100644 --- a/pkg/bloomgateway/resolver.go +++ b/pkg/bloomgateway/resolver.go @@ -15,7 +15,7 @@ import ( ) type BlockResolver interface { - Resolve(context.Context, string, bloomshipper.Interval, []*logproto.GroupedChunkRefs) ([]blockWithSeries, error) + Resolve(context.Context, string, bloomshipper.Interval, []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error) } type blockWithSeries struct { @@ -28,7 +28,7 @@ type defaultBlockResolver struct { logger log.Logger } -func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, error) { +func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error) { minFp, maxFp := getFirstLast(series) metaSearch := bloomshipper.MetaSearchParams{ TenantID: tenant, @@ -52,10 +52,12 @@ func (r *defaultBlockResolver) Resolve(ctx context.Context, tenant string, inter ) if err != nil { - return nil, err + return nil, series, err } - return blocksMatchingSeries(metas, interval, series), nil + mapped := blocksMatchingSeries(metas, interval, series) + skipped := unassignedSeries(mapped, series) + return mapped, skipped, nil } func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) []blockWithSeries { @@ -96,6 +98,31 @@ func blocksMatchingSeries(metas []bloomshipper.Meta, interval bloomshipper.Inter return result } +func unassignedSeries(mapped []blockWithSeries, series []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs { + skipped := make([]*logproto.GroupedChunkRefs, len(series)) + _ = copy(skipped, series) + + for _, block := range mapped { + minFp, maxFp := getFirstLast(block.series) + + minIdx := sort.Search(len(skipped), func(i int) bool { + return skipped[i].Fingerprint >= minFp.Fingerprint + }) + + maxIdx := sort.Search(len(skipped), func(i int) bool { + return skipped[i].Fingerprint >= maxFp.Fingerprint + }) + + if minIdx == len(skipped) || maxIdx == 0 || minIdx == maxIdx { + continue + } + + skipped = append(skipped[0:minIdx], skipped[maxIdx+1:]...) + } + + return skipped +} + func NewBlockResolver(store bloomshipper.Store, logger log.Logger) BlockResolver { return &defaultBlockResolver{ store: store, diff --git a/pkg/bloomgateway/resolver_test.go b/pkg/bloomgateway/resolver_test.go index a2cd422e1594f..7214537d6885c 100644 --- a/pkg/bloomgateway/resolver_test.go +++ b/pkg/bloomgateway/resolver_test.go @@ -11,18 +11,22 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" ) +func makeBlockRef(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshipper.BlockRef { + return bloomshipper.BlockRef{ + Ref: bloomshipper.Ref{ + TenantID: "tenant", + TableName: "table", + Bounds: v1.NewBounds(minFp, maxFp), + StartTimestamp: from, + EndTimestamp: through, + }, + } +} + func makeMeta(minFp, maxFp model.Fingerprint, from, through model.Time) bloomshipper.Meta { return bloomshipper.Meta{ Blocks: []bloomshipper.BlockRef{ - { - Ref: bloomshipper.Ref{ - TenantID: "tenant", - TableName: "table", - Bounds: v1.NewBounds(minFp, maxFp), - StartTimestamp: from, - EndTimestamp: through, - }, - }, + makeBlockRef(minFp, maxFp, from, through), }, } } @@ -113,3 +117,137 @@ func TestBlockResolver_BlocksMatchingSeries(t *testing.T) { require.Equal(t, expected, res) }) } + +func TestBlockResolver_UnassignedSeries(t *testing.T) { + series := []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x00}, + {Fingerprint: 0x20}, + {Fingerprint: 0x40}, + {Fingerprint: 0x60}, + {Fingerprint: 0x80}, + {Fingerprint: 0xa0}, + {Fingerprint: 0xc0}, + {Fingerprint: 0xe0}, + } + + testCases := []struct { + desc string + mapped []blockWithSeries + expected []*logproto.GroupedChunkRefs + }{ + { + desc: "no blocks - all unassigned", + mapped: []blockWithSeries{}, + expected: series, + }, + { + desc: "block has no overlapping series - all unassigned", + mapped: []blockWithSeries{ + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0xf0}, + {Fingerprint: 0xff}, + }, + }, + }, + expected: series, + }, + { + desc: "single block covering all series - no unassigned", + mapped: []blockWithSeries{ + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x00}, + {Fingerprint: 0x20}, + {Fingerprint: 0x40}, + {Fingerprint: 0x60}, + {Fingerprint: 0x80}, + {Fingerprint: 0xa0}, + {Fingerprint: 0xc0}, + {Fingerprint: 0xe0}, + }, + }, + }, + expected: []*logproto.GroupedChunkRefs{}, + }, + { + desc: "multiple blocks covering all series - no unassigned", + mapped: []blockWithSeries{ + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x00}, + {Fingerprint: 0x20}, + {Fingerprint: 0x40}, + {Fingerprint: 0x60}, + }, + }, + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x40}, + {Fingerprint: 0x60}, + {Fingerprint: 0x80}, + {Fingerprint: 0xa0}, + }, + }, + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x80}, + {Fingerprint: 0xa0}, + {Fingerprint: 0xc0}, + {Fingerprint: 0xe0}, + }, + }, + }, + expected: []*logproto.GroupedChunkRefs{}, + }, + { + desc: "single block overlapping some series", + mapped: []blockWithSeries{ + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x00}, + {Fingerprint: 0x20}, + {Fingerprint: 0x40}, + {Fingerprint: 0x60}, + }, + }, + }, + expected: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x80}, + {Fingerprint: 0xa0}, + {Fingerprint: 0xc0}, + {Fingerprint: 0xe0}, + }, + }, + { + desc: "multiple blocks overlapping some series", + mapped: []blockWithSeries{ + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x20}, + {Fingerprint: 0x40}, + {Fingerprint: 0x60}, + }, + }, + { + series: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x80}, + {Fingerprint: 0xa0}, + {Fingerprint: 0xc0}, + }, + }, + }, + expected: []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x00}, + {Fingerprint: 0xe0}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + result := unassignedSeries(tc.mapped, series) + require.Equal(t, result, tc.expected) + }) + } +}