From 3835fac1a9f55750abbd4ac3b281eab27942a9f8 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 24 Apr 2023 16:27:43 -0700 Subject: [PATCH 01/11] handle grpc code resource exhausted for store gateway Signed-off-by: Ben Ye --- integration/e2ecortex/client.go | 4 +- integration/querier_test.go | 93 ++++++++++++++++++++++++ pkg/querier/blocks_store_queryable.go | 59 ++++++++++++++- pkg/querier/error_translate_queryable.go | 1 - pkg/util/limiter/query_limiter.go | 40 +++++----- 5 files changed, 170 insertions(+), 27 deletions(-) diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index adfc99faf4..04e3d5f3ca 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -259,8 +259,8 @@ func (c *Client) LabelValues(label string, start, end time.Time, matches []strin } // LabelNames gets label names -func (c *Client) LabelNames(start, end time.Time) ([]string, error) { - result, _, err := c.querierClient.LabelNames(context.Background(), nil, start, end) +func (c *Client) LabelNames(start, end time.Time, matchers ...string) ([]string, error) { + result, _, err := c.querierClient.LabelNames(context.Background(), matchers, start, end) return result, err } diff --git a/integration/querier_test.go b/integration/querier_test.go index 651003a9ec..8ce155dbe0 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -818,6 +818,99 @@ func TestQuerierWithBlocksStorageOnMissingBlocksFromStorage(t *testing.T) { assert.Contains(t, err.Error(), "500") } +func TestQuerierWithBlocksStorageLimits(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + }) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Cortex components for the write path. + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(distributor, ingester)) + + // Wait until the distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + + // Push some series to Cortex. + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1") + require.NoError(t, err) + + seriesTimestamp := time.Now() + series1, _ := generateSeries("series_1", seriesTimestamp, prompb.Label{Name: "job", Value: "test"}) + series2, _ := generateSeries("series_2", seriesTimestamp, prompb.Label{Name: "job", Value: "test"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Wait until the TSDB head is compacted and shipped to the storage. + // The shipped block contains the 1st series, while the 2ns series in in the head. + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series")) + + // Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check. + storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-blocks-storage.bucket-store.sync-interval": "5s", + "-querier.max-fetched-series-per-query": "1", + }), "") + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{ + "-blocks-storage.bucket-store.sync-interval": "5s", + "-querier.max-fetched-series-per-query": "1", + }), "") + require.NoError(t, s.StartAndWaitReady(querier, storeGateway)) + + // Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total")) + require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount)) + + // Query back the series. + c, err = e2ecortex.NewClient("", querier.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // We expect all queries hitting 422 exceeded series limit + result, err := c.Query(`{job="test"}`, seriesTimestamp) + require.Error(t, err) + require.Contains(t, err.Error(), "422") + require.Contains(t, err.Error(), "exceeded series limit") + + result, err := c.Series([]string{`{job="test"}`}, seriesTimestamp.Add(-time.Hour), seriesTimestamp) + require.Error(t, err) + require.Contains(t, err.Error(), "422") + require.Contains(t, err.Error(), "exceeded series limit") + + c.LabelNames(seriesTimestamp.Add(-time.Hour), seriesTimestamp, `{job="test"}`) + require.Error(t, err) + require.Contains(t, err.Error(), "422") + require.Contains(t, err.Error(), "exceeded series limit") + + c.LabelValues(seriesTimestamp.Add(-time.Hour), seriesTimestamp, []string{`{job="test"}`}) + require.Error(t, err) + require.Contains(t, err.Error(), "422") + require.Contains(t, err.Error(), "exceeded series limit") +} + func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { const blockRangePeriod = 5 * time.Second diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index 468c3b8e97..c5f8db9f3d 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -639,6 +639,24 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( } if err != nil { + s, ok := status.FromError(err) + if !ok { + s, ok = status.FromError(errors.Cause(err)) + } + + if ok { + // Thanos Store Gateway uses it when hitting series/chunk limit. + if s.Code() == codes.ResourceExhausted { + message := s.Message() + // https://github.com/thanos-io/thanos/blob/3c0c9ffaed6ab0a7c52991dd8d7c695c49cff8ee/pkg/store/bucket.go#L937 + if strings.Contains(message, "exceeded series limit") { + return validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, queryLimiter.MaxSeriesPerQuery)) + } else if strings.Contains(message, "exceeded chunks limit") { + // https://github.com/thanos-io/thanos/blob/3c0c9ffaed6ab0a7c52991dd8d7c695c49cff8ee/pkg/store/bucket.go#L1036 + return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit)) + } + } + } return errors.Wrapf(err, "failed to receive series from %s", c.RemoteAddress()) } @@ -746,6 +764,7 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( warnings = storage.Warnings(nil) queriedBlocks = []ulid.ULID(nil) spanLog = spanlogger.FromContext(ctx) + queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) ) // Concurrently fetch series from all clients. @@ -763,10 +782,26 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( namesResp, err := c.LabelNames(gCtx, req) if err != nil { if isRetryableError(err) { - level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress())) + level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch label names from %s due to retryable error", c.RemoteAddress())) return nil } - return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) + + s, ok := status.FromError(err) + if !ok { + s, ok = status.FromError(errors.Cause(err)) + } + + if ok { + // Thanos Store Gateway uses it when hitting series/chunk limit. + if s.Code() == codes.ResourceExhausted { + message := s.Message() + // https://github.com/thanos-io/thanos/blob/3c0c9ffaed6ab0a7c52991dd8d7c695c49cff8ee/pkg/store/bucket.go#L937 + if strings.Contains(message, "exceeded series limit") { + return validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, queryLimiter.MaxSeriesPerQuery)) + } + } + } + return errors.Wrapf(err, "failed to fetch label names from %s", c.RemoteAddress()) } myQueriedBlocks := []ulid.ULID(nil) @@ -827,6 +862,7 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( warnings = storage.Warnings(nil) queriedBlocks = []ulid.ULID(nil) spanLog = spanlogger.FromContext(ctx) + queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) ) // Concurrently fetch series from all clients. @@ -844,10 +880,25 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( valuesResp, err := c.LabelValues(gCtx, req) if err != nil { if isRetryableError(err) { - level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch series from %s due to retryable error", c.RemoteAddress())) + level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch label values from %s due to retryable error", c.RemoteAddress())) return nil } - return errors.Wrapf(err, "failed to fetch series from %s", c.RemoteAddress()) + s, ok := status.FromError(err) + if !ok { + s, ok = status.FromError(errors.Cause(err)) + } + + if ok { + // Thanos Store Gateway uses it when hitting series/chunk limit. + if s.Code() == codes.ResourceExhausted { + message := s.Message() + // https://github.com/thanos-io/thanos/blob/3c0c9ffaed6ab0a7c52991dd8d7c695c49cff8ee/pkg/store/bucket.go#L937 + if strings.Contains(message, "exceeded series limit") { + return validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, queryLimiter.MaxSeriesPerQuery)) + } + } + } + return errors.Wrapf(err, "failed to fetch label values from %s", c.RemoteAddress()) } myQueriedBlocks := []ulid.ULID(nil) diff --git a/pkg/querier/error_translate_queryable.go b/pkg/querier/error_translate_queryable.go index 43ace739b5..5bb63a62e1 100644 --- a/pkg/querier/error_translate_queryable.go +++ b/pkg/querier/error_translate_queryable.go @@ -2,7 +2,6 @@ package querier import ( "context" - "github.com/cortexproject/cortex/pkg/util/validation" "github.com/gogo/status" diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go index 84031711e1..b775d55599 100644 --- a/pkg/util/limiter/query_limiter.go +++ b/pkg/util/limiter/query_limiter.go @@ -30,10 +30,10 @@ type QueryLimiter struct { dataBytesCount atomic.Int64 chunkCount atomic.Int64 - maxSeriesPerQuery int - maxChunkBytesPerQuery int - maxDataBytesPerQuery int - maxChunksPerQuery int + MaxSeriesPerQuery int + MaxChunkBytesPerQuery int + MaxDataBytesPerQuery int + MaxChunksPerQuery int } // NewQueryLimiter makes a new per-query limiter. Each query limiter @@ -43,10 +43,10 @@ func NewQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery, maxChunksPerQuery uniqueSeriesMx: sync.Mutex{}, uniqueSeries: map[model.Fingerprint]struct{}{}, - maxSeriesPerQuery: maxSeriesPerQuery, - maxChunkBytesPerQuery: maxChunkBytesPerQuery, - maxChunksPerQuery: maxChunksPerQuery, - maxDataBytesPerQuery: maxDataBytesPerQuery, + MaxSeriesPerQuery: maxSeriesPerQuery, + MaxChunkBytesPerQuery: maxChunkBytesPerQuery, + MaxChunksPerQuery: maxChunksPerQuery, + MaxDataBytesPerQuery: maxDataBytesPerQuery, } } @@ -68,7 +68,7 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter { // AddSeries adds the input series and returns an error if the limit is reached. func (ql *QueryLimiter) AddSeries(seriesLabels []cortexpb.LabelAdapter) error { // If the max series is unlimited just return without managing map - if ql.maxSeriesPerQuery == 0 { + if ql.MaxSeriesPerQuery == 0 { return nil } fingerprint := client.FastFingerprint(seriesLabels) @@ -77,9 +77,9 @@ func (ql *QueryLimiter) AddSeries(seriesLabels []cortexpb.LabelAdapter) error { defer ql.uniqueSeriesMx.Unlock() ql.uniqueSeries[fingerprint] = struct{}{} - if len(ql.uniqueSeries) > ql.maxSeriesPerQuery { + if len(ql.uniqueSeries) > ql.MaxSeriesPerQuery { // Format error with max limit - return fmt.Errorf(ErrMaxSeriesHit, ql.maxSeriesPerQuery) + return fmt.Errorf(ErrMaxSeriesHit, ql.MaxSeriesPerQuery) } return nil } @@ -93,33 +93,33 @@ func (ql *QueryLimiter) uniqueSeriesCount() int { // AddChunkBytes adds the input chunk size in bytes and returns an error if the limit is reached. func (ql *QueryLimiter) AddChunkBytes(chunkSizeInBytes int) error { - if ql.maxChunkBytesPerQuery == 0 { + if ql.MaxChunkBytesPerQuery == 0 { return nil } - if ql.chunkBytesCount.Add(int64(chunkSizeInBytes)) > int64(ql.maxChunkBytesPerQuery) { - return fmt.Errorf(ErrMaxChunkBytesHit, ql.maxChunkBytesPerQuery) + if ql.chunkBytesCount.Add(int64(chunkSizeInBytes)) > int64(ql.MaxChunkBytesPerQuery) { + return fmt.Errorf(ErrMaxChunkBytesHit, ql.MaxChunkBytesPerQuery) } return nil } // AddDataBytes adds the queried data bytes and returns an error if the limit is reached. func (ql *QueryLimiter) AddDataBytes(dataSizeInBytes int) error { - if ql.maxDataBytesPerQuery == 0 { + if ql.MaxDataBytesPerQuery == 0 { return nil } - if ql.dataBytesCount.Add(int64(dataSizeInBytes)) > int64(ql.maxDataBytesPerQuery) { - return fmt.Errorf(ErrMaxDataBytesHit, ql.maxDataBytesPerQuery) + if ql.dataBytesCount.Add(int64(dataSizeInBytes)) > int64(ql.MaxDataBytesPerQuery) { + return fmt.Errorf(ErrMaxDataBytesHit, ql.MaxDataBytesPerQuery) } return nil } func (ql *QueryLimiter) AddChunks(count int) error { - if ql.maxChunksPerQuery == 0 { + if ql.MaxChunksPerQuery == 0 { return nil } - if ql.chunkCount.Add(int64(count)) > int64(ql.maxChunksPerQuery) { - return fmt.Errorf(ErrMaxChunksPerQueryLimit, ql.maxChunksPerQuery) + if ql.chunkCount.Add(int64(count)) > int64(ql.MaxChunksPerQuery) { + return fmt.Errorf(ErrMaxChunksPerQueryLimit, ql.MaxChunksPerQuery) } return nil } From 8ba1015a6bf3f3354e905bfc586971586608d0d9 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 24 Apr 2023 16:42:24 -0700 Subject: [PATCH 02/11] fix lint Signed-off-by: Ben Ye --- integration/querier_test.go | 8 ++++---- pkg/querier/error_translate_queryable.go | 3 ++- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/integration/querier_test.go b/integration/querier_test.go index 8ce155dbe0..1f2bf82b50 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -890,22 +890,22 @@ func TestQuerierWithBlocksStorageLimits(t *testing.T) { require.NoError(t, err) // We expect all queries hitting 422 exceeded series limit - result, err := c.Query(`{job="test"}`, seriesTimestamp) + _, err = c.Query(`{job="test"}`, seriesTimestamp) require.Error(t, err) require.Contains(t, err.Error(), "422") require.Contains(t, err.Error(), "exceeded series limit") - result, err := c.Series([]string{`{job="test"}`}, seriesTimestamp.Add(-time.Hour), seriesTimestamp) + _, err = c.Series([]string{`{job="test"}`}, seriesTimestamp.Add(-time.Hour), seriesTimestamp) require.Error(t, err) require.Contains(t, err.Error(), "422") require.Contains(t, err.Error(), "exceeded series limit") - c.LabelNames(seriesTimestamp.Add(-time.Hour), seriesTimestamp, `{job="test"}`) + _, err = c.LabelNames(seriesTimestamp.Add(-time.Hour), seriesTimestamp, `{job="test"}`) require.Error(t, err) require.Contains(t, err.Error(), "422") require.Contains(t, err.Error(), "exceeded series limit") - c.LabelValues(seriesTimestamp.Add(-time.Hour), seriesTimestamp, []string{`{job="test"}`}) + _, err = c.LabelValues("__name__", seriesTimestamp.Add(-time.Hour), seriesTimestamp, []string{`{job="test"}`}) require.Error(t, err) require.Contains(t, err.Error(), "422") require.Contains(t, err.Error(), "exceeded series limit") diff --git a/pkg/querier/error_translate_queryable.go b/pkg/querier/error_translate_queryable.go index 5bb63a62e1..c0a14a3723 100644 --- a/pkg/querier/error_translate_queryable.go +++ b/pkg/querier/error_translate_queryable.go @@ -2,13 +2,14 @@ package querier import ( "context" - "github.com/cortexproject/cortex/pkg/util/validation" "github.com/gogo/status" "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" + + "github.com/cortexproject/cortex/pkg/util/validation" ) // TranslateToPromqlAPIError converts error to one of promql.Errors for consumption in PromQL API. From 15922b088c6f8d1dc57c42e7379019923f9b198a Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 24 Apr 2023 17:09:26 -0700 Subject: [PATCH 03/11] update changelog Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 44abcbd5ed..bd1ab53dda 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## master / unreleased * [BUGFIX] Ruler: Validate if rule group can be safely converted back to rule group yaml from protobuf message #5265 +* [BUGFIX] Querier: Convert gRPC `ResourceExhausted` status code from store gateway to 422 limit error. #5286 ## 1.15.0 2023-04-19 From 160f3b2beb93db94e565e068c25d032a91302703 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 24 Apr 2023 17:34:24 -0700 Subject: [PATCH 04/11] try fixing test Signed-off-by: Ben Ye --- integration/querier_test.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/integration/querier_test.go b/integration/querier_test.go index 1f2bf82b50..793e7dbe0e 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -851,8 +851,9 @@ func TestQuerierWithBlocksStorageLimits(t *testing.T) { require.NoError(t, err) seriesTimestamp := time.Now() + series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2) series1, _ := generateSeries("series_1", seriesTimestamp, prompb.Label{Name: "job", Value: "test"}) - series2, _ := generateSeries("series_2", seriesTimestamp, prompb.Label{Name: "job", Value: "test"}) + series2, _ := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "job", Value: "test"}) res, err := c.Push(series1) require.NoError(t, err) @@ -890,22 +891,22 @@ func TestQuerierWithBlocksStorageLimits(t *testing.T) { require.NoError(t, err) // We expect all queries hitting 422 exceeded series limit - _, err = c.Query(`{job="test"}`, seriesTimestamp) + _, err = c.Query(`{job="test"}`, series2Timestamp) require.Error(t, err) require.Contains(t, err.Error(), "422") require.Contains(t, err.Error(), "exceeded series limit") - _, err = c.Series([]string{`{job="test"}`}, seriesTimestamp.Add(-time.Hour), seriesTimestamp) + _, err = c.Series([]string{`{job="test"}`}, series2Timestamp.Add(-time.Hour), series2Timestamp) require.Error(t, err) require.Contains(t, err.Error(), "422") require.Contains(t, err.Error(), "exceeded series limit") - _, err = c.LabelNames(seriesTimestamp.Add(-time.Hour), seriesTimestamp, `{job="test"}`) + _, err = c.LabelNames(series2Timestamp.Add(-time.Hour), series2Timestamp, `{job="test"}`) require.Error(t, err) require.Contains(t, err.Error(), "422") require.Contains(t, err.Error(), "exceeded series limit") - _, err = c.LabelValues("__name__", seriesTimestamp.Add(-time.Hour), seriesTimestamp, []string{`{job="test"}`}) + _, err = c.LabelValues("__name__", series2Timestamp.Add(-time.Hour), series2Timestamp, []string{`{job="test"}`}) require.Error(t, err) require.Contains(t, err.Error(), "422") require.Contains(t, err.Error(), "exceeded series limit") From e72c50dd1bea1b29fc0f7cb26b49b6ea534f444d Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 24 Apr 2023 19:41:46 -0700 Subject: [PATCH 05/11] try to fix E2E test Signed-off-by: Ben Ye --- integration/e2ecortex/client.go | 84 +++++++++++++++++++++++++++++++-- integration/querier_test.go | 32 +++++++------ integration/query_fuzz_test.go | 1 - 3 files changed, 98 insertions(+), 19 deletions(-) diff --git a/integration/e2ecortex/client.go b/integration/e2ecortex/client.go index 04e3d5f3ca..7912f28c51 100644 --- a/integration/e2ecortex/client.go +++ b/integration/e2ecortex/client.go @@ -150,10 +150,88 @@ func (c *Client) QueryRangeRaw(query string, start, end time.Time, step time.Dur } // QueryRaw runs a query directly against the querier API. -func (c *Client) QueryRaw(query string) (*http.Response, []byte, error) { - addr := fmt.Sprintf("http://%s/api/prom/api/v1/query?query=%s", c.querierAddress, url.QueryEscape(query)) +func (c *Client) QueryRaw(query string, ts time.Time) (*http.Response, []byte, error) { + u := &url.URL{ + Scheme: "http", + Path: fmt.Sprintf("%s/api/prom/api/v1/query", c.querierAddress), + } + q := u.Query() + q.Set("query", query) - return c.query(addr) + if !ts.IsZero() { + q.Set("time", FormatTime(ts)) + } + u.RawQuery = q.Encode() + return c.query(u.String()) +} + +// SeriesRaw runs a series request directly against the querier API. +func (c *Client) SeriesRaw(matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) { + u := &url.URL{ + Scheme: "http", + Path: fmt.Sprintf("%s/api/prom/api/v1/series", c.querierAddress), + } + q := u.Query() + + for _, m := range matches { + q.Add("match[]", m) + } + + if !startTime.IsZero() { + q.Set("start", FormatTime(startTime)) + } + if !endTime.IsZero() { + q.Set("end", FormatTime(endTime)) + } + + u.RawQuery = q.Encode() + return c.query(u.String()) +} + +// LabelNamesRaw runs a label names request directly against the querier API. +func (c *Client) LabelNamesRaw(matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) { + u := &url.URL{ + Scheme: "http", + Path: fmt.Sprintf("%s/api/prom/api/v1/labels", c.querierAddress), + } + q := u.Query() + + for _, m := range matches { + q.Add("match[]", m) + } + + if !startTime.IsZero() { + q.Set("start", FormatTime(startTime)) + } + if !endTime.IsZero() { + q.Set("end", FormatTime(endTime)) + } + + u.RawQuery = q.Encode() + return c.query(u.String()) +} + +// LabelValuesRaw runs a label values request directly against the querier API. +func (c *Client) LabelValuesRaw(label string, matches []string, startTime, endTime time.Time) (*http.Response, []byte, error) { + u := &url.URL{ + Scheme: "http", + Path: fmt.Sprintf("%s/api/prom/api/v1/label/%s/values", c.querierAddress, label), + } + q := u.Query() + + for _, m := range matches { + q.Add("match[]", m) + } + + if !startTime.IsZero() { + q.Set("start", FormatTime(startTime)) + } + if !endTime.IsZero() { + q.Set("end", FormatTime(endTime)) + } + + u.RawQuery = q.Encode() + return c.query(u.String()) } // RemoteRead runs a remote read query. diff --git a/integration/querier_test.go b/integration/querier_test.go index 793e7dbe0e..1433cc3eee 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -5,6 +5,7 @@ package integration import ( "fmt" + "net/http" "strconv" "strings" "testing" @@ -891,25 +892,26 @@ func TestQuerierWithBlocksStorageLimits(t *testing.T) { require.NoError(t, err) // We expect all queries hitting 422 exceeded series limit - _, err = c.Query(`{job="test"}`, series2Timestamp) - require.Error(t, err) - require.Contains(t, err.Error(), "422") - require.Contains(t, err.Error(), "exceeded series limit") + resp, body, err := c.QueryRaw(`{job="test"}`, series2Timestamp) + require.NoError(t, err) + require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) + require.Contains(t, string(body), "exceeded series limit") + resp, body, err = c.SeriesRaw([]string{`{job="test"}`}, series2Timestamp.Add(-time.Hour), series2Timestamp) _, err = c.Series([]string{`{job="test"}`}, series2Timestamp.Add(-time.Hour), series2Timestamp) - require.Error(t, err) - require.Contains(t, err.Error(), "422") - require.Contains(t, err.Error(), "exceeded series limit") + require.NoError(t, err) + require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) + require.Contains(t, string(body), "max number of series limit") - _, err = c.LabelNames(series2Timestamp.Add(-time.Hour), series2Timestamp, `{job="test"}`) - require.Error(t, err) - require.Contains(t, err.Error(), "422") - require.Contains(t, err.Error(), "exceeded series limit") + resp, body, err = c.LabelNamesRaw([]string{`{job="test"}`}, series2Timestamp.Add(-time.Hour), series2Timestamp) + require.NoError(t, err) + require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) + require.Contains(t, string(body), "max number of series limit") - _, err = c.LabelValues("__name__", series2Timestamp.Add(-time.Hour), series2Timestamp, []string{`{job="test"}`}) - require.Error(t, err) - require.Contains(t, err.Error(), "422") - require.Contains(t, err.Error(), "exceeded series limit") + resp, body, err = c.LabelValuesRaw("__name__", []string{`{job="test"}`}, series2Timestamp.Add(-time.Hour), series2Timestamp) + require.NoError(t, err) + require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) + require.Contains(t, string(body), "max number of series limit") } func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index 7b19321c32..1562dbf2ee 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -133,7 +133,6 @@ func TestVerticalShardingFuzz(t *testing.T) { opts := []promqlsmith.Option{ promqlsmith.WithEnableOffset(true), promqlsmith.WithEnableAtModifier(true), - promqlsmith.WithEnableVectorMatching(true), } ps := promqlsmith.New(rnd, lbls, opts...) From 48a7b321a6f9f7900f487b450ce3313870d05d9d Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 24 Apr 2023 19:49:40 -0700 Subject: [PATCH 06/11] lint Signed-off-by: Ben Ye --- integration/query_frontend_test.go | 4 ++-- integration/zone_aware_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/integration/query_frontend_test.go b/integration/query_frontend_test.go index 5177eb1973..cd93908cd6 100644 --- a/integration/query_frontend_test.go +++ b/integration/query_frontend_test.go @@ -293,7 +293,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { // No need to repeat the test on missing metric name for each user. if userID == 0 && cfg.testMissingMetricName { - res, body, err := c.QueryRaw("{instance=~\"hello.*\"}") + res, body, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now()) require.NoError(t, err) require.Equal(t, 422, res.StatusCode) require.Contains(t, string(body), "query must contain metric name") @@ -317,7 +317,7 @@ func runQueryFrontendTest(t *testing.T, cfg queryFrontendTestConfig) { // No need to repeat the test on Server-Timing header for each user. if userID == 0 && cfg.queryStatsEnabled { - res, _, err := c.QueryRaw("{instance=~\"hello.*\"}") + res, _, err := c.QueryRaw("{instance=~\"hello.*\"}", time.Now()) require.NoError(t, err) require.Regexp(t, "querier_wall_time;dur=[0-9.]*, response_time;dur=[0-9.]*$", res.Header.Values("Server-Timing")[0]) } diff --git a/integration/zone_aware_test.go b/integration/zone_aware_test.go index 7cb0772213..c4d7937478 100644 --- a/integration/zone_aware_test.go +++ b/integration/zone_aware_test.go @@ -135,7 +135,7 @@ func TestZoneAwareReplication(t *testing.T) { require.NoError(t, ingester3.Kill()) // Query back any series => fail (either because of a timeout or 500) - result, _, err := client.QueryRaw("series_1") + result, _, err := client.QueryRaw("series_1", time.Now()) if !errors.Is(err, context.DeadlineExceeded) { require.NoError(t, err) require.Equal(t, 500, result.StatusCode) From 5da5bb40f2645179e9b52911453cbeb9a9b88cb0 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 24 Apr 2023 20:07:23 -0700 Subject: [PATCH 07/11] try again Signed-off-by: Ben Ye --- integration/querier_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/integration/querier_test.go b/integration/querier_test.go index 1433cc3eee..9eabf6b0cf 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -898,7 +898,6 @@ func TestQuerierWithBlocksStorageLimits(t *testing.T) { require.Contains(t, string(body), "exceeded series limit") resp, body, err = c.SeriesRaw([]string{`{job="test"}`}, series2Timestamp.Add(-time.Hour), series2Timestamp) - _, err = c.Series([]string{`{job="test"}`}, series2Timestamp.Add(-time.Hour), series2Timestamp) require.NoError(t, err) require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) require.Contains(t, string(body), "max number of series limit") From 20252692cf9fbed81df8a6759ed97d04d65b7671 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 24 Apr 2023 20:33:47 -0700 Subject: [PATCH 08/11] fix message Signed-off-by: Ben Ye --- integration/querier_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration/querier_test.go b/integration/querier_test.go index 9eabf6b0cf..1ccc4a9f6e 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -895,7 +895,7 @@ func TestQuerierWithBlocksStorageLimits(t *testing.T) { resp, body, err := c.QueryRaw(`{job="test"}`, series2Timestamp) require.NoError(t, err) require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) - require.Contains(t, string(body), "exceeded series limit") + require.Contains(t, string(body), "max number of series limit") resp, body, err = c.SeriesRaw([]string{`{job="test"}`}, series2Timestamp.Add(-time.Hour), series2Timestamp) require.NoError(t, err) From 8397f306e56963579e936c080edb5e170529100a Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 24 Apr 2023 22:57:14 -0700 Subject: [PATCH 09/11] remove labels API Signed-off-by: Ben Ye --- integration/querier_test.go | 10 -------- pkg/querier/blocks_store_queryable.go | 33 --------------------------- 2 files changed, 43 deletions(-) diff --git a/integration/querier_test.go b/integration/querier_test.go index 1ccc4a9f6e..26d0e62c21 100644 --- a/integration/querier_test.go +++ b/integration/querier_test.go @@ -901,16 +901,6 @@ func TestQuerierWithBlocksStorageLimits(t *testing.T) { require.NoError(t, err) require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) require.Contains(t, string(body), "max number of series limit") - - resp, body, err = c.LabelNamesRaw([]string{`{job="test"}`}, series2Timestamp.Add(-time.Hour), series2Timestamp) - require.NoError(t, err) - require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) - require.Contains(t, string(body), "max number of series limit") - - resp, body, err = c.LabelValuesRaw("__name__", []string{`{job="test"}`}, series2Timestamp.Add(-time.Hour), series2Timestamp) - require.NoError(t, err) - require.Equal(t, http.StatusUnprocessableEntity, resp.StatusCode) - require.Contains(t, string(body), "max number of series limit") } func TestQueryLimitsWithBlocksStorageRunningInMicroServices(t *testing.T) { diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index c5f8db9f3d..f4c260d746 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -645,7 +645,6 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( } if ok { - // Thanos Store Gateway uses it when hitting series/chunk limit. if s.Code() == codes.ResourceExhausted { message := s.Message() // https://github.com/thanos-io/thanos/blob/3c0c9ffaed6ab0a7c52991dd8d7c695c49cff8ee/pkg/store/bucket.go#L937 @@ -764,7 +763,6 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( warnings = storage.Warnings(nil) queriedBlocks = []ulid.ULID(nil) spanLog = spanlogger.FromContext(ctx) - queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) ) // Concurrently fetch series from all clients. @@ -786,21 +784,6 @@ func (q *blocksStoreQuerier) fetchLabelNamesFromStore( return nil } - s, ok := status.FromError(err) - if !ok { - s, ok = status.FromError(errors.Cause(err)) - } - - if ok { - // Thanos Store Gateway uses it when hitting series/chunk limit. - if s.Code() == codes.ResourceExhausted { - message := s.Message() - // https://github.com/thanos-io/thanos/blob/3c0c9ffaed6ab0a7c52991dd8d7c695c49cff8ee/pkg/store/bucket.go#L937 - if strings.Contains(message, "exceeded series limit") { - return validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, queryLimiter.MaxSeriesPerQuery)) - } - } - } return errors.Wrapf(err, "failed to fetch label names from %s", c.RemoteAddress()) } @@ -862,7 +845,6 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( warnings = storage.Warnings(nil) queriedBlocks = []ulid.ULID(nil) spanLog = spanlogger.FromContext(ctx) - queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx) ) // Concurrently fetch series from all clients. @@ -883,21 +865,6 @@ func (q *blocksStoreQuerier) fetchLabelValuesFromStore( level.Warn(spanLog).Log("err", errors.Wrapf(err, "failed to fetch label values from %s due to retryable error", c.RemoteAddress())) return nil } - s, ok := status.FromError(err) - if !ok { - s, ok = status.FromError(errors.Cause(err)) - } - - if ok { - // Thanos Store Gateway uses it when hitting series/chunk limit. - if s.Code() == codes.ResourceExhausted { - message := s.Message() - // https://github.com/thanos-io/thanos/blob/3c0c9ffaed6ab0a7c52991dd8d7c695c49cff8ee/pkg/store/bucket.go#L937 - if strings.Contains(message, "exceeded series limit") { - return validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, queryLimiter.MaxSeriesPerQuery)) - } - } - } return errors.Wrapf(err, "failed to fetch label values from %s", c.RemoteAddress()) } From e3b0853733a0f4bdc426c8ae4d2dfb1d8f2bd977 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 25 Apr 2023 09:38:10 -0700 Subject: [PATCH 10/11] remove logic to check string contains Signed-off-by: Ben Ye --- pkg/querier/blocks_store_queryable.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index f4c260d746..aa100bf683 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -646,14 +646,7 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores( if ok { if s.Code() == codes.ResourceExhausted { - message := s.Message() - // https://github.com/thanos-io/thanos/blob/3c0c9ffaed6ab0a7c52991dd8d7c695c49cff8ee/pkg/store/bucket.go#L937 - if strings.Contains(message, "exceeded series limit") { - return validation.LimitError(fmt.Sprintf(limiter.ErrMaxSeriesHit, queryLimiter.MaxSeriesPerQuery)) - } else if strings.Contains(message, "exceeded chunks limit") { - // https://github.com/thanos-io/thanos/blob/3c0c9ffaed6ab0a7c52991dd8d7c695c49cff8ee/pkg/store/bucket.go#L1036 - return validation.LimitError(fmt.Sprintf(errMaxChunksPerQueryLimit, util.LabelMatchersToString(matchers), maxChunksLimit)) - } + return validation.LimitError(s.Message()) } } return errors.Wrapf(err, "failed to receive series from %s", c.RemoteAddress()) From 17e46b5fbfd07b780b0d4539f8f32d3892793105 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Tue, 25 Apr 2023 09:41:26 -0700 Subject: [PATCH 11/11] make limiter vars private Signed-off-by: Ben Ye --- pkg/util/limiter/query_limiter.go | 40 +++++++++++++++---------------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/pkg/util/limiter/query_limiter.go b/pkg/util/limiter/query_limiter.go index b775d55599..84031711e1 100644 --- a/pkg/util/limiter/query_limiter.go +++ b/pkg/util/limiter/query_limiter.go @@ -30,10 +30,10 @@ type QueryLimiter struct { dataBytesCount atomic.Int64 chunkCount atomic.Int64 - MaxSeriesPerQuery int - MaxChunkBytesPerQuery int - MaxDataBytesPerQuery int - MaxChunksPerQuery int + maxSeriesPerQuery int + maxChunkBytesPerQuery int + maxDataBytesPerQuery int + maxChunksPerQuery int } // NewQueryLimiter makes a new per-query limiter. Each query limiter @@ -43,10 +43,10 @@ func NewQueryLimiter(maxSeriesPerQuery, maxChunkBytesPerQuery, maxChunksPerQuery uniqueSeriesMx: sync.Mutex{}, uniqueSeries: map[model.Fingerprint]struct{}{}, - MaxSeriesPerQuery: maxSeriesPerQuery, - MaxChunkBytesPerQuery: maxChunkBytesPerQuery, - MaxChunksPerQuery: maxChunksPerQuery, - MaxDataBytesPerQuery: maxDataBytesPerQuery, + maxSeriesPerQuery: maxSeriesPerQuery, + maxChunkBytesPerQuery: maxChunkBytesPerQuery, + maxChunksPerQuery: maxChunksPerQuery, + maxDataBytesPerQuery: maxDataBytesPerQuery, } } @@ -68,7 +68,7 @@ func QueryLimiterFromContextWithFallback(ctx context.Context) *QueryLimiter { // AddSeries adds the input series and returns an error if the limit is reached. func (ql *QueryLimiter) AddSeries(seriesLabels []cortexpb.LabelAdapter) error { // If the max series is unlimited just return without managing map - if ql.MaxSeriesPerQuery == 0 { + if ql.maxSeriesPerQuery == 0 { return nil } fingerprint := client.FastFingerprint(seriesLabels) @@ -77,9 +77,9 @@ func (ql *QueryLimiter) AddSeries(seriesLabels []cortexpb.LabelAdapter) error { defer ql.uniqueSeriesMx.Unlock() ql.uniqueSeries[fingerprint] = struct{}{} - if len(ql.uniqueSeries) > ql.MaxSeriesPerQuery { + if len(ql.uniqueSeries) > ql.maxSeriesPerQuery { // Format error with max limit - return fmt.Errorf(ErrMaxSeriesHit, ql.MaxSeriesPerQuery) + return fmt.Errorf(ErrMaxSeriesHit, ql.maxSeriesPerQuery) } return nil } @@ -93,33 +93,33 @@ func (ql *QueryLimiter) uniqueSeriesCount() int { // AddChunkBytes adds the input chunk size in bytes and returns an error if the limit is reached. func (ql *QueryLimiter) AddChunkBytes(chunkSizeInBytes int) error { - if ql.MaxChunkBytesPerQuery == 0 { + if ql.maxChunkBytesPerQuery == 0 { return nil } - if ql.chunkBytesCount.Add(int64(chunkSizeInBytes)) > int64(ql.MaxChunkBytesPerQuery) { - return fmt.Errorf(ErrMaxChunkBytesHit, ql.MaxChunkBytesPerQuery) + if ql.chunkBytesCount.Add(int64(chunkSizeInBytes)) > int64(ql.maxChunkBytesPerQuery) { + return fmt.Errorf(ErrMaxChunkBytesHit, ql.maxChunkBytesPerQuery) } return nil } // AddDataBytes adds the queried data bytes and returns an error if the limit is reached. func (ql *QueryLimiter) AddDataBytes(dataSizeInBytes int) error { - if ql.MaxDataBytesPerQuery == 0 { + if ql.maxDataBytesPerQuery == 0 { return nil } - if ql.dataBytesCount.Add(int64(dataSizeInBytes)) > int64(ql.MaxDataBytesPerQuery) { - return fmt.Errorf(ErrMaxDataBytesHit, ql.MaxDataBytesPerQuery) + if ql.dataBytesCount.Add(int64(dataSizeInBytes)) > int64(ql.maxDataBytesPerQuery) { + return fmt.Errorf(ErrMaxDataBytesHit, ql.maxDataBytesPerQuery) } return nil } func (ql *QueryLimiter) AddChunks(count int) error { - if ql.MaxChunksPerQuery == 0 { + if ql.maxChunksPerQuery == 0 { return nil } - if ql.chunkCount.Add(int64(count)) > int64(ql.MaxChunksPerQuery) { - return fmt.Errorf(ErrMaxChunksPerQueryLimit, ql.MaxChunksPerQuery) + if ql.chunkCount.Add(int64(count)) > int64(ql.maxChunksPerQuery) { + return fmt.Errorf(ErrMaxChunksPerQueryLimit, ql.maxChunksPerQuery) } return nil }