Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] TSDB: Optimize inverse matching (#14144) #756

Merged
merged 1 commit into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions tsdb/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ type IndexReader interface {
// If no postings are found having at least one matching label, an empty iterator is returned.
PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings

// PostingsForAllLabelValues returns a sorted iterator over all postings having a label with the given name.
// If no postings are found with the label in question, an empty iterator is returned.
PostingsForAllLabelValues(ctx context.Context, name string) index.Postings

// PostingsForMatchers assembles a single postings iterator based on the given matchers.
// The resulting postings are not ordered by series.
// If concurrent hint is set to true, call will be optimized for a (most likely) concurrent call with same matchers,
Expand Down Expand Up @@ -551,6 +555,10 @@ func (r blockIndexReader) PostingsForLabelMatching(ctx context.Context, name str
return r.ir.PostingsForLabelMatching(ctx, name, match)
}

func (r blockIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
return r.ir.PostingsForAllLabelValues(ctx, name)
}

func (r blockIndexReader) PostingsForMatchers(ctx context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
return r.ir.PostingsForMatchers(ctx, concurrent, ms...)
}
Expand Down
4 changes: 4 additions & 0 deletions tsdb/head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,10 @@ func (h *headIndexReader) PostingsForLabelMatching(ctx context.Context, name str
return h.head.postings.PostingsForLabelMatching(ctx, name, match)
}

func (h *headIndexReader) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
return h.head.postings.PostingsForAllLabelValues(ctx, name)
}

func (h *headIndexReader) PostingsForMatchers(ctx context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
return h.head.pfmc.PostingsForMatchers(ctx, h, concurrent, ms...)
}
Expand Down
23 changes: 19 additions & 4 deletions tsdb/index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1794,6 +1794,15 @@ func (r *Reader) Postings(ctx context.Context, name string, values ...string) (P
}

func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
return r.postingsForLabelMatching(ctx, name, match)
}

func (r *Reader) PostingsForAllLabelValues(ctx context.Context, name string) Postings {
return r.postingsForLabelMatching(ctx, name, nil)
}

// postingsForLabelMatching implements PostingsForLabelMatching if match is non-nil, and PostingsForAllLabelValues otherwise.
func (r *Reader) postingsForLabelMatching(ctx context.Context, name string, match func(string) bool) Postings {
if r.version == FormatV1 {
return r.postingsForLabelMatchingV1(ctx, name, match)
}
Expand All @@ -1803,11 +1812,17 @@ func (r *Reader) PostingsForLabelMatching(ctx context.Context, name string, matc
return EmptyPostings()
}

postingsEstimate := 0
if match == nil {
// The caller wants all postings for name.
postingsEstimate = len(e) * symbolFactor
}

lastVal := e[len(e)-1].value
var its []Postings
its := make([]Postings, 0, postingsEstimate)
if err := r.traversePostingOffsets(ctx, e[0].off, func(val string, postingsOff uint64) (bool, error) {
if match(val) {
// We want this postings iterator since the value is a match
if match == nil || match(val) {
// We want this postings iterator since the value is a match.
postingsDec := encoding.NewDecbufAt(r.b, int(postingsOff), castagnoliTable)
_, p, err := r.dec.PostingsFromDecbuf(postingsDec)
if err != nil {
Expand Down Expand Up @@ -1836,7 +1851,7 @@ func (r *Reader) postingsForLabelMatchingV1(ctx context.Context, name string, ma
return ErrPostings(ctx.Err())
}
count++
if !match(val) {
if match != nil && !match(val) {
continue
}

Expand Down
46 changes: 46 additions & 0 deletions tsdb/index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,52 @@ func TestChunksTimeOrdering(t *testing.T) {
require.NoError(t, idx.Close())
}

func TestReader_PostingsForLabelMatching(t *testing.T) {
const seriesCount = 9
var input indexWriterSeriesSlice
for i := 1; i <= seriesCount; i++ {
input = append(input, &indexWriterSeries{
labels: labels.FromStrings("__name__", strconv.Itoa(i)),
chunks: []chunks.Meta{
{Ref: 1, MinTime: 0, MaxTime: 10},
},
})
}
ir, _, _ := createFileReader(context.Background(), t, input)

p := ir.PostingsForLabelMatching(context.Background(), "__name__", func(v string) bool {
iv, err := strconv.Atoi(v)
if err != nil {
panic(err)
}
return iv%2 == 0
})
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{4, 6, 8, 10}, refs)
}

func TestReader_PostingsForAllLabelValues(t *testing.T) {
const seriesCount = 9
var input indexWriterSeriesSlice
for i := 1; i <= seriesCount; i++ {
input = append(input, &indexWriterSeries{
labels: labels.FromStrings("__name__", strconv.Itoa(i)),
chunks: []chunks.Meta{
{Ref: 1, MinTime: 0, MaxTime: 10},
},
})
}
ir, _, _ := createFileReader(context.Background(), t, input)

p := ir.PostingsForAllLabelValues(context.Background(), "__name__")
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
require.Equal(t, []storage.SeriesRef{3, 4, 5, 6, 7, 8, 9, 10, 11}, refs)
}

func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
const seriesCount = 1000
var input indexWriterSeriesSlice
Expand Down
16 changes: 16 additions & 0 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,22 @@ func (p *MemPostings) PostingsForLabelMatching(ctx context.Context, name string,
return Merge(ctx, its...)
}

func (p *MemPostings) PostingsForAllLabelValues(ctx context.Context, name string) Postings {
p.mtx.RLock()

e := p.m[name]
its := make([]Postings, 0, len(e))
for _, refs := range e {
if len(refs) > 0 {
its = append(its, NewListPostings(refs))
}
}

// Let the mutex go before merging.
p.mtx.RUnlock()
return Merge(ctx, its...)
}

// labelValues returns a slice of label values for the given label name.
// It will take the read lock.
func (p *MemPostings) labelValues(name string) []string {
Expand Down
15 changes: 15 additions & 0 deletions tsdb/index/postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1571,6 +1571,21 @@ func TestMemPostings_PostingsForLabelMatching(t *testing.T) {
require.Equal(t, []storage.SeriesRef{2, 4}, refs)
}

func TestMemPostings_PostingsForAllLabelValues(t *testing.T) {
mp := NewMemPostings()
mp.Add(1, labels.FromStrings("foo", "1"))
mp.Add(2, labels.FromStrings("foo", "2"))
mp.Add(3, labels.FromStrings("foo", "3"))
mp.Add(4, labels.FromStrings("foo", "4"))

p := mp.PostingsForAllLabelValues(context.Background(), "foo")
require.NoError(t, p.Err())
refs, err := ExpandPostings(p)
require.NoError(t, err)
// All postings for the label should be returned.
require.Equal(t, []storage.SeriesRef{1, 2, 3, 4}, refs)
}

func TestMemPostings_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
memP := NewMemPostings()
seriesCount := 10 * checkContextEveryNIterations
Expand Down
4 changes: 4 additions & 0 deletions tsdb/ooo_head_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,10 @@ func (ir *OOOCompactionHeadIndexReader) PostingsForLabelMatching(context.Context
return index.ErrPostings(errors.New("not supported"))
}

func (ir *OOOCompactionHeadIndexReader) PostingsForAllLabelValues(context.Context, string) index.Postings {
return index.ErrPostings(errors.New("not supported"))
}

func (ir *OOOCompactionHeadIndexReader) SortedPostings(p index.Postings) index.Postings {
// This will already be sorted from the Postings() call above.
return p
Expand Down
4 changes: 4 additions & 0 deletions tsdb/postings_for_matchers_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ type IndexPostingsReader interface {
// PostingsForLabelMatching returns a sorted iterator over postings having a label with the given name and a value for which match returns true.
// If no postings are found having at least one matching label, an empty iterator is returned.
PostingsForLabelMatching(ctx context.Context, name string, match func(value string) bool) index.Postings

// PostingsForAllLabelValues returns a sorted iterator over all postings having a label with the given name.
// If no postings are found with the label in question, an empty iterator is returned.
PostingsForAllLabelValues(ctx context.Context, name string) index.Postings
}

// NewPostingsForMatchersCache creates a new PostingsForMatchersCache.
Expand Down
4 changes: 4 additions & 0 deletions tsdb/postings_for_matchers_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,10 @@ func (idx indexForPostingsMock) PostingsForLabelMatching(context.Context, string
panic("implement me")
}

func (idx indexForPostingsMock) PostingsForAllLabelValues(context.Context, string) index.Postings {
panic("implement me")
}

// timeNowMock offers a mockable time.Now() implementation
// empty value is ready to be used, and it should not be copied (use a reference).
type timeNowMock struct {
Expand Down
27 changes: 7 additions & 20 deletions tsdb/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,29 +362,16 @@ func inversePostingsForMatcher(ctx context.Context, ix IndexPostingsReader, m *l
return ix.Postings(ctx, m.Name, m.Value)
}

vals, err := ix.LabelValues(ctx, m.Name)
if err != nil {
return nil, err
}

res := vals[:0]
// If the match before inversion was !="" or !~"", we just want all the values.
// If the matcher being inverted is =~"" or ="", we just want all the values.
if m.Value == "" && (m.Type == labels.MatchRegexp || m.Type == labels.MatchEqual) {
res = vals
} else {
count := 1
for _, val := range vals {
if count%checkContextEveryNIterations == 0 && ctx.Err() != nil {
return nil, ctx.Err()
}
count++
if !m.Matches(val) {
res = append(res, val)
}
}
it := ix.PostingsForAllLabelValues(ctx, m.Name)
return it, it.Err()
}

return ix.Postings(ctx, m.Name, res...)
it := ix.PostingsForLabelMatching(ctx, m.Name, func(s string) bool {
return !m.Matches(s)
})
return it, it.Err()
}

const maxExpandedPostingsFactor = 100 // Division factor for maximum number of matched series.
Expand Down
29 changes: 18 additions & 11 deletions tsdb/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2380,6 +2380,16 @@ func (m mockIndex) PostingsForLabelMatching(ctx context.Context, name string, ma
return index.Merge(ctx, res...)
}

func (m mockIndex) PostingsForAllLabelValues(ctx context.Context, name string) index.Postings {
var res []index.Postings
for l, srs := range m.postings {
if l.Name == name {
res = append(res, index.NewListPostings(srs))
}
}
return index.Merge(ctx, res...)
}

func (m mockIndex) PostingsForMatchers(_ context.Context, concurrent bool, ms ...*labels.Matcher) (index.Postings, error) {
var ps []storage.SeriesRef
for p, s := range m.series {
Expand Down Expand Up @@ -3400,6 +3410,10 @@ func (m mockMatcherIndex) PostingsForLabelMatching(context.Context, string, func
return index.ErrPostings(fmt.Errorf("PostingsForLabelMatching called"))
}

func (m mockMatcherIndex) PostingsForAllLabelValues(context.Context, string) index.Postings {
return index.ErrPostings(errors.New("PostingsForAllLabelValues called"))
}

func TestPostingsForMatcher(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -3928,17 +3942,6 @@ func TestReader_PostingsForLabelMatchingHonorsContextCancel(t *testing.T) {
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}

func TestReader_InversePostingsForMatcherHonorsContextCancel(t *testing.T) {
ir := mockReaderOfLabels{}

failAfter := uint64(mockReaderOfLabelsSeriesCount / 2 / checkContextEveryNIterations)
ctx := &testutil.MockContextErrAfter{FailAfter: failAfter}
_, err := inversePostingsForMatcher(ctx, ir, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".*"))

require.Error(t, err)
require.Equal(t, failAfter+1, ctx.Count()) // Plus one for the Err() call that puts the error in the result.
}

type mockReaderOfLabels struct{}

const mockReaderOfLabelsSeriesCount = checkContextEveryNIterations * 10
Expand Down Expand Up @@ -3971,6 +3974,10 @@ func (m mockReaderOfLabels) PostingsForLabelMatching(context.Context, string, fu
panic("PostingsForLabelMatching called")
}

func (m mockReaderOfLabels) PostingsForAllLabelValues(context.Context, string) index.Postings {
panic("PostingsForAllLabelValues called")
}

func (m mockReaderOfLabels) Postings(context.Context, string, ...string) (index.Postings, error) {
panic("Postings called")
}
Expand Down
Loading