From 81e9d4f2322990f974608ecf5fa909e64c56b87f Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 5 Feb 2021 14:52:05 -0500 Subject: [PATCH 01/29] WIP matchers for label endpoints --- src/query/api/v1/handler/prometheus/common.go | 84 ++++++++++++++++--- 1 file changed, 71 insertions(+), 13 deletions(-) diff --git a/src/query/api/v1/handler/prometheus/common.go b/src/query/api/v1/handler/prometheus/common.go index 87859f9caf..1f00c1c23d 100644 --- a/src/query/api/v1/handler/prometheus/common.go +++ b/src/query/api/v1/handler/prometheus/common.go @@ -211,7 +211,10 @@ func ParseSeriesMatchQuery( parseOpts xpromql.ParseOptions, tagOptions models.TagOptions, ) ([]*storage.FetchQuery, error) { - r.ParseForm() + if err := r.ParseForm(); err != nil { + return nil, xerrors.NewInvalidParamsError(err) + } + matcherValues := r.Form["match[]"] if len(matcherValues) == 0 { return nil, xerrors.NewInvalidParamsError(errors.ErrInvalidMatchers) @@ -222,19 +225,17 @@ func ParseSeriesMatchQuery( return nil, err } - queries := make([]*storage.FetchQuery, len(matcherValues)) - fn := parseOpts.MetricSelectorFn() - for i, s := range matcherValues { - promMatchers, err := fn(s) - if err != nil { - return nil, xerrors.NewInvalidParamsError(err) - } - - matchers, err := xpromql.LabelMatchersToModelMatcher(promMatchers, tagOptions) - if err != nil { - return nil, xerrors.NewInvalidParamsError(err) - } + matchers, ok, err := ParseMatch(r, parseOpts, tagOptions) + if err != nil { + return nil, err + } + if !ok { + return nil, xerrors.NewInvalidParamsError( + fmt.Errorf("need more than one matcher: expected>=1, actual=%d", len(matchers))) + } + queries := make([]*storage.FetchQuery, 0, len(matcherValues)) + for _, m := range matcherValues { queries[i] = &storage.FetchQuery{ Raw: fmt.Sprintf("match[]=%s", s), TagMatchers: matchers, @@ -246,6 +247,63 @@ func ParseSeriesMatchQuery( return queries, nil } +// ParsedMatch is a parsed matched. +type ParsedMatch struct { + Match string + Matchers []models.Matchers +} + +// ParseMatch parses all match params from the GET request. +func ParseMatch( + r *http.Request, + parseOpts xpromql.ParseOptions, + tagOptions models.TagOptions, +) ([]ParsedMatch, bool, error) { + if err := r.ParseForm(); err != nil { + return nil, false, xerrors.NewInvalidParamsError(err) + } + + matcherValues := r.Form["match[]"] + if len(matcherValues) == 0 { + return nil, false, nil + } + + matchers := make([]models.Matchers, 0, len(matcherValues)) + for _, str := range matcherValues { + m, err := parseMatch(r, parseOpts, tagOptions, str) + if err != nil { + return nil, false, err + } + matchers = append(matchers, ParsedMatch{ + Match: str, + Matchers: m, + }) + } + + return matchers, true, nil +} + +func parseMatch( + r *http.Request, + parseOpts xpromql.ParseOptions, + tagOptions models.TagOptions, + matcher string, +) (models.Matchers, error) { + fn := parseOpts.MetricSelectorFn() + + promMatchers, err := fn(matcher) + if err != nil { + return nil, xerrors.NewInvalidParamsError(err) + } + + matchers, err := xpromql.LabelMatchersToModelMatcher(promMatchers, tagOptions) + if err != nil { + return nil, xerrors.NewInvalidParamsError(err) + } + + return matchers, nil +} + func renderNameOnlyTagCompletionResultsJSON( w io.Writer, results []consolidators.CompletedTag, From bb694af36a9f101303c8a80f1284b6fc14f89e9c Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 5 Feb 2021 15:02:17 -0500 Subject: [PATCH 02/29] Add parsing in endpoints --- src/query/api/v1/handler/prometheus/common.go | 14 +++---- .../v1/handler/prometheus/native/list_tags.go | 23 ++++++++++- .../handler/prometheus/remote/tag_values.go | 41 ++++++++++++++----- 3 files changed, 59 insertions(+), 19 deletions(-) diff --git a/src/query/api/v1/handler/prometheus/common.go b/src/query/api/v1/handler/prometheus/common.go index 1f00c1c23d..8b4f5ce90b 100644 --- a/src/query/api/v1/handler/prometheus/common.go +++ b/src/query/api/v1/handler/prometheus/common.go @@ -235,13 +235,13 @@ func ParseSeriesMatchQuery( } queries := make([]*storage.FetchQuery, 0, len(matcherValues)) - for _, m := range matcherValues { - queries[i] = &storage.FetchQuery{ - Raw: fmt.Sprintf("match[]=%s", s), - TagMatchers: matchers, + for _, m := range matchers { + queries = append(queries, &storage.FetchQuery{ + Raw: fmt.Sprintf("match[]=%s", m.Match), + TagMatchers: m.Matchers, Start: start, End: end, - } + }) } return queries, nil @@ -250,7 +250,7 @@ func ParseSeriesMatchQuery( // ParsedMatch is a parsed matched. type ParsedMatch struct { Match string - Matchers []models.Matchers + Matchers models.Matchers } // ParseMatch parses all match params from the GET request. @@ -268,7 +268,7 @@ func ParseMatch( return nil, false, nil } - matchers := make([]models.Matchers, 0, len(matcherValues)) + matchers := make([]ParsedMatch, 0, len(matcherValues)) for _, str := range matcherValues { m, err := parseMatch(r, parseOpts, tagOptions, str) if err != nil { diff --git a/src/query/api/v1/handler/prometheus/native/list_tags.go b/src/query/api/v1/handler/prometheus/native/list_tags.go index 23aa9268c8..33e77fad9a 100644 --- a/src/query/api/v1/handler/prometheus/native/list_tags.go +++ b/src/query/api/v1/handler/prometheus/native/list_tags.go @@ -22,6 +22,7 @@ package native import ( "context" + "fmt" "net/http" "github.com/m3db/m3/src/query/api/v1/handler" @@ -32,6 +33,7 @@ import ( "github.com/m3db/m3/src/query/parser/promql" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/util/logging" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" @@ -54,6 +56,7 @@ type ListTagsHandler struct { fetchOptionsBuilder handleroptions.FetchOptionsBuilder parseOpts promql.ParseOptions instrumentOpts instrument.Options + tagOpts models.TagOptions } // NewListTagsHandler returns a new instance of handler. @@ -63,6 +66,7 @@ func NewListTagsHandler(opts options.HandlerOptions) http.Handler { fetchOptionsBuilder: opts.FetchOptionsBuilder(), parseOpts: promql.NewParseOptions().SetNowFn(opts.NowFn()), instrumentOpts: opts.InstrumentOpts(), + tagOpts: opts.TagOptions(), } } @@ -77,9 +81,26 @@ func (h *ListTagsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + tagMatchers := models.Matchers{{Type: models.MatchAll}} + reqTagMatchers, ok, err := prometheus.ParseMatch(r, h.parseOpts, h.tagOpts) + if err != nil { + err = xerrors.NewInvalidParamsError(err) + xhttp.WriteError(w, err) + return + } + if ok { + if n := len(reqTagMatchers); n != 1 { + err = xerrors.NewInvalidParamsError(fmt.Errorf( + "only single tag matcher allowed: actual=%d", n)) + xhttp.WriteError(w, err) + return + } + tagMatchers = reqTagMatchers[0].Matchers + } + query := &storage.CompleteTagsQuery{ CompleteNameOnly: true, - TagMatchers: models.Matchers{{Type: models.MatchAll}}, + TagMatchers: tagMatchers, Start: start, End: end, } diff --git a/src/query/api/v1/handler/prometheus/remote/tag_values.go b/src/query/api/v1/handler/prometheus/remote/tag_values.go index 61c5935130..01d9e7910b 100644 --- a/src/query/api/v1/handler/prometheus/remote/tag_values.go +++ b/src/query/api/v1/handler/prometheus/remote/tag_values.go @@ -22,6 +22,7 @@ package remote import ( "context" + "fmt" "net/http" "github.com/m3db/m3/src/query/api/v1/handler" @@ -34,6 +35,7 @@ import ( "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/storage/m3/consolidators" "github.com/m3db/m3/src/query/util/logging" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" xhttp "github.com/m3db/m3/src/x/net/http" @@ -59,6 +61,7 @@ type TagValuesHandler struct { fetchOptionsBuilder handleroptions.FetchOptionsBuilder parseOpts promql.ParseOptions instrumentOpts instrument.Options + tagOpts models.TagOptions } // TagValuesResponse is the response that gets returned to the user @@ -67,12 +70,13 @@ type TagValuesResponse struct { } // NewTagValuesHandler returns a new instance of handler. -func NewTagValuesHandler(options options.HandlerOptions) http.Handler { +func NewTagValuesHandler(opts options.HandlerOptions) http.Handler { return &TagValuesHandler{ - storage: options.Storage(), - fetchOptionsBuilder: options.FetchOptionsBuilder(), - parseOpts: promql.NewParseOptions().SetNowFn(options.NowFn()), - instrumentOpts: options.InstrumentOpts(), + storage: opts.Storage(), + fetchOptionsBuilder: opts.FetchOptionsBuilder(), + parseOpts: promql.NewParseOptions().SetNowFn(opts.NowFn()), + instrumentOpts: opts.InstrumentOpts(), + tagOpts: opts.TagOptions(), } } @@ -125,16 +129,31 @@ func (h *TagValuesHandler) parseTagValuesToQuery( } nameBytes := []byte(name) + + tagMatchers := models.Matchers{ + models.Matcher{ + Type: models.MatchField, + Name: nameBytes, + }, + } + reqTagMatchers, ok, err := prometheus.ParseMatch(r, h.parseOpts, h.tagOpts) + if err != nil { + return nil, xerrors.NewInvalidParamsError(err) + } + if ok { + if n := len(reqTagMatchers); n != 1 { + err := xerrors.NewInvalidParamsError(fmt.Errorf( + "only single tag matcher allowed: actual=%d", n)) + return nil, err + } + tagMatchers = reqTagMatchers[0].Matchers + } + return &storage.CompleteTagsQuery{ Start: start, End: end, CompleteNameOnly: false, FilterNameTags: [][]byte{nameBytes}, - TagMatchers: models.Matchers{ - models.Matcher{ - Type: models.MatchField, - Name: nameBytes, - }, - }, + TagMatchers: tagMatchers, }, nil } From f06ce87ad0a32547a1ae0d72e2c1e030d01c81ac Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 5 Feb 2021 16:05:01 -0500 Subject: [PATCH 03/29] Label / label values integration test --- .../prometheus/test.sh | 44 +++++++++++++++++++ src/query/api/v1/handler/prometheus/common.go | 3 +- 2 files changed, 45 insertions(+), 2 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 1d3482c232..10c614511f 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -391,6 +391,48 @@ function test_series { '[[ $(curl -s "0.0.0.0:7201/api/v1/series?match[]=prometheus_remote_storage_samples_total&start=-292273086-05-16T16:47:06Z&end=292277025-08-18T07:12:54.999999999Z" | jq -r ".data | length") -eq 1 ]]' } +function test_labels { + TAG_NAME_0="name_0" TAG_VALUE_0="value_0_1" \ + TAG_NAME_1="name_1" TAG_VALUE_1="value_1_1" \ + TAG_NAME_2="name_2" TAG_VALUE_1="value_2_1" \ + prometheus_remote_write \ + label_metric now 42.42 \ + true "Expected request to succeed" \ + 200 "Expected request to return status code 200" + + TAG_NAME_0="name_0" TAG_VALUE_0="value_0_2" \ + TAG_NAME_1="name_1" TAG_VALUE_1="value_1_2" \ + prometheus_remote_write \ + label_metric_2 now 42.42 \ + true "Expected request to succeed" \ + 200 "Expected request to return status code 200" + + sleep 100000 + # Test label search with match + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 77 ]]' # 77 withou a match + + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 3 ]]' + + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric_2&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 2 ]]' + + # Test label values search with match + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 2 ]]' # two values without a match + + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 1 ]]' + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric&start=0&end=9999999999999.99999" | jq -r ".data[0]") = "value_1_1" ]]' + + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric_2&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 1 ]]' + ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric_2&start=0&end=9999999999999.99999" | jq -r ".data[0]") = "value_1_2" ]]' +} + echo "Running readiness test" test_readiness @@ -409,6 +451,8 @@ test_prometheus_query_native_timeout test_query_restrict_tags test_prometheus_remote_write_map_tags test_series +test_labels +test_label_values echo "Running function correctness tests" test_correctness diff --git a/src/query/api/v1/handler/prometheus/common.go b/src/query/api/v1/handler/prometheus/common.go index 8b4f5ce90b..5f4d4082d1 100644 --- a/src/query/api/v1/handler/prometheus/common.go +++ b/src/query/api/v1/handler/prometheus/common.go @@ -270,7 +270,7 @@ func ParseMatch( matchers := make([]ParsedMatch, 0, len(matcherValues)) for _, str := range matcherValues { - m, err := parseMatch(r, parseOpts, tagOptions, str) + m, err := parseMatch(parseOpts, tagOptions, str) if err != nil { return nil, false, err } @@ -284,7 +284,6 @@ func ParseMatch( } func parseMatch( - r *http.Request, parseOpts xpromql.ParseOptions, tagOptions models.TagOptions, matcher string, From 2d09465e1f5a6167f92350b2c63f17c2ddcff2f7 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 5 Feb 2021 17:21:28 -0500 Subject: [PATCH 04/29] Add list tags unit test --- .../prometheus/test.sh | 3 +- .../prometheus/native/list_tags_test.go | 63 ++++++++++++++----- src/query/parser/promql/matchers.go | 1 - 3 files changed, 49 insertions(+), 18 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 10c614511f..608462671e 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -394,7 +394,7 @@ function test_series { function test_labels { TAG_NAME_0="name_0" TAG_VALUE_0="value_0_1" \ TAG_NAME_1="name_1" TAG_VALUE_1="value_1_1" \ - TAG_NAME_2="name_2" TAG_VALUE_1="value_2_1" \ + TAG_NAME_2="name_2" TAG_VALUE_2="value_2_1" \ prometheus_remote_write \ label_metric now 42.42 \ true "Expected request to succeed" \ @@ -408,6 +408,7 @@ function test_labels { 200 "Expected request to return status code 200" sleep 100000 + # Test label search with match ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 77 ]]' # 77 withou a match diff --git a/src/query/api/v1/handler/prometheus/native/list_tags_test.go b/src/query/api/v1/handler/prometheus/native/list_tags_test.go index f0e2e24c4b..c4e8a3cf53 100644 --- a/src/query/api/v1/handler/prometheus/native/list_tags_test.go +++ b/src/query/api/v1/handler/prometheus/native/list_tags_test.go @@ -116,32 +116,63 @@ func testListTags(t *testing.T, meta block.ResultMetadata, header string) { opts := options.EmptyHandlerOptions(). SetStorage(store). SetFetchOptionsBuilder(fb). + SetTagOptions(models.NewTagOptions()). SetNowFn(nowFn) h := NewListTagsHandler(opts) for _, method := range []string{"GET", "POST"} { - matcher := &listTagsMatcher{start: time.Unix(0, 0), end: now} - store.EXPECT().CompleteTags(gomock.Any(), matcher, gomock.Any()). - Return(storeResult, nil) + testListTagsWithMatch(t, now, store, storeResult, method, header, h, false) + testListTagsWithMatch(t, now, store, storeResult, method, header, h, true) + } +} - req := httptest.NewRequest(method, "/labels", nil) - w := httptest.NewRecorder() +func testListTagsWithMatch( + t *testing.T, + now time.Time, + store *storage.MockStorage, + storeResult *consolidators.CompleteTagsResult, + method string, + header string, + h http.Handler, + withMatchOverride bool, +) { + tagMatcher := models.Matchers{{Type: models.MatchAll}} + target := "/labels" + if withMatchOverride { + tagMatcher = models.Matchers{{ + Type: models.MatchEqual, + Name: []byte("__name__"), + Value: []byte("testing"), + }} + target = "/labels?match[]=testing" + } - h.ServeHTTP(w, req) + matcher := &storage.CompleteTagsQuery{ + CompleteNameOnly: true, + TagMatchers: tagMatcher, + Start: time.Unix(0, 0), + End: now, + } + store.EXPECT().CompleteTags(gomock.Any(), gomock.Eq(matcher), gomock.Any()). + Return(storeResult, nil) - require.Equal(t, http.StatusOK, w.Result().StatusCode) + req := httptest.NewRequest(method, target, nil) + w := httptest.NewRecorder() - body := w.Result().Body - defer body.Close() + h.ServeHTTP(w, req) - r, err := ioutil.ReadAll(body) - require.NoError(t, err) + require.Equal(t, http.StatusOK, w.Result().StatusCode) - ex := `{"status":"success","data":["bar","baz","foo"]}` - require.Equal(t, ex, string(r)) + body := w.Result().Body + defer body.Close() - actual := w.Header().Get(headers.LimitHeader) - assert.Equal(t, header, actual) - } + r, err := ioutil.ReadAll(body) + require.NoError(t, err) + + ex := `{"status":"success","data":["bar","baz","foo"]}` + require.Equal(t, ex, string(r)) + + actual := w.Header().Get(headers.LimitHeader) + assert.Equal(t, header, actual) } func TestListErrorTags(t *testing.T) { diff --git a/src/query/parser/promql/matchers.go b/src/query/parser/promql/matchers.go index c25e618a61..dec2c38168 100644 --- a/src/query/parser/promql/matchers.go +++ b/src/query/parser/promql/matchers.go @@ -419,7 +419,6 @@ func LabelMatchersToModelMatcher( ) (models.Matchers, error) { matchers := make(models.Matchers, 0, len(lMatchers)) for _, m := range lMatchers { - // here. matchType, err := promTypeToM3(m.Type) if err != nil { return nil, err From e50225f74edaab943b5adf6549432c3bf8fda99c Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 5 Feb 2021 17:50:16 -0500 Subject: [PATCH 05/29] Ensure name matcher is included in tag values API --- .../handler/prometheus/remote/tag_values.go | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/query/api/v1/handler/prometheus/remote/tag_values.go b/src/query/api/v1/handler/prometheus/remote/tag_values.go index 01d9e7910b..29861d7774 100644 --- a/src/query/api/v1/handler/prometheus/remote/tag_values.go +++ b/src/query/api/v1/handler/prometheus/remote/tag_values.go @@ -21,6 +21,7 @@ package remote import ( + "bytes" "context" "fmt" "net/http" @@ -130,12 +131,11 @@ func (h *TagValuesHandler) parseTagValuesToQuery( nameBytes := []byte(name) - tagMatchers := models.Matchers{ - models.Matcher{ - Type: models.MatchField, - Name: nameBytes, - }, + nameMatcher := models.Matcher{ + Type: models.MatchField, + Name: nameBytes, } + tagMatchers := models.Matchers{nameMatcher} reqTagMatchers, ok, err := prometheus.ParseMatch(r, h.parseOpts, h.tagOpts) if err != nil { return nil, xerrors.NewInvalidParamsError(err) @@ -146,7 +146,15 @@ func (h *TagValuesHandler) parseTagValuesToQuery( "only single tag matcher allowed: actual=%d", n)) return nil, err } - tagMatchers = reqTagMatchers[0].Matchers + + reqTagMatcher := reqTagMatchers[0] + + for _, m := range reqTagMatcher.Matchers { + // add all matchers that don't match the default name matcher. + if m.Type != nameMatcher.Type || !bytes.Equal(m.Name, nameMatcher.Name) { + tagMatchers = append(tagMatchers, m) + } + } } return &storage.CompleteTagsQuery{ From 654728a876f40ccf9c49796b7f231e7da125b4a1 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 5 Feb 2021 18:09:33 -0500 Subject: [PATCH 06/29] Add tag values unit test for match[] --- .../prometheus/remote/tag_values_test.go | 104 ++++++++++++------ 1 file changed, 69 insertions(+), 35 deletions(-) diff --git a/src/query/api/v1/handler/prometheus/remote/tag_values_test.go b/src/query/api/v1/handler/prometheus/remote/tag_values_test.go index cf617060ae..e6244ae104 100644 --- a/src/query/api/v1/handler/prometheus/remote/tag_values_test.go +++ b/src/query/api/v1/handler/prometheus/remote/tag_values_test.go @@ -111,6 +111,7 @@ func TestTagValues(t *testing.T) { opts := options.EmptyHandlerOptions(). SetStorage(store). SetNowFn(nowFn). + SetTagOptions(models.NewTagOptions()). SetFetchOptionsBuilder(fb) valueHandler := NewTagValuesHandler(opts) @@ -123,50 +124,83 @@ func TestTagValues(t *testing.T) { url := fmt.Sprintf("/label/{%s}/values", NameReplace) for _, tt := range names { - path := fmt.Sprintf("/label/%s/values?start=100", tt.name) - req, err := http.NewRequest("GET", path, nil) - if err != nil { - t.Fatal(err) - } + testTagValuesWithMatch(t, now, store, tt.name, url, valueHandler, false) + testTagValuesWithMatch(t, now, store, tt.name, url, valueHandler, true) + } +} - rr := httptest.NewRecorder() - router := mux.NewRouter() - matcher := &tagValuesMatcher{ - start: time.Unix(100, 0), - end: now, - filterTag: tt.name, +func testTagValuesWithMatch( + t *testing.T, + now time.Time, + store *storage.MockStorage, + name string, + url string, + valueHandler http.Handler, + withMatchOverride bool, +) { + path := fmt.Sprintf("/label/%s/values?start=100", name) + nameMatcher := models.Matcher{ + Type: models.MatchField, + Name: []byte(name), + } + matchers := models.Matchers{nameMatcher} + if withMatchOverride { + path = fmt.Sprintf("/label/%s/values?start=100&match[]=testing", name) + matchers = models.Matchers{ + nameMatcher, + { + Type: models.MatchEqual, + Name: []byte("__name__"), + Value: []byte("testing"), + }, } + } - storeResult := &consolidators.CompleteTagsResult{ - CompleteNameOnly: false, - CompletedTags: []consolidators.CompletedTag{ - { - Name: b(tt.name), - Values: bs("a", "b", "c", tt.name), - }, - }, - Metadata: block.ResultMetadata{ - Exhaustive: false, - Warnings: []block.Warning{{Name: "foo", Message: "bar"}}, + matcher := &storage.CompleteTagsQuery{ + Start: time.Unix(100, 0), + End: now, + CompleteNameOnly: false, + FilterNameTags: [][]byte{[]byte(name)}, + TagMatchers: matchers, + } + + req, err := http.NewRequest("GET", path, nil) + if err != nil { + t.Fatal(err) + } + + rr := httptest.NewRecorder() + router := mux.NewRouter() + + storeResult := &consolidators.CompleteTagsResult{ + CompleteNameOnly: false, + CompletedTags: []consolidators.CompletedTag{ + { + Name: b(name), + Values: bs("a", "b", "c", name), }, - } + }, + Metadata: block.ResultMetadata{ + Exhaustive: false, + Warnings: []block.Warning{{Name: "foo", Message: "bar"}}, + }, + } - store.EXPECT().CompleteTags(gomock.Any(), matcher, gomock.Any()). - Return(storeResult, nil) + store.EXPECT().CompleteTags(gomock.Any(), gomock.Eq(matcher), gomock.Any()). + Return(storeResult, nil) - router.HandleFunc(url, valueHandler.ServeHTTP) - router.ServeHTTP(rr, req) + router.HandleFunc(url, valueHandler.ServeHTTP) + router.ServeHTTP(rr, req) - read, err := ioutil.ReadAll(rr.Body) - require.NoError(t, err) + read, err := ioutil.ReadAll(rr.Body) + require.NoError(t, err) - ex := fmt.Sprintf(`{"status":"success","data":["a","b","c","%s"]}`, tt.name) - assert.Equal(t, ex, string(read)) + ex := fmt.Sprintf(`{"status":"success","data":["a","b","c","%s"]}`, name) + assert.Equal(t, ex, string(read)) - warning := rr.Header().Get(headers.LimitHeader) - exWarn := fmt.Sprintf("%s,foo_bar", headers.LimitHeaderSeriesLimitApplied) - assert.Equal(t, exWarn, warning) - } + warning := rr.Header().Get(headers.LimitHeader) + exWarn := fmt.Sprintf("%s,foo_bar", headers.LimitHeaderSeriesLimitApplied) + assert.Equal(t, exWarn, warning) } func TestTagValueErrors(t *testing.T) { From 06b3eec01a44426760cd47136bad6237e312fc86 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 5 Feb 2021 18:19:19 -0500 Subject: [PATCH 07/29] Add ability to skip fields altogether if does not match restrict by query for aggregate result --- src/dbnode/storage/index/block.go | 4 +- .../storage/index/fields_terms_iterator.go | 35 ++++++++++--- .../storage/index/filter_fields_iterator.go | 52 ++++++++++++------- .../storage/index/read_through_segment.go | 5 ++ src/m3ninx/index/segment/fst/segment.go | 29 +++++++++++ src/m3ninx/index/segment/mem/reader.go | 4 ++ src/m3ninx/index/segment/mem/types.go | 1 + src/m3ninx/index/segment/types.go | 1 + 8 files changed, 103 insertions(+), 28 deletions(-) diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index 1b02e60b53..5f64fb6447 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -626,7 +626,7 @@ func (b *block) aggregateWithSpan( } return aggOpts.FieldFilter.Allow(field) }, - fieldIterFn: func(r segment.Reader) (segment.FieldsIterator, error) { + fieldIterFn: func(r segment.Reader) (segment.FieldsPostingsListIterator, error) { // NB(prateek): we default to using the regular (FST) fields iterator // unless we have a predefined list of fields we know we need to restrict // our search to, in which case we iterate that list and check if known values @@ -638,7 +638,7 @@ func (b *block) aggregateWithSpan( // to this function is expected to have (FieldsFilter) pretty small. If that changes // in the future, we can revisit this. if len(aggOpts.FieldFilter) == 0 { - return r.Fields() + return r.FieldsPostingsList() } return newFilterFieldsIterator(r, aggOpts.FieldFilter) }, diff --git a/src/dbnode/storage/index/fields_terms_iterator.go b/src/dbnode/storage/index/fields_terms_iterator.go index 186c32d47b..1761933397 100644 --- a/src/dbnode/storage/index/fields_terms_iterator.go +++ b/src/dbnode/storage/index/fields_terms_iterator.go @@ -49,23 +49,23 @@ func (o fieldsAndTermsIteratorOpts) allow(f []byte) bool { return o.allowFn(f) } -func (o fieldsAndTermsIteratorOpts) newFieldIter(r segment.Reader) (segment.FieldsIterator, error) { +func (o fieldsAndTermsIteratorOpts) newFieldIter(r segment.Reader) (segment.FieldsPostingsListIterator, error) { if o.fieldIterFn == nil { - return r.Fields() + return r.FieldsPostingsList() } return o.fieldIterFn(r) } type allowFn func(field []byte) bool -type newFieldIterFn func(r segment.Reader) (segment.FieldsIterator, error) +type newFieldIterFn func(r segment.Reader) (segment.FieldsPostingsListIterator, error) type fieldsAndTermsIter struct { reader segment.Reader opts fieldsAndTermsIteratorOpts err error - fieldIter segment.FieldsIterator + fieldIter segment.FieldsPostingsListIterator termIter segment.TermsIterator current struct { @@ -145,10 +145,33 @@ func (fti *fieldsAndTermsIter) setNextField() bool { } for fieldIter.Next() { - field := fieldIter.Current() + field, pl := fieldIter.Current() if !fti.opts.allow(field) { continue } + if fti.restrictByPostings == nil { + // No restrictions. + fti.current.field = field + return true + } + + bitmap, ok := roaring.BitmapFromPostingsList(pl) + if !ok { + fti.err = errUnpackBitmapFromPostingsList + return false + } + + // Check field is part of at least some of the documents we're + // restricted to providing results for based on intersection + // count. + // Note: IntersectionCount is significantly faster than intersecting and + // counting results and also does not allocate. + if n := fti.restrictByPostings.IntersectionCount(bitmap); n < 1 { + // No match, not next result. + continue + } + + // Matches, this is next result. fti.current.field = field return true } @@ -213,7 +236,7 @@ func (fti *fieldsAndTermsIter) nextTermsIterResult() (bool, error) { return false, errUnpackBitmapFromPostingsList } - // Check term isn part of at least some of the documents we're + // Check term is part of at least some of the documents we're // restricted to providing results for based on intersection // count. // Note: IntersectionCount is significantly faster than intersecting and diff --git a/src/dbnode/storage/index/filter_fields_iterator.go b/src/dbnode/storage/index/filter_fields_iterator.go index 96ac7570e2..7f94f03d6a 100644 --- a/src/dbnode/storage/index/filter_fields_iterator.go +++ b/src/dbnode/storage/index/filter_fields_iterator.go @@ -21,9 +21,11 @@ package index import ( + "bytes" "errors" "github.com/m3db/m3/src/m3ninx/index/segment" + "github.com/m3db/m3/src/m3ninx/postings" ) var ( @@ -33,54 +35,64 @@ var ( func newFilterFieldsIterator( reader segment.Reader, fields AggregateFieldFilter, -) (segment.FieldsIterator, error) { +) (segment.FieldsPostingsListIterator, error) { if len(fields) == 0 { return nil, errNoFiltersSpecified } + fieldsIter, err := reader.FieldsPostingsList() + if err != nil { + return nil, err + } return &filterFieldsIterator{ reader: reader, + fieldsIter: fieldsIter, fields: fields, currentIdx: -1, }, nil } type filterFieldsIterator struct { - reader segment.Reader - fields AggregateFieldFilter + reader segment.Reader + fieldsIter segment.FieldsPostingsListIterator + fields AggregateFieldFilter err error currentIdx int } -var _ segment.FieldsIterator = &filterFieldsIterator{} +var _ segment.FieldsPostingsListIterator = &filterFieldsIterator{} func (f *filterFieldsIterator) Next() bool { if f.err != nil { return false } - f.currentIdx++ // required because we start at -1 - for f.currentIdx < len(f.fields) { - field := f.fields[f.currentIdx] + for f.fieldsIter.Next() { + field, _ := f.fieldsIter.Current() - ok, err := f.reader.ContainsField(field) - if err != nil { - f.err = err - return false + found := false + for _, f := range f.fields { + if bytes.Equal(field, f) { + found = true + break + } } - - // i.e. we found a field from the filter list contained in the segment. - if ok { + if found { return true } - - // the current field is unsuitable, so we skip to the next possiblity. - f.currentIdx++ } return false } -func (f *filterFieldsIterator) Current() []byte { return f.fields[f.currentIdx] } -func (f *filterFieldsIterator) Err() error { return f.err } -func (f *filterFieldsIterator) Close() error { return nil } +func (f *filterFieldsIterator) Current() ([]byte, postings.List) { + return f.fieldsIter.Current() +} + +func (f *filterFieldsIterator) Err() error { + return f.err +} + +func (f *filterFieldsIterator) Close() error { + return nil +} diff --git a/src/dbnode/storage/index/read_through_segment.go b/src/dbnode/storage/index/read_through_segment.go index 089fd546bc..38d5a6e4c4 100644 --- a/src/dbnode/storage/index/read_through_segment.go +++ b/src/dbnode/storage/index/read_through_segment.go @@ -287,6 +287,11 @@ func (s *readThroughSegmentReader) Fields() (segment.FieldsIterator, error) { return s.reader.Fields() } +// FieldsPostingsList is a pass through call. +func (s *readThroughSegmentReader) FieldsPostingsList() (segment.FieldsPostingsListIterator, error) { + return s.reader.FieldsPostingsList() +} + // ContainsField is a pass through call. func (s *readThroughSegmentReader) ContainsField(field []byte) (bool, error) { return s.reader.ContainsField(field) diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index fd041df720..5c33ab3f7f 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -391,6 +391,22 @@ func (i *termsIterable) termsNotClosedMaybeFinalizedWithRLock( return i.postingsIter, nil } +func (i *termsIterable) fieldsNotClosedMaybeFinalizedWithRLock() (sgmt.FieldsPostingsListIterator, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if i.r.finalized { + return nil, errReaderFinalized + } + + i.fieldsIter.reset(fstTermsIterOpts{ + seg: i.r, + fst: i.r.fieldsFST, + finalizeFST: false, + }) + i.postingsIter.reset(i.r, i.fieldsIter) + return i.postingsIter, nil +} + func (r *fsSegment) UnmarshalPostingsListBitmap(b *pilosaroaring.Bitmap, offset uint64) error { r.RLock() defer r.RUnlock() @@ -879,6 +895,19 @@ func (sr *fsSegmentReader) ContainsField(field []byte) (bool, error) { return sr.fsSegment.fieldsFST.Contains(field) } +func (sr *fsSegmentReader) FieldsPostingsList() (sgmt.FieldsPostingsListIterator, error) { + if sr.closed { + return nil, errReaderClosed + } + if sr.termsIterable == nil { + sr.termsIterable = newTermsIterable(sr.fsSegment) + } + sr.fsSegment.RLock() + iter, err := sr.termsIterable.fieldsNotClosedMaybeFinalizedWithRLock() + sr.fsSegment.RUnlock() + return iter, err +} + func (sr *fsSegmentReader) Terms(field []byte) (sgmt.TermsIterator, error) { if sr.closed { return nil, errReaderClosed diff --git a/src/m3ninx/index/segment/mem/reader.go b/src/m3ninx/index/segment/mem/reader.go index 998a184452..93a42824f9 100644 --- a/src/m3ninx/index/segment/mem/reader.go +++ b/src/m3ninx/index/segment/mem/reader.go @@ -70,6 +70,10 @@ func (r *reader) Terms(field []byte) (sgmt.TermsIterator, error) { return r.segment.Terms(field) } +func (r *reader) FieldsPostingsList() (sgmt.FieldsPostingsListIterator, error) { + return r.segment.FieldsPostingsList() +} + func (r *reader) MatchField(field []byte) (postings.List, error) { // falling back to regexp .* as this segment implementation is only used in tests. return r.MatchRegexp(field, index.DotStarCompiledRegex()) diff --git a/src/m3ninx/index/segment/mem/types.go b/src/m3ninx/index/segment/mem/types.go index 9a5268e0b3..30479fa85f 100644 --- a/src/m3ninx/index/segment/mem/types.go +++ b/src/m3ninx/index/segment/mem/types.go @@ -72,6 +72,7 @@ type ReadableSegment interface { Fields() (sgmt.FieldsIterator, error) ContainsField(field []byte) (bool, error) Terms(field []byte) (sgmt.TermsIterator, error) + FieldsPostingsList() (sgmt.FieldsPostingsListIterator, error) matchTerm(field, term []byte) (postings.List, error) matchRegexp(field []byte, compiled *re.Regexp) (postings.List, error) getDoc(id postings.ID) (doc.Metadata, error) diff --git a/src/m3ninx/index/segment/types.go b/src/m3ninx/index/segment/types.go index 0b176dc6de..61ba1d3cd3 100644 --- a/src/m3ninx/index/segment/types.go +++ b/src/m3ninx/index/segment/types.go @@ -69,6 +69,7 @@ type Reader interface { index.Reader FieldsIterable TermsIterable + FieldsPostingsListIterable // ContainsField returns a bool indicating if the Segment contains the provided field. ContainsField(field []byte) (bool, error) From 01d7605b52bfa1f59748e9817fb8938933213d71 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 5 Feb 2021 18:26:37 -0500 Subject: [PATCH 08/29] Remove sleep --- scripts/docker-integration-tests/prometheus/test.sh | 2 -- 1 file changed, 2 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 608462671e..8fbe710858 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -407,8 +407,6 @@ function test_labels { true "Expected request to succeed" \ 200 "Expected request to return status code 200" - sleep 100000 - # Test label search with match ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 77 ]]' # 77 withou a match From bda7d06d490b3854a0c1ebbea6931507769695f7 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 5 Feb 2021 18:49:11 -0500 Subject: [PATCH 09/29] Use a separate iterator for fields postings list iterator --- src/m3ninx/index/segment/fst/segment.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index 5c33ab3f7f..d90d6ba24d 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -851,10 +851,11 @@ var _ sgmt.Reader = (*fsSegmentReader)(nil) // fsSegmentReader is not thread safe for use and relies on the underlying // segment for synchronization. type fsSegmentReader struct { - closed bool - ctx context.Context - fsSegment *fsSegment - termsIterable *termsIterable + closed bool + ctx context.Context + fsSegment *fsSegment + fieldsIterable *termsIterable + termsIterable *termsIterable } func newReader( @@ -872,6 +873,12 @@ func (sr *fsSegmentReader) Fields() (sgmt.FieldsIterator, error) { return nil, errReaderClosed } + sr.fsSegment.RLock() + defer sr.fsSegment.RUnlock() + if sr.fsSegment.finalized { + return nil, errReaderFinalized + } + iter := newFSTTermsIter() iter.reset(fstTermsIterOpts{ seg: sr.fsSegment, @@ -899,11 +906,11 @@ func (sr *fsSegmentReader) FieldsPostingsList() (sgmt.FieldsPostingsListIterator if sr.closed { return nil, errReaderClosed } - if sr.termsIterable == nil { - sr.termsIterable = newTermsIterable(sr.fsSegment) + if sr.fieldsIterable == nil { + sr.fieldsIterable = newTermsIterable(sr.fsSegment) } sr.fsSegment.RLock() - iter, err := sr.termsIterable.fieldsNotClosedMaybeFinalizedWithRLock() + iter, err := sr.fieldsIterable.fieldsNotClosedMaybeFinalizedWithRLock() sr.fsSegment.RUnlock() return iter, err } From 5f0460daf463feb21f6e0a503699e4ee43491a5b Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 5 Feb 2021 19:15:50 -0500 Subject: [PATCH 10/29] Return new terms iterable each time --- src/m3ninx/index/segment/fst/segment.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index d90d6ba24d..24d42e1082 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -851,11 +851,10 @@ var _ sgmt.Reader = (*fsSegmentReader)(nil) // fsSegmentReader is not thread safe for use and relies on the underlying // segment for synchronization. type fsSegmentReader struct { - closed bool - ctx context.Context - fsSegment *fsSegment - fieldsIterable *termsIterable - termsIterable *termsIterable + closed bool + ctx context.Context + fsSegment *fsSegment + termsIterable *termsIterable } func newReader( @@ -906,11 +905,9 @@ func (sr *fsSegmentReader) FieldsPostingsList() (sgmt.FieldsPostingsListIterator if sr.closed { return nil, errReaderClosed } - if sr.fieldsIterable == nil { - sr.fieldsIterable = newTermsIterable(sr.fsSegment) - } + fieldsIterable := newTermsIterable(sr.fsSegment) sr.fsSegment.RLock() - iter, err := sr.fieldsIterable.fieldsNotClosedMaybeFinalizedWithRLock() + iter, err := fieldsIterable.fieldsNotClosedMaybeFinalizedWithRLock() sr.fsSegment.RUnlock() return iter, err } From 7f96eb0f0cd5d4f083cf647659bc40255c850d70 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 5 Feb 2021 20:16:28 -0500 Subject: [PATCH 11/29] Lookup field postings list from the field data offset --- .../index/segment/fst/fst_terms_iterator.go | 1 + .../fst/fst_terms_postings_iterator.go | 16 +++++- src/m3ninx/index/segment/fst/segment.go | 49 +++++++++++++------ 3 files changed, 50 insertions(+), 16 deletions(-) diff --git a/src/m3ninx/index/segment/fst/fst_terms_iterator.go b/src/m3ninx/index/segment/fst/fst_terms_iterator.go index 39327dc673..734ebbd911 100644 --- a/src/m3ninx/index/segment/fst/fst_terms_iterator.go +++ b/src/m3ninx/index/segment/fst/fst_terms_iterator.go @@ -31,6 +31,7 @@ type fstTermsIterOpts struct { seg *fsSegment fst *vellum.FST finalizeFST bool + fieldsFST bool } func (o fstTermsIterOpts) Close() error { diff --git a/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go b/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go index b0c2c25224..411bd5fbfd 100644 --- a/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go +++ b/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go @@ -21,6 +21,7 @@ package fst import ( + "github.com/m3db/m3/src/m3ninx/generated/proto/fswriter" sgmt "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/postings" postingsroaring "github.com/m3db/m3/src/m3ninx/postings/roaring" @@ -92,8 +93,19 @@ func (f *fstTermsPostingsIter) Next() bool { currOffset := f.termsIter.CurrentOffset() f.seg.RLock() - f.err = f.seg.unmarshalPostingsListBitmapNotClosedMaybeFinalizedWithLock(f.bitmap, - currOffset) + if f.termsIter.opts.fieldsFST { + var fieldsData fswriter.FieldData + fieldsData, f.err = f.seg.unmarshalFieldDataNotClosedMaybeFinalizedWithRLock(currOffset) + if f.err != nil { + currOffset = fieldsData.FieldPostingsListOffset + } + } + if f.err != nil { + // Only attempt if the previous unmarshal definitely succeeded + // if we are operating on a fields FST. + f.err = f.seg.unmarshalPostingsListBitmapNotClosedMaybeFinalizedWithLock(f.bitmap, + currOffset) + } f.seg.RUnlock() return f.err == nil diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index 24d42e1082..e6b27b09c6 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -46,15 +46,16 @@ import ( ) var ( - errReaderClosed = errors.New("segment is closed") - errReaderFinalized = errors.New("segment is finalized") - errReaderNilRegexp = errors.New("nil regexp provided") - errUnsupportedMajorVersion = errors.New("unsupported major version") - errDocumentsDataUnset = errors.New("documents data bytes are not set") - errDocumentsIdxUnset = errors.New("documents index bytes are not set") - errPostingsDataUnset = errors.New("postings data bytes are not set") - errFSTTermsDataUnset = errors.New("fst terms data bytes are not set") - errFSTFieldsDataUnset = errors.New("fst fields data bytes are not set") + errReaderClosed = errors.New("segment is closed") + errReaderFinalized = errors.New("segment is finalized") + errReaderNilRegexp = errors.New("nil regexp provided") + errUnsupportedMajorVersion = errors.New("unsupported major version") + errDocumentsDataUnset = errors.New("documents data bytes are not set") + errDocumentsIdxUnset = errors.New("documents index bytes are not set") + errPostingsDataUnset = errors.New("postings data bytes are not set") + errFSTTermsDataUnset = errors.New("fst terms data bytes are not set") + errFSTFieldsDataUnset = errors.New("fst fields data bytes are not set") + errUnsupportedFeatureFieldsPostingsList = errors.New("fst unsupported operation on old segment version: missing field postings list") ) // SegmentData represent the collection of required parameters to construct a Segment. @@ -402,6 +403,7 @@ func (i *termsIterable) fieldsNotClosedMaybeFinalizedWithRLock() (sgmt.FieldsPos seg: i.r, fst: i.r.fieldsFST, finalizeFST: false, + fieldsFST: true, }) i.postingsIter.reset(i.r, i.fieldsIter) return i.postingsIter, nil @@ -454,18 +456,37 @@ func (r *fsSegment) matchFieldNotClosedMaybeFinalizedWithRLock( return r.opts.PostingsListPool().Get(), nil } - protoBytes, _, err := r.retrieveTermsBytesWithRLock(r.data.FSTTermsData.Bytes, termsFSTOffset) + fieldData, err := r.unmarshalFieldDataNotClosedMaybeFinalizedWithRLock(termsFSTOffset) if err != nil { return nil, err } + postingsOffset := fieldData.FieldPostingsListOffset + return r.retrievePostingsListWithRLock(postingsOffset) +} + +func (r *fsSegment) unmarshalFieldDataNotClosedMaybeFinalizedWithRLock( + fieldDataOffset uint64, +) (fswriter.FieldData, error) { + // NB(r): Not closed, but could be finalized (i.e. closed segment reader) + // calling match field after this segment is finalized. + if r.finalized { + return fswriter.FieldData{}, errReaderFinalized + } + if !r.data.Version.supportsFieldPostingsList() { + return fswriter.FieldData{}, errUnsupportedFeatureFieldsPostingsList + } + + protoBytes, _, err := r.retrieveTermsBytesWithRLock(r.data.FSTTermsData.Bytes, fieldDataOffset) + if err != nil { + return fswriter.FieldData{}, err + } + var fieldData fswriter.FieldData if err := fieldData.Unmarshal(protoBytes); err != nil { - return nil, err + return fswriter.FieldData{}, err } - - postingsOffset := fieldData.FieldPostingsListOffset - return r.retrievePostingsListWithRLock(postingsOffset) + return fieldData, nil } func (r *fsSegment) matchTermNotClosedMaybeFinalizedWithRLock( From d3db7a8a0d446e88be591113aab404e8d87b7f18 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 5 Feb 2021 20:31:48 -0500 Subject: [PATCH 12/29] Use curr offset always --- src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go b/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go index 411bd5fbfd..e16ce21f56 100644 --- a/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go +++ b/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go @@ -96,9 +96,7 @@ func (f *fstTermsPostingsIter) Next() bool { if f.termsIter.opts.fieldsFST { var fieldsData fswriter.FieldData fieldsData, f.err = f.seg.unmarshalFieldDataNotClosedMaybeFinalizedWithRLock(currOffset) - if f.err != nil { - currOffset = fieldsData.FieldPostingsListOffset - } + currOffset = fieldsData.FieldPostingsListOffset } if f.err != nil { // Only attempt if the previous unmarshal definitely succeeded From 083b3d70ebca5860d0c674a653cb954910c373ff Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Fri, 5 Feb 2021 20:38:37 -0500 Subject: [PATCH 13/29] Unmarshal if err == nil --- src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go b/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go index e16ce21f56..b01e998858 100644 --- a/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go +++ b/src/m3ninx/index/segment/fst/fst_terms_postings_iterator.go @@ -98,7 +98,7 @@ func (f *fstTermsPostingsIter) Next() bool { fieldsData, f.err = f.seg.unmarshalFieldDataNotClosedMaybeFinalizedWithRLock(currOffset) currOffset = fieldsData.FieldPostingsListOffset } - if f.err != nil { + if f.err == nil { // Only attempt if the previous unmarshal definitely succeeded // if we are operating on a fields FST. f.err = f.seg.unmarshalPostingsListBitmapNotClosedMaybeFinalizedWithLock(f.bitmap, From 4fbfa66697f6501bd4a42c32a3b6d092eef88e9e Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 5 Feb 2021 21:16:09 -0500 Subject: [PATCH 14/29] Fixup integration test assertions --- scripts/docker-integration-tests/prometheus/test.sh | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 8fbe710858..4e9a44092c 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -409,13 +409,13 @@ function test_labels { # Test label search with match ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 77 ]]' # 77 withou a match + '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?start=0&end=9999999999999.99999" | jq -r ".data | length") -gt 3 ]]' ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 3 ]]' + '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 4 ]]' ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric_2&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 2 ]]' + '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric_2&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 3 ]]' # Test label values search with match ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ @@ -451,7 +451,6 @@ test_query_restrict_tags test_prometheus_remote_write_map_tags test_series test_labels -test_label_values echo "Running function correctness tests" test_correctness From e96015d6f8e38fb439a934580e3538b8082130bc Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 5 Feb 2021 21:30:10 -0500 Subject: [PATCH 15/29] Close fieldsIter on filterFieldsIterator.Close() --- src/dbnode/storage/index/filter_fields_iterator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/storage/index/filter_fields_iterator.go b/src/dbnode/storage/index/filter_fields_iterator.go index 7f94f03d6a..c65a77e680 100644 --- a/src/dbnode/storage/index/filter_fields_iterator.go +++ b/src/dbnode/storage/index/filter_fields_iterator.go @@ -94,5 +94,5 @@ func (f *filterFieldsIterator) Err() error { } func (f *filterFieldsIterator) Close() error { - return nil + return f.fieldsIter.Close() } From 518005f3107340dad259e7783d4f89cc52b9713e Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 5 Feb 2021 21:40:57 -0500 Subject: [PATCH 16/29] Update mocks --- src/m3ninx/index/segment/mem/mem_mock.go | 17 ++++++++++++++++- src/m3ninx/index/segment/segment_mock.go | 15 +++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/src/m3ninx/index/segment/mem/mem_mock.go b/src/m3ninx/index/segment/mem/mem_mock.go index a51c4f759a..b7f5c7a114 100644 --- a/src/m3ninx/index/segment/mem/mem_mock.go +++ b/src/m3ninx/index/segment/mem/mem_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/m3ninx/index/segment/mem (interfaces: ReadableSegment) -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -88,6 +88,21 @@ func (mr *MockReadableSegmentMockRecorder) Fields() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Fields", reflect.TypeOf((*MockReadableSegment)(nil).Fields)) } +// FieldsPostingsList mocks base method +func (m *MockReadableSegment) FieldsPostingsList() (segment.FieldsPostingsListIterator, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FieldsPostingsList") + ret0, _ := ret[0].(segment.FieldsPostingsListIterator) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FieldsPostingsList indicates an expected call of FieldsPostingsList +func (mr *MockReadableSegmentMockRecorder) FieldsPostingsList() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FieldsPostingsList", reflect.TypeOf((*MockReadableSegment)(nil).FieldsPostingsList)) +} + // Terms mocks base method func (m *MockReadableSegment) Terms(arg0 []byte) (segment.TermsIterator, error) { m.ctrl.T.Helper() diff --git a/src/m3ninx/index/segment/segment_mock.go b/src/m3ninx/index/segment/segment_mock.go index 7c86c01359..36b4e3efbe 100644 --- a/src/m3ninx/index/segment/segment_mock.go +++ b/src/m3ninx/index/segment/segment_mock.go @@ -360,6 +360,21 @@ func (mr *MockReaderMockRecorder) Terms(field interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Terms", reflect.TypeOf((*MockReader)(nil).Terms), field) } +// FieldsPostingsList mocks base method +func (m *MockReader) FieldsPostingsList() (FieldsPostingsListIterator, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FieldsPostingsList") + ret0, _ := ret[0].(FieldsPostingsListIterator) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FieldsPostingsList indicates an expected call of FieldsPostingsList +func (mr *MockReaderMockRecorder) FieldsPostingsList() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FieldsPostingsList", reflect.TypeOf((*MockReader)(nil).FieldsPostingsList)) +} + // ContainsField mocks base method func (m *MockReader) ContainsField(field []byte) (bool, error) { m.ctrl.T.Helper() From c283d364d5696cd33f1d4be44da1ca479265cf14 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Fri, 5 Feb 2021 23:10:38 -0500 Subject: [PATCH 17/29] Fixup filter field iterator test --- .../storage/index/filter_fields_iterator.go | 11 +- .../index/filter_fields_iterator_test.go | 114 ++++++++++-------- 2 files changed, 75 insertions(+), 50 deletions(-) diff --git a/src/dbnode/storage/index/filter_fields_iterator.go b/src/dbnode/storage/index/filter_fields_iterator.go index c65a77e680..e10e9e70c3 100644 --- a/src/dbnode/storage/index/filter_fields_iterator.go +++ b/src/dbnode/storage/index/filter_fields_iterator.go @@ -24,6 +24,8 @@ import ( "bytes" "errors" + xerrors "github.com/m3db/m3/src/x/errors" + "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/postings" ) @@ -94,5 +96,12 @@ func (f *filterFieldsIterator) Err() error { } func (f *filterFieldsIterator) Close() error { - return f.fieldsIter.Close() + multiErr := xerrors.NewMultiError() + if err := f.reader.Close(); err != nil { + multiErr = multiErr.Add(err) + } + if err := f.fieldsIter.Close(); err != nil { + multiErr = multiErr.Add(err) + } + return multiErr.FinalError() } diff --git a/src/dbnode/storage/index/filter_fields_iterator_test.go b/src/dbnode/storage/index/filter_fields_iterator_test.go index 9e584011a4..f4aa267ce3 100644 --- a/src/dbnode/storage/index/filter_fields_iterator_test.go +++ b/src/dbnode/storage/index/filter_fields_iterator_test.go @@ -26,7 +26,6 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment" xtest "github.com/m3db/m3/src/x/test" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" ) @@ -45,10 +44,15 @@ func TestNewFilterFieldsIteratorNoMatchesInSegment(t *testing.T) { filters := AggregateFieldFilter{[]byte("a"), []byte("b")} r := segment.NewMockReader(ctrl) + f := segment.NewMockFieldsPostingsListIterator(ctrl) + r.EXPECT().FieldsPostingsList().Return(f, nil) iter, err := newFilterFieldsIterator(r, filters) require.NoError(t, err) - r.EXPECT().ContainsField(gomock.Any()).Return(false, nil).AnyTimes() + f.EXPECT().Next().Return(false).Times(1) + r.EXPECT().Close().Return(nil).Times(1) + f.EXPECT().Close().Return(nil).Times(1) + require.False(t, iter.Next()) require.NoError(t, iter.Err()) require.NoError(t, iter.Close()) @@ -60,16 +64,21 @@ func TestNewFilterFieldsIteratorFirstMatch(t *testing.T) { filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} r := segment.NewMockReader(ctrl) + f := segment.NewMockFieldsPostingsListIterator(ctrl) + + r.EXPECT().FieldsPostingsList().Return(f, nil) + f.EXPECT().Next().Return(true) + f.EXPECT().Current().Return([]byte("a"), nil).Times(2) + f.EXPECT().Next().Return(false) + r.EXPECT().Close().Return(nil).Times(1) + f.EXPECT().Close().Return(nil).Times(1) + iter, err := newFilterFieldsIterator(r, filters) require.NoError(t, err) - gomock.InOrder( - r.EXPECT().ContainsField([]byte("a")).Return(true, nil), - r.EXPECT().ContainsField([]byte("b")).Return(false, nil), - r.EXPECT().ContainsField([]byte("c")).Return(false, nil), - ) require.True(t, iter.Next()) - require.Equal(t, "a", string(iter.Current())) + val, _ := iter.Current() + require.Equal(t, "a", string(val)) require.False(t, iter.Next()) require.NoError(t, iter.Err()) require.NoError(t, iter.Close()) @@ -81,16 +90,23 @@ func TestNewFilterFieldsIteratorMiddleMatch(t *testing.T) { filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} r := segment.NewMockReader(ctrl) + f := segment.NewMockFieldsPostingsListIterator(ctrl) + + r.EXPECT().FieldsPostingsList().Return(f, nil) + f.EXPECT().Next().Return(true) + f.EXPECT().Current().Return([]byte("d"), nil).Times(1) + f.EXPECT().Next().Return(true) + f.EXPECT().Current().Return([]byte("b"), nil).Times(2) + f.EXPECT().Next().Return(false) + r.EXPECT().Close().Return(nil).Times(1) + f.EXPECT().Close().Return(nil).Times(1) + iter, err := newFilterFieldsIterator(r, filters) require.NoError(t, err) - gomock.InOrder( - r.EXPECT().ContainsField([]byte("a")).Return(false, nil), - r.EXPECT().ContainsField([]byte("b")).Return(true, nil), - r.EXPECT().ContainsField([]byte("c")).Return(false, nil), - ) require.True(t, iter.Next()) - require.Equal(t, "b", string(iter.Current())) + val, _ := iter.Current() + require.Equal(t, "b", string(val)) require.False(t, iter.Next()) require.NoError(t, iter.Err()) require.NoError(t, iter.Close()) @@ -102,16 +118,25 @@ func TestNewFilterFieldsIteratorEndMatch(t *testing.T) { filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} r := segment.NewMockReader(ctrl) + f := segment.NewMockFieldsPostingsListIterator(ctrl) + + r.EXPECT().FieldsPostingsList().Return(f, nil) + f.EXPECT().Next().Return(true) + f.EXPECT().Current().Return([]byte("d"), nil).Times(1) + f.EXPECT().Next().Return(true) + f.EXPECT().Current().Return([]byte("e"), nil).Times(1) + f.EXPECT().Next().Return(true) + f.EXPECT().Current().Return([]byte("c"), nil).Times(2) + f.EXPECT().Next().Return(false) + r.EXPECT().Close().Return(nil).Times(1) + f.EXPECT().Close().Return(nil).Times(1) + iter, err := newFilterFieldsIterator(r, filters) require.NoError(t, err) - gomock.InOrder( - r.EXPECT().ContainsField([]byte("a")).Return(false, nil), - r.EXPECT().ContainsField([]byte("b")).Return(false, nil), - r.EXPECT().ContainsField([]byte("c")).Return(true, nil), - ) require.True(t, iter.Next()) - require.Equal(t, "c", string(iter.Current())) + val, _ := iter.Current() + require.Equal(t, "c", string(val)) require.False(t, iter.Next()) require.NoError(t, iter.Err()) require.NoError(t, iter.Close()) @@ -123,43 +148,34 @@ func TestNewFilterFieldsIteratorAllMatch(t *testing.T) { filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} r := segment.NewMockReader(ctrl) + f := segment.NewMockFieldsPostingsListIterator(ctrl) + + r.EXPECT().FieldsPostingsList().Return(f, nil) iter, err := newFilterFieldsIterator(r, filters) require.NoError(t, err) - gomock.InOrder( - r.EXPECT().ContainsField([]byte("a")).Return(true, nil), - r.EXPECT().ContainsField([]byte("b")).Return(true, nil), - r.EXPECT().ContainsField([]byte("c")).Return(true, nil), - ) - require.True(t, iter.Next()) - require.Equal(t, "a", string(iter.Current())) + f.EXPECT().Next().Return(true) + f.EXPECT().Current().Return([]byte("a"), nil).Times(2) require.True(t, iter.Next()) - require.Equal(t, "b", string(iter.Current())) + val, _ := iter.Current() + require.Equal(t, "a", string(val)) + + f.EXPECT().Next().Return(true) + f.EXPECT().Current().Return([]byte("b"), nil).Times(2) require.True(t, iter.Next()) - require.Equal(t, "c", string(iter.Current())) - require.False(t, iter.Next()) - require.NoError(t, iter.Err()) - require.NoError(t, iter.Close()) -} + val, _ = iter.Current() + require.Equal(t, "b", string(val)) -func TestNewFilterFieldsIteratorRandomMatch(t *testing.T) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() + f.EXPECT().Next().Return(true) + f.EXPECT().Current().Return([]byte("c"), nil).Times(2) + require.True(t, iter.Next()) + val, _ = iter.Current() + require.Equal(t, "c", string(val)) - filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - r := segment.NewMockReader(ctrl) - iter, err := newFilterFieldsIterator(r, filters) - require.NoError(t, err) + f.EXPECT().Next().Return(false) + r.EXPECT().Close().Return(nil).Times(1) + f.EXPECT().Close().Return(nil).Times(1) - gomock.InOrder( - r.EXPECT().ContainsField([]byte("a")).Return(true, nil), - r.EXPECT().ContainsField([]byte("b")).Return(false, nil), - r.EXPECT().ContainsField([]byte("c")).Return(true, nil), - ) - require.True(t, iter.Next()) - require.Equal(t, "a", string(iter.Current())) - require.True(t, iter.Next()) - require.Equal(t, "c", string(iter.Current())) require.False(t, iter.Next()) require.NoError(t, iter.Err()) require.NoError(t, iter.Close()) From 277f845ea777cfb0b683b01fbac0cf5c6b7c67cb Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Sun, 7 Feb 2021 10:44:10 -0500 Subject: [PATCH 18/29] Lint --- src/dbnode/storage/index/filter_fields_iterator.go | 3 +-- src/m3ninx/index/segment/fst/segment.go | 5 +++-- src/query/api/v1/handler/prometheus/common.go | 1 + src/query/api/v1/handler/prometheus/native/list_tags_test.go | 2 +- src/query/api/v1/handler/prometheus/remote/tag_values.go | 1 + .../api/v1/handler/prometheus/remote/tag_values_test.go | 1 + 6 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/dbnode/storage/index/filter_fields_iterator.go b/src/dbnode/storage/index/filter_fields_iterator.go index e10e9e70c3..7b24eb2ad9 100644 --- a/src/dbnode/storage/index/filter_fields_iterator.go +++ b/src/dbnode/storage/index/filter_fields_iterator.go @@ -24,10 +24,9 @@ import ( "bytes" "errors" - xerrors "github.com/m3db/m3/src/x/errors" - "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/postings" + xerrors "github.com/m3db/m3/src/x/errors" ) var ( diff --git a/src/m3ninx/index/segment/fst/segment.go b/src/m3ninx/index/segment/fst/segment.go index e6b27b09c6..883a440e05 100644 --- a/src/m3ninx/index/segment/fst/segment.go +++ b/src/m3ninx/index/segment/fst/segment.go @@ -49,13 +49,14 @@ var ( errReaderClosed = errors.New("segment is closed") errReaderFinalized = errors.New("segment is finalized") errReaderNilRegexp = errors.New("nil regexp provided") - errUnsupportedMajorVersion = errors.New("unsupported major version") errDocumentsDataUnset = errors.New("documents data bytes are not set") errDocumentsIdxUnset = errors.New("documents index bytes are not set") errPostingsDataUnset = errors.New("postings data bytes are not set") errFSTTermsDataUnset = errors.New("fst terms data bytes are not set") errFSTFieldsDataUnset = errors.New("fst fields data bytes are not set") - errUnsupportedFeatureFieldsPostingsList = errors.New("fst unsupported operation on old segment version: missing field postings list") + errUnsupportedFeatureFieldsPostingsList = errors.New( + "fst unsupported operation on old segment version: missing field postings list", + ) ) // SegmentData represent the collection of required parameters to construct a Segment. diff --git a/src/query/api/v1/handler/prometheus/common.go b/src/query/api/v1/handler/prometheus/common.go index 5f4d4082d1..287c3e8ae9 100644 --- a/src/query/api/v1/handler/prometheus/common.go +++ b/src/query/api/v1/handler/prometheus/common.go @@ -235,6 +235,7 @@ func ParseSeriesMatchQuery( } queries := make([]*storage.FetchQuery, 0, len(matcherValues)) + // nolint:rangeValCopy (the set of matchers is small) for _, m := range matchers { queries = append(queries, &storage.FetchQuery{ Raw: fmt.Sprintf("match[]=%s", m.Match), diff --git a/src/query/api/v1/handler/prometheus/native/list_tags_test.go b/src/query/api/v1/handler/prometheus/native/list_tags_test.go index c4e8a3cf53..e7bf3b2afe 100644 --- a/src/query/api/v1/handler/prometheus/native/list_tags_test.go +++ b/src/query/api/v1/handler/prometheus/native/list_tags_test.go @@ -163,7 +163,7 @@ func testListTagsWithMatch( require.Equal(t, http.StatusOK, w.Result().StatusCode) body := w.Result().Body - defer body.Close() + defer body.Close() // nolint:errcheck r, err := ioutil.ReadAll(body) require.NoError(t, err) diff --git a/src/query/api/v1/handler/prometheus/remote/tag_values.go b/src/query/api/v1/handler/prometheus/remote/tag_values.go index 29861d7774..d5b1814756 100644 --- a/src/query/api/v1/handler/prometheus/remote/tag_values.go +++ b/src/query/api/v1/handler/prometheus/remote/tag_values.go @@ -149,6 +149,7 @@ func (h *TagValuesHandler) parseTagValuesToQuery( reqTagMatcher := reqTagMatchers[0] + //nolint:gocritic for _, m := range reqTagMatcher.Matchers { // add all matchers that don't match the default name matcher. if m.Type != nameMatcher.Type || !bytes.Equal(m.Name, nameMatcher.Name) { diff --git a/src/query/api/v1/handler/prometheus/remote/tag_values_test.go b/src/query/api/v1/handler/prometheus/remote/tag_values_test.go index e6244ae104..8fa1d17a40 100644 --- a/src/query/api/v1/handler/prometheus/remote/tag_values_test.go +++ b/src/query/api/v1/handler/prometheus/remote/tag_values_test.go @@ -164,6 +164,7 @@ func testTagValuesWithMatch( TagMatchers: matchers, } + // nolint:noctx req, err := http.NewRequest("GET", path, nil) if err != nil { t.Fatal(err) From 240bde41b9e914753dd934d10a34e4a0e1d1d776 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Sun, 7 Feb 2021 11:03:11 -0500 Subject: [PATCH 19/29] Update fields terms iter tests --- .../index/fields_terms_iterator_prop_test.go | 5 ++- .../index/fields_terms_iterator_test.go | 38 ++++++++++++++++++- src/query/api/v1/handler/prometheus/common.go | 2 +- .../prometheus/native/list_tags_test.go | 2 +- 4 files changed, 41 insertions(+), 6 deletions(-) diff --git a/src/dbnode/storage/index/fields_terms_iterator_prop_test.go b/src/dbnode/storage/index/fields_terms_iterator_prop_test.go index 91224e7e60..1a657768b6 100644 --- a/src/dbnode/storage/index/fields_terms_iterator_prop_test.go +++ b/src/dbnode/storage/index/fields_terms_iterator_prop_test.go @@ -35,6 +35,7 @@ import ( "github.com/leanovate/gopter" "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" + "github.com/m3db/m3/src/m3ninx/index/segment" xtest "github.com/m3db/m3/src/x/test" ) @@ -149,9 +150,9 @@ func genIterableSegment(ctrl *gomock.Controller) gopter.Gen { r := segment.NewMockReader(ctrl) - fieldIterator := &stubFieldIterator{points: fields} + fieldsPostingsListIterator := &stubFieldsPostingsListIterator{points: fields} - r.EXPECT().Fields().Return(fieldIterator, nil).AnyTimes() + r.EXPECT().FieldsPostingsList().Return(fieldsPostingsListIterator, nil).AnyTimes() for f, values := range tagValues { sort.Slice(values, func(i, j int) bool { diff --git a/src/dbnode/storage/index/fields_terms_iterator_test.go b/src/dbnode/storage/index/fields_terms_iterator_test.go index 706d49d68e..1d00a2a6d2 100644 --- a/src/dbnode/storage/index/fields_terms_iterator_test.go +++ b/src/dbnode/storage/index/fields_terms_iterator_test.go @@ -282,9 +282,9 @@ func newMockSegmentReader(ctrl *gomock.Controller, tagValues map[string][]string }) r := segment.NewMockReader(ctrl) - fieldIterator := &stubFieldIterator{points: fields} + fieldsPostingsListIterator := &stubFieldsPostingsListIterator{points: fields} - r.EXPECT().Fields().Return(fieldIterator, nil).AnyTimes() + r.EXPECT().FieldsPostingsList().Return(fieldsPostingsListIterator, nil).AnyTimes() for _, f := range fields { termValues := tagValues[f.value] @@ -302,6 +302,40 @@ func newMockSegmentReader(ctrl *gomock.Controller, tagValues map[string][]string return r } +type stubFieldsPostingsListIterator struct { + current iterpoint + points []iterpoint +} + +func (s *stubFieldsPostingsListIterator) Next() bool { + if len(s.points) == 0 { + return false + } + s.current = s.points[0] + s.points = s.points[1:] + return true +} + +func (s *stubFieldsPostingsListIterator) Current() ([]byte, postings.List) { + return []byte(s.current.value), nil +} + +func (s *stubFieldsPostingsListIterator) Err() error { + return s.current.err +} + +func (s *stubFieldsPostingsListIterator) Close() error { + if s.current.err != nil { + return s.current.err + } + for s.Next() { + if err := s.Err(); err != nil { + return err + } + } + return nil +} + type stubTermIterator struct { current iterpoint points []iterpoint diff --git a/src/query/api/v1/handler/prometheus/common.go b/src/query/api/v1/handler/prometheus/common.go index 287c3e8ae9..f1a9ba13d7 100644 --- a/src/query/api/v1/handler/prometheus/common.go +++ b/src/query/api/v1/handler/prometheus/common.go @@ -235,7 +235,7 @@ func ParseSeriesMatchQuery( } queries := make([]*storage.FetchQuery, 0, len(matcherValues)) - // nolint:rangeValCopy (the set of matchers is small) + // nolint:gocritic for _, m := range matchers { queries = append(queries, &storage.FetchQuery{ Raw: fmt.Sprintf("match[]=%s", m.Match), diff --git a/src/query/api/v1/handler/prometheus/native/list_tags_test.go b/src/query/api/v1/handler/prometheus/native/list_tags_test.go index e7bf3b2afe..a240f9535f 100644 --- a/src/query/api/v1/handler/prometheus/native/list_tags_test.go +++ b/src/query/api/v1/handler/prometheus/native/list_tags_test.go @@ -160,7 +160,7 @@ func testListTagsWithMatch( h.ServeHTTP(w, req) - require.Equal(t, http.StatusOK, w.Result().StatusCode) + require.Equal(t, http.StatusOK, w.Result().StatusCode) // nolint:bodyclose body := w.Result().Body defer body.Close() // nolint:errcheck From 560063e30ea3808a48b72dd4800732be456d8a92 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Sun, 7 Feb 2021 11:18:17 -0500 Subject: [PATCH 20/29] Use fake fields postings list iterator for filter fields iterator test --- .../index/fields_terms_iterator_test.go | 2 +- .../index/filter_fields_iterator_test.go | 100 +++++++----------- 2 files changed, 42 insertions(+), 60 deletions(-) diff --git a/src/dbnode/storage/index/fields_terms_iterator_test.go b/src/dbnode/storage/index/fields_terms_iterator_test.go index 1d00a2a6d2..294febbe5c 100644 --- a/src/dbnode/storage/index/fields_terms_iterator_test.go +++ b/src/dbnode/storage/index/fields_terms_iterator_test.go @@ -270,7 +270,7 @@ func TestFieldsTermsIteratorIterateTermsAndRestrictByQuery(t *testing.T) { }, slice) } -func newMockSegmentReader(ctrl *gomock.Controller, tagValues map[string][]string) segment.Reader { +func newMockSegmentReader(ctrl *gomock.Controller, tagValues map[string][]string) *segment.MockReader { fields := make([]iterpoint, 0, len(tagValues)) for k := range tagValues { fields = append(fields, iterpoint{ diff --git a/src/dbnode/storage/index/filter_fields_iterator_test.go b/src/dbnode/storage/index/filter_fields_iterator_test.go index f4aa267ce3..2d9b42956f 100644 --- a/src/dbnode/storage/index/filter_fields_iterator_test.go +++ b/src/dbnode/storage/index/filter_fields_iterator_test.go @@ -43,15 +43,11 @@ func TestNewFilterFieldsIteratorNoMatchesInSegment(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b")} - r := segment.NewMockReader(ctrl) - f := segment.NewMockFieldsPostingsListIterator(ctrl) - r.EXPECT().FieldsPostingsList().Return(f, nil) - iter, err := newFilterFieldsIterator(r, filters) - require.NoError(t, err) + reader := newMockSegmentReader(ctrl, map[string][]string{}) + reader.EXPECT().Close().Return(nil).Times(1) - f.EXPECT().Next().Return(false).Times(1) - r.EXPECT().Close().Return(nil).Times(1) - f.EXPECT().Close().Return(nil).Times(1) + iter, err := newFilterFieldsIterator(reader, filters) + require.NoError(t, err) require.False(t, iter.Next()) require.NoError(t, iter.Err()) @@ -63,17 +59,10 @@ func TestNewFilterFieldsIteratorFirstMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - r := segment.NewMockReader(ctrl) - f := segment.NewMockFieldsPostingsListIterator(ctrl) + reader := newMockSegmentReader(ctrl, map[string][]string{"a": nil}) + reader.EXPECT().Close().Return(nil).Times(1) - r.EXPECT().FieldsPostingsList().Return(f, nil) - f.EXPECT().Next().Return(true) - f.EXPECT().Current().Return([]byte("a"), nil).Times(2) - f.EXPECT().Next().Return(false) - r.EXPECT().Close().Return(nil).Times(1) - f.EXPECT().Close().Return(nil).Times(1) - - iter, err := newFilterFieldsIterator(r, filters) + iter, err := newFilterFieldsIterator(reader, filters) require.NoError(t, err) require.True(t, iter.Next()) @@ -89,19 +78,10 @@ func TestNewFilterFieldsIteratorMiddleMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - r := segment.NewMockReader(ctrl) - f := segment.NewMockFieldsPostingsListIterator(ctrl) - - r.EXPECT().FieldsPostingsList().Return(f, nil) - f.EXPECT().Next().Return(true) - f.EXPECT().Current().Return([]byte("d"), nil).Times(1) - f.EXPECT().Next().Return(true) - f.EXPECT().Current().Return([]byte("b"), nil).Times(2) - f.EXPECT().Next().Return(false) - r.EXPECT().Close().Return(nil).Times(1) - f.EXPECT().Close().Return(nil).Times(1) - - iter, err := newFilterFieldsIterator(r, filters) + reader := newMockSegmentReader(ctrl, map[string][]string{"d": nil, "b": nil, "e": nil}) + reader.EXPECT().Close().Return(nil).Times(1) + + iter, err := newFilterFieldsIterator(reader, filters) require.NoError(t, err) require.True(t, iter.Next()) @@ -117,21 +97,10 @@ func TestNewFilterFieldsIteratorEndMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - r := segment.NewMockReader(ctrl) - f := segment.NewMockFieldsPostingsListIterator(ctrl) - - r.EXPECT().FieldsPostingsList().Return(f, nil) - f.EXPECT().Next().Return(true) - f.EXPECT().Current().Return([]byte("d"), nil).Times(1) - f.EXPECT().Next().Return(true) - f.EXPECT().Current().Return([]byte("e"), nil).Times(1) - f.EXPECT().Next().Return(true) - f.EXPECT().Current().Return([]byte("c"), nil).Times(2) - f.EXPECT().Next().Return(false) - r.EXPECT().Close().Return(nil).Times(1) - f.EXPECT().Close().Return(nil).Times(1) - - iter, err := newFilterFieldsIterator(r, filters) + reader := newMockSegmentReader(ctrl, map[string][]string{"d": nil, "e": nil, "c": nil}) + reader.EXPECT().Close().Return(nil).Times(1) + + iter, err := newFilterFieldsIterator(reader, filters) require.NoError(t, err) require.True(t, iter.Next()) @@ -147,34 +116,47 @@ func TestNewFilterFieldsIteratorAllMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - r := segment.NewMockReader(ctrl) - f := segment.NewMockFieldsPostingsListIterator(ctrl) + reader := newMockSegmentReader(ctrl, map[string][]string{"a": nil, "b": nil, "c": nil}) + reader.EXPECT().Close().Return(nil).Times(1) - r.EXPECT().FieldsPostingsList().Return(f, nil) - iter, err := newFilterFieldsIterator(r, filters) + iter, err := newFilterFieldsIterator(reader, filters) require.NoError(t, err) - f.EXPECT().Next().Return(true) - f.EXPECT().Current().Return([]byte("a"), nil).Times(2) require.True(t, iter.Next()) val, _ := iter.Current() require.Equal(t, "a", string(val)) - f.EXPECT().Next().Return(true) - f.EXPECT().Current().Return([]byte("b"), nil).Times(2) require.True(t, iter.Next()) val, _ = iter.Current() require.Equal(t, "b", string(val)) - f.EXPECT().Next().Return(true) - f.EXPECT().Current().Return([]byte("c"), nil).Times(2) require.True(t, iter.Next()) val, _ = iter.Current() require.Equal(t, "c", string(val)) - f.EXPECT().Next().Return(false) - r.EXPECT().Close().Return(nil).Times(1) - f.EXPECT().Close().Return(nil).Times(1) + require.False(t, iter.Next()) + require.NoError(t, iter.Err()) + require.NoError(t, iter.Close()) +} + +func TestNewFilterFieldsIteratorRandomMatch(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} + reader := newMockSegmentReader(ctrl, map[string][]string{"a": nil, "c": nil}) + reader.EXPECT().Close().Return(nil).Times(1) + + iter, err := newFilterFieldsIterator(reader, filters) + require.NoError(t, err) + + require.True(t, iter.Next()) + val, _ := iter.Current() + require.Equal(t, "a", string(val)) + + require.True(t, iter.Next()) + val, _ = iter.Current() + require.Equal(t, "c", string(val)) require.False(t, iter.Next()) require.NoError(t, iter.Err()) From 5f52d8ea26924ea551e2e5de68388381dd3b6c02 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Sun, 7 Feb 2021 11:36:01 -0500 Subject: [PATCH 21/29] Add ParseMatch unit test --- .../api/v1/handler/prometheus/common_test.go | 82 +++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/src/query/api/v1/handler/prometheus/common_test.go b/src/query/api/v1/handler/prometheus/common_test.go index b8479ae4ab..fe59b17118 100644 --- a/src/query/api/v1/handler/prometheus/common_test.go +++ b/src/query/api/v1/handler/prometheus/common_test.go @@ -193,3 +193,85 @@ func TestParseStartAndEnd(t *testing.T) { }) } } + +// TestParseMatch tests the parsing / construction logic around ParseMatch(). +// matcher_test.go has more comprehensive testing on parsing details. +func TestParseMatch(t *testing.T) { + parseOpts := promql.NewParseOptions() + tagOpts := models.NewTagOptions() + + tests := []struct { + querystring string + exMatch []ParsedMatch + exErr bool + exEmpty bool + }{ + {exEmpty: true}, + { + querystring: "match[]=eq_label", + exMatch: []ParsedMatch{{ + Match: "eq_label", + Matchers: models.Matchers{ + { + Type: models.MatchEqual, + Name: []byte("__name__"), + Value: []byte("eq_label"), + }, + }, + }}}, + {querystring: "match[]=illegal%match", exErr: true}, + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("GET_%s", tt.querystring), func(t *testing.T) { + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, + fmt.Sprintf("/?%s", tt.querystring), nil) + require.NoError(t, err) + + parsedMatches, ok, err := ParseMatch(req, parseOpts, tagOpts) + + if tt.exErr { + require.Error(t, err) + require.False(t, ok) + require.Empty(t, parsedMatches) + return + } + + require.NoError(t, err) + if tt.exEmpty { + require.False(t, ok) + require.Empty(t, parsedMatches) + } else { + require.True(t, ok) + require.Equal(t, tt.exMatch, parsedMatches) + } + }) + } + + for _, tt := range tests { + t.Run(fmt.Sprintf("POST_%s", tt.querystring), func(t *testing.T) { + b := bytes.NewBuffer([]byte(tt.querystring)) + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, "/", b) + require.NoError(t, err) + req.Header.Add(xhttp.HeaderContentType, xhttp.ContentTypeFormURLEncoded) + + parsedMatches, ok, err := ParseMatch(req, parseOpts, tagOpts) + + if tt.exErr { + require.Error(t, err) + require.False(t, ok) + require.Empty(t, parsedMatches) + return + } + + require.NoError(t, err) + if tt.exEmpty { + require.False(t, ok) + require.Empty(t, parsedMatches) + } else { + require.True(t, ok) + require.Equal(t, tt.exMatch, parsedMatches) + } + }) + } +} From 9758a9ca8514c8a81067a9853730732b7b7cdad9 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Sun, 7 Feb 2021 12:05:10 -0500 Subject: [PATCH 22/29] Lint --- .../api/v1/handler/prometheus/common_test.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/query/api/v1/handler/prometheus/common_test.go b/src/query/api/v1/handler/prometheus/common_test.go index fe59b17118..056aea1a9e 100644 --- a/src/query/api/v1/handler/prometheus/common_test.go +++ b/src/query/api/v1/handler/prometheus/common_test.go @@ -209,16 +209,19 @@ func TestParseMatch(t *testing.T) { {exEmpty: true}, { querystring: "match[]=eq_label", - exMatch: []ParsedMatch{{ - Match: "eq_label", - Matchers: models.Matchers{ - { - Type: models.MatchEqual, - Name: []byte("__name__"), - Value: []byte("eq_label"), + exMatch: []ParsedMatch{ + { + Match: "eq_label", + Matchers: models.Matchers{ + { + Type: models.MatchEqual, + Name: []byte("__name__"), + Value: []byte("eq_label"), + }, }, }, - }}}, + }, + }, {querystring: "match[]=illegal%match", exErr: true}, } From fdfe85453a9ad4fa3f3c55986d033a5d10721053 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Sun, 7 Feb 2021 12:18:45 -0500 Subject: [PATCH 23/29] Add unit tests for fst reader fields postings list --- .../index/segment/fst/writer_reader_test.go | 60 +++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/src/m3ninx/index/segment/fst/writer_reader_test.go b/src/m3ninx/index/segment/fst/writer_reader_test.go index eef2fbdf5f..fef79a7d70 100644 --- a/src/m3ninx/index/segment/fst/writer_reader_test.go +++ b/src/m3ninx/index/segment/fst/writer_reader_test.go @@ -312,6 +312,34 @@ func TestPostingsListEqualForMatchField(t *testing.T) { }) } } + +func TestPostingsListEqualForMatchFieldWithFieldsPostingsList(t *testing.T) { + for _, test := range testDocuments { + t.Run(test.name, func(t *testing.T) { + for _, tc := range newTestCases(t, test.docs) { + t.Run(tc.name, func(t *testing.T) { + expSeg, obsSeg := tc.expected, tc.observed + expReader, err := expSeg.Reader() + require.NoError(t, err) + obsReader, err := obsSeg.Reader() + require.NoError(t, err) + + obsFieldsPostingsIter, err := obsReader.FieldsPostingsList() + require.NoError(t, err) + + for obsFieldsPostingsIter.Next() { + f, obsPl := obsFieldsPostingsIter.Current() + expPl, err := expReader.MatchField(f) + require.NoError(t, err) + require.True(t, expPl.Equal(obsPl), + fmt.Sprintf("field[%s] - [%v] != [%v]", string(f), pprintIter(expPl), pprintIter(obsPl))) + } + }) + } + }) + } +} + func TestPostingsListEqualForMatchTerm(t *testing.T) { for _, test := range testDocuments { t.Run(test.name, func(t *testing.T) { @@ -343,6 +371,38 @@ func TestPostingsListEqualForMatchTerm(t *testing.T) { } } +func TestPostingsListEqualForMatchTermWithFieldsPostingsList(t *testing.T) { + for _, test := range testDocuments { + t.Run(test.name, func(t *testing.T) { + memSeg, fstSeg := newTestSegments(t, test.docs) + memReader, err := memSeg.Reader() + require.NoError(t, err) + fstReader, err := fstSeg.Reader() + require.NoError(t, err) + + fstFieldsPostingsIter, err := fstReader.FieldsPostingsList() + require.NoError(t, err) + + for fstFieldsPostingsIter.Next() { + f, _ := fstFieldsPostingsIter.Current() + + memTermsIter, err := memSeg.Terms(f) + require.NoError(t, err) + memTerms := toTermPostings(t, memTermsIter) + + for term := range memTerms { + memPl, err := memReader.MatchTerm(f, []byte(term)) + require.NoError(t, err) + fstPl, err := fstReader.MatchTerm(f, []byte(term)) + require.NoError(t, err) + require.True(t, memPl.Equal(fstPl), + fmt.Sprintf("%s:%s - [%v] != [%v]", string(f), term, pprintIter(memPl), pprintIter(fstPl))) + } + } + }) + } +} + func TestPostingsListContainsID(t *testing.T) { for _, test := range testDocuments { t.Run(test.name, func(t *testing.T) { From bef0c3b909f2985bc512b3d05607432a9036e041 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Sun, 7 Feb 2021 13:57:55 -0500 Subject: [PATCH 24/29] More explicit label assertion --- .../docker-integration-tests/prometheus/test.sh | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index 4a57dce416..ec889cc244 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -409,27 +409,27 @@ function test_labels { # Test label search with match ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?start=0&end=9999999999999.99999" | jq -r ".data | length") -gt 3 ]]' + '[[ $(curl -s "0.0.0.0:7201/api/v1/labels" | jq -r "[.data[] | select(index(\"name_0\", \"name_1\", \"name_2\"))] | length") -eq 3 ]]' ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 4 ]]' + '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric" | jq -r ".data | length") -eq 4 ]]' ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric_2&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 3 ]]' + '[[ $(curl -s "0.0.0.0:7201/api/v1/labels?match[]=label_metric_2" | jq -r ".data | length") -eq 3 ]]' # Test label values search with match ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 2 ]]' # two values without a match + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values" | jq -r ".data | length") -eq 2 ]]' # two values without a match ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 1 ]]' + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric" | jq -r ".data | length") -eq 1 ]]' ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric&start=0&end=9999999999999.99999" | jq -r ".data[0]") = "value_1_1" ]]' + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric" | jq -r ".data[0]") = "value_1_1" ]]' ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric_2&start=0&end=9999999999999.99999" | jq -r ".data | length") -eq 1 ]]' + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric_2" | jq -r ".data | length") -eq 1 ]]' ATTEMPTS=5 TIMEOUT=2 MAX_TIMEOUT=4 retry_with_backoff \ - '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric_2&start=0&end=9999999999999.99999" | jq -r ".data[0]") = "value_1_2" ]]' + '[[ $(curl -s "0.0.0.0:7201/api/v1/label/name_1/values?match[]=label_metric_2" | jq -r ".data[0]") = "value_1_2" ]]' } echo "Running readiness test" From 9f0a962043d346bd5ac9fa5fa65b71253f129561 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Sun, 7 Feb 2021 14:00:36 -0500 Subject: [PATCH 25/29] Lint --- src/m3ninx/index/segment/fst/writer_reader_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/m3ninx/index/segment/fst/writer_reader_test.go b/src/m3ninx/index/segment/fst/writer_reader_test.go index fef79a7d70..50339643db 100644 --- a/src/m3ninx/index/segment/fst/writer_reader_test.go +++ b/src/m3ninx/index/segment/fst/writer_reader_test.go @@ -317,6 +317,7 @@ func TestPostingsListEqualForMatchFieldWithFieldsPostingsList(t *testing.T) { for _, test := range testDocuments { t.Run(test.name, func(t *testing.T) { for _, tc := range newTestCases(t, test.docs) { + tc := tc t.Run(tc.name, func(t *testing.T) { expSeg, obsSeg := tc.expected, tc.observed expReader, err := expSeg.Reader() From eba69621f3950822b27b392dadfb84ac0cd30b08 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Sun, 7 Feb 2021 23:47:35 -0500 Subject: [PATCH 26/29] Add test for fields iterator postings list match --- .../index/fields_terms_iterator_test.go | 80 +++++++++++++++---- .../index/filter_fields_iterator_test.go | 12 +-- 2 files changed, 71 insertions(+), 21 deletions(-) diff --git a/src/dbnode/storage/index/fields_terms_iterator_test.go b/src/dbnode/storage/index/fields_terms_iterator_test.go index 294febbe5c..90ec7b032c 100644 --- a/src/dbnode/storage/index/fields_terms_iterator_test.go +++ b/src/dbnode/storage/index/fields_terms_iterator_test.go @@ -23,6 +23,7 @@ package index import ( "bytes" "fmt" + "github.com/m3db/m3/src/m3ninx/postings/roaring" "sort" "strings" "testing" @@ -173,8 +174,8 @@ func TestFieldsTermsIteratorEmptyTerm(t *testing.T) { ctrl := gomock.NewController(xtest.Reporter{T: t}) defer ctrl.Finish() - reader := newMockSegmentReader(ctrl, map[string][]string{ - "a": nil, + reader := newMockSegmentReader(ctrl, map[string]terms{ + "a": {}, }) iter, err := newFieldsAndTermsIterator(reader, fieldsAndTermsIteratorOpts{iterateTerms: false}) require.NoError(t, err) @@ -182,12 +183,45 @@ func TestFieldsTermsIteratorEmptyTerm(t *testing.T) { requireSlicesEqual(t, []pair{{"a", ""}}, slice) } +func TestFieldsTermsIteratorRestrictByQueryFields(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{T: t}) + defer ctrl.Finish() + + pl0 := roaring.NewPostingsList() + require.NoError(t, pl0.Insert(postings.ID(42))) + + pl1 := roaring.NewPostingsList() + require.NoError(t, pl1.Insert(postings.ID(1))) + + pl2 := roaring.NewPostingsList() + require.NoError(t, pl2.Insert(postings.ID(2))) + + reader := newMockSegmentReader(ctrl, map[string]terms{ + "foo": {values: []term{{value: "foo_0"}}, postings: pl0}, + "bar": {values: []term{{value: "bar_0"}}, postings: pl1}, + "baz": {values: []term{{value: "baz_0"}}, postings: pl2}, + }) + + // Simulate term query for "bar": + reader.EXPECT().MatchField([]byte("bar")).Return(pl1, nil) + + iter, err := newFieldsAndTermsIterator(reader, fieldsAndTermsIteratorOpts{ + iterateTerms: false, + restrictByQuery: &Query{ + Query: idx.NewFieldQuery([]byte("bar")), + }, + }) + require.NoError(t, err) + slice := toSlice(t, iter) + requireSlicesEqual(t, []pair{{"bar", ""}}, slice) +} + func TestFieldsTermsIteratorEmptyTermInclude(t *testing.T) { ctrl := gomock.NewController(xtest.Reporter{T: t}) defer ctrl.Finish() - reader := newMockSegmentReader(ctrl, map[string][]string{ - "a": nil, + reader := newMockSegmentReader(ctrl, map[string]terms{ + "a": {}, }) iter, err := newFieldsAndTermsIterator(reader, fieldsAndTermsIteratorOpts{iterateTerms: true}) require.NoError(t, err) @@ -270,11 +304,22 @@ func TestFieldsTermsIteratorIterateTermsAndRestrictByQuery(t *testing.T) { }, slice) } -func newMockSegmentReader(ctrl *gomock.Controller, tagValues map[string][]string) *segment.MockReader { - fields := make([]iterpoint, 0, len(tagValues)) - for k := range tagValues { +type terms struct { + values []term + postings postings.List +} + +type term struct { + value string + postings postings.List +} + +func newMockSegmentReader(ctrl *gomock.Controller, termValues map[string]terms) *segment.MockReader { + fields := make([]iterpoint, 0, len(termValues)) + for field, terms := range termValues { fields = append(fields, iterpoint{ - value: k, + value: field, + postings: terms.postings, }) } sort.Slice(fields, func(i, j int) bool { @@ -287,12 +332,15 @@ func newMockSegmentReader(ctrl *gomock.Controller, tagValues map[string][]string r.EXPECT().FieldsPostingsList().Return(fieldsPostingsListIterator, nil).AnyTimes() for _, f := range fields { - termValues := tagValues[f.value] - sort.Strings(termValues) + termValues := termValues[f.value].values + sort.Slice(termValues, func(i, j int) bool { + return termValues[i].value < termValues[j].value + }) terms := make([]iterpoint, 0, len(termValues)) for _, t := range termValues { terms = append(terms, iterpoint{ - value: t, + value: t.value, + postings: t.postings, }) } termIterator := &stubTermIterator{points: terms} @@ -317,7 +365,7 @@ func (s *stubFieldsPostingsListIterator) Next() bool { } func (s *stubFieldsPostingsListIterator) Current() ([]byte, postings.List) { - return []byte(s.current.value), nil + return []byte(s.current.value), s.current.postings } func (s *stubFieldsPostingsListIterator) Err() error { @@ -351,7 +399,7 @@ func (s *stubTermIterator) Next() bool { } func (s *stubTermIterator) Current() ([]byte, postings.List) { - return []byte(s.current.value), nil + return []byte(s.current.value), s.current.postings } func (s *stubTermIterator) Err() error { @@ -405,8 +453,9 @@ func (s *stubFieldIterator) Close() error { } type iterpoint struct { - err error - value string + err error + value string + postings postings.List } type pair struct { @@ -475,6 +524,7 @@ func toSlice(t *testing.T, iter fieldsAndTermsIterator) []pair { Value: string(v), }) } + require.NoError(t, iter.Err()) return pairs } diff --git a/src/dbnode/storage/index/filter_fields_iterator_test.go b/src/dbnode/storage/index/filter_fields_iterator_test.go index 2d9b42956f..db60a0e39d 100644 --- a/src/dbnode/storage/index/filter_fields_iterator_test.go +++ b/src/dbnode/storage/index/filter_fields_iterator_test.go @@ -43,7 +43,7 @@ func TestNewFilterFieldsIteratorNoMatchesInSegment(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b")} - reader := newMockSegmentReader(ctrl, map[string][]string{}) + reader := newMockSegmentReader(ctrl, map[string]terms{}) reader.EXPECT().Close().Return(nil).Times(1) iter, err := newFilterFieldsIterator(reader, filters) @@ -59,7 +59,7 @@ func TestNewFilterFieldsIteratorFirstMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - reader := newMockSegmentReader(ctrl, map[string][]string{"a": nil}) + reader := newMockSegmentReader(ctrl, map[string]terms{"a": {}}) reader.EXPECT().Close().Return(nil).Times(1) iter, err := newFilterFieldsIterator(reader, filters) @@ -78,7 +78,7 @@ func TestNewFilterFieldsIteratorMiddleMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - reader := newMockSegmentReader(ctrl, map[string][]string{"d": nil, "b": nil, "e": nil}) + reader := newMockSegmentReader(ctrl, map[string]terms{"d": {}, "b": {}, "e": {}}) reader.EXPECT().Close().Return(nil).Times(1) iter, err := newFilterFieldsIterator(reader, filters) @@ -97,7 +97,7 @@ func TestNewFilterFieldsIteratorEndMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - reader := newMockSegmentReader(ctrl, map[string][]string{"d": nil, "e": nil, "c": nil}) + reader := newMockSegmentReader(ctrl, map[string]terms{"d": {}, "e": {}, "c": {}}) reader.EXPECT().Close().Return(nil).Times(1) iter, err := newFilterFieldsIterator(reader, filters) @@ -116,7 +116,7 @@ func TestNewFilterFieldsIteratorAllMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - reader := newMockSegmentReader(ctrl, map[string][]string{"a": nil, "b": nil, "c": nil}) + reader := newMockSegmentReader(ctrl, map[string]terms{"a": {}, "b": {}, "c": {}}) reader.EXPECT().Close().Return(nil).Times(1) iter, err := newFilterFieldsIterator(reader, filters) @@ -144,7 +144,7 @@ func TestNewFilterFieldsIteratorRandomMatch(t *testing.T) { defer ctrl.Finish() filters := AggregateFieldFilter{[]byte("a"), []byte("b"), []byte("c")} - reader := newMockSegmentReader(ctrl, map[string][]string{"a": nil, "c": nil}) + reader := newMockSegmentReader(ctrl, map[string]terms{"a": {}, "c": {}}) reader.EXPECT().Close().Return(nil).Times(1) iter, err := newFilterFieldsIterator(reader, filters) From 74a3e3f1459f9f69e88b995db8a21445487a3396 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 8 Feb 2021 00:10:24 -0500 Subject: [PATCH 27/29] Fix lint --- src/dbnode/storage/index/fields_terms_iterator.go | 3 ++- .../storage/index/fields_terms_iterator_test.go | 12 ++++++------ src/dbnode/storage/index/results.go | 3 +-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/dbnode/storage/index/fields_terms_iterator.go b/src/dbnode/storage/index/fields_terms_iterator.go index 1761933397..44f6cc4685 100644 --- a/src/dbnode/storage/index/fields_terms_iterator.go +++ b/src/dbnode/storage/index/fields_terms_iterator.go @@ -23,11 +23,12 @@ package index import ( "errors" + pilosaroaring "github.com/m3dbx/pilosa/roaring" + "github.com/m3db/m3/src/m3ninx/index/segment" "github.com/m3db/m3/src/m3ninx/postings" "github.com/m3db/m3/src/m3ninx/postings/roaring" xerrors "github.com/m3db/m3/src/x/errors" - pilosaroaring "github.com/m3dbx/pilosa/roaring" ) var ( diff --git a/src/dbnode/storage/index/fields_terms_iterator_test.go b/src/dbnode/storage/index/fields_terms_iterator_test.go index 90ec7b032c..dc7af4afc8 100644 --- a/src/dbnode/storage/index/fields_terms_iterator_test.go +++ b/src/dbnode/storage/index/fields_terms_iterator_test.go @@ -23,11 +23,13 @@ package index import ( "bytes" "fmt" - "github.com/m3db/m3/src/m3ninx/postings/roaring" "sort" "strings" "testing" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/idx" m3ninxindex "github.com/m3db/m3/src/m3ninx/index" @@ -35,11 +37,9 @@ import ( "github.com/m3db/m3/src/m3ninx/index/segment/fst" "github.com/m3db/m3/src/m3ninx/index/segment/mem" "github.com/m3db/m3/src/m3ninx/postings" + "github.com/m3db/m3/src/m3ninx/postings/roaring" "github.com/m3db/m3/src/m3ninx/util" xtest "github.com/m3db/m3/src/x/test" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/require" ) var ( @@ -316,10 +316,10 @@ type term struct { func newMockSegmentReader(ctrl *gomock.Controller, termValues map[string]terms) *segment.MockReader { fields := make([]iterpoint, 0, len(termValues)) - for field, terms := range termValues { + for field := range termValues { fields = append(fields, iterpoint{ value: field, - postings: terms.postings, + postings: termValues[field].postings, }) } sort.Slice(fields, func(i, j int) bool { diff --git a/src/dbnode/storage/index/results.go b/src/dbnode/storage/index/results.go index 65a0875c04..58a30a0f6c 100644 --- a/src/dbnode/storage/index/results.go +++ b/src/dbnode/storage/index/results.go @@ -51,8 +51,7 @@ type results struct { idPool ident.Pool bytesPool pool.CheckedBytesPool - pool QueryResultsPool - noFinalize bool + pool QueryResultsPool } // NewQueryResults returns a new query results object. From 56f0235e400578f58760ba3731c0ca7b514fa761 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 8 Feb 2021 00:35:51 -0500 Subject: [PATCH 28/29] Fix lint --- .../index/fields_terms_iterator_prop_test.go | 6 ++-- .../index/fields_terms_iterator_test.go | 29 ++++++++++++------- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/dbnode/storage/index/fields_terms_iterator_prop_test.go b/src/dbnode/storage/index/fields_terms_iterator_prop_test.go index 1a657768b6..75220065f5 100644 --- a/src/dbnode/storage/index/fields_terms_iterator_prop_test.go +++ b/src/dbnode/storage/index/fields_terms_iterator_prop_test.go @@ -35,6 +35,7 @@ import ( "github.com/leanovate/gopter" "github.com/leanovate/gopter/gen" "github.com/leanovate/gopter/prop" + "github.com/stretchr/testify/require" "github.com/m3db/m3/src/m3ninx/index/segment" xtest "github.com/m3db/m3/src/x/test" @@ -63,7 +64,8 @@ func TestFieldsTermsIteratorPropertyTest(t *testing.T) { if err != nil { return false, err } - observed := toSlice(t, iter) + observed, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, expected, observed) return true, nil }, @@ -98,7 +100,7 @@ func TestFieldsTermsIteratorPropertyTestNoPanic(t *testing.T) { if err != nil { return false, err } - toSlice(t, iter) + _, _ = toSlice(iter) return true, nil }, genIterableSegment(ctrl), diff --git a/src/dbnode/storage/index/fields_terms_iterator_test.go b/src/dbnode/storage/index/fields_terms_iterator_test.go index dc7af4afc8..dab5e7a2cd 100644 --- a/src/dbnode/storage/index/fields_terms_iterator_test.go +++ b/src/dbnode/storage/index/fields_terms_iterator_test.go @@ -92,7 +92,8 @@ func TestFieldsTermsIteratorReuse(t *testing.T) { }, }) require.NoError(t, err) - slice := toSlice(t, iter) + slice, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{ {"d", "e"}, {"d", "f"}, @@ -107,7 +108,8 @@ func TestFieldsTermsIteratorReuse(t *testing.T) { }, }) require.NoError(t, err) - slice = toSlice(t, iter) + slice, err = toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{ {"a", "b"}, {"a", "c"}, @@ -136,7 +138,8 @@ func TestFieldsTermsIteratorSimpleSkip(t *testing.T) { }, }) require.NoError(t, err) - slice := toSlice(t, iter) + slice, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{ {"d", "e"}, {"d", "f"}, @@ -160,7 +163,8 @@ func TestFieldsTermsIteratorTermsOnly(t *testing.T) { iter, err := newFieldsAndTermsIterator(reader, fieldsAndTermsIteratorOpts{}) require.NoError(t, err) - slice := toSlice(t, iter) + slice, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{ {"a", ""}, {"d", ""}, @@ -179,7 +183,8 @@ func TestFieldsTermsIteratorEmptyTerm(t *testing.T) { }) iter, err := newFieldsAndTermsIterator(reader, fieldsAndTermsIteratorOpts{iterateTerms: false}) require.NoError(t, err) - slice := toSlice(t, iter) + slice, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{{"a", ""}}, slice) } @@ -212,7 +217,8 @@ func TestFieldsTermsIteratorRestrictByQueryFields(t *testing.T) { }, }) require.NoError(t, err) - slice := toSlice(t, iter) + slice, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{{"bar", ""}}, slice) } @@ -225,7 +231,8 @@ func TestFieldsTermsIteratorEmptyTermInclude(t *testing.T) { }) iter, err := newFieldsAndTermsIterator(reader, fieldsAndTermsIteratorOpts{iterateTerms: true}) require.NoError(t, err) - slice := toSlice(t, iter) + slice, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{}, slice) } @@ -295,7 +302,8 @@ func TestFieldsTermsIteratorIterateTermsAndRestrictByQuery(t *testing.T) { }, }) require.NoError(t, err) - slice := toSlice(t, iter) + slice, err := toSlice(iter) + require.NoError(t, err) requireSlicesEqual(t, []pair{ {"color", "red"}, {"color", "yellow"}, @@ -512,7 +520,7 @@ func (s *fieldsTermsIterSetup) requireEquals(t *testing.T, iter fieldsAndTermsIt require.NoError(t, iter.Close()) } -func toSlice(t *testing.T, iter fieldsAndTermsIterator) []pair { +func toSlice(iter fieldsAndTermsIterator) ([]pair, error) { var pairs []pair for iter.Next() { n, v := iter.Current() @@ -524,8 +532,7 @@ func toSlice(t *testing.T, iter fieldsAndTermsIterator) []pair { Value: string(v), }) } - require.NoError(t, iter.Err()) - return pairs + return pairs, nil } func requireSlicesEqual(t *testing.T, a, b []pair) { From 98f75e7a03c33228074fd9f3292ad3b6a26572d9 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 8 Feb 2021 01:15:27 -0500 Subject: [PATCH 29/29] Fix not returning err from toSlice --- src/dbnode/storage/index/fields_terms_iterator_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/storage/index/fields_terms_iterator_test.go b/src/dbnode/storage/index/fields_terms_iterator_test.go index dab5e7a2cd..243f5336ca 100644 --- a/src/dbnode/storage/index/fields_terms_iterator_test.go +++ b/src/dbnode/storage/index/fields_terms_iterator_test.go @@ -532,7 +532,7 @@ func toSlice(iter fieldsAndTermsIterator) ([]pair, error) { Value: string(v), }) } - return pairs, nil + return pairs, iter.Err() } func requireSlicesEqual(t *testing.T, a, b []pair) {