Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max-inflight-requests limit to store gateway #5553

Merged
merged 9 commits into from
Sep 13, 2023
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
* [FEATURE] Ruler: Support for filtering rules in the API. #5417
* [FEATURE] Compactor: Add `-compactor.ring.tokens-file-path` to store generated tokens locally. #5432
* [FEATURE] Query Frontend: Add `-frontend.retry-on-too-many-outstanding-requests` to re-enqueue 429 requests if there are multiple query-schedulers available. #5496
* [FEATURE] Store Gateway: Add `-blocks-storage.bucket-store.max-inflight-requests`for store gateways to reject further requests upon reaching the limit. #5553
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,11 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.max-concurrent
[max_concurrent: <int> | default = 100]

# Max number of inflight queries to execute against the long-term storage.
# The limit is shared across all tenants. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
[max_inflight_requests: <int> | default = 0]

justinjung04 marked this conversation as resolved.
Show resolved Hide resolved
# Maximum number of concurrent tenants synching blocks.
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
[tenant_sync_concurrency: <int> | default = 10]
Expand Down
5 changes: 5 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,11 @@ blocks_storage:
# CLI flag: -blocks-storage.bucket-store.max-concurrent
[max_concurrent: <int> | default = 100]

# Max number of inflight queries to execute against the long-term storage.
# The limit is shared across all tenants. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
[max_inflight_requests: <int> | default = 0]

# Maximum number of concurrent tenants synching blocks.
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
[tenant_sync_concurrency: <int> | default = 10]
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,11 @@ bucket_store:
# CLI flag: -blocks-storage.bucket-store.max-concurrent
[max_concurrent: <int> | default = 100]

# Max number of inflight queries to execute against the long-term storage. The
# limit is shared across all tenants. 0 to disable.
# CLI flag: -blocks-storage.bucket-store.max-inflight-requests
[max_inflight_requests: <int> | default = 0]

# Maximum number of concurrent tenants synching blocks.
# CLI flag: -blocks-storage.bucket-store.tenant-sync-concurrency
[tenant_sync_concurrency: <int> | default = 10]
Expand Down
2 changes: 2 additions & 0 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,8 @@ func isRetryableError(err error) bool {
switch status.Code(err) {
case codes.Unavailable:
return true
case codes.ResourceExhausted:
return errors.Is(err, storegateway.ErrTooManyInflightRequests)
justinjung04 marked this conversation as resolved.
Show resolved Hide resolved
default:
return false
}
Expand Down
51 changes: 51 additions & 0 deletions pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"google.golang.org/grpc/status"

"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
"github.com/cortexproject/cortex/pkg/storegateway"
"github.com/cortexproject/cortex/pkg/storegateway/storegatewaypb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/limiter"
Expand Down Expand Up @@ -708,6 +709,56 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
},
},
},
"multiple store-gateways has the block, but one of them had too many inflight requests": {
finderResult: bucketindex.Blocks{
{ID: block1},
},
storeSetResponses: []interface{}{
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "1.1.1.1",
mockedSeriesErr: storegateway.ErrTooManyInflightRequests,
}: {block1},
},
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2),
mockHintsResponse(block1),
}}: {block1},
},
},
limits: &blocksStoreLimitsMock{},
queryLimiter: noOpQueryLimiter,
expectedSeries: []seriesResult{
{
lbls: labels.New(metricNameLabel, series1Label),
values: []valueResult{
{t: minT, v: 2},
},
},
},
},
"store gateway returns resource exhausted error other than max inflight request": {
finderResult: bucketindex.Blocks{
{ID: block1},
},
storeSetResponses: []interface{}{
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{
remoteAddr: "1.1.1.1",
mockedSeriesErr: status.Error(codes.ResourceExhausted, "some other resource"),
}: {block1},
},
map[BlocksStoreClient][]ulid.ULID{
&storeGatewayClientMock{remoteAddr: "2.2.2.2", mockedSeriesResponses: []*storepb.SeriesResponse{
mockSeriesResponse(labels.Labels{metricNameLabel, series1Label}, minT, 2),
mockHintsResponse(block1),
}}: {block1},
},
},
limits: &blocksStoreLimitsMock{},
expectedErr: errors.Wrapf(status.Error(codes.ResourceExhausted, "some other resource"), "failed to fetch series from 1.1.1.1"),
},
}

for testName, testData := range tests {
Expand Down
2 changes: 2 additions & 0 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ type BucketStoreConfig struct {
SyncDir string `yaml:"sync_dir"`
SyncInterval time.Duration `yaml:"sync_interval"`
MaxConcurrent int `yaml:"max_concurrent"`
MaxInflightRequests int `yaml:"max_inflight_requests"`
TenantSyncConcurrency int `yaml:"tenant_sync_concurrency"`
BlockSyncConcurrency int `yaml:"block_sync_concurrency"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
Expand Down Expand Up @@ -291,6 +292,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.ChunkPoolMinBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-min-bucket-size-bytes", ChunkPoolDefaultMinBucketSize, "Size - in bytes - of the smallest chunks pool bucket.")
f.IntVar(&cfg.ChunkPoolMaxBucketSizeBytes, "blocks-storage.bucket-store.chunk-pool-max-bucket-size-bytes", ChunkPoolDefaultMaxBucketSize, "Size - in bytes - of the largest chunks pool bucket.")
f.IntVar(&cfg.MaxConcurrent, "blocks-storage.bucket-store.max-concurrent", 100, "Max number of concurrent queries to execute against the long-term storage. The limit is shared across all tenants.")
f.IntVar(&cfg.MaxInflightRequests, "blocks-storage.bucket-store.max-inflight-requests", 0, "Max number of inflight queries to execute against the long-term storage. The limit is shared across all tenants. 0 to disable.")
f.IntVar(&cfg.TenantSyncConcurrency, "blocks-storage.bucket-store.tenant-sync-concurrency", 10, "Maximum number of concurrent tenants synching blocks.")
f.IntVar(&cfg.BlockSyncConcurrency, "blocks-storage.bucket-store.block-sync-concurrency", 20, "Maximum number of concurrent blocks synching per tenant.")
f.IntVar(&cfg.MetaSyncConcurrency, "blocks-storage.bucket-store.meta-sync-concurrency", 20, "Number of Go routines to use when syncing block meta files from object storage per tenant.")
Expand Down
35 changes: 35 additions & 0 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/weaveworks/common/logging"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/cortexproject/cortex/pkg/storage/bucket"
"github.com/cortexproject/cortex/pkg/storage/tsdb"
Expand Down Expand Up @@ -72,13 +73,19 @@ type BucketStores struct {
storesErrorsMu sync.RWMutex
storesErrors map[string]error

// Keeps number of inflight requests
inflightRequestCnt int
inflightRequestMu sync.RWMutex

// Metrics.
syncTimes prometheus.Histogram
syncLastSuccess prometheus.Gauge
tenantsDiscovered prometheus.Gauge
tenantsSynced prometheus.Gauge
}

var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway")

// NewBucketStores makes a new BucketStores.
func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.Bucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) {
cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, bucketClient, logger, reg)
Expand Down Expand Up @@ -313,6 +320,16 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
return nil
}

maxInflightRequests := u.cfg.BucketStore.MaxInflightRequests
if maxInflightRequests > 0 {
if u.getInflightRequestCnt() >= maxInflightRequests {
return ErrTooManyInflightRequests
}

u.incrementInflightRequestCnt()
defer u.decrementInflightRequestCnt()
}

err = store.Series(req, spanSeriesServer{
Store_SeriesServer: srv,
ctx: spanCtx,
Expand All @@ -321,6 +338,24 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
return err
}

func (u *BucketStores) getInflightRequestCnt() int {
u.inflightRequestMu.RLock()
defer u.inflightRequestMu.RUnlock()
return u.inflightRequestCnt
}

func (u *BucketStores) incrementInflightRequestCnt() {
u.inflightRequestMu.Lock()
u.inflightRequestCnt++
u.inflightRequestMu.Unlock()
}

func (u *BucketStores) decrementInflightRequestCnt() {
u.inflightRequestMu.Lock()
u.inflightRequestCnt--
u.inflightRequestMu.Unlock()
}

// LabelNames implements the Storegateway proto service.
func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames")
Expand Down
42 changes: 42 additions & 0 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,48 @@ func testBucketStoresSeriesShouldCorrectlyQuerySeriesSpanningMultipleChunks(t *t
}
}

func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *testing.T) {
cfg := prepareStorageConfig(t)
cfg.BucketStore.MaxInflightRequests = 10
reg := prometheus.NewPedanticRegistry()
storageDir := t.TempDir()
generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15)
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, stores.InitialSync(context.Background()))

stores.inflightRequestMu.Lock()
stores.inflightRequestCnt = 10
stores.inflightRequestMu.Unlock()
series, warnings, err := querySeries(stores, "user_id", "series_1", 0, 100)
assert.ErrorIs(t, err, ErrTooManyInflightRequests)
assert.Empty(t, series)
assert.Empty(t, warnings)
}

func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabled(t *testing.T) {
cfg := prepareStorageConfig(t)
reg := prometheus.NewPedanticRegistry()
storageDir := t.TempDir()
generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15)
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)

stores, err := NewBucketStores(cfg, NewNoShardingStrategy(), bucket, defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
require.NoError(t, stores.InitialSync(context.Background()))

stores.inflightRequestMu.Lock()
stores.inflightRequestCnt = 10 // max_inflight_request is set to 0 by default = disabled
stores.inflightRequestMu.Unlock()
series, _, err := querySeries(stores, "user_id", "series_1", 0, 100)
require.NoError(t, err)
assert.Equal(t, 1, len(series))
}

func prepareStorageConfig(t *testing.T) cortex_tsdb.BlocksStorageConfig {
cfg := cortex_tsdb.BlocksStorageConfig{}
flagext.DefaultValues(&cfg)
Expand Down
20 changes: 20 additions & 0 deletions pkg/storegateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ const (
// ringAutoForgetUnhealthyPeriods is how many consecutive timeout periods an unhealthy instance
// in the ring will be automatically removed.
ringAutoForgetUnhealthyPeriods = 10

instanceLimitsMetric = "cortex_storegateway_instance_limits"
instanceLimitsMetricHelp = "Instance limits used by this store gateway."
limitLabel = "limit"
)

var (
Expand Down Expand Up @@ -142,6 +146,22 @@ func newStoreGateway(gatewayCfg Config, storageCfg cortex_tsdb.BlocksStorageConf
g.bucketSync.WithLabelValues(syncReasonPeriodic)
g.bucketSync.WithLabelValues(syncReasonRingChange)

promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: instanceLimitsMetric,
Help: instanceLimitsMetricHelp,
ConstLabels: map[string]string{limitLabel: "max_inflight_requests"},
}).Set(float64(storageCfg.BucketStore.MaxInflightRequests))
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: instanceLimitsMetric,
Help: instanceLimitsMetricHelp,
ConstLabels: map[string]string{limitLabel: "max_concurrent"},
}).Set(float64(storageCfg.BucketStore.MaxConcurrent))
promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: instanceLimitsMetric,
Help: instanceLimitsMetricHelp,
ConstLabels: map[string]string{limitLabel: "max_chunk_pool_bytes"},
}).Set(float64(storageCfg.BucketStore.MaxChunkPoolBytes))

// Init sharding strategy.
var shardingStrategy ShardingStrategy

Expand Down
Loading