Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add concurrent evaluation for ruler #5766

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* [FEATURE] Query Frontend/Scheduler: Add query priority support. #5605
* [FEATURE] Tracing: Add `kuberesolver` to resolve endpoints address with `kubernetes://` prefix as Kubernetes service. #5731
* [FEATURE] Tracing: Add `tracing.otel.round-robin` flag to use `round_robin` gRPC client side LB policy for sending OTLP traces. #5731
* [FEATURE] Ruler: Add `ruler.concurrent-evals-enabled` flag to enable concurrent evaluation within a single rule group for independent rules. Maximum concurrency can be configured via `ruler.max-concurrent-evals`. #5766
* [ENHANCEMENT] Store Gateway: Added `-store-gateway.enabled-tenants` and `-store-gateway.disabled-tenants` to explicitly enable or disable store-gateway for specific tenants. #5638
* [ENHANCEMENT] Compactor: Add new compactor metric `cortex_compactor_start_duration_seconds`. #5683
* [ENHANCEMENT] Upgraded Docker base images to `alpine:3.18`. #5684
Expand Down
10 changes: 10 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3968,6 +3968,16 @@ alertmanager_client:
# CLI flag: -ruler.resend-delay
[resend_delay: <duration> | default = 1m]

# If enabled, rules from a single rule group can be evaluated concurrently if
# there is no dependency between each other. Max concurrency for each rule group
# is controlled via ruler.max-concurrent-evals flag.
# CLI flag: -ruler.concurrent-evals-enabled
[concurrent_evals_enabled: <boolean> | default = false]

# Max concurrency for a single rule group to evaluate independent rules.
# CLI flag: -ruler.max-concurrent-evals
[max_concurrent_evals: <int> | default = 1]

# Distribute rule evaluation using ring backend
# CLI flag: -ruler.enable-sharding
[enable_sharding: <boolean> | default = false]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ require (
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.46.0
// Prometheus maps version 2.x.y to tags v0.x.y.
github.com/prometheus/prometheus v0.49.2-0.20240202164002-aa845f7c12ce
github.com/prometheus/prometheus v0.49.2-0.20240205174859-6005ac6f9dc6
github.com/segmentio/fasthash v1.0.3
github.com/sony/gobreaker v0.5.0
github.com/spf13/afero v1.9.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1320,8 +1320,8 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/prometheus/prometheus v0.49.2-0.20240202164002-aa845f7c12ce h1:gHCPxX6dJZJOZh/nYy0DmnTu2PbgWjs8hY0eLgofPfA=
github.com/prometheus/prometheus v0.49.2-0.20240202164002-aa845f7c12ce/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU=
github.com/prometheus/prometheus v0.49.2-0.20240205174859-6005ac6f9dc6 h1:E1dnG12fSlUeHST75LpGqPpd/YCOSNqKD2CUmm3Em90=
github.com/prometheus/prometheus v0.49.2-0.20240205174859-6005ac6f9dc6/go.mod h1:FvE8dtQ1Ww63IlyKBn1V4s+zMwF9kHkVNkQBR1pM4CU=
github.com/redis/rueidis v1.0.14-go1.18 h1:dGir5z8w8X1ex7JWO/Zx2FMBrZgQ8Yjm+lw9fPLSNGw=
github.com/redis/rueidis v1.0.14-go1.18/go.mod h1:HGekzV3HbmzFmRK6j0xic8Z9119+ECoGMjeN1TV1NYU=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
Expand Down
17 changes: 14 additions & 3 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"flag"
"fmt"
"io"
"os"
"path"
"path/filepath"
Expand All @@ -30,7 +31,6 @@ import (
"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/compact"
thanos_testutil "github.com/thanos-io/thanos/pkg/testutil/e2eutil"
"gopkg.in/yaml.v2"

"github.com/cortexproject/cortex/pkg/ring"
Expand Down Expand Up @@ -1065,8 +1065,19 @@ func TestCompactor_ShouldSkipOutOrOrderBlocks(t *testing.T) {
b1 := createTSDBBlock(t, bucketClient, "user-1", 10, 20, map[string]string{"__name__": "Teste"})
b2 := createTSDBBlock(t, bucketClient, "user-1", 20, 30, map[string]string{"__name__": "Teste"})

err := thanos_testutil.PutOutOfOrderIndex(path.Join(tmpDir, "user-1", b1.String()), 10, 20)
// Read bad index file.
indexFile, err := os.Open("testdata/out_of_order_chunks/index")
require.NoError(t, err)
indexFileStat, err := indexFile.Stat()
require.NoError(t, err)

dir := path.Join(tmpDir, "user-1", b1.String())
outputFile, err := os.OpenFile(path.Join(dir, "index"), os.O_RDWR|os.O_TRUNC, 0755)
require.NoError(t, err)

n, err := io.Copy(outputFile, indexFile)
require.NoError(t, err)
require.Equal(t, indexFileStat.Size(), n)

cfg := prepareConfig()
cfg.SkipBlocksWithOutOfOrderChunksEnabled = true
Expand Down Expand Up @@ -1097,7 +1108,7 @@ func TestCompactor_ShouldSkipOutOrOrderBlocks(t *testing.T) {

// Wait until a run has completed.
cortex_testutil.Poll(t, 5*time.Second, true, func() interface{} {
if _, err := os.Stat(path.Join(tmpDir, "user-1", b1.String(), "no-compact-mark.json")); err == nil {
if _, err := os.Stat(path.Join(dir, "no-compact-mark.json")); err == nil {
return true
}
return false
Expand Down
Binary file added pkg/compactor/testdata/out_of_order_chunks/index
Binary file not shown.
24 changes: 13 additions & 11 deletions pkg/ruler/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,17 +325,19 @@ func DefaultTenantManagerFactory(cfg Config, p Pusher, q storage.Queryable, engi
metricsQueryFunc := MetricsQueryFunc(engineQueryFunc, totalQueries, failedQueries)

return rules.NewManager(&rules.ManagerOptions{
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
Queryable: q,
QueryFunc: RecordAndReportRuleQueryMetrics(metricsQueryFunc, queryTime, logger),
Context: user.InjectOrgID(ctx, userID),
ExternalURL: cfg.ExternalURL.URL,
NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String()),
Logger: log.With(logger, "user", userID),
Registerer: reg,
OutageTolerance: cfg.OutageTolerance,
ForGracePeriod: cfg.ForGracePeriod,
ResendDelay: cfg.ResendDelay,
Appendable: NewPusherAppendable(p, userID, overrides, totalWrites, failedWrites),
Queryable: q,
QueryFunc: RecordAndReportRuleQueryMetrics(metricsQueryFunc, queryTime, logger),
Context: user.InjectOrgID(ctx, userID),
ExternalURL: cfg.ExternalURL.URL,
NotifyFunc: SendAlerts(notifier, cfg.ExternalURL.URL.String()),
Logger: log.With(logger, "user", userID),
Registerer: reg,
OutageTolerance: cfg.OutageTolerance,
ForGracePeriod: cfg.ForGracePeriod,
ResendDelay: cfg.ResendDelay,
ConcurrentEvalsEnabled: cfg.ConcurrentEvalsEnabled,
MaxConcurrentEvals: cfg.MaxConcurrentEvals,
})
}
}
Expand Down
16 changes: 13 additions & 3 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ var (
supportedShardingStrategies = []string{util.ShardingStrategyDefault, util.ShardingStrategyShuffle}

// Validation errors.
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
errInvalidShardingStrategy = errors.New("invalid sharding strategy")
errInvalidTenantShardSize = errors.New("invalid tenant shard size, the value must be greater than 0")
errInvalidMaxConcurrentEvals = errors.New("invalid max concurrent evals, the value must be greater than 0")
)

const (
Expand Down Expand Up @@ -95,7 +96,7 @@ type Config struct {
RulePath string `yaml:"rule_path"`

// URL of the Alertmanager to send notifications to.
// If your are configuring the ruler to send to a Cortex Alertmanager,
// If you are configuring the ruler to send to a Cortex Alertmanager,
// ensure this includes any path set in the Alertmanager external URL.
AlertmanagerURL string `yaml:"alertmanager_url"`
// Whether to use DNS SRV records to discover Alertmanager.
Expand All @@ -118,6 +119,9 @@ type Config struct {
// Minimum amount of time to wait before resending an alert to Alertmanager.
ResendDelay time.Duration `yaml:"resend_delay"`

ConcurrentEvalsEnabled bool `yaml:"concurrent_evals_enabled"`
MaxConcurrentEvals int64 `yaml:"max_concurrent_evals"`

// Enable sharding rule groups.
EnableSharding bool `yaml:"enable_sharding"`
ShardingStrategy string `yaml:"sharding_strategy"`
Expand Down Expand Up @@ -149,6 +153,10 @@ func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error {
if err := cfg.ClientTLSConfig.Validate(log); err != nil {
return errors.Wrap(err, "invalid ruler gRPC client config")
}

if cfg.ConcurrentEvalsEnabled && cfg.MaxConcurrentEvals <= 0 {
return errInvalidMaxConcurrentEvals
}
return nil
}

Expand Down Expand Up @@ -188,6 +196,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.OutageTolerance, "ruler.for-outage-tolerance", time.Hour, `Max time to tolerate outage for restoring "for" state of alert.`)
f.DurationVar(&cfg.ForGracePeriod, "ruler.for-grace-period", 10*time.Minute, `Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period.`)
f.DurationVar(&cfg.ResendDelay, "ruler.resend-delay", time.Minute, `Minimum amount of time to wait before resending an alert to Alertmanager.`)
f.BoolVar(&cfg.ConcurrentEvalsEnabled, "ruler.concurrent-evals-enabled", false, `If enabled, rules from a single rule group can be evaluated concurrently if there is no dependency between each other. Max concurrency for each rule group is controlled via ruler.max-concurrent-evals flag.`)
f.Int64Var(&cfg.MaxConcurrentEvals, "ruler.max-concurrent-evals", 1, `Max concurrency for a single rule group to evaluate independent rules.`)

f.Var(&cfg.EnabledTenants, "ruler.enabled-tenants", "Comma separated list of tenants whose rules this ruler can evaluate. If specified, only these tenants will be handled by ruler, otherwise this ruler can process rules from all tenants. Subject to sharding.")
f.Var(&cfg.DisabledTenants, "ruler.disabled-tenants", "Comma separated list of tenants whose rules this ruler cannot evaluate. If specified, a ruler that would normally pick the specified tenant(s) for processing will ignore them instead. Subject to sharding.")
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 26 additions & 4 deletions vendor/github.com/prometheus/prometheus/tsdb/index/index.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

This file was deleted.

This file was deleted.

Loading
Loading