Skip to content

Commit

Permalink
Added enable flag
Browse files Browse the repository at this point in the history
Signed-off-by: Justin Jung <jungjust@amazon.com>
  • Loading branch information
justinjung04 committed Jun 14, 2024
1 parent 8ff710e commit 6c15c92
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 43 deletions.
28 changes: 17 additions & 11 deletions pkg/storage/tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,12 @@ type BucketStoreConfig struct {
SeriesBatchSize int `yaml:"series_batch_size"`

// Token bucket configs
PodDataBytesRateLimit int64 `yaml:"pod_data_bytes_rate_limit"`
TokenBucketLimiter TokenBucketLimiterConfig `yaml:"token_bucket_limiter"`
}

type TokenBucketLimiterConfig struct {
Enabled bool `yaml:"enabled"`
DryRun bool `yaml:"dry_run"`
PodTokenBucketSize int64 `yaml:"pod_token_bucket_size"`
UserTokenBucketSize int64 `yaml:"user_token_bucket_size"`
RequestTokenBucketSize int64 `yaml:"request_token_bucket_size"`
Expand Down Expand Up @@ -337,16 +342,17 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.LazyExpandedPostingsEnabled, "blocks-storage.bucket-store.lazy-expanded-postings-enabled", false, "If true, Store Gateway will estimate postings size and try to lazily expand postings if it downloads less data than expanding all postings.")
f.IntVar(&cfg.SeriesBatchSize, "blocks-storage.bucket-store.series-batch-size", store.SeriesBatchSize, "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.")
f.StringVar(&cfg.BlockDiscoveryStrategy, "blocks-storage.bucket-store.block-discovery-strategy", string(ConcurrentDiscovery), "One of "+strings.Join(supportedBlockDiscoveryStrategies, ", ")+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations. bucket_index strategy can be used in Compactor only and utilizes the existing bucket index to fetch block IDs to sync. This avoids iterating the bucket but can be impacted by delays of cleaner creating bucket index.")
f.Int64Var(&cfg.PodDataBytesRateLimit, "blocks-storage.bucket-store.pod-data-bytes-rate-limit", int64(1*units.Gibibyte), "Overall data bytes rate limit for a pod")
f.Int64Var(&cfg.PodTokenBucketSize, "blocks-storage.bucket-store.pod-token-bucket-size", int64(820*units.Mebibyte), "Pod token bucket size")
f.Int64Var(&cfg.UserTokenBucketSize, "blocks-storage.bucket-store.user-token-bucket-size", int64(615*units.Mebibyte), "User token bucket size")
f.Int64Var(&cfg.RequestTokenBucketSize, "blocks-storage.bucket-store.request-token-bucket-size", int64(4*units.Mebibyte), "Request token bucket size")
f.Float64Var(&cfg.FetchedPostingsTokenFactor, "blocks-storage.bucket-store.fetched-postings-token-factor", 2, "Multiplication factor used for fetched postings token")
f.Float64Var(&cfg.TouchedPostingsTokenFactor, "blocks-storage.bucket-store.touched-postings-token-factor", 2, "Multiplication factor used for touched postings token")
f.Float64Var(&cfg.FetchedSeriesTokenFactor, "blocks-storage.bucket-store.fetched-series-token-factor", 2.5, "Multiplication factor used for fetched series token")
f.Float64Var(&cfg.TouchedSeriesTokenFactor, "blocks-storage.bucket-store.touched-series-token-factor", 10, "Multiplication factor used for touched series token")
f.Float64Var(&cfg.FetchedChunksTokenFactor, "blocks-storage.bucket-store.fetched-chunks-token-factor", 0.5, "Multiplication factor used for fetched chunks token")
f.Float64Var(&cfg.TouchedChunksTokenFactor, "blocks-storage.bucket-store.touched-chunks-token-factor", 0.5, "Multiplication factor used for touched chunks token")
f.BoolVar(&cfg.TokenBucketLimiter.Enabled, "blocks-storage.bucket-store.token-bucket-limiter.enabled", false, "Whether token bucket limiter is enabled")
f.BoolVar(&cfg.TokenBucketLimiter.DryRun, "blocks-storage.bucket-store.token-bucket-limiter.dry-run", false, "Whether the token bucket limiter is in dry run mode")
f.Int64Var(&cfg.TokenBucketLimiter.PodTokenBucketSize, "blocks-storage.bucket-store.token-bucket-limiter.pod-token-bucket-size", int64(820*units.Mebibyte), "Pod token bucket size")
f.Int64Var(&cfg.TokenBucketLimiter.UserTokenBucketSize, "blocks-storage.bucket-store.token-bucket-limiter.user-token-bucket-size", int64(615*units.Mebibyte), "User token bucket size")
f.Int64Var(&cfg.TokenBucketLimiter.RequestTokenBucketSize, "blocks-storage.bucket-store.token-bucket-limiter.request-token-bucket-size", int64(4*units.Mebibyte), "Request token bucket size")
f.Float64Var(&cfg.TokenBucketLimiter.FetchedPostingsTokenFactor, "blocks-storage.bucket-store.token-bucket-limiter.fetched-postings-token-factor", 2, "Multiplication factor used for fetched postings token")
f.Float64Var(&cfg.TokenBucketLimiter.TouchedPostingsTokenFactor, "blocks-storage.bucket-store.token-bucket-limiter.touched-postings-token-factor", 2, "Multiplication factor used for touched postings token")
f.Float64Var(&cfg.TokenBucketLimiter.FetchedSeriesTokenFactor, "blocks-storage.bucket-store.token-bucket-limiter.fetched-series-token-factor", 2.5, "Multiplication factor used for fetched series token")
f.Float64Var(&cfg.TokenBucketLimiter.TouchedSeriesTokenFactor, "blocks-storage.bucket-store.token-bucket-limiter.touched-series-token-factor", 10, "Multiplication factor used for touched series token")
f.Float64Var(&cfg.TokenBucketLimiter.FetchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-limiter.fetched-chunks-token-factor", 0.5, "Multiplication factor used for fetched chunks token")
f.Float64Var(&cfg.TokenBucketLimiter.TouchedChunksTokenFactor, "blocks-storage.bucket-store.token-bucket-limiter.touched-chunks-token-factor", 0.5, "Multiplication factor used for touched chunks token")
}

// Validate the config.
Expand Down
32 changes: 18 additions & 14 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
metaFetcherMetrics: NewMetadataFetcherMetrics(),
queryGate: queryGate,
partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg),
podTokenBucket: util.NewTokenBucket(cfg.BucketStore.PodTokenBucketSize, cfg.BucketStore.PodTokenBucketSize, promauto.With(reg).NewGauge(prometheus.GaugeOpts{
podTokenBucket: util.NewTokenBucket(cfg.BucketStore.TokenBucketLimiter.PodTokenBucketSize, cfg.BucketStore.TokenBucketLimiter.PodTokenBucketSize, promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "cortex_bucket_stores_pod_token_bucket_remaining",
Help: "Number of tokens left in pod token bucket.",
})),
Expand Down Expand Up @@ -485,9 +485,11 @@ func (u *BucketStores) closeEmptyBucketStore(userID string) error {
unlockInDefer = false
u.storesMu.Unlock()

u.userTokenBucketsMu.Lock()
delete(u.userTokenBuckets, userID)
u.userTokenBucketsMu.Unlock()
if u.cfg.BucketStore.TokenBucketLimiter.Enabled {
u.userTokenBucketsMu.Lock()
delete(u.userTokenBuckets, userID)
u.userTokenBucketsMu.Unlock()
}

u.metaFetcherMetrics.RemoveUserRegistry(userID)
u.bucketStoreMetrics.RemoveUserRegistry(userID)
Expand Down Expand Up @@ -626,17 +628,19 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro
bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging())
}

