From 8406d7b954bc09fa6f91899a1d7edaff335931b4 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Tue, 19 Nov 2024 16:49:01 +0100 Subject: [PATCH] [PERF] TSDB: Optimize inverse matching (#14144) Simple follow-up to #13620. Modify `tsdb.PostingsForMatchers` to use the optimized tsdb.IndexReader.PostingsForLabelMatching method also for inverse matching. Introduce method `PostingsForAllLabelValues`, to avoid changing the existing method. The performance is much improved for a subset of the cases; there are up to ~60% CPU gains and ~12.5% reduction in memory usage. Remove `TestReader_InversePostingsForMatcherHonorsContextCancel` since `inversePostingsForMatcher` only passes `ctx` to `IndexReader` implementations now. Signed-off-by: Arve Knudsen --- tsdb/block.go | 8 +++++++ tsdb/head_read.go | 4 ++++ tsdb/index/index.go | 23 +++++++++++++++---- tsdb/index/index_test.go | 46 +++++++++++++++++++++++++++++++++++++ tsdb/index/postings.go | 16 +++++++++++++ tsdb/index/postings_test.go | 15 ++++++++++++ tsdb/ooo_head_read.go | 4 ++++ tsdb/querier.go | 27 ++++++---------------- tsdb/querier_test.go | 29 ++++++++++++++--------- 9 files changed, 137 insertions(+), 35 deletions(-) diff --git a/tsdb/block.go b/tsdb/block.go index bfb04c3f43..007564c273 100644 --- a/tsdb/block.go +++ b/tsdb/block.go @@ -83,6 +83,10 @@ type IndexReader interface { // If no postings are found having at least one matching label, an empty iterator is returned. PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings + // PostingsForAllLabelValues returns a sorted iterator over all postings having a label with the given name. + // If no postings are found with the label in question, an empty iterator is returned. + PostingsForAllLabelValues(ctx context.Context, name string) index.Postings + // PostingsForMatchers assembles a single postings iterator based on the given matchers. // The resulting postings are not ordered by series. // If concurrent hint is set to true, call will be optimized for a (most likely) concurrent call with same matchers, @@ -551,6 +555,10 @@ func (r blockIndexReader) PostingsForLabelMatching(ctx context.Context, name str return r.ir.PostingsForLabelMatching(ctx, name, match) } +func (r blockIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings { + return r.ir.PostingsForAllLabelValues(ctx, name) +} + func (r blockIndexReader) PostingsForMatchers(ctx context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { return r.ir.PostingsForMatchers(ctx, concurrent, ms...) } diff --git a/tsdb/head_read.go b/tsdb/head_read.go index f6215c5206..a3cd7b653d 100644 --- a/tsdb/head_read.go +++ b/tsdb/head_read.go @@ -123,6 +123,10 @@ func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name str return h.head.postings.PostingsForLabelMatching(ctx, name, match) } +func (h *headIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings { + return h.head.postings.PostingsForAllLabelValues(ctx, name) +} + func (h *headIndexReader) PostingsForMatchers(ctx context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { return h.head.pfmc.PostingsForMatchers(ctx, h, concurrent, ms...) } diff --git a/tsdb/index/index.go b/tsdb/index/index.go index 957106b604..48ee98c9ff 100644 --- a/tsdb/index/index.go +++ b/tsdb/index/index.go @@ -1794,6 +1794,15 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P } func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { + return r.postingsForLabelMatching(ctx, name, match) +} + +func (r *Reader) PostingsForAllLabelValues(ctx context.Context, name string) Postings { + return r.postingsForLabelMatching(ctx, name, nil) +} + +// postingsForLabelMatching implements PostingsForLabelMatching if match is non-nil, and PostingsForAllLabelValues otherwise. +func (r *Reader) postingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings { if r.version == FormatV1 { return r.postingsForLabelMatchingV1(ctx, name, match) } @@ -1803,11 +1812,17 @@ func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, matc return EmptyPostings() } + postingsEstimate := 0 + if match == nil { + // The caller wants all postings for name. + postingsEstimate = len(e) * symbolFactor + } + lastVal := e[len(e)-1].value - var its []Postings + its := make([]Postings, 0, postingsEstimate) if err := r.traversePostingOffsets(ctx, e[0].off, func(val string, postingsOff uint64) (bool, error) { - if match(val) { - // We want this postings iterator since the value is a match + if match == nil || match(val) { + // We want this postings iterator since the value is a match. postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable) _, p, err := r.dec.PostingsFromDecbuf(postingsDec) if err != nil { @@ -1836,7 +1851,7 @@ func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, ma return ErrPostings(ctx.Err()) } count++ - if !match(val) { + if match != nil && !match(val) { continue } diff --git a/tsdb/index/index_test.go b/tsdb/index/index_test.go index 9647a3d5fe..94e91eb1de 100644 --- a/tsdb/index/index_test.go +++ b/tsdb/index/index_test.go @@ -616,6 +616,52 @@ func TestChunksTimeOrdering(t *testing.T) { require.NoError(t, idx.Close()) } +func TestReader_PostingsForLabelMatching(t *testing.T) { + const seriesCount = 9 + var input indexWriterSeriesSlice + for i := 1; i <= seriesCount; i++ { + input = append(input, &indexWriterSeries{ + labels: labels.FromStrings("__name__", strconv.Itoa(i)), + chunks: []chunks.Meta{ + {Ref: 1, MinTime: 0, MaxTime: 10}, + }, + }) + } + ir, _, _ := createFileReader(context.Background(), t, input) + + p := ir.PostingsForLabelMatching(context.Background(), "__name__", func(v string) bool { + iv, err := strconv.Atoi(v) + if err != nil { + panic(err) + } + return iv%2 == 0 + }) + require.NoError(t, p.Err()) + refs, err := ExpandPostings(p) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{4, 6, 8, 10}, refs) +} + +func TestReader_PostingsForAllLabelValues(t *testing.T) { + const seriesCount = 9 + var input indexWriterSeriesSlice + for i := 1; i <= seriesCount; i++ { + input = append(input, &indexWriterSeries{ + labels: labels.FromStrings("__name__", strconv.Itoa(i)), + chunks: []chunks.Meta{ + {Ref: 1, MinTime: 0, MaxTime: 10}, + }, + }) + } + ir, _, _ := createFileReader(context.Background(), t, input) + + p := ir.PostingsForAllLabelValues(context.Background(), "__name__") + require.NoError(t, p.Err()) + refs, err := ExpandPostings(p) + require.NoError(t, err) + require.Equal(t, []storage.SeriesRef{3, 4, 5, 6, 7, 8, 9, 10, 11}, refs) +} + func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { const seriesCount = 1000 var input indexWriterSeriesSlice diff --git a/tsdb/index/postings.go b/tsdb/index/postings.go index 15536c69c9..a987132513 100644 --- a/tsdb/index/postings.go +++ b/tsdb/index/postings.go @@ -447,6 +447,22 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string, return Merge(ctx, its...) } +func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string) Postings { + p.mtx.RLock() + + e := p.m[name] + its := make([]Postings, 0, len(e)) + for _, refs := range e { + if len(refs) > 0 { + its = append(its, NewListPostings(refs)) + } + } + + // Let the mutex go before merging. + p.mtx.RUnlock() + return Merge(ctx, its...) +} + // labelValues returns a slice of label values for the given label name. // It will take the read lock. func (p *MemPostings) labelValues(name string) []string { diff --git a/tsdb/index/postings_test.go b/tsdb/index/postings_test.go index 93fba7f065..cfa176f791 100644 --- a/tsdb/index/postings_test.go +++ b/tsdb/index/postings_test.go @@ -1571,6 +1571,21 @@ func TestMemPostings_PostingsForLabelMatching(t *testing.T) { require.Equal(t, []storage.SeriesRef{2, 4}, refs) } +func TestMemPostings_PostingsForAllLabelValues(t *testing.T) { + mp := NewMemPostings() + mp.Add(1, labels.FromStrings("foo", "1")) + mp.Add(2, labels.FromStrings("foo", "2")) + mp.Add(3, labels.FromStrings("foo", "3")) + mp.Add(4, labels.FromStrings("foo", "4")) + + p := mp.PostingsForAllLabelValues(context.Background(), "foo") + require.NoError(t, p.Err()) + refs, err := ExpandPostings(p) + require.NoError(t, err) + // All postings for the label should be returned. + require.Equal(t, []storage.SeriesRef{1, 2, 3, 4}, refs) +} + func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { memP := NewMemPostings() seriesCount := 10 * checkContextEveryNIterations diff --git a/tsdb/ooo_head_read.go b/tsdb/ooo_head_read.go index 1f5b7c951f..745cd5d5fe 100644 --- a/tsdb/ooo_head_read.go +++ b/tsdb/ooo_head_read.go @@ -456,6 +456,10 @@ func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context return index.ErrPostings(errors.New("not supported")) } +func (ir *OOOCompactionHeadIndexReader) PostingsForAllLabelValues(context.Context, string) index.Postings { + return index.ErrPostings(errors.New("not supported")) +} + func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings { // This will already be sorted from the Postings() call above. return p diff --git a/tsdb/querier.go b/tsdb/querier.go index e506153eba..44a542e376 100644 --- a/tsdb/querier.go +++ b/tsdb/querier.go @@ -362,29 +362,16 @@ func inversePostingsForMatcher(ctx context.Context, ix IndexPostingsReader, m *l return ix.Postings(ctx, m.Name, m.Value) } - vals, err := ix.LabelValues(ctx, m.Name) - if err != nil { - return nil, err - } - - res := vals[:0] - // If the match before inversion was !="" or !~"", we just want all the values. + // If the matcher being inverted is =~"" or ="", we just want all the values. if m.Value == "" && (m.Type == labels.MatchRegexp || m.Type == labels.MatchEqual) { - res = vals - } else { - count := 1 - for _, val := range vals { - if count%checkContextEveryNIterations == 0 && ctx.Err() != nil { - return nil, ctx.Err() - } - count++ - if !m.Matches(val) { - res = append(res, val) - } - } + it := ix.PostingsForAllLabelValues(ctx, m.Name) + return it, it.Err() } - return ix.Postings(ctx, m.Name, res...) + it := ix.PostingsForLabelMatching(ctx, m.Name, func(s string) bool { + return !m.Matches(s) + }) + return it, it.Err() } const maxExpandedPostingsFactor = 100 // Division factor for maximum number of matched series. diff --git a/tsdb/querier_test.go b/tsdb/querier_test.go index b06ba1f263..aedcf0b69a 100644 --- a/tsdb/querier_test.go +++ b/tsdb/querier_test.go @@ -2380,6 +2380,16 @@ func (m mockIndex) PostingsForLabelMatching(ctx context.Context, name string, ma return index.Merge(ctx, res...) } +func (m mockIndex) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings { + var res []index.Postings + for l, srs := range m.postings { + if l.Name == name { + res = append(res, index.NewListPostings(srs)) + } + } + return index.Merge(ctx, res...) +} + func (m mockIndex) PostingsForMatchers(_ context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) { var ps []storage.SeriesRef for p, s := range m.series { @@ -3400,6 +3410,10 @@ func (m mockMatcherIndex) PostingsForLabelMatching(context.Context, string, func return index.ErrPostings(fmt.Errorf("PostingsForLabelMatching called")) } +func (m mockMatcherIndex) PostingsForAllLabelValues(context.Context, string) index.Postings { + return index.ErrPostings(errors.New("PostingsForAllLabelValues called")) +} + func TestPostingsForMatcher(t *testing.T) { ctx := context.Background() @@ -3928,17 +3942,6 @@ func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) { require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result. } -func TestReader_InversePostingsForMatcherHonorsContextCancel(t *testing.T) { - ir := mockReaderOfLabels{} - - failAfter := uint64(mockReaderOfLabelsSeriesCount / 2 / checkContextEveryNIterations) - ctx := &testutil.MockContextErrAfter{FailAfter: failAfter} - _, err := inversePostingsForMatcher(ctx, ir, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*")) - - require.Error(t, err) - require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result. -} - type mockReaderOfLabels struct{} const mockReaderOfLabelsSeriesCount = checkContextEveryNIterations * 10 @@ -3971,6 +3974,10 @@ func (m mockReaderOfLabels) PostingsForLabelMatching(context.Context, string, fu panic("PostingsForLabelMatching called") } +func (m mockReaderOfLabels) PostingsForAllLabelValues(context.Context, string) index.Postings { + panic("PostingsForAllLabelValues called") +} + func (m mockReaderOfLabels) Postings(context.Context, string, ...string) (index.Postings, error) { panic("Postings called") }