Skip to content

Commit

Permalink
expose flag for max store gateway consistency check attempts (#6276)
Browse files Browse the repository at this point in the history
  • Loading branch information
yeya24 authored Oct 17, 2024
1 parent 1e5b01f commit 3865567
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* [ENHANCEMENT] Ingester: Add matchers to ingester LabelNames() and LabelNamesStream() RPC. #6209
* [ENHANCEMENT] Ingester/Store Gateway Clients: Introduce an experimental HealthCheck handler to quickly fail requests directed to unhealthy targets. #6225 #6257
* [ENHANCEMENT] Upgrade build image and Go version to 1.23.2. #6261 #6262
* [ENHANCEMENT] Querier/Ruler: Expose `store_gateway_consistency_check_max_attempts` for max retries when querying store gateway in consistency check. #6276
* [BUGFIX] Runtime-config: Handle absolute file paths when working directory is not / #6224

## 1.18.1 2024-10-14
Expand Down
6 changes: 6 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,12 @@ querier:
# CLI flag: -querier.store-gateway-query-stats-enabled
[store_gateway_query_stats: <boolean> | default = true]

# The maximum number of times we attempt fetching missing blocks from
# different store-gateways. If no more store-gateways are left (ie. due to
# lower replication factor) than we'll end the retries earlier
# CLI flag: -querier.store-gateway-consistency-check-max-attempts
[store_gateway_consistency_check_max_attempts: <int> | default = 3]

# When distributor's sharding strategy is shuffle-sharding and this setting is
# > 0, queriers fetch in-memory series from the minimum set of required
# ingesters, selecting only ingesters which may have received series since
Expand Down
6 changes: 6 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3872,6 +3872,12 @@ store_gateway_client:
# CLI flag: -querier.store-gateway-query-stats-enabled
[store_gateway_query_stats: <boolean> | default = true]

# The maximum number of times we attempt fetching missing blocks from different
# store-gateways. If no more store-gateways are left (ie. due to lower
# replication factor) than we'll end the retries earlier
# CLI flag: -querier.store-gateway-consistency-check-max-attempts
[store_gateway_consistency_check_max_attempts: <int> | default = 3]

# When distributor's sharding strategy is shuffle-sharding and this setting is >
# 0, queriers fetch in-memory series from the minimum set of required ingesters,
# selecting only ingesters which may have received series since 'now - lookback
Expand Down
55 changes: 30 additions & 25 deletions pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ type BlocksStoreQueryable struct {
metrics *blocksStoreQueryableMetrics
limits BlocksStoreLimits

storeGatewayQueryStatsEnabled bool
storeGatewayQueryStatsEnabled bool
storeGatewayConsistencyCheckMaxAttempts int

// Subservices manager.
subservices *services.Manager
Expand All @@ -153,8 +154,7 @@ func NewBlocksStoreQueryable(
finder BlocksFinder,
consistency *BlocksConsistencyChecker,
limits BlocksStoreLimits,
queryStoreAfter time.Duration,
storeGatewayQueryStatsEnabled bool,
config Config,
logger log.Logger,
reg prometheus.Registerer,
) (*BlocksStoreQueryable, error) {
Expand All @@ -164,16 +164,17 @@ func NewBlocksStoreQueryable(
}

q := &BlocksStoreQueryable{
stores: stores,
finder: finder,
consistency: consistency,
queryStoreAfter: queryStoreAfter,
logger: logger,
subservices: manager,
subservicesWatcher: services.NewFailureWatcher(),
metrics: newBlocksStoreQueryableMetrics(reg),
limits: limits,
storeGatewayQueryStatsEnabled: storeGatewayQueryStatsEnabled,
stores: stores,
finder: finder,
consistency: consistency,
queryStoreAfter: config.QueryStoreAfter,
logger: logger,
subservices: manager,
subservicesWatcher: services.NewFailureWatcher(),
metrics: newBlocksStoreQueryableMetrics(reg),
limits: limits,
storeGatewayQueryStatsEnabled: config.StoreGatewayQueryStatsEnabled,
storeGatewayConsistencyCheckMaxAttempts: config.StoreGatewayConsistencyCheckMaxAttempts,
}

q.Service = services.NewBasicService(q.starting, q.running, q.stopping)
Expand Down Expand Up @@ -264,7 +265,7 @@ func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegatewa
reg,
)

return NewBlocksStoreQueryable(stores, finder, consistency, limits, querierCfg.QueryStoreAfter, querierCfg.StoreGatewayQueryStatsEnabled, logger, reg)
return NewBlocksStoreQueryable(stores, finder, consistency, limits, querierCfg, logger, reg)
}

func (q *BlocksStoreQueryable) starting(ctx context.Context) error {
Expand Down Expand Up @@ -299,16 +300,17 @@ func (q *BlocksStoreQueryable) Querier(mint, maxt int64) (storage.Querier, error
}

return &blocksStoreQuerier{
minT: mint,
maxT: maxt,
finder: q.finder,
stores: q.stores,
metrics: q.metrics,
limits: q.limits,
consistency: q.consistency,
logger: q.logger,
queryStoreAfter: q.queryStoreAfter,
storeGatewayQueryStatsEnabled: q.storeGatewayQueryStatsEnabled,
minT: mint,
maxT: maxt,
finder: q.finder,
stores: q.stores,
metrics: q.metrics,
limits: q.limits,
consistency: q.consistency,
logger: q.logger,
queryStoreAfter: q.queryStoreAfter,
storeGatewayQueryStatsEnabled: q.storeGatewayQueryStatsEnabled,
storeGatewayConsistencyCheckMaxAttempts: q.storeGatewayConsistencyCheckMaxAttempts,
}, nil
}

Expand All @@ -328,6 +330,9 @@ type blocksStoreQuerier struct {
// If enabled, query stats of store gateway requests will be logged
// using `info` level.
storeGatewayQueryStatsEnabled bool

// The maximum number of times we attempt fetching missing blocks from different Store Gateways.
storeGatewayConsistencyCheckMaxAttempts int
}

// Select implements storage.Querier interface.
Expand Down Expand Up @@ -534,7 +539,7 @@ func (q *blocksStoreQuerier) queryWithConsistencyCheck(ctx context.Context, logg
retryableError error
)

for attempt := 1; attempt <= maxFetchSeriesAttempts; attempt++ {
for attempt := 1; attempt <= q.storeGatewayConsistencyCheckMaxAttempts; attempt++ {
// Find the set of store-gateway instances having the blocks. The exclude parameter is the
// map of blocks queried so far, with the list of store-gateway addresses for each block.
clients, err := q.stores.GetClientsFor(userID, remainingBlocks, attemptedBlocks, attemptedBlocksZones)
Expand Down
11 changes: 10 additions & 1 deletion pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1552,6 +1552,8 @@ func TestBlocksStoreQuerier_Select(t *testing.T) {
logger: log.NewNopLogger(),
metrics: newBlocksStoreQueryableMetrics(reg),
limits: testData.limits,

storeGatewayConsistencyCheckMaxAttempts: 3,
}

matchers := []*labels.Matcher{
Expand Down Expand Up @@ -2148,6 +2150,8 @@ func TestBlocksStoreQuerier_Labels(t *testing.T) {
logger: log.NewNopLogger(),
metrics: newBlocksStoreQueryableMetrics(reg),
limits: &blocksStoreLimitsMock{},

storeGatewayConsistencyCheckMaxAttempts: 3,
}

if testFunc == "LabelNames" {
Expand Down Expand Up @@ -2371,7 +2375,12 @@ func TestBlocksStoreQuerier_PromQLExecution(t *testing.T) {
}

// Instance the querier that will be executed to run the query.
queryable, err := NewBlocksStoreQueryable(stores, finder, NewBlocksConsistencyChecker(0, 0, logger, nil), &blocksStoreLimitsMock{}, 0, false, logger, nil)
cfg := Config{
QueryStoreAfter: 0,
StoreGatewayQueryStatsEnabled: false,
StoreGatewayConsistencyCheckMaxAttempts: 3,
}
queryable, err := NewBlocksStoreQueryable(stores, finder, NewBlocksConsistencyChecker(0, 0, logger, nil), &blocksStoreLimitsMock{}, cfg, logger, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), queryable))
defer services.StopAndAwaitTerminated(context.Background(), queryable) // nolint:errcheck
Expand Down
9 changes: 9 additions & 0 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ type Config struct {
StoreGatewayClient ClientConfig `yaml:"store_gateway_client"`
StoreGatewayQueryStatsEnabled bool `yaml:"store_gateway_query_stats"`

// The maximum number of times we attempt fetching missing blocks from different Store Gateways.
StoreGatewayConsistencyCheckMaxAttempts int `yaml:"store_gateway_consistency_check_max_attempts"`

ShuffleShardingIngestersLookbackPeriod time.Duration `yaml:"shuffle_sharding_ingesters_lookback_period"`

// Experimental. Use https://github.com/thanos-io/promql-engine rather than
Expand All @@ -94,6 +97,7 @@ var (
errShuffleShardingLookbackLessThanQueryStoreAfter = errors.New("the shuffle-sharding lookback period should be greater or equal than the configured 'query store after'")
errEmptyTimeRange = errors.New("empty time range")
errUnsupportedResponseCompression = errors.New("unsupported response compression. Supported compression 'gzip' and '' (disable compression)")
errInvalidConsistencyCheckAttempts = errors.New("store gateway consistency check max attempts should be greater or equal than 1")
)

// RegisterFlags adds the flags required to config this to the given FlagSet.
Expand Down Expand Up @@ -122,6 +126,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.ActiveQueryTrackerDir, "querier.active-query-tracker-dir", "./active-query-tracker", "Active query tracker monitors active queries, and writes them to the file in given directory. If Cortex discovers any queries in this log during startup, it will log them to the log file. Setting to empty value disables active query tracker, which also disables -querier.max-concurrent option.")
f.StringVar(&cfg.StoreGatewayAddresses, "querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).")
f.BoolVar(&cfg.StoreGatewayQueryStatsEnabled, "querier.store-gateway-query-stats-enabled", true, "If enabled, store gateway query stats will be logged using `info` log level.")
f.IntVar(&cfg.StoreGatewayConsistencyCheckMaxAttempts, "querier.store-gateway-consistency-check-max-attempts", maxFetchSeriesAttempts, "The maximum number of times we attempt fetching missing blocks from different store-gateways. If no more store-gateways are left (ie. due to lower replication factor) than we'll end the retries earlier")
f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.")
f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).")
f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")
Expand All @@ -148,6 +153,10 @@ func (cfg *Config) Validate() error {
}
}

if cfg.StoreGatewayConsistencyCheckMaxAttempts < 1 {
return errInvalidConsistencyCheckAttempts
}

return nil
}

Expand Down

0 comments on commit 3865567

Please sign in to comment.