Skip to content

Commit

Permalink
Adding support to filter rules
Browse files Browse the repository at this point in the history
Signed-off-by: Anand Rajagopal <anrajag@amazon.com>
  • Loading branch information
rajagopalanand committed Jun 28, 2023
1 parent 4ed4a6b commit c68e8fa
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 36 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [FEATURE] Querier/StoreGateway: Allow the tenant shard sizes to be a percent of total instances. #5393
* [FEATURE] Added the flag `-alertmanager.api-concurrency` to configure alert manager api concurrency limit. #5412
* [FEATURE] Store Gateway: Add `-store-gateway.sharding-ring.keep-instance-in-the-ring-on-shutdown` to skip unregistering instance from the ring in shutdown. #5421
* [FEATURE] Ruler: Support for filtering rules in the API. #5417
* [ENHANCEMENT] Distributor/Ingester: Add span on push path #5319
* [ENHANCEMENT] Support object storage backends for runtime configuration file. #5292
* [ENHANCEMENT] Query Frontend: Reject subquery with too small step size. #5323
Expand All @@ -41,7 +42,6 @@
* [BUGFIX] Compactor: Partial block with only visit marker should be deleted even there is no deletion marker. #5342
* [BUGFIX] KV: Etcd calls will no longer block indefinitely and will now time out after the DialTimeout period. #5392
* [BUGFIX] Ring: Allow RF greater than number of zones to select more than one instance per zone #5411
* [BUGFIX] Distributor: Fix potential data corruption in cases of timeout between distributors and ingesters. #5422

## 1.15.1 2023-04-26

Expand Down
8 changes: 4 additions & 4 deletions docs/contributing/how-integration-tests-work.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ This will locally build the `quay.io/cortexproject/cortex:latest` image used by
Once the Docker image is built, you can run integration tests:

```
go test -v -tags=integration,requires_docker,integration_alertmanager,integration_memberlist,integration_querier,integration_ruler,integration_query_fuzz ./integration/...
go test -v -tags=requires_docker ./integration/...
```

If you want to run a single test you can use a filter. For example, to only run `TestRulerMetricsWhenIngesterFails`:
If you want to run a single test you can use a filter. For example, to only run `TestChunksStorageAllIndexBackends`:

```
go test -v -tags=integration,requires_docker,integration_ruler ./integration/ -run "^TestRulerMetricsWhenIngesterFails$" -count=1
go test -v -tags=requires_docker ./integration -run "^TestChunksStorageAllIndexBackends$"
```

### Supported environment variables
Expand All @@ -46,4 +46,4 @@ Integration tests have `requires_docker` tag (`// +build requires_docker` line f

## Isolation

Each integration test runs in isolation. For each integration test, we do create a Docker network, start Cortex and its dependencies containers, push/query series to/from Cortex and run assertions on it. Once the test has done, both the Docker network and containers are terminated and deleted.
Each integration test runs in isolation. For each integration test, we do create a Docker network, start Cortex and its dependencies containers, push/query series to/from Cortex and run assertions on it. Once the test has done, both the Docker network and containers are terminated and deleted.
63 changes: 63 additions & 0 deletions integration/e2ecortex/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,69 @@ type ServerStatus struct {
} `json:"data"`
}

type RuleFilter struct {
Namespaces []string
RuleGroupNames []string
RuleNames []string
RuleType string
}

func addQueryParams(urlValues url.Values, paramName string, params ...string) {
for _, paramValue := range params {
urlValues.Add(paramName, paramValue)
}
}

// GetPrometheusRulesWithFilter fetches the rules from the Prometheus endpoint /api/v1/rules.
func (c *Client) GetPrometheusRulesWithFilter(filter RuleFilter) ([]*ruler.RuleGroup, error) {
// Create HTTP request

req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/api/prom/api/v1/rules", c.rulerAddress), nil)
if err != nil {
return nil, err
}
req.Header.Set("X-Scope-OrgID", c.orgID)

urlValues := req.URL.Query()
addQueryParams(urlValues, "file[]", filter.Namespaces...)
addQueryParams(urlValues, "rule_name[]", filter.RuleNames...)
addQueryParams(urlValues, "rule_group[]", filter.RuleGroupNames...)
addQueryParams(urlValues, "type", filter.RuleType)
req.URL.RawQuery = urlValues.Encode()

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

// Execute HTTP request
res, err := c.httpClient.Do(req.WithContext(ctx))
if err != nil {
return nil, err
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}

// Decode the response.
type response struct {
Status string `json:"status"`
Data ruler.RuleDiscovery `json:"data"`
}

decoded := &response{}
if err := json.Unmarshal(body, decoded); err != nil {
return nil, err
}

if decoded.Status != "success" {
return nil, fmt.Errorf("unexpected response status '%s'", decoded.Status)
}

return decoded.Data.RuleGroups, nil
}

// GetPrometheusRules fetches the rules from the Prometheus endpoint /api/v1/rules.
func (c *Client) GetPrometheusRules() ([]*ruler.RuleGroup, error) {
// Create HTTP request
Expand Down
160 changes: 160 additions & 0 deletions integration/ruler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"crypto/x509/pkix"
"fmt"
"math"
"math/rand"
"net/http"
"os"
"path/filepath"
Expand All @@ -17,6 +18,8 @@ import (
"testing"
"time"

"github.com/cortexproject/cortex/pkg/ruler"

"github.com/cortexproject/cortex/pkg/storage/tsdb"

"github.com/prometheus/common/model"
Expand Down Expand Up @@ -389,6 +392,163 @@ func TestRulerSharding(t *testing.T) {
assert.ElementsMatch(t, expectedNames, actualNames)
}

func TestRulerAPISharding(t *testing.T) {
const numRulesGroups = 100

random := rand.New(rand.NewSource(time.Now().UnixNano()))
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Generate multiple rule groups, with 1 rule each.
ruleGroups := make([]rulefmt.RuleGroup, numRulesGroups)
expectedNames := make([]string, numRulesGroups)
alertCount := 0
for i := 0; i < numRulesGroups; i++ {
num := random.Intn(100)
var ruleNode yaml.Node
var exprNode yaml.Node

ruleNode.SetString(fmt.Sprintf("rule_%d", i))
exprNode.SetString(strconv.Itoa(i))
ruleName := fmt.Sprintf("test_%d", i)

expectedNames[i] = ruleName
if num%2 == 0 {
alertCount++
ruleGroups[i] = rulefmt.RuleGroup{
Name: ruleName,
Interval: 60,
Rules: []rulefmt.RuleNode{{
Alert: ruleNode,
Expr: exprNode,
}},
}
} else {
ruleGroups[i] = rulefmt.RuleGroup{
Name: ruleName,
Interval: 60,
Rules: []rulefmt.RuleNode{{
Record: ruleNode,
Expr: exprNode,
}},
}
}
}

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, rulestoreBucketName)
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Configure the ruler.
rulerFlags := mergeFlags(
BlocksStorageFlags(),
RulerFlags(),
RulerShardingFlags(consul.NetworkHTTPEndpoint()),
map[string]string{
// Since we're not going to run any rule, we don't need the
// store-gateway to be configured to a valid address.
"-querier.store-gateway-addresses": "localhost:12345",
// Enable the bucket index so we can skip the initial bucket scan.
"-blocks-storage.bucket-store.bucket-index.enabled": "true",
},
)

// Start rulers.
ruler1 := e2ecortex.NewRuler("ruler-1", consul.NetworkHTTPEndpoint(), rulerFlags, "")
ruler2 := e2ecortex.NewRuler("ruler-2", consul.NetworkHTTPEndpoint(), rulerFlags, "")
rulers := e2ecortex.NewCompositeCortexService(ruler1, ruler2)
require.NoError(t, s.StartAndWaitReady(ruler1, ruler2))

// Upload rule groups to one of the rulers.
c, err := e2ecortex.NewClient("", "", "", ruler1.HTTPEndpoint(), "user-1")
require.NoError(t, err)

namespaceNames := []string{"test1", "test2", "test3", "test4", "test5"}
namespaceNameCount := make([]int, 5)
nsRand := rand.New(rand.NewSource(time.Now().UnixNano()))
for _, ruleGroup := range ruleGroups {
index := nsRand.Intn(len(namespaceNames))
namespaceNameCount[index] = namespaceNameCount[index] + 1
require.NoError(t, c.SetRuleGroup(ruleGroup, namespaceNames[index]))
}

// Wait until rulers have loaded all rules.
require.NoError(t, rulers.WaitSumMetricsWithOptions(e2e.Equals(numRulesGroups), []string{"cortex_prometheus_rule_group_rules"}, e2e.WaitMissingMetrics))

// Since rulers have loaded all rules, we expect that rules have been sharded
// between the two rulers.
require.NoError(t, ruler1.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))
require.NoError(t, ruler2.WaitSumMetrics(e2e.Less(numRulesGroups), "cortex_prometheus_rule_group_rules"))

testCases := map[string]struct {
filter e2ecortex.RuleFilter
resultCheckFn func(assert.TestingT, []*ruler.RuleGroup)
}{
"Filter for Alert Rules": {
filter: e2ecortex.RuleFilter{
RuleType: "alert",
},
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
assert.Len(t, ruleGroups, alertCount, "Expected %d rules but got %d", alertCount, len(ruleGroups))
},
},
"Filter for Recording Rules": {
filter: e2ecortex.RuleFilter{
RuleType: "record",
},
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
assert.Len(t, ruleGroups, numRulesGroups-alertCount, "Expected %d rules but got %d", numRulesGroups-alertCount, len(ruleGroups))
},
},
"Filter by Namespace Name": {
filter: e2ecortex.RuleFilter{
Namespaces: []string{namespaceNames[2]},
},
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
assert.Len(t, ruleGroups, namespaceNameCount[2], "Expected %d rules but got %d", namespaceNameCount[2], len(ruleGroups))
},
},
"Filter by Namespace Name and Alert Rules": {
filter: e2ecortex.RuleFilter{
RuleType: "alert",
Namespaces: []string{namespaceNames[2]},
},
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
for _, ruleGroup := range ruleGroups {
rule := ruleGroup.Rules[0].(map[string]interface{})
ruleType := rule["type"]
assert.Equal(t, "alerting", ruleType, "Expected 'alerting' rule type but got %s", ruleType)
}
},
},
"Filter by Rule Names": {
filter: e2ecortex.RuleFilter{
RuleNames: []string{"rule_3", "rule_64", "rule_99"},
},
resultCheckFn: func(t assert.TestingT, ruleGroups []*ruler.RuleGroup) {
ruleNames := []string{}
for _, ruleGroup := range ruleGroups {
rule := ruleGroup.Rules[0].(map[string]interface{})
ruleName := rule["name"]
ruleNames = append(ruleNames, ruleName.(string))

}
assert.Len(t, ruleNames, 3, "Expected %d rules but got %d", 3, len(ruleNames))
},
},
}
// For each test case, fetch the rules with configured filters, and ensure the results match.
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
actualGroups, err := c.GetPrometheusRulesWithFilter(tc.filter)
require.NoError(t, err)
tc.resultCheckFn(t, actualGroups)
})
}
}

func TestRulerAlertmanager(t *testing.T) {
var namespaceOne = "test_/encoded_+namespace/?"
ruleGroup := createTestRuleGroup(t)
Expand Down
17 changes: 6 additions & 11 deletions pkg/ruler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,6 @@ type recordingRule struct {
EvaluationTime float64 `json:"evaluationTime"`
}

const (
AlertingRuleFilter string = "alert"
RecordingRuleFilter string = "record"
)

func respondError(logger log.Logger, w http.ResponseWriter, msg string) {
b, err := json.Marshal(&response{
Status: "error",
Expand All @@ -125,10 +120,10 @@ func respondError(logger log.Logger, w http.ResponseWriter, msg string) {
}
}

func respondClientError(logger log.Logger, w http.ResponseWriter, msg string) {
func respondBadRequest(logger log.Logger, w http.ResponseWriter, msg string) {
b, err := json.Marshal(&response{
Status: "error",
ErrorType: v1.ErrServer,
ErrorType: v1.ErrBadData,
Error: msg,
Data: nil,
})
Expand Down Expand Up @@ -173,13 +168,13 @@ func (a *API) PrometheusRules(w http.ResponseWriter, req *http.Request) {

if err := req.ParseForm(); err != nil {
level.Error(logger).Log("msg", "error parsing form/query params", "err", err)
respondClientError(logger, w, err.Error())
respondBadRequest(logger, w, "error parsing form/query params")
return
}

typ := strings.ToLower(req.URL.Query().Get("type"))
if typ != "" && typ != AlertingRuleFilter && typ != RecordingRuleFilter {
respondClientError(logger, w, fmt.Sprintf("not supported value %q", typ))
if typ != "" && typ != alertingRuleFilter && typ != recordingRuleFilter {
respondBadRequest(logger, w, fmt.Sprintf("unsupported rule type %q", typ))
return
}

Expand Down Expand Up @@ -284,7 +279,7 @@ func (a *API) PrometheusAlerts(w http.ResponseWriter, req *http.Request) {

w.Header().Set("Content-Type", "application/json")
rulesRequest := RulesRequest{
Type: AlertingRuleFilter,
Type: alertingRuleFilter,
}
rgs, err := a.ruler.GetRules(req.Context(), rulesRequest)

Expand Down
20 changes: 11 additions & 9 deletions pkg/ruler/ruler.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ const (

// errors
errListAllUser = "unable to list the ruler users"

alertingRuleFilter string = "alert"
recordingRuleFilter string = "record"
)

// Config is the configuration for the recording rules server.
Expand Down Expand Up @@ -677,12 +680,17 @@ func (r *Ruler) getLocalRules(userID string, rulesRequest RulesRequest) ([]*Grou
fileSet := sliceToSet(rulesRequest.Files)
ruleType := rulesRequest.Type

returnAlerts := ruleType == "" || ruleType == "alert"
returnRecording := ruleType == "" || ruleType == "record"
returnAlerts := ruleType == "" || ruleType == alertingRuleFilter
returnRecording := ruleType == "" || ruleType == recordingRuleFilter

for _, group := range groups {
// The mapped filename is url path escaped encoded to make handling `/` characters easier
decodedNamespace, err := url.PathUnescape(strings.TrimPrefix(group.File(), prefix))
if err != nil {
return nil, errors.Wrap(err, "unable to decode rule filename")
}
if len(fileSet) > 0 {
if _, OK := fileSet[group.File()]; !OK {
if _, OK := fileSet[decodedNamespace]; !OK {
continue
}
}
Expand All @@ -694,12 +702,6 @@ func (r *Ruler) getLocalRules(userID string, rulesRequest RulesRequest) ([]*Grou
}
interval := group.Interval()

// The mapped filename is url path escaped encoded to make handling `/` characters easier
decodedNamespace, err := url.PathUnescape(strings.TrimPrefix(group.File(), prefix))
if err != nil {
return nil, errors.Wrap(err, "unable to decode rule filename")
}

groupDesc := &GroupStateDesc{
Group: &rulespb.RuleGroupDesc{
Name: group.Name(),
Expand Down
Loading

0 comments on commit c68e8fa

Please sign in to comment.