From a98cfa42090ef16c0923e0fc2634ef5184a4fa3b Mon Sep 17 00:00:00 2001 From: Goutham Veeramachaneni Date: Tue, 25 Aug 2020 11:58:12 +0200 Subject: [PATCH] Move regex opt to after lookup (#2973) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Move regex opt to after lookup fixes #2906 When we use the caching client (which is what is used in most cases), we load the entire row (tableName+HashKey) irrespective of what the rangeKey parameters are. Which means with the optimisation, we are loading the same row multiple times and then operating on the same data. This PR moves the optimisation to after the data is loaded which should be faster. Signed-off-by: Goutham Veeramachaneni * Add benchmark Signed-off-by: Goutham Veeramachaneni * Add changelog entry Signed-off-by: Goutham Veeramachaneni * Address feedback Signed-off-by: Goutham Veeramachaneni Co-authored-by: Peter Štibraný --- chunk_store.go | 31 ++++--- chunk_store_test.go | 205 ++++++-------------------------------------- 2 files changed, 45 insertions(+), 191 deletions(-) diff --git a/chunk_store.go b/chunk_store.go index 58d37698a6ec..6fa8ea9de5e6 100644 --- a/chunk_store.go +++ b/chunk_store.go @@ -460,16 +460,6 @@ func (c *baseStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, thro } else if matcher.Type == labels.MatchEqual { labelName = matcher.Name queries, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, matcher.Value) - } else if matcher.Type == labels.MatchRegexp && len(FindSetMatches(matcher.Value)) > 0 { - set := FindSetMatches(matcher.Value) - for _, v := range set { - var qs []IndexQuery - qs, err = c.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, matcher.Name, v) - if err != nil { - break - } - queries = append(queries, qs...) - } } else { labelName = matcher.Name queries, err = c.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, matcher.Name) @@ -550,6 +540,14 @@ func (c *baseStore) parseIndexEntries(_ context.Context, entries []IndexEntry, m return nil, nil } + matchSet := map[string]struct{}{} + if matcher != nil && matcher.Type == labels.MatchRegexp { + set := FindSetMatches(matcher.Value) + for _, v := range set { + matchSet[v] = struct{}{} + } + } + result := make([]string, 0, len(entries)) for _, entry := range entries { chunkKey, labelValue, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value) @@ -557,6 +555,19 @@ func (c *baseStore) parseIndexEntries(_ context.Context, entries []IndexEntry, m return nil, err } + // If the matcher is like a set (=~"a|b|c|d|...") and + // the label value is not in that set move on. + if len(matchSet) > 0 { + if _, ok := matchSet[string(labelValue)]; !ok { + continue + } + + // If its in the set, then add it to set, we don't need to run + // matcher on it again. + result = append(result, chunkKey) + continue + } + if matcher != nil && !matcher.Matches(string(labelValue)) { continue } diff --git a/chunk_store_test.go b/chunk_store_test.go index 09fb7846a7c7..1bab30c7a984 100644 --- a/chunk_store_test.go +++ b/chunk_store_test.go @@ -5,8 +5,6 @@ import ( "fmt" "math/rand" "reflect" - "sort" - "sync" "testing" "time" @@ -23,7 +21,6 @@ import ( "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/chunk/encoding" - "github.com/cortexproject/cortex/pkg/querier/astmapper" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/validation" ) @@ -507,6 +504,10 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { `foo{bar=~"beep|baz"}`, []Chunk{chunk1, chunk2}, }, + { + `foo{bar=~"beeping|baz"}`, + []Chunk{chunk1}, + }, { `foo{toms="code", bar=~"beep|baz"}`, []Chunk{chunk1, chunk2}, @@ -546,177 +547,6 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { } } -// TestChunkStore_verifyRegexSetOptimizations tests if chunks are fetched correctly when we have the metric name -func TestChunkStore_verifyRegexSetOptimizations(t *testing.T) { - ctx := context.Background() - now := model.Now() - - testCases := []struct { - query string - expect []string - }{ - { - `foo`, - []string{"foo"}, - }, - { - `foo{bar="baz"}`, - []string{"foo{bar=\"baz\"}"}, - }, - { - `foo{bar!="baz"}`, - []string{"foo"}, - }, - { - `foo{toms="code", bar="beep"}`, - []string{"foo{bar=\"beep\"}", "foo{toms=\"code\"}"}, - }, - { - `foo{bar=~"beep"}`, - []string{"foo{bar=\"beep\"}"}, - }, - { - `foo{bar=~"beep|baz"}`, - []string{"foo{bar=\"baz\"}", "foo{bar=\"beep\"}"}, - }, - { - `foo{toms="code", bar=~"beep|baz"}`, - []string{"foo{bar=\"baz\"}", "foo{bar=\"beep\"}", "foo{toms=\"code\"}"}, - }, - { - `foo{bar=~".+"}`, - []string{"foo{bar}"}, - }, - } - - for _, schema := range schemas { - var storeCfg StoreConfig - flagext.DefaultValues(&storeCfg) - - schemaCfg := DefaultSchemaConfig("", schema, 0) - schemaObj, err := schemaCfg.Configs[0].CreateSchema() - require.NoError(t, err) - - var mockSchema = &mockBaseSchema{schema: schemaObj} - - switch s := schemaObj.(type) { - case StoreSchema: - schemaObj = mockStoreSchema{mockBaseSchema: mockSchema, schema: s} - case SeriesStoreSchema: - schemaObj = mockSeriesStoreSchema{mockBaseSchema: mockSchema, schema: s} - } - - store := newTestChunkStoreConfigWithMockStorage(t, schemaCfg, schemaObj, storeCfg) - defer store.Stop() - - from := now.Add(-time.Hour) - through := now - - for _, tc := range testCases { - t.Run(fmt.Sprintf("%s / %s", tc.query, schema), func(t *testing.T) { - // reset queries for test - mockSchema.resetQueries() - - t.Log("========= Running query", tc.query, "with schema", schema) - matchers, err := parser.ParseMetricSelector(tc.query) - if err != nil { - t.Fatal(err) - } - - _, err = store.Get(ctx, userID, from, through, matchers...) - require.NoError(t, err) - - qs := mockSchema.getQueries() - sort.Strings(qs) - - if !reflect.DeepEqual(tc.expect, qs) { - t.Fatalf("%s: wrong queries - %s", tc.query, test.Diff(tc.expect, qs)) - } - }) - } - } -} - -type mockBaseSchema struct { - schema BaseSchema - - mu sync.Mutex - queries []string -} - -func (m *mockBaseSchema) getQueries() []string { - m.mu.Lock() - defer m.mu.Unlock() - return m.queries -} - -func (m *mockBaseSchema) resetQueries() { - m.mu.Lock() - defer m.mu.Unlock() - m.queries = nil -} - -func (m *mockBaseSchema) GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) { - m.mu.Lock() - m.queries = append(m.queries, metricName) - m.mu.Unlock() - - return m.schema.GetReadQueriesForMetric(from, through, userID, metricName) -} - -func (m *mockBaseSchema) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error) { - m.mu.Lock() - m.queries = append(m.queries, fmt.Sprintf("%s{%s}", metricName, labelName)) - m.mu.Unlock() - - return m.schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName) -} - -func (m *mockBaseSchema) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error) { - m.mu.Lock() - m.queries = append(m.queries, fmt.Sprintf("%s{%s=%q}", metricName, labelName, labelValue)) - m.mu.Unlock() - return m.schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue) -} - -func (m *mockBaseSchema) FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery { - return m.schema.FilterReadQueries(queries, shard) -} - -type mockStoreSchema struct { - *mockBaseSchema - schema StoreSchema -} - -func (m mockStoreSchema) GetWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) { - return m.schema.GetWriteEntries(from, through, userID, metricName, labels, chunkID) -} - -type mockSeriesStoreSchema struct { - *mockBaseSchema - schema SeriesStoreSchema -} - -func (m mockSeriesStoreSchema) GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error) { - return m.schema.GetCacheKeysAndLabelWriteEntries(from, through, userID, metricName, labels, chunkID) -} - -func (m mockSeriesStoreSchema) GetChunkWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) { - return m.schema.GetChunkWriteEntries(from, through, userID, metricName, labels, chunkID) -} - -func (m mockSeriesStoreSchema) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) { - return m.schema.GetChunksForSeries(from, through, userID, seriesID) -} - -func (m mockSeriesStoreSchema) GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) { - return m.schema.GetLabelNamesForSeries(from, through, userID, seriesID) -} - -func (m mockSeriesStoreSchema) GetSeriesDeleteEntries(from, through model.Time, userID string, metric labels.Labels, hasChunksForIntervalFunc hasChunksForIntervalFunc) ([]IndexEntry, error) { - return m.schema.GetSeriesDeleteEntries(from, through, userID, metric, hasChunksForIntervalFunc) -} - func mustNewLabelMatcher(matchType labels.MatchType, name string, value string) *labels.Matcher { return labels.MustNewMatcher(matchType, name, value) } @@ -1006,13 +836,13 @@ func TestStoreMaxLookBack(t *testing.T) { require.Equal(t, now, chunks[0].Through) } -func benchmarkParseIndexEntries(i int64, b *testing.B) { +func benchmarkParseIndexEntries(i int64, regex string, b *testing.B) { b.ReportAllocs() b.StopTimer() store := &store{} ctx := context.Background() entries := generateIndexEntries(i) - matcher, err := labels.NewMatcher(labels.MatchRegexp, "", ".*") + matcher, err := labels.NewMatcher(labels.MatchRegexp, "", regex) if err != nil { b.Fatal(err) } @@ -1022,16 +852,29 @@ func benchmarkParseIndexEntries(i int64, b *testing.B) { if err != nil { b.Fatal(err) } - if len(keys) != len(entries)/2 { + if regex == ".*" && len(keys) != len(entries)/2 { b.Fatalf("expected keys:%d got:%d", len(entries)/2, len(keys)) } } } -func BenchmarkParseIndexEntries500(b *testing.B) { benchmarkParseIndexEntries(500, b) } -func BenchmarkParseIndexEntries2500(b *testing.B) { benchmarkParseIndexEntries(2500, b) } -func BenchmarkParseIndexEntries10000(b *testing.B) { benchmarkParseIndexEntries(10000, b) } -func BenchmarkParseIndexEntries50000(b *testing.B) { benchmarkParseIndexEntries(50000, b) } +func BenchmarkParseIndexEntries500(b *testing.B) { benchmarkParseIndexEntries(500, ".*", b) } +func BenchmarkParseIndexEntries2500(b *testing.B) { benchmarkParseIndexEntries(2500, ".*", b) } +func BenchmarkParseIndexEntries10000(b *testing.B) { benchmarkParseIndexEntries(10000, ".*", b) } +func BenchmarkParseIndexEntries50000(b *testing.B) { benchmarkParseIndexEntries(50000, ".*", b) } + +func BenchmarkParseIndexEntriesRegexSet500(b *testing.B) { + benchmarkParseIndexEntries(500, "labelvalue0|labelvalue1|labelvalue2|labelvalue3|labelvalue600", b) +} +func BenchmarkParseIndexEntriesRegexSet2500(b *testing.B) { + benchmarkParseIndexEntries(2500, "labelvalue0|labelvalue1|labelvalue2|labelvalue3|labelvalue600", b) +} +func BenchmarkParseIndexEntriesRegexSet10000(b *testing.B) { + benchmarkParseIndexEntries(10000, "labelvalue0|labelvalue1|labelvalue2|labelvalue3|labelvalue600", b) +} +func BenchmarkParseIndexEntriesRegexSet50000(b *testing.B) { + benchmarkParseIndexEntries(50000, "labelvalue0|labelvalue1|labelvalue2|labelvalue3|labelvalue600", b) +} func generateIndexEntries(n int64) []IndexEntry { res := make([]IndexEntry, 0, n)