Skip to content

Commit

Permalink
Ruler: remote rule evaluation (#8744)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
Adds the ability to evaluate recording & alerting rules against a given
`query-frontend`, allowing these queries to be executed with all the
parallelisation & optimisation that regular adhoc queries have. This is
important because with `local` evaluation all queries are
single-threaded, and rules that evaluate a large range/volume of data
may timeout or OOM the `ruler` itself, leading to missed metrics or
alerts.

When `remote` evaluation mode is enabled, the `ruler` effectively just
becomes a gRPC client for the `query-frontend`, which will dramatically
improve the reliability of the `ruler` and also drastically reduce its
resource requirements.

**Which issue(s) this PR fixes**:
This PR implements the feature discussed in
#8129 (**LID 0002: Remote Rule
Evaluation**).
  • Loading branch information
Danny Kopping committed Mar 13, 2023
1 parent 540380f commit 33e44ed
Show file tree
Hide file tree
Showing 22 changed files with 996 additions and 125 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

##### Enhancements

* [8744](https://github.com/grafana/loki/pull/8744) **dannykopping**: Ruler: remote rule evaluation.
* [8727](https://github.com/grafana/loki/pull/8727) **cstyan** **jeschkies**: Propagate per-request limit header to querier.
* [8682](https://github.com/grafana/loki/pull/8682) **dannykopping**: Add fetched chunk size distribution metric `loki_chunk_fetcher_fetched_size_bytes`.
* [8532](https://github.com/grafana/loki/pull/8532) **justcompile**: Adds Storage Class option to S3 objects
Expand Down
81 changes: 81 additions & 0 deletions docs/sources/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,87 @@ remote_write:
# -limits.per-user-override-period.
# CLI flag: -ruler.remote-write.config-refresh-period
[config_refresh_period: <duration> | default = 10s]

# Configuration for rule evaluation.
evaluation:
# The evaluation mode for the ruler. Can be either 'local' or 'remote'. If set
# to 'local', the ruler will evaluate rules locally. If set to 'remote', the
# ruler will evaluate rules remotely. If unset, the ruler will evaluate rules
# locally.
# CLI flag: -ruler.evaluation.mode
[mode: <string> | default = "local"]

query_frontend:
# GRPC listen address of the query-frontend(s). Must be a DNS address
# (prefixed with dns:///) to enable client side load balancing.
# CLI flag: -ruler.evaluation.query-frontend.address
[address: <string> | default = ""]

# Set to true if query-frontend connection requires TLS.
# CLI flag: -ruler.evaluation.query-frontend.tls-enabled
[tls_enabled: <boolean> | default = false]

# Path to the client certificate file, which will be used for authenticating
# with the server. Also requires the key path to be configured.
# CLI flag: -ruler.evaluation.query-frontend.tls-cert-path
[tls_cert_path: <string> | default = ""]

# Path to the key file for the client certificate. Also requires the client
# certificate to be configured.
# CLI flag: -ruler.evaluation.query-frontend.tls-key-path
[tls_key_path: <string> | default = ""]

# Path to the CA certificates file to validate server certificate against.
# If not set, the host's root CA certificates are used.
# CLI flag: -ruler.evaluation.query-frontend.tls-ca-path
[tls_ca_path: <string> | default = ""]

# Override the expected name on the server certificate.
# CLI flag: -ruler.evaluation.query-frontend.tls-server-name
[tls_server_name: <string> | default = ""]

# Skip validating server certificate.
# CLI flag: -ruler.evaluation.query-frontend.tls-insecure-skip-verify
[tls_insecure_skip_verify: <boolean> | default = false]

# Override the default cipher suite list (separated by commas). Allowed
# values:
#
# Secure Ciphers:
# - TLS_RSA_WITH_AES_128_CBC_SHA
# - TLS_RSA_WITH_AES_256_CBC_SHA
# - TLS_RSA_WITH_AES_128_GCM_SHA256
# - TLS_RSA_WITH_AES_256_GCM_SHA384
# - TLS_AES_128_GCM_SHA256
# - TLS_AES_256_GCM_SHA384
# - TLS_CHACHA20_POLY1305_SHA256
# - TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA
# - TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA
# - TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA
# - TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA
# - TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256
# - TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
# - TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384
# - TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256
# - TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256
#
# Insecure Ciphers:
# - TLS_RSA_WITH_RC4_128_SHA
# - TLS_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_RSA_WITH_AES_128_CBC_SHA256
# - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_RC4_128_SHA
# - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA
# - TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256
# - TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256
# CLI flag: -ruler.evaluation.query-frontend.tls-cipher-suites
[tls_cipher_suites: <string> | default = ""]

# Override the default minimum TLS version. Allowed values: VersionTLS10,
# VersionTLS11, VersionTLS12, VersionTLS13
# CLI flag: -ruler.evaluation.query-frontend.tls-min-version
[tls_min_version: <string> | default = ""]
```

### ingester_client
Expand Down
2 changes: 1 addition & 1 deletion integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func (c *Client) GetRules(ctx context.Context) (*RulesResponse, error) {
resp := RulesResponse{}
err = json.Unmarshal(buf, &resp)
if err != nil {
return nil, fmt.Errorf("error parsing response data: %w", err)
return nil, fmt.Errorf("error parsing response data %q: %w", buf, err)
}

return &resp, err
Expand Down
132 changes: 55 additions & 77 deletions integration/cluster/cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cluster

import (
"bytes"
"context"
"errors"
"flag"
Expand All @@ -19,6 +20,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"gopkg.in/yaml.v2"

"github.com/grafana/loki/integration/util"

"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/util/cfg"
"github.com/grafana/loki/pkg/validation"
Expand Down Expand Up @@ -86,43 +89,20 @@ ingester:
querier:
multi_tenant_queries_enabled: true
{{if .remoteWriteUrls}}
ruler:
wal:
dir: {{.rulerWALPath}}
storage:
type: local
local:
directory: {{.rulesPath}}
rule_path: {{.sharedDataPath}}/rule
enable_api: true
ring:
kvstore:
store: inmemory
remote_write:
enabled: true
clients:
remote_client1:
url: {{index .remoteWriteUrls 0}}/api/v1/write
remote_client2:
url: {{index .remoteWriteUrls 1}}/api/v1/write
{{end}}
`))
wal:
dir: {{.sharedDataPath}}/ruler-wal
storage:
type: local
local:
directory: {{.sharedDataPath}}/rules
rule_path: {{.sharedDataPath}}/prom-rule
rulesConfig = `
groups:
- name: always-firing
interval: 1s
rules:
- alert: fire
expr: |
1 > 0
for: 0m
labels:
severity: warning
annotations:
summary: test
`
`))
)

func wrapRegistry() {
Expand Down Expand Up @@ -265,16 +245,18 @@ type Component struct {
flags []string

configFile string
extraConfigs []string
overridesFile string
dataPath string
rulerWALPath string
rulesPath string
RulesTenant string

running bool
wg sync.WaitGroup
}

RemoteWriteUrls []string
// ClusterSharedPath returns the path to the shared directory between all components in the cluster.
// This path will be removed once the cluster is stopped.
func (c *Component) ClusterSharedPath() string {
return c.cluster.sharedPath
}

func (c *Component) HTTPURL() string {
Expand All @@ -285,6 +267,14 @@ func (c *Component) GRPCURL() string {
return fmt.Sprintf("localhost:%s", port(c.loki.Server.GRPCListenAddr().String()))
}

func (c *Component) WithExtraConfig(cfg string) {
if c.running {
panic("cannot set extra config after component is running")
}

c.extraConfigs = append(c.extraConfigs, cfg)
}

func port(addr string) string {
parts := strings.Split(addr, ":")
return parts[len(parts)-1]
Expand All @@ -303,44 +293,12 @@ func (c *Component) writeConfig() error {
return fmt.Errorf("error creating data path: %w", err)
}

if len(c.RemoteWriteUrls) > 0 {
c.rulesPath, err = os.MkdirTemp(c.cluster.sharedPath, "rules")
if err != nil {
return fmt.Errorf("error creating rules path: %w", err)
}

fakeDir, err := os.MkdirTemp(c.rulesPath, "fake")
if err != nil {
return fmt.Errorf("error creating rules/fake path: %w", err)
}

s := strings.Split(fakeDir, "/")
c.RulesTenant = s[len(s)-1]

c.rulerWALPath, err = os.MkdirTemp(c.cluster.sharedPath, "ruler-wal")
if err != nil {
return fmt.Errorf("error creating ruler-wal path: %w", err)
}

rulesConfigFile, err := os.CreateTemp(fakeDir, "rules*.yaml")
if err != nil {
return fmt.Errorf("error creating rules config file: %w", err)
}

if _, err = rulesConfigFile.Write([]byte(rulesConfig)); err != nil {
return fmt.Errorf("error writing to rules config file: %w", err)
}

rulesConfigFile.Close()
mergedConfig, err := c.MergedConfig()
if err != nil {
return fmt.Errorf("error getting merged config: %w", err)
}

if err := configTemplate.Execute(configFile, map[string]interface{}{
"dataPath": c.dataPath,
"sharedDataPath": c.cluster.sharedPath,
"remoteWriteUrls": c.RemoteWriteUrls,
"rulesPath": c.rulesPath,
"rulerWALPath": c.rulerWALPath,
}); err != nil {
if err := os.WriteFile(configFile.Name(), mergedConfig, 0644); err != nil {
return fmt.Errorf("error writing config file: %w", err)
}

Expand All @@ -351,6 +309,32 @@ func (c *Component) writeConfig() error {
return nil
}

// MergedConfig merges the base config template with any additional config that has been provided
func (c *Component) MergedConfig() ([]byte, error) {
var sb bytes.Buffer

if err := configTemplate.Execute(&sb, map[string]interface{}{
"dataPath": c.dataPath,
"sharedDataPath": c.cluster.sharedPath,
}); err != nil {
return nil, fmt.Errorf("error writing config file: %w", err)
}

merger := util.NewYAMLMerger()
merger.AddFragment(sb.Bytes())

for _, extra := range c.extraConfigs {
merger.AddFragment([]byte(extra))
}

merged, err := merger.Merge()
if err != nil {
return nil, fmt.Errorf("failed to marshal merged config to YAML: %w", err)
}

return merged, nil
}

func (c *Component) run() error {
c.running = true

Expand Down Expand Up @@ -440,12 +424,6 @@ func (c *Component) cleanup() (files []string, dirs []string) {
if c.dataPath != "" {
dirs = append(dirs, c.dataPath)
}
if c.rulerWALPath != "" {
dirs = append(dirs, c.rulerWALPath)
}
if c.rulesPath != "" {
dirs = append(dirs, c.rulesPath)
}

return files, dirs
}
Expand Down
53 changes: 53 additions & 0 deletions integration/cluster/ruler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package cluster

import (
"fmt"
"os"
"path/filepath"
"strings"
)

func (c *Component) WithRulerRemoteWrite(name, url string) {

// ensure remote-write is enabled
c.WithExtraConfig(`
ruler:
remote_write:
enabled: true
`)

c.WithExtraConfig(fmt.Sprintf(`
ruler:
remote_write:
clients:
%s:
url: %s/api/v1/write
queue_config:
# send immediately as soon as a sample is generated
capacity: 1
batch_send_deadline: 0s
`, name, url))
}

func (c *Component) WithTenantRules(tenantFilesMap map[string]map[string]string) error {
sharedPath := c.ClusterSharedPath()
rulesPath := filepath.Join(sharedPath, "rules")

if err := os.Mkdir(rulesPath, 0755); err != nil {
return fmt.Errorf("error creating rules path: %w", err)
}

for tenant, files := range tenantFilesMap {
for filename, file := range files {
path := filepath.Join(rulesPath, tenant)
if err := os.Mkdir(path, 0755); err != nil {
return fmt.Errorf("error creating tenant %s rules path: %w", tenant, err)
}
if err := os.WriteFile(filepath.Join(path, filename), []byte(strings.TrimSpace(file)), 0644); err != nil {
return fmt.Errorf("error creating rule file at path %s: %w", path, err)
}
}
}

return nil
}
Loading

0 comments on commit 33e44ed

Please sign in to comment.