From 3bf2d1fea08593bdf10dc8a6827998a6d8a8243c Mon Sep 17 00:00:00 2001 From: Christian Haudum Date: Thu, 25 Apr 2024 18:11:45 +0200 Subject: [PATCH] fix(blooms): Deduplicate filtered series and chunks (#12791) Signed-off-by: Christian Haudum --- pkg/bloomgateway/cache.go | 55 +++-------------- pkg/bloomgateway/cache_test.go | 10 +-- pkg/bloomgateway/client.go | 106 ++++++++++++++++++++++++++------ pkg/bloomgateway/client_test.go | 37 +++++++++++ pkg/bloomgateway/resolver.go | 2 +- 5 files changed, 139 insertions(+), 71 deletions(-) diff --git a/pkg/bloomgateway/cache.go b/pkg/bloomgateway/cache.go index 60124f353e2a6..cec615b393fd7 100644 --- a/pkg/bloomgateway/cache.go +++ b/pkg/bloomgateway/cache.go @@ -3,12 +3,10 @@ package bloomgateway import ( "context" "flag" - "sort" "time" "github.com/go-kit/log" "github.com/prometheus/common/model" - "golang.org/x/exp/slices" "google.golang.org/grpc" "github.com/grafana/loki/v3/pkg/logproto" @@ -95,58 +93,21 @@ func newMerger() merger { // We merge all chunks grouped by their fingerprint. func (m merger) MergeResponse(responses ...resultscache.Response) (resultscache.Response, error) { var size int - for _, r := range responses { - res := r.(*logproto.FilterChunkRefResponse) - size += len(res.ChunkRefs) - } - chunkRefs := make([]*logproto.GroupedChunkRefs, 0, size) + unmerged := make([][]*logproto.GroupedChunkRefs, 0, len(responses)) for _, r := range responses { res := r.(*logproto.FilterChunkRefResponse) - chunkRefs = append(chunkRefs, res.ChunkRefs...) - } - - return &logproto.FilterChunkRefResponse{ - ChunkRefs: mergeGroupedChunkRefs(chunkRefs), - }, nil -} - -// Merge duplicated fingerprints by: -// 1. Sort the chunkRefs by their stream fingerprint -// 2. Remove duplicated FPs appending all chunks into the first fingerprint's chunk list. -func mergeGroupedChunkRefs(chunkRefs []*logproto.GroupedChunkRefs) []*logproto.GroupedChunkRefs { - if len(chunkRefs) <= 1 { - return chunkRefs - } - - sort.Slice(chunkRefs, func(i, j int) bool { - return chunkRefs[i].Fingerprint < chunkRefs[j].Fingerprint - }) - - var lastDiffFP int - for i := 1; i < len(chunkRefs); i++ { - if chunkRefs[lastDiffFP].Fingerprint == chunkRefs[i].Fingerprint { - chunkRefs[lastDiffFP].Refs = mergeShortRefs(append(chunkRefs[lastDiffFP].Refs, chunkRefs[i].Refs...)) - } else { - lastDiffFP++ - chunkRefs[lastDiffFP] = chunkRefs[i] - } + unmerged = append(unmerged, res.ChunkRefs) + size += len(res.ChunkRefs) } - return chunkRefs[:lastDiffFP+1] -} -// mergeShortRefs merges short-refs by removing duplicated checksums. -func mergeShortRefs(refs []*logproto.ShortRef) []*logproto.ShortRef { - if len(refs) <= 1 { - return refs + buf := make([]*logproto.GroupedChunkRefs, 0, size) + deduped, err := mergeSeries(unmerged, buf) + if err != nil { + return nil, err } - sort.Slice(refs, func(i, j int) bool { - return refs[i].Checksum < refs[j].Checksum - }) - return slices.CompactFunc(refs, func(a, b *logproto.ShortRef) bool { - return a.Checksum == b.Checksum - }) + return &logproto.FilterChunkRefResponse{ChunkRefs: deduped}, nil } type ClientCache struct { diff --git a/pkg/bloomgateway/cache_test.go b/pkg/bloomgateway/cache_test.go index 81124f86a54bf..841c155482718 100644 --- a/pkg/bloomgateway/cache_test.go +++ b/pkg/bloomgateway/cache_test.go @@ -288,6 +288,11 @@ func TestMerge(t *testing.T) { Fingerprint: 2, Tenant: "fake", Refs: []*logproto.ShortRef{ + { + From: 700, + Through: 1000, + Checksum: 40, + }, { From: 1000, Through: 1500, @@ -303,11 +308,6 @@ func TestMerge(t *testing.T) { Through: 2500, Checksum: 30, }, - { - From: 700, - Through: 1000, - Checksum: 40, - }, { From: 2000, Through: 2700, diff --git a/pkg/bloomgateway/client.go b/pkg/bloomgateway/client.go index f9a6c2dd57407..0ef6f498a59cf 100644 --- a/pkg/bloomgateway/client.go +++ b/pkg/bloomgateway/client.go @@ -3,7 +3,6 @@ package bloomgateway import ( "context" "flag" - "fmt" "io" "math" "sort" @@ -15,6 +14,7 @@ import ( ringclient "github.com/grafana/dskit/ring/client" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" @@ -22,6 +22,7 @@ import ( "github.com/grafana/loki/v3/pkg/logqlmodel/stats" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/queue" + v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1" "github.com/grafana/loki/v3/pkg/storage/chunk/cache" "github.com/grafana/loki/v3/pkg/storage/chunk/cache/resultscache" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" @@ -258,17 +259,6 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, interva return rs.groups[i].Fingerprint < rs.groups[j].Fingerprint }) - level.Info(c.logger).Log( - "msg", "do FilterChunkRefs for addresses", - "part", fmt.Sprintf("%d/%d", i+1, len(servers)), - "addr", rs.addr, - "from", interval.Start.Time(), - "through", interval.End.Time(), - "series", len(rs.groups), - "blocks", len(rs.blocks), - "tenant", tenant, - ) - return c.doForAddrs([]string{rs.addr}, func(client logproto.BloomGatewayClient) error { req := &logproto.FilterChunkRefRequest{ From: interval.Start, @@ -290,15 +280,95 @@ func (c *GatewayClient) FilterChunks(ctx context.Context, tenant string, interva if err != nil { return nil, err } - return flatten(results, count), nil + + buf := make([]*logproto.GroupedChunkRefs, 0, count) + return mergeSeries(results, buf) } -func flatten(input [][]*logproto.GroupedChunkRefs, n int) []*logproto.GroupedChunkRefs { - result := make([]*logproto.GroupedChunkRefs, 0, n) - for _, res := range input { - result = append(result, res...) +// mergeSeries combines respones from multiple FilterChunkRefs calls and deduplicates +// chunks from series that appear in multiple responses. +// To avoid allocations, an optional slice can be passed as second argument. +func mergeSeries(input [][]*logproto.GroupedChunkRefs, buf []*logproto.GroupedChunkRefs) ([]*logproto.GroupedChunkRefs, error) { + // clear provided buffer + buf = buf[:0] + + iters := make([]v1.PeekingIterator[*logproto.GroupedChunkRefs], 0, len(input)) + for _, inp := range input { + iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp))) } - return result + + heapIter := v1.NewHeapIterator[*logproto.GroupedChunkRefs]( + func(a, b *logproto.GroupedChunkRefs) bool { + return a.Fingerprint < b.Fingerprint + }, + iters..., + ) + + dedupeIter := v1.NewDedupingIter[*logproto.GroupedChunkRefs, *logproto.GroupedChunkRefs]( + // eq + func(a, b *logproto.GroupedChunkRefs) bool { return a.Fingerprint == b.Fingerprint }, + // from + v1.Identity[*logproto.GroupedChunkRefs], + // merge + func(a, b *logproto.GroupedChunkRefs) *logproto.GroupedChunkRefs { + return &logproto.GroupedChunkRefs{ + Fingerprint: a.Fingerprint, + Tenant: a.Tenant, + Refs: mergeChunks(a.Refs, b.Refs), + } + }, + // iterator + v1.NewPeekingIter(heapIter), + ) + + return v1.CollectInto(dedupeIter, buf) +} + +func mergeChunks(inputs ...[]*logproto.ShortRef) []*logproto.ShortRef { + if len(inputs) == 0 { + return nil + } + + if len(inputs) == 1 { + slices.SortFunc( + inputs[0], + func(a, b *logproto.ShortRef) int { + if a.Equal(b) { + return 0 + } + if a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through)) { + return -1 + } + return 1 + }, + ) + return inputs[0] + } + + iters := make([]v1.PeekingIterator[*logproto.ShortRef], 0, len(inputs)) + for _, inp := range inputs { + iters = append(iters, v1.NewPeekingIter(v1.NewSliceIter(inp))) + } + + chunkDedupe := v1.NewDedupingIter[*logproto.ShortRef, *logproto.ShortRef]( + // eq + func(a, b *logproto.ShortRef) bool { return a.Equal(b) }, + // from + v1.Identity[*logproto.ShortRef], + // merge + func(a, b *logproto.ShortRef) *logproto.ShortRef { return a }, + // iterator + v1.NewPeekingIter[*logproto.ShortRef]( + v1.NewHeapIterator[*logproto.ShortRef]( + func(a, b *logproto.ShortRef) bool { + return a.From.Before(b.From) || (a.From.Equal(b.From) && a.Through.Before(b.Through)) + }, + iters..., + ), + ), + ) + merged, _ := v1.Collect(chunkDedupe) + return merged } // doForAddrs sequetially calls the provided callback function fn for each diff --git a/pkg/bloomgateway/client_test.go b/pkg/bloomgateway/client_test.go index b9ac9273beefa..db0a0adcff47b 100644 --- a/pkg/bloomgateway/client_test.go +++ b/pkg/bloomgateway/client_test.go @@ -7,8 +7,10 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/flagext" "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" "github.com/stretchr/testify/require" + "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/querier/plan" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper" @@ -33,3 +35,38 @@ func TestBloomGatewayClient(t *testing.T) { require.Equal(t, 0, len(res)) }) } + +func shortRef(f, t model.Time, c uint32) *logproto.ShortRef { + return &logproto.ShortRef{ + From: f, + Through: t, + Checksum: c, + } +} + +func TestGatewayClient_MergeSeries(t *testing.T) { + inputs := [][]*logproto.GroupedChunkRefs{ + // response 1 + { + {Fingerprint: 0x00, Refs: []*logproto.ShortRef{shortRef(0, 1, 1), shortRef(1, 2, 2)}}, // not overlapping + {Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks + {Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6)}}, // partially overlapping chunks + }, + // response 2 + { + {Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks + {Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(1, 2, 6), shortRef(2, 3, 7)}}, // partially overlapping chunks + {Fingerprint: 0x03, Refs: []*logproto.ShortRef{shortRef(0, 1, 8), shortRef(1, 2, 9)}}, // not overlapping + }, + } + + expected := []*logproto.GroupedChunkRefs{ + {Fingerprint: 0x00, Refs: []*logproto.ShortRef{shortRef(0, 1, 1), shortRef(1, 2, 2)}}, // not overlapping + {Fingerprint: 0x01, Refs: []*logproto.ShortRef{shortRef(0, 1, 3), shortRef(1, 2, 4)}}, // fully overlapping chunks + {Fingerprint: 0x02, Refs: []*logproto.ShortRef{shortRef(0, 1, 5), shortRef(1, 2, 6), shortRef(2, 3, 7)}}, // partially overlapping chunks + {Fingerprint: 0x03, Refs: []*logproto.ShortRef{shortRef(0, 1, 8), shortRef(1, 2, 9)}}, // not overlapping + } + + result, _ := mergeSeries(inputs, nil) + require.Equal(t, expected, result) +} diff --git a/pkg/bloomgateway/resolver.go b/pkg/bloomgateway/resolver.go index c5b24115a211f..62ec5836cc136 100644 --- a/pkg/bloomgateway/resolver.go +++ b/pkg/bloomgateway/resolver.go @@ -15,7 +15,7 @@ import ( ) type BlockResolver interface { - Resolve(context.Context, string, bloomshipper.Interval, []*logproto.GroupedChunkRefs) ([]blockWithSeries, []*logproto.GroupedChunkRefs, error) + Resolve(ctx context.Context, tenant string, interval bloomshipper.Interval, series []*logproto.GroupedChunkRefs) (blocks []blockWithSeries, skipped []*logproto.GroupedChunkRefs, err error) } type blockWithSeries struct {