Skip to content

Commit

Permalink
Return grpc code resource exhausted for byte limit error (thanos-io#6325
Browse files Browse the repository at this point in the history
)

* return grpc code resource exhausted for byte limit error

Signed-off-by: Ben Ye <benye@amazon.com>

* fix lint

Signed-off-by: Ben Ye <benye@amazon.com>

* update partial response strategy

Signed-off-by: Ben Ye <benye@amazon.com>

* fix limit

Signed-off-by: Ben Ye <benye@amazon.com>

* try to fix tests

Signed-off-by: Ben Ye <benye@amazon.com>

* fix test error message

Signed-off-by: Ben Ye <benye@amazon.com>

* fix test

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 authored and HC Zhu committed Jun 27, 2023
1 parent e5393f8 commit a00789c
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6306](https://github.com/thanos-io/thanos/pull/6306) Tracing: tracing in OTLP utilize the OTEL_TRACES_SAMPLER env variable
- [#6330](https://github.com/thanos-io/thanos/pull/6330) Store: Fix inconsistent error for series limits.
- [#6342](https://github.com/thanos-io/thanos/pull/6342) Cache/Redis: Upgrade `rueidis` to v1.0.2 to to improve error handling while shrinking a redis cluster.
- [#6325](https://github.com/thanos-io/thanos/pull/6325) Store: return gRPC resource exhausted error for byte limiter.

### Changed
- [#6168](https://github.com/thanos-io/thanos/pull/6168) Receiver: Make ketama hashring fail early when configured with number of nodes lower than the replication factor.
Expand Down
12 changes: 6 additions & 6 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2329,7 +2329,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
fromCache, _ := r.block.indexCache.FetchMultiPostings(ctx, r.block.meta.ULID, keys)
for _, dataFromCache := range fromCache {
if err := bytesLimiter.Reserve(uint64(len(dataFromCache))); err != nil {
return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while loading postings from index cache")
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading postings from index cache: %s", err)
}
}

Expand Down Expand Up @@ -2402,7 +2402,7 @@ func (r *bucketIndexReader) fetchPostings(ctx context.Context, keys []labels.Lab
length := int64(part.End) - start

if err := bytesLimiter.Reserve(uint64(length)); err != nil {
return nil, closeFns, errors.Wrap(err, "bytes limit exceeded while fetching postings")
return nil, closeFns, httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching postings: %s", err)
}
}

Expand Down Expand Up @@ -2562,7 +2562,7 @@ func (r *bucketIndexReader) PreloadSeries(ctx context.Context, ids []storage.Ser
for id, b := range fromCache {
r.loadedSeries[id] = b
if err := bytesLimiter.Reserve(uint64(len(b))); err != nil {
return errors.Wrap(err, "exceeded bytes limit while loading series from index cache")
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while loading series from index cache: %s", err)
}
}

Expand All @@ -2587,7 +2587,7 @@ func (r *bucketIndexReader) loadSeries(ctx context.Context, ids []storage.Series

if bytesLimiter != nil {
if err := bytesLimiter.Reserve(uint64(end - start)); err != nil {
return errors.Wrap(err, "exceeded bytes limit while fetching series")
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching series: %s", err)
}
}

Expand Down Expand Up @@ -2859,7 +2859,7 @@ func (r *bucketChunkReader) load(ctx context.Context, res []seriesEntry, aggrs [

for _, p := range parts {
if err := bytesLimiter.Reserve(uint64(p.End - p.Start)); err != nil {
return errors.Wrap(err, "bytes limit exceeded while fetching chunks")
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err)
}
}

Expand Down Expand Up @@ -2976,7 +2976,7 @@ func (r *bucketChunkReader) loadChunks(ctx context.Context, res []seriesEntry, a
// Read entire chunk into new buffer.
// TODO: readChunkRange call could be avoided for any chunk but last in this particular part.
if err := bytesLimiter.Reserve(uint64(chunkLen)); err != nil {
return errors.Wrap(err, "bytes limit exceeded while fetching chunks")
return httpgrpc.Errorf(int(codes.ResourceExhausted), "exceeded bytes limit while fetching chunks: %s", err)
}
nb, err := r.block.readChunkRange(ctx, seq, int64(pIdx.offset), int64(chunkLen), []byteRange{{offset: 0, length: chunkLen}})
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"
"time"

"github.com/alecthomas/units"
"github.com/go-kit/log"
"github.com/gogo/status"
"github.com/oklog/ulid"
Expand Down Expand Up @@ -602,6 +603,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
cases := map[string]struct {
maxChunksLimit uint64
maxSeriesLimit uint64
maxBytesLimit int64
expectedErr string
code codes.Code
}{
Expand All @@ -619,6 +621,13 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {
maxSeriesLimit: 1,
code: codes.ResourceExhausted,
},
"should fail if the max bytes limit is exceeded - ResourceExhausted": {
maxChunksLimit: expectedChunks,
expectedErr: "exceeded bytes limit",
maxSeriesLimit: 2,
maxBytesLimit: 1,
code: codes.ResourceExhausted,
},
}

for testName, testData := range cases {
Expand All @@ -629,7 +638,7 @@ func TestBucketStore_Series_ChunksLimiter_e2e(t *testing.T) {

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(testData.maxChunksLimit), NewSeriesLimiterFactory(testData.maxSeriesLimit), NewBytesLimiterFactory(units.Base2Bytes(testData.maxBytesLimit)), emptyRelabelConfig, allowAllFilterConf)
testutil.Ok(t, s.store.SyncBlocks(ctx))

req := &storepb.SeriesRequest{
Expand Down
3 changes: 3 additions & 0 deletions pkg/store/bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1213,6 +1213,9 @@ func benchmarkExpandedPostings(

for _, c := range cases {
t.Run(c.name, func(t testutil.TB) {
if c.name != `i=~".*"` {
return
}
b := &bucketBlock{
logger: log.NewNopLogger(),
metrics: newBucketStoreMetrics(nil),
Expand Down
40 changes: 13 additions & 27 deletions test/e2e/store_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/efficientgo/e2e"
e2edb "github.com/efficientgo/e2e/db"
e2emon "github.com/efficientgo/e2e/monitoring"
Expand All @@ -23,18 +24,17 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/thanos-io/objstore/providers/s3"

"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/client"

"github.com/efficientgo/core/testutil"
"github.com/thanos-io/objstore/providers/s3"

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/cacheutil"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"github.com/thanos-io/thanos/test/e2e/e2ethanos"
)
Expand Down Expand Up @@ -860,18 +860,14 @@ config:
testutil.Ok(t, store1.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced"))
testutil.Ok(t, store2.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced"))
testutil.Ok(t, store3.WaitSumMetrics(e2emon.Equals(4), "thanos_blocks_meta_synced"))
opts := promclient.QueryOptions{Deduplicate: true, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}

t.Run("Series() limits", func(t *testing.T) {

testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error {
_, err := simpleInstantQuery(t,
ctx,
q1.Endpoint("http"),
func() string { return testQuery },
time.Now,
promclient.QueryOptions{Deduplicate: true}, 0)
if err != nil {
if strings.Contains(err.Error(), "expanded matching posting: get postings: bytes limit exceeded while fetching postings: limit 1 violated") {
if _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q1.Endpoint("http")), testQuery, time.Now(), opts); err != nil {
e := err.Error()
if strings.Contains(e, "expanded matching posting: get postings") && strings.Contains(e, "exceeded bytes limit while fetching postings: limit 1 violated") {
return nil
}
return err
Expand All @@ -880,14 +876,9 @@ config:
}))

testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error {
_, err := simpleInstantQuery(t,
ctx,
q2.Endpoint("http"),
func() string { return testQuery },
time.Now,
promclient.QueryOptions{Deduplicate: true}, 0)
if err != nil {
if strings.Contains(err.Error(), "preload series: exceeded bytes limit while fetching series: limit 100 violated") {
if _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q2.Endpoint("http")), testQuery, time.Now(), opts); err != nil {
e := err.Error()
if strings.Contains(e, "preload series") && strings.Contains(e, "exceeded bytes limit while fetching series: limit 100 violated") {
return nil
}
return err
Expand All @@ -896,14 +887,9 @@ config:
}))

testutil.Ok(t, runutil.RetryWithLog(log.NewLogfmtLogger(os.Stdout), 5*time.Second, ctx.Done(), func() error {
_, err := simpleInstantQuery(t,
ctx,
q3.Endpoint("http"),
func() string { return testQuery },
time.Now,
promclient.QueryOptions{Deduplicate: true}, 0)
if err != nil {
if strings.Contains(err.Error(), "load chunks: bytes limit exceeded while fetching chunks: limit 196627 violated") {
if _, _, err := promclient.NewDefaultClient().QueryInstant(ctx, urlParse(t, "http://"+q3.Endpoint("http")), testQuery, time.Now(), opts); err != nil {
e := err.Error()
if strings.Contains(e, "load chunks") && strings.Contains(e, "exceeded bytes limit while fetching chunks: limit 196627 violated") {
return nil
}
return err
Expand Down

0 comments on commit a00789c

Please sign in to comment.