Skip to content

Commit

Permalink
Use per-tenant limit for global query offset
Browse files Browse the repository at this point in the history
Signed-off-by: Mustafain Ali Khan <mustalik@amazon.com>
  • Loading branch information
mustafain117 committed Jul 26, 2024
1 parent d96cf38 commit 820170a
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* [FEATURE] Ingester: Add `ingester.instance-limits.max-inflight-query-requests` to allow limiting ingester concurrent queries. #6081
* [FEATURE] Distributor: Add `validation.max-native-histogram-buckets` to limit max number of bucket count. Distributor will try to automatically reduce histogram resolution until it is within the bucket limit or resolution cannot be reduced anymore. #6104
* [FEATURE] Store Gateway: Token bucket limiter. #6016
* [FEATURE] Ruler: Add support for `query_offset` field on RuleGroup and new `ruler_query_offset` per-tenant limit. #6085
* [ENHANCEMENT] rulers: Add support to persist tokens in rulers. #5987
* [ENHANCEMENT] Query Frontend/Querier: Added store gateway postings touched count and touched size in Querier stats and log in Query Frontend. #5892
* [ENHANCEMENT] Query Frontend/Querier: Returns `warnings` on prometheus query responses. #5916
Expand Down
8 changes: 4 additions & 4 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3349,6 +3349,10 @@ query_rejection:
# CLI flag: -ruler.max-rule-groups-per-tenant
[ruler_max_rule_groups_per_tenant: <int> | default = 0]

# Duration to offset all rule evaluation queries per-tenant.
# CLI flag: -ruler.query-offset
[ruler_query_offset: <duration> | default = 0s]

# The default tenant's shard size when the shuffle-sharding strategy is used.
# Must be set when the store-gateway sharding is enabled with the
# shuffle-sharding strategy. When this setting is specified in the per-tenant
Expand Down Expand Up @@ -4147,10 +4151,6 @@ ruler_client:
# CLI flag: -ruler.rule-path
[rule_path: <string> | default = "/rules"]
# Default offset for all rule evaluation queries
# CLI flag: -ruler.rule-query-offset
[rule_query_offset: <duration> | default = 0s]
# Comma-separated list of URL(s) of the Alertmanager(s) to send notifications
# to. Each Alertmanager URL is treated as a separate group in the configuration.
# Multiple Alertmanagers in HA per group can be supported by using DNS
Expand Down
3 changes: 2 additions & 1 deletion pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type RulesLimits interface {
RulerTenantShardSize(userID string) int
RulerMaxRuleGroupsPerTenant(userID string) int
RulerMaxRulesPerRuleGroup(userID string) int
RulerQueryOffset(userID string) time.Duration
DisabledRuleGroups(userID string) validation.DisabledRuleGroups
}

Expand Down Expand Up @@ -359,7 +360,7 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
ConcurrentEvalsEnabled: cfg.ConcurrentEvalsEnabled,
MaxConcurrentEvals: cfg.MaxConcurrentEvals,
DefaultRuleQueryOffset: func() time.Duration {
return cfg.RuleQueryOffset
return overrides.RulerQueryOffset(userID)
},
})
}
Expand Down
6 changes: 2 additions & 4 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ type Config struct {
PollInterval time.Duration `yaml:"poll_interval"`
// Path to store rule files for prom manager.
RulePath string `yaml:"rule_path"`
// Default offset for all rule evaluation queries.
RuleQueryOffset time.Duration `yaml:"rule_query_offset"`

// URL of the Alertmanager to send notifications to.
// If you are configuring the ruler to send to a Cortex Alertmanager,
Expand Down Expand Up @@ -196,7 +194,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.Var(&cfg.ExternalURL, "ruler.external.url", "URL of alerts return path.")
f.DurationVar(&cfg.EvaluationInterval, "ruler.evaluation-interval", 1*time.Minute, "How frequently to evaluate rules")
f.DurationVar(&cfg.PollInterval, "ruler.poll-interval", 1*time.Minute, "How frequently to poll for rule changes")
f.DurationVar(&cfg.RuleQueryOffset, "ruler.rule-query-offset", 0*time.Minute, "Default offset for all rule evaluation queries")

f.StringVar(&cfg.AlertmanagerURL, "ruler.alertmanager-url", "", "Comma-separated list of URL(s) of the Alertmanager(s) to send notifications to. Each Alertmanager URL is treated as a separate group in the configuration. Multiple Alertmanagers in HA per group can be supported by using DNS resolution via -ruler.alertmanager-discovery.")
f.BoolVar(&cfg.AlertmanagerDiscovery, "ruler.alertmanager-discovery", false, "Use DNS SRV records to discover Alertmanager hosts.")
Expand Down Expand Up @@ -914,14 +911,15 @@ func (r *Ruler) getLocalRules(userID string, rulesRequest RulesRequest, includeB
}
interval := group.Interval()

queryOffset := group.QueryOffset()
groupDesc := &GroupStateDesc{
Group: &rulespb.RuleGroupDesc{
Name: group.Name(),
Namespace: string(decodedNamespace),
Interval: interval,
User: userID,
Limit: int64(group.Limit()),
QueryOffset: group.QueryOffset(),
QueryOffset: &queryOffset,
},

EvaluationTimestamp: group.GetLastEvaluation(),
Expand Down
11 changes: 9 additions & 2 deletions pkg/ruler/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type ruleLimits struct {
maxRuleGroups int
disabledRuleGroups validation.DisabledRuleGroups
maxQueryLength time.Duration
queryOffset time.Duration
}

func (r ruleLimits) EvaluationDelay(_ string) time.Duration {
Expand All @@ -112,6 +113,10 @@ func (r ruleLimits) DisabledRuleGroups(userID string) validation.DisabledRuleGro

func (r ruleLimits) MaxQueryLength(_ string) time.Duration { return r.maxQueryLength }

func (r ruleLimits) RulerQueryOffset(_ string) time.Duration {
return r.queryOffset
}

func newEmptyQueryable() storage.Queryable {
return storage.QueryableFunc(func(mint, maxt int64) (storage.Querier, error) {
return emptyQuerier{}, nil
Expand Down Expand Up @@ -2550,7 +2555,8 @@ func TestRuler_QueryOffset(t *testing.T) {
compareRuleGroupDescToStateDesc(t, expectedRg, rg)

// test default query offset=0 when not defined at group level
require.Equal(t, time.Duration(0), rg.GetGroup().QueryOffset)
gotOffset := rg.GetGroup().QueryOffset
require.Equal(t, time.Duration(0), *gotOffset)

ctx = user.InjectOrgID(context.Background(), "user2")
rls, err = r.Rules(ctx, &RulesRequest{})
Expand All @@ -2561,5 +2567,6 @@ func TestRuler_QueryOffset(t *testing.T) {
compareRuleGroupDescToStateDesc(t, expectedRg, rg)

// test group query offset is set
require.Equal(t, time.Minute*2, rg.GetGroup().QueryOffset)
gotOffset = rg.GetGroup().QueryOffset
require.Equal(t, time.Minute*2, *gotOffset)
}
10 changes: 7 additions & 3 deletions pkg/ruler/rulespb/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import (

// ToProto transforms a formatted prometheus rulegroup to a rule group protobuf
func ToProto(user string, namespace string, rl rulefmt.RuleGroup) *RuleGroupDesc {
queryOffset := time.Duration(0)
var queryOffset *time.Duration
if rl.QueryOffset != nil {
queryOffset = time.Duration(*rl.QueryOffset)
offset := time.Duration(*rl.QueryOffset)
queryOffset = &offset
}
rg := RuleGroupDesc{
Name: rl.Name,
Expand Down Expand Up @@ -48,7 +49,10 @@ func formattedRuleToProto(rls []rulefmt.RuleNode) []*RuleDesc {

// FromProto generates a rulefmt RuleGroup
func FromProto(rg *RuleGroupDesc) rulefmt.RuleGroup {
queryOffset := model.Duration(rg.QueryOffset)
var queryOffset model.Duration
if rg.QueryOffset != nil {
queryOffset = model.Duration(*rg.QueryOffset)
}
formattedRuleGroup := rulefmt.RuleGroup{
Name: rg.GetName(),
Interval: model.Duration(rg.Interval),
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/rulespb/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestProto(t *testing.T) {
desc := ToProto("test", "namespace", rg)

assert.Equal(t, len(rules), len(desc.Rules))
assert.Equal(t, 30*time.Second, desc.QueryOffset)
assert.Equal(t, 30*time.Second, *desc.QueryOffset)

ruleDesc := desc.Rules[0]

Expand Down
93 changes: 53 additions & 40 deletions pkg/ruler/rulespb/rules.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/ruler/rulespb/rules.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ message RuleGroupDesc {
repeated google.protobuf.Any options = 9;
int64 limit =10;
google.protobuf.Duration queryOffset = 11
[(gogoproto.nullable) = false, (gogoproto.stdduration) = true];
[(gogoproto.nullable) = true, (gogoproto.stdduration) = true];
}

// RuleDesc is a proto representation of a Prometheus Rule
Expand Down
3 changes: 2 additions & 1 deletion pkg/ruler/store_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ var (
},
},
}
queryOffset = 2 * time.Minute
mockRulesQueryOffset = map[string]rulespb.RuleGroupList{
"user1": {
&rulespb.RuleGroupDesc{
Expand Down Expand Up @@ -164,7 +165,7 @@ var (
},
},
Interval: interval,
QueryOffset: 2 * time.Minute,
QueryOffset: &queryOffset,
},
},
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ type Limits struct {
RulerTenantShardSize int `yaml:"ruler_tenant_shard_size" json:"ruler_tenant_shard_size"`
RulerMaxRulesPerRuleGroup int `yaml:"ruler_max_rules_per_rule_group" json:"ruler_max_rules_per_rule_group"`
RulerMaxRuleGroupsPerTenant int `yaml:"ruler_max_rule_groups_per_tenant" json:"ruler_max_rule_groups_per_tenant"`
RulerQueryOffset model.Duration `yaml:"ruler_query_offset" json:"ruler_query_offset"`

// Store-gateway.
StoreGatewayTenantShardSize float64 `yaml:"store_gateway_tenant_shard_size" json:"store_gateway_tenant_shard_size"`
Expand Down Expand Up @@ -264,10 +265,10 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {

f.IntVar(&l.MaxOutstandingPerTenant, "frontend.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per request queue (either query frontend or query scheduler); requests beyond this error with HTTP 429.")

f.Var(&l.RulerEvaluationDelay, "ruler.evaluation-delay-duration", "Duration to delay the evaluation of rules to ensure the underlying metrics have been pushed to Cortex.")
f.IntVar(&l.RulerTenantShardSize, "ruler.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by ruler. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
f.IntVar(&l.RulerMaxRulesPerRuleGroup, "ruler.max-rules-per-rule-group", 0, "Maximum number of rules per rule group per-tenant. 0 to disable.")
f.IntVar(&l.RulerMaxRuleGroupsPerTenant, "ruler.max-rule-groups-per-tenant", 0, "Maximum number of rule groups per-tenant. 0 to disable.")
f.Var(&l.RulerQueryOffset, "ruler.query-offset", "Duration to offset all rule evaluation queries per-tenant.")

f.Var(&l.CompactorBlocksRetentionPeriod, "compactor.blocks-retention-period", "Delete blocks containing samples older than the specified retention period. 0 to disable.")
f.IntVar(&l.CompactorTenantShardSize, "compactor.tenant-shard-size", 0, "The default tenant's shard size when the shuffle-sharding strategy is used by the compactor. When this setting is specified in the per-tenant overrides, a value of 0 disables shuffle sharding for the tenant.")
Expand Down Expand Up @@ -791,6 +792,11 @@ func (o *Overrides) RulerMaxRuleGroupsPerTenant(userID string) int {
return o.GetOverridesForUser(userID).RulerMaxRuleGroupsPerTenant
}

// RulerQueryOffset returns the rule query offset for a given user.
func (o *Overrides) RulerQueryOffset(userID string) time.Duration {
return time.Duration(o.GetOverridesForUser(userID).RulerQueryOffset)
}

// StoreGatewayTenantShardSize returns the store-gateway shard size for a given user.
func (o *Overrides) StoreGatewayTenantShardSize(userID string) float64 {
return o.GetOverridesForUser(userID).StoreGatewayTenantShardSize
Expand Down

0 comments on commit 820170a

Please sign in to comment.