u.userTokenBucketsMu.Lock()
u.userTokenBuckets[userID] = util.NewTokenBucket(u.cfg.BucketStore.UserTokenBucketSize, u.cfg.BucketStore.UserTokenBucketSize, nil)
u.userTokenBucketsMu.Unlock()
if u.cfg.BucketStore.TokenBucketLimiter.Enabled {
u.userTokenBucketsMu.Lock()
u.userTokenBuckets[userID] = util.NewTokenBucket(u.cfg.BucketStore.TokenBucketLimiter.UserTokenBucketSize, u.cfg.BucketStore.TokenBucketLimiter.UserTokenBucketSize, nil)
u.userTokenBucketsMu.Unlock()
}

bs, err := store.NewBucketStore(
userBkt,
fetcher,
u.syncDirForUser(userID),
newChunksLimiterFactory(u.limits, userID),
newSeriesLimiterFactory(u.limits, userID),
newBytesLimiterFactory(u.limits, userID, u.podTokenBucket, u.getUserTokenBucket(userID), u.getTokensToRetrieve, u.cfg.BucketStore.RequestTokenBucketSize),
newBytesLimiterFactory(u.limits, userID, u.podTokenBucket, u.getUserTokenBucket(userID), u.cfg.BucketStore.TokenBucketLimiter, u.getTokensToRetrieve),
u.partitioner,
u.cfg.BucketStore.BlockSyncConcurrency,
false, // No need to enable backward compatibility with Thanos pre 0.8.0 queriers
Expand Down Expand Up @@ -708,17 +712,17 @@ func (u *BucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDa
tokensToRetrieve := float64(tokens)
switch dataType {
case store.PostingsFetched:
tokensToRetrieve *= u.cfg.BucketStore.FetchedPostingsTokenFactor
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketLimiter.FetchedPostingsTokenFactor
case store.PostingsTouched:
tokensToRetrieve *= u.cfg.BucketStore.TouchedPostingsTokenFactor
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketLimiter.TouchedPostingsTokenFactor
case store.SeriesFetched:
tokensToRetrieve *= u.cfg.BucketStore.FetchedSeriesTokenFactor
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketLimiter.FetchedSeriesTokenFactor
case store.SeriesTouched:
tokensToRetrieve *= u.cfg.BucketStore.TouchedSeriesTokenFactor
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketLimiter.TouchedSeriesTokenFactor
case store.ChunksFetched:
tokensToRetrieve *= u.cfg.BucketStore.FetchedChunksTokenFactor
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketLimiter.FetchedChunksTokenFactor
case store.ChunksTouched:
tokensToRetrieve *= u.cfg.BucketStore.TouchedChunksTokenFactor
tokensToRetrieve *= u.cfg.BucketStore.TokenBucketLimiter.TouchedChunksTokenFactor
}
return int64(tokensToRetrieve)
}
Expand Down
76 changes: 62 additions & 14 deletions pkg/storegateway/bucket_stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,6 @@ func TestBucketStores_InitialSync(t *testing.T) {
}

require.NoError(t, stores.InitialSync(ctx))
assert.NotNil(t, stores.getUserTokenBucket("user-1"))
assert.NotNil(t, stores.getUserTokenBucket("user-2"))

// Query series after the initial sync.
for userID, metricName := range userToMetric {
Expand Down Expand Up @@ -720,8 +718,6 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) {
sharding.users = []string{user1}
require.NoError(t, stores.SyncBlocks(ctx))
require.Equal(t, []string{user1}, getUsersInDir(t, cfg.BucketStore.SyncDir))
assert.NotNil(t, stores.getUserTokenBucket(user1))
assert.Nil(t, stores.getUserTokenBucket(user2))

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_bucket_store_block_drops_total Total number of local blocks that were dropped.
Expand All @@ -739,8 +735,6 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) {
sharding.users = nil
require.NoError(t, stores.SyncBlocks(ctx))
require.Equal(t, []string(nil), getUsersInDir(t, cfg.BucketStore.SyncDir))
assert.Nil(t, stores.getUserTokenBucket(user1))
assert.Nil(t, stores.getUserTokenBucket(user2))

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_bucket_store_block_drops_total Total number of local blocks that were dropped.
Expand Down Expand Up @@ -769,22 +763,76 @@ func TestBucketStores_deleteLocalFilesForExcludedTenants(t *testing.T) {
`), metricNames...))
}

func TestBucketStores_getUserTokenBucket(t *testing.T) {
const (
user1 = "user-1"
user2 = "user-2"
)

ctx := context.Background()
cfg := prepareStorageConfig(t)
cfg.BucketStore.TokenBucketLimiter.Enabled = true

storageDir := t.TempDir()
userToMetric := map[string]string{
user1: "series_1",
user2: "series_2",
}
for userID, metricName := range userToMetric {
generateStorageBlock(t, storageDir, userID, metricName, 10, 100, 15)
}

sharding := userShardingStrategy{}
sharding.users = []string{user1, user2}

bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
assert.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
assert.NoError(t, err)

assert.NoError(t, stores.InitialSync(ctx))
assert.NotNil(t, stores.getUserTokenBucket("user-1"))
assert.NotNil(t, stores.getUserTokenBucket("user-2"))

sharding.users = []string{user1}
assert.NoError(t, stores.SyncBlocks(ctx))
assert.NotNil(t, stores.getUserTokenBucket("user-1"))
assert.Nil(t, stores.getUserTokenBucket("user-2"))

sharding.users = []string{}
assert.NoError(t, stores.SyncBlocks(ctx))
assert.Nil(t, stores.getUserTokenBucket("user-1"))
assert.Nil(t, stores.getUserTokenBucket("user-2"))

cfg.BucketStore.TokenBucketLimiter.Enabled = false
sharding.users = []string{user1, user2}
reg = prometheus.NewPedanticRegistry()
stores, err = NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
assert.NoError(t, err)

assert.NoError(t, stores.InitialSync(ctx))
assert.Nil(t, stores.getUserTokenBucket("user-1"))
assert.Nil(t, stores.getUserTokenBucket("user-2"))
}

func TestBucketStores_getTokensToRetrieve(t *testing.T) {
cfg := prepareStorageConfig(t)
cfg.BucketStore.FetchedPostingsTokenFactor = 1
cfg.BucketStore.TouchedPostingsTokenFactor = 2
cfg.BucketStore.FetchedSeriesTokenFactor = 3
cfg.BucketStore.TouchedSeriesTokenFactor = 4
cfg.BucketStore.FetchedChunksTokenFactor = 0
cfg.BucketStore.TouchedChunksTokenFactor = 0.5
cfg.BucketStore.TokenBucketLimiter.FetchedPostingsTokenFactor = 1
cfg.BucketStore.TokenBucketLimiter.TouchedPostingsTokenFactor = 2
cfg.BucketStore.TokenBucketLimiter.FetchedSeriesTokenFactor = 3
cfg.BucketStore.TokenBucketLimiter.TouchedSeriesTokenFactor = 4
cfg.BucketStore.TokenBucketLimiter.FetchedChunksTokenFactor = 0
cfg.BucketStore.TokenBucketLimiter.TouchedChunksTokenFactor = 0.5

storageDir := t.TempDir()
bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
require.NoError(t, err)
assert.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
require.NoError(t, err)
assert.NoError(t, err)

assert.Equal(t, int64(2), stores.getTokensToRetrieve(2, store.PostingsFetched))
assert.Equal(t, int64(4), stores.getTokensToRetrieve(2, store.PostingsTouched))
Expand Down
14 changes: 11 additions & 3 deletions pkg/storegateway/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/http"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -46,6 +47,7 @@ type tokenBucketLimiter struct {
userTokenBucket *util.TokenBucket
requestTokenBucket *util.TokenBucket
getTokensToRetrieve func(tokens uint64, dataType store.StoreDataType) int64
dryRun bool
}

func (t *tokenBucketLimiter) Reserve(_ uint64) error {
Expand Down Expand Up @@ -78,12 +80,13 @@ func (t *tokenBucketLimiter) ReserveWithType(num uint64, dataType store.StoreDat
return nil
}

func newTokenBucketLimiter(podTokenBucket, userTokenBucket, requestTokenBucket *util.TokenBucket, getTokensToRetrieve func(tokens uint64, dataType store.StoreDataType) int64) *tokenBucketLimiter {
func newTokenBucketLimiter(podTokenBucket, userTokenBucket, requestTokenBucket *util.TokenBucket, dryRun bool, getTokensToRetrieve func(tokens uint64, dataType store.StoreDataType) int64) *tokenBucketLimiter {
return &tokenBucketLimiter{
podTokenBucket: podTokenBucket,
userTokenBucket: userTokenBucket,
requestTokenBucket: requestTokenBucket,
getTokensToRetrieve: getTokensToRetrieve,
dryRun: dryRun,
}
}

Expand All @@ -107,13 +110,18 @@ func newSeriesLimiterFactory(limits *validation.Overrides, userID string) store.
}
}

func newBytesLimiterFactory(limits *validation.Overrides, userID string, podTokenBucket, userTokenBucket *util.TokenBucket, getTokensToRetrieve func(tokens uint64, dataType store.StoreDataType) int64, requestTokenBucketSize int64) store.BytesLimiterFactory {
func newBytesLimiterFactory(limits *validation.Overrides, userID string, podTokenBucket, userTokenBucket *util.TokenBucket, tokenBucketLimiterCfg tsdb.TokenBucketLimiterConfig, getTokensToRetrieve func(tokens uint64, dataType store.StoreDataType) int64) store.BytesLimiterFactory {
return func(failedCounter prometheus.Counter) store.BytesLimiter {
limiters := []store.BytesLimiter{}
// Since limit overrides could be live reloaded, we have to get the current user's limit
// each time a new limiter is instantiated.
limiters = append(limiters, store.NewLimiter(uint64(limits.MaxDownloadedBytesPerRequest(userID)), failedCounter))
limiters = append(limiters, newTokenBucketLimiter(podTokenBucket, userTokenBucket, util.NewTokenBucket(requestTokenBucketSize, requestTokenBucketSize, nil), getTokensToRetrieve))

if tokenBucketLimiterCfg.Enabled {
requestTokenBucket := util.NewTokenBucket(tokenBucketLimiterCfg.RequestTokenBucketSize, tokenBucketLimiterCfg.RequestTokenBucketSize, nil)
limiters = append(limiters, newTokenBucketLimiter(podTokenBucket, userTokenBucket, requestTokenBucket, tokenBucketLimiterCfg.DryRun, getTokensToRetrieve))
}

return &compositeLimiter{
limiters: limiters,
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storegateway/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestNewTokenBucketLimiter(t *testing.T) {
podTokenBucket := util.NewTokenBucket(3, 3, nil)
userTokenBucket := util.NewTokenBucket(2, 2, nil)
requestTokenBucket := util.NewTokenBucket(1, 1, nil)
l := newTokenBucketLimiter(podTokenBucket, userTokenBucket, requestTokenBucket, func(tokens uint64, dataType store.StoreDataType) int64 {
l := newTokenBucketLimiter(podTokenBucket, userTokenBucket, requestTokenBucket, true, func(tokens uint64, dataType store.StoreDataType) int64 {
if dataType == store.SeriesFetched {
return int64(tokens) * 5
}
Expand Down Expand Up @@ -71,3 +71,4 @@ func TestNewTokenBucketLimiter(t *testing.T) {
userTokenBucket.ForceRetrieve(2)
assert.NoError(t, l.ReserveWithType(1, store.PostingsFetched))
}

0 comments on commit 6c15c92

Please sign in to comment.