From a00789c8d42e5eccf5c668fffc8267836f53d056 Mon Sep 17 00:00:00 2001 From: Ben Ye Date: Mon, 8 May 2023 08:49:48 -0700 Subject: [PATCH] Return grpc code resource exhausted for byte limit error (#6325) * return grpc code resource exhausted for byte limit error Signed-off-by: Ben Ye * fix lint Signed-off-by: Ben Ye * update partial response strategy Signed-off-by: Ben Ye * fix limit Signed-off-by: Ben Ye * try to fix tests Signed-off-by: Ben Ye * fix test error message Signed-off-by: Ben Ye * fix test Signed-off-by: Ben Ye --------- Signed-off-by: Ben Ye --- CHANGELOG.md | 1 + pkg/store/bucket.go | 12 +++++----- pkg/store/bucket_e2e_test.go | 11 +++++++++- pkg/store/bucket_test.go | 3 +++ test/e2e/store_gateway_test.go | 40 +++++++++++----------------------- 5 files changed, 33 insertions(+), 34 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c08147575b..dedf12c5b68 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6306](https://github.com/thanos-io/thanos/pull/6306) Tracing: tracing in OTLP utilize the OTEL_TRACES_SAMPLER env variable - [#6330](https://github.com/thanos-io/thanos/pull/6330) Store: Fix inconsistent error for series limits. - [#6342](https://github.com/thanos-io/thanos/pull/6342) Cache/Redis: Upgrade `rueidis` to v1.0.2 to to improve error handling while shrinking a redis cluster. +- [#6325](https://github.com/thanos-io/thanos/pull/6325) Store: return gRPC resource exhausted error for byte limiter. ### Changed - [#6168](https://github.com/thanos-io/thanos/pull/6168) Receiver: Make ketama hashring fail early when configured with number of nodes lower than the replication factor. diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 876f8220c6d..f95338a9bfb 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -2329,7 +2329,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys) for _, dataFromCache := range fromCache { if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil { - return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache") + return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err) } } @@ -2402,7 +2402,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab length := int64(part.End) - start if err := bytesLimiter.Reserve(uint64(length)); err != nil { - return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while fetching postings") + return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching postings: %s", err) } } @@ -2562,7 +2562,7 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser for id, b := range fromCache { r.loadedSeries[id] = b if err := bytesLimiter.Reserve(uint64(len(b))); err != nil { - return errors.Wrap(err, "exceeded bytes limit while loading series from index cache") + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading series from index cache: %s", err) } } @@ -2587,7 +2587,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series if bytesLimiter != nil { if err := bytesLimiter.Reserve(uint64(end - start)); err != nil { - return errors.Wrap(err, "exceeded bytes limit while fetching series") + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching series: %s", err) } } @@ -2859,7 +2859,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [ for _, p := range parts { if err := bytesLimiter.Reserve(uint64(p.End - p.Start)); err != nil { - return errors.Wrap(err, "bytes limit exceeded while fetching chunks") + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err) } } @@ -2976,7 +2976,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a // Read entire chunk into new buffer. // TODO: readChunkRange call could be avoided for any chunk but last in this particular part. if err := bytesLimiter.Reserve(uint64(chunkLen)); err != nil { - return errors.Wrap(err, "bytes limit exceeded while fetching chunks") + return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err) } nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}}) if err != nil { diff --git a/pkg/store/bucket_e2e_test.go b/pkg/store/bucket_e2e_test.go index 1847e336b70..9e1fd17ca44 100644 --- a/pkg/store/bucket_e2e_test.go +++ b/pkg/store/bucket_e2e_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/alecthomas/units" "github.com/go-kit/log" "github.com/gogo/status" "github.com/oklog/ulid" @@ -602,6 +603,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { cases := map[string]struct { maxChunksLimit uint64 maxSeriesLimit uint64 + maxBytesLimit int64 expectedErr string code codes.Code }{ @@ -619,6 +621,13 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { maxSeriesLimit: 1, code: codes.ResourceExhausted, }, + "should fail if the max bytes limit is exceeded - ResourceExhausted": { + maxChunksLimit: expectedChunks, + expectedErr: "exceeded bytes limit", + maxSeriesLimit: 2, + maxBytesLimit: 1, + code: codes.ResourceExhausted, + }, } for testName, testData := range cases { @@ -629,7 +638,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) { dir := t.TempDir() - s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf) + s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(units.Base2Bytes(testData.maxBytesLimit)), emptyRelabelConfig, allowAllFilterConf) testutil.Ok(t, s.store.SyncBlocks(ctx)) req := &storepb.SeriesRequest{ diff --git a/pkg/store/bucket_test.go b/pkg/store/bucket_test.go index 28b96025dba..90ba08feef6 100644 --- a/pkg/store/bucket_test.go +++ b/pkg/store/bucket_test.go @@ -1213,6 +1213,9 @@ func benchmarkExpandedPostings( for _, c := range cases { t.Run(c.name, func(t testutil.TB) { + if c.name != `i=~".*"` { + return + } b := &bucketBlock{ logger: log.NewNopLogger(), metrics: newBucketStoreMetrics(nil), diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index d82eac5585b..efcf1773666 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -14,6 +14,7 @@ import ( "testing" "time" + "github.com/efficientgo/core/testutil" "github.com/efficientgo/e2e" e2edb "github.com/efficientgo/e2e/db" e2emon "github.com/efficientgo/e2e/monitoring" @@ -23,18 +24,17 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/model/timestamp" - "github.com/thanos-io/objstore/providers/s3" "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/client" - - "github.com/efficientgo/core/testutil" + "github.com/thanos-io/objstore/providers/s3" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/cacheutil" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" "github.com/thanos-io/thanos/test/e2e/e2ethanos" ) @@ -860,18 +860,14 @@ config: testutil.Ok(t, store1.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) testutil.Ok(t, store2.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) testutil.Ok(t, store3.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced")) + opts := promclient.QueryOptions{Deduplicate: true, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT} t.Run("Series() limits", func(t *testing.T) { testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { - _, err := simpleInstantQuery(t, - ctx, - q1.Endpoint("http"), - func() string { return testQuery }, - time.Now, - promclient.QueryOptions{Deduplicate: true}, 0) - if err != nil { - if strings.Contains(err.Error(), "expanded matching posting: get postings: bytes limit exceeded while fetching postings: limit 1 violated") { + if _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q1.Endpoint("http")), testQuery, time.Now(), opts); err != nil { + e := err.Error() + if strings.Contains(e, "expanded matching posting: get postings") && strings.Contains(e, "exceeded bytes limit while fetching postings: limit 1 violated") { return nil } return err @@ -880,14 +876,9 @@ config: })) testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { - _, err := simpleInstantQuery(t, - ctx, - q2.Endpoint("http"), - func() string { return testQuery }, - time.Now, - promclient.QueryOptions{Deduplicate: true}, 0) - if err != nil { - if strings.Contains(err.Error(), "preload series: exceeded bytes limit while fetching series: limit 100 violated") { + if _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q2.Endpoint("http")), testQuery, time.Now(), opts); err != nil { + e := err.Error() + if strings.Contains(e, "preload series") && strings.Contains(e, "exceeded bytes limit while fetching series: limit 100 violated") { return nil } return err @@ -896,14 +887,9 @@ config: })) testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error { - _, err := simpleInstantQuery(t, - ctx, - q3.Endpoint("http"), - func() string { return testQuery }, - time.Now, - promclient.QueryOptions{Deduplicate: true}, 0) - if err != nil { - if strings.Contains(err.Error(), "load chunks: bytes limit exceeded while fetching chunks: limit 196627 violated") { + if _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q3.Endpoint("http")), testQuery, time.Now(), opts); err != nil { + e := err.Error() + if strings.Contains(e, "load chunks") && strings.Contains(e, "exceeded bytes limit while fetching chunks: limit 196627 violated") { return nil } return err