From d88a0d0a1d51fbc52a305f92b77ada4de6d8e32d Mon Sep 17 00:00:00 2001 From: Bartek Plotka Date: Mon, 25 Mar 2019 18:21:10 +0000 Subject: [PATCH] ruler: Added support for strict rule groups that does not allow partial_response Signed-off-by: Bartek Plotka --- CHANGELOG.md | 4 + cmd/thanos/rule.go | 177 +++++++++++++++--------- docs/components/query.md | 22 ++- docs/components/rule.md | 135 +++++++++++++++--- pkg/promclient/promclient.go | 70 +++++++--- pkg/rule/api/v1.go | 83 ++++++------ pkg/rule/api/v1_test.go | 18 ++- pkg/rule/rule.go | 170 +++++++++++++++++++++++ pkg/rule/rule_test.go | 125 +++++++++++++++++ pkg/store/prompb/remote.pb.go | 46 +++++-- pkg/store/storepb/custom.go | 8 ++ pkg/store/storepb/rpc.pb.go | 248 +++++++++++++++++++++++----------- pkg/store/storepb/rpc.proto | 28 +++- pkg/store/storepb/types.pb.go | 31 +++-- pkg/ui/rule.go | 28 ++-- scripts/genproto.sh | 2 +- test/e2e/rule_test.go | 54 +++++++- test/e2e/spinup_test.go | 38 +++--- 18 files changed, 1009 insertions(+), 278 deletions(-) create mode 100644 pkg/rule/rule.go create mode 100644 pkg/rule/rule_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index fa6032cdb7c..49f9f930654 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,10 @@ New tracing span: :warning: **WARNING** :warning: #798 adds a new default limit to Thanos Store: `--store.grpc.series-max-concurrency`. Most likely you will want to make it the same as `--query.max-concurrent` on Thanos Query. +- [#970](https://github.com/improbable-eng/thanos/pull/970) Added `PartialResponseStrategy` field for `RuleGroups` for `Ruler`. +### Changed +- [#970](https://github.com/improbable-eng/thanos/pull/970) Deprecated partial_response_disabled proto field. Added partial_response_strategy instead. Both in gRPC and Query API. + ### Fixed - [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far - [#966](https://github.com/improbable-eng/thanos/pull/966) Bucket: verify no longer warns about overlapping blocks, that overlap `0s` diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index e9b4a87a0e3..6ccfbd4af69 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -19,6 +19,8 @@ import ( "syscall" "time" + thanosrule "github.com/improbable-eng/thanos/pkg/rule" + "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/improbable-eng/thanos/pkg/alert" @@ -227,13 +229,20 @@ func runRule( Name: "thanos_rule_loaded_rules", Help: "Loaded rules partitioned by file and group", }, - []string{"file", "group"}, + []string{"part_resp_strategy", "file", "group"}, + ) + ruleEvalWarnings := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "thanos_rule_evaluation_with_warnings_total", + Help: "The total number of rule evaluation that were successful but had warnings which can indicate partial error.", + }, []string{"strategy"}, ) reg.MustRegister(configSuccess) reg.MustRegister(configSuccessTime) reg.MustRegister(duplicatedQuery) reg.MustRegister(alertMngrAddrResolutionErrors) reg.MustRegister(rulesLoaded) + reg.MustRegister(ruleEvalWarnings) for _, addr := range queryAddrs { if addr == "" { @@ -263,54 +272,13 @@ func runRule( extprom.WrapRegistererWithPrefix("thanos_ruler_query_apis", reg), ) - // Hit the HTTP query API of query peers in randomized order until we get a result - // back or the context get canceled. - queryFn := func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { - var addrs []string - - // Add addresses from gossip. - peers := peer.PeerStates(cluster.PeerTypeQuery) - var ids []string - for id := range peers { - ids = append(ids, id) - } - sort.Slice(ids, func(i int, j int) bool { - return strings.Compare(ids[i], ids[j]) < 0 - }) - for _, id := range ids { - addrs = append(addrs, peers[id].QueryAPIAddr) - } - - // Add DNS resolved addresses from static flags and file SD. - // TODO(bwplotka): Consider generating addresses in *url.URL - addrs = append(addrs, dnsProvider.Addresses()...) - - removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs) - - for _, i := range rand.Perm(len(addrs)) { - u, err := url.Parse(fmt.Sprintf("http://%s", addrs[i])) - if err != nil { - return nil, errors.Wrapf(err, "url parse %s", addrs[i]) - } - - span, ctx := tracing.StartSpan(ctx, "/rule_instant_query HTTP[client]") - v, err := promclient.PromqlQueryInstant(ctx, logger, u, q, t, true) - span.Finish() - return v, err - } - return nil, errors.Errorf("no query peer reachable") - } - // Run rule evaluation and alert notifications. var ( alertmgrs = newAlertmanagerSet(alertmgrURLs) alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels) - mgr *rules.Manager + ruleMgrs = thanosrule.Managers{} ) { - ctx, cancel := context.WithCancel(context.Background()) - ctx = tracing.ContextWithTracer(ctx, tracer) - notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) { res := make([]*alert.Alert, 0, len(alerts)) for _, alrt := range alerts { @@ -331,26 +299,38 @@ func runRule( } alertQ.Push(res) } - st := tsdb.Adapter(db, 0) - mgr = rules.NewManager(&rules.ManagerOptions{ - Context: ctx, - QueryFunc: queryFn, + + opts := rules.ManagerOptions{ NotifyFunc: notify, Logger: log.With(logger, "component", "rules"), Appendable: st, - Registerer: reg, ExternalURL: nil, TSDB: st, - }) - g.Add(func() error { - mgr.Run() - <-ctx.Done() - mgr.Stop() - return nil - }, func(error) { - cancel() - }) + } + + for _, strategy := range storepb.PartialResponseStrategy_value { + s := storepb.PartialResponseStrategy(strategy) + + ctx, cancel := context.WithCancel(context.Background()) + ctx = tracing.ContextWithTracer(ctx, tracer) + + opts := opts + opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg) + opts.Context = ctx + opts.QueryFunc = queryFunc(logger, peer, dnsProvider, duplicatedQuery, ruleEvalWarnings, s) + + ruleMgrs[s] = rules.NewManager(&opts) + g.Add(func() error { + ruleMgrs[s].Run() + <-ctx.Done() + + return nil + }, func(error) { + cancel() + ruleMgrs[s].Stop() + }) + } } { var storeLset []storepb.Label @@ -469,11 +449,13 @@ func runRule( level.Error(logger).Log("msg", "retrieving rule files failed. Ignoring file.", "pattern", pat, "err", err) continue } + files = append(files, fs...) } level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files)) - if err := mgr.Update(evalInterval, files); err != nil { + + if err := ruleMgrs.Update(dataDir, evalInterval, files); err != nil { configSuccess.Set(0) level.Error(logger).Log("msg", "reloading rules failed", "err", err) continue @@ -483,9 +465,12 @@ func runRule( configSuccessTime.Set(float64(time.Now().UnixNano()) / 1e9) rulesLoaded.Reset() - for _, group := range mgr.RuleGroups() { - rulesLoaded.WithLabelValues(group.File(), group.Name()).Set(float64(len(group.Rules()))) + for s, mgr := range ruleMgrs { + for _, group := range mgr.RuleGroups() { + rulesLoaded.WithLabelValues(s.String(), group.File(), group.Name()).Set(float64(len(group.Rules()))) + } } + } }, func(error) { close(cancel) @@ -569,9 +554,9 @@ func runRule( "web.prefix-header": webPrefixHeaderName, } - ui.NewRuleUI(logger, mgr, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix)) + ui.NewRuleUI(logger, ruleMgrs, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix)) - api := v1.NewAPI(logger, mgr) + api := v1.NewAPI(logger, ruleMgrs) api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger) mux := http.NewServeMux() @@ -767,3 +752,71 @@ func removeDuplicateQueryAddrs(logger log.Logger, duplicatedQueriers prometheus. } return deduplicated } + +// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result +// back or the context get canceled. +func queryFunc( + logger log.Logger, + peer cluster.Peer, + dnsProvider *dns.Provider, + duplicatedQuery prometheus.Counter, + ruleEvalWarnings *prometheus.CounterVec, + partialResponseStrategy storepb.PartialResponseStrategy, +) rules.QueryFunc { + var spanID string + + switch partialResponseStrategy { + case storepb.PartialResponseStrategy_WARN: + spanID = "/rule_instant_query HTTP[client]" + case storepb.PartialResponseStrategy_ABORT: + spanID = "/rule_instant_query_part_resp_abort HTTP[client]" + default: + // Programming error will be caught by tests. + panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error()) + } + + return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) { + var addrs []string + + // Add addresses from gossip. + peers := peer.PeerStates(cluster.PeerTypeQuery) + var ids []string + for id := range peers { + ids = append(ids, id) + } + sort.Slice(ids, func(i int, j int) bool { + return strings.Compare(ids[i], ids[j]) < 0 + }) + for _, id := range ids { + addrs = append(addrs, peers[id].QueryAPIAddr) + } + + // Add DNS resolved addresses from static flags and file SD. + // TODO(bwplotka): Consider generating addresses in *url.URL + addrs = append(addrs, dnsProvider.Addresses()...) + + removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs) + + for _, i := range rand.Perm(len(addrs)) { + u, err := url.Parse(fmt.Sprintf("http://%s", addrs[i])) + if err != nil { + return nil, errors.Wrapf(err, "url parse %s", addrs[i]) + } + + span, ctx := tracing.StartSpan(ctx, spanID) + v, warns, err := promclient.PromqlQueryInstant(ctx, logger, u, q, t, promclient.ThanosQueryParams{ + Deduplicate: true, + PartialResponseStrategy: partialResponseStrategy, + }) + span.Finish() + + if err != nil && len(warns) > 0 { + ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc() + // TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ): + level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q) + } + return v, err + } + return nil, errors.Errorf("no query peer reachable") + } +} diff --git a/docs/components/query.md b/docs/components/query.md index 972796eecaf..b3ed80e3900 100644 --- a/docs/components/query.md +++ b/docs/components/query.md @@ -53,8 +53,22 @@ This logic can also be controlled via parameter on QueryAPI. More details below. Overall QueryAPI exposed by Thanos is guaranteed to be compatible with Prometheus 2.x. -However, for additional Thanos features, Thanos, on top of Prometheus adds several -additional parameters listed below as well as custom response fields. +However, for additional Thanos features, Thanos, on top of Prometheus adds +* partial response behaviour +* several additional parameters listed below +* custom response fields. + +### Partial Response + +QueryAPI and StoreAPI has additional behaviour controlled via query parameter called [PartialResponseStrategy](../../pkg/store/storepb/rpc.pb.go). + +Partial response is a potentially missed result within query against QueryAPI or StoreAPI. This can happen if one +of StoreAPIs is returning error or timeout whereas couple of others returns success. It does not mean you are missing data, +you might lucky enough that you actually get the correct data as the broken StoreAPI did not have anything for your query. + +If partial response happen QueryAPI returns human readable warnings explained [here](query.md#CustomResponseFields) + +See [this](query.md#PartialResponseStrategy) on how to control this behaviour. ### Deduplication Enabled @@ -77,7 +91,9 @@ Max source resolution is max resolution in seconds we want to use for data we qu * 5m -> we will use max 5m downsampling. * 1h -> we will use max 1h downsampling. -### Partial Response / Error Enabled +### Partial Response Strategy + +// TODO(bwplotka): Update. This will change to "strategy" soon as [PartialResponseStrategy enum here](../../pkg/store/storepb/rpc.proto) | HTTP URL/FORM parameter | Type | Default | Example | |----|----|----|----| diff --git a/docs/components/rule.md b/docs/components/rule.md index 60b856223b2..21789b33195 100644 --- a/docs/components/rule.md +++ b/docs/components/rule.md @@ -1,36 +1,139 @@ -# Rule +# Rule (aka Ruler) -_**NOTE:** The rule component is experimental since it has conceptual tradeoffs that might not be favorable for most use cases. It is recommended to keep deploying rules to the relevant Prometheus servers._ +_**NOTE:** It is recommended to keep deploying rules inside the relevant Prometheus servers locally. Use ruler only on specific cases. Read details[below](rule.md#Risk) why._ _The rule component should in particular not be used to circumvent solving rule deployment properly at the configuration management level._ -The rule component evaluates Prometheus recording and alerting rules against random query nodes in its cluster. Rule results are written back to disk in the Prometheus 2.0 storage format. Rule nodes at the same time participate in the cluster themselves as source store nodes and upload their generated TSDB blocks to an object store. +The rule component evaluates Prometheus recording and alerting rules against chosen query API via repeated `--query` (or FileSD via `--query.sd`). If more then one query is passed, round robin balancing is performed. + +Rule results are written back to disk in the Prometheus 2.0 storage format. Rule nodes at the same time participate in the system as source store nodes, which means that they expose StoreAPI and upload their generated TSDB blocks to an object store. -The data of each rule node can be labeled to satisfy the clusters labeling scheme. High-availability pairs can be run in parallel and should be distinguished by the designated replica label, just like regular Prometheus servers. +You can think of Rule as a simplified Prometheus that does not require a sidecar and does not scrape and do PromQL evaluation (no QueryAPI). + +The data of each Rule node can be labeled to satisfy the clusters labeling scheme. High-availability pairs can be run in parallel and should be distinguished by the designated replica label, just like regular Prometheus servers. +Read more about Ruler in HA in [here](rule.md#Ruler_HA) ``` $ thanos rule \ - --data-dir "/path/to/data" \ - --eval-interval "30s" \ - --rule-file "/path/to/rules/*.rules.yaml" \ - --alert.query-url "http://0.0.0.0:9090" \ - --alertmanagers.url "alert.thanos.io" \ - --cluster.peers "thanos-cluster.example.org" \ - --objstore.config-file "bucket.yml" + --data-dir "/path/to/data" \ + --eval-interval "30s" \ + --rule-file "/path/to/rules/*.rules.yaml" \ + --alert.query-url "http://0.0.0.0:9090" \ # This tells what query URL to link to in UI. + --alertmanagers.url "alert.thanos.io" \ + --query "query.example.org" \ + --query "query2.example.org" \ + --objstore.config-file "bucket.yml" \ + --label 'monitor_cluster="cluster1"' + --label 'replica="A" ``` -The content of `bucket.yml`: +## Risk + +Ruler has conceptual tradeoffs that might not be favorable for most use cases. The main tradeoff is its dependence on +query reliability. For Prometheus it is unlikely to have alert/recording rule evaluation failure as evaluation is local. + +For Ruler the read path is distributed, since most likely ruler is querying Thanos Querier which gets data from remote Store APIs. + +This means that **query failure** are more likely to happen, that's why clear strategy on what will happen to alert and during query +unavailability is the key. + +## Partial Response + +See [this](query.md#PartialResponse) on initial info. + +Rule allows to specify rule groups with additional field that controls PartialResponseStrategy e.g: ```yaml -type: GCS -config: - bucket: example-bucket +groups: +- name: "warn strategy" + partial_response_strategy: "warn" + rules: + - alert: "some" + expr: "up" +- name: "abort strategy" + partial_response_strategy: "abort" + rules: + - alert: "some" + expr: "up" +- name: "by default strategy is abort" + rules: + - alert: "some" + expr: "up" ``` +It is recommended to keep partial response to `abort` for alerts and that is the default as well. + +Essentially for alerting having partial response can result in symptom being missed by Rule's alert. + +## Must have: essential Ruler alerts! + +To be sure that alerting works it is essential to monitor Ruler and alert from another **Scraper (Prometheus + sidecar)** that sits in same cluster. + +The most important metrics to alert on are: + +* `thanos_alert_sender_alerts_dropped_total`. If greater than 0 it means that rule triggered alerts are not being sent to alertmanager which might +indicate connection, incompatibility or misconfiguration problems. + +* `prometheus_rule_evaluation_failures_total`. If greater than 0 it means that rule failed to be evaluated which results in +either gap in rule or potentially ignored alert. Alert heavily on this if this happens for longer than your alert thresholds. +`strategy` label will tell you if failures comes from rules that tolerates [partial response](rule.md#PartialResponse) or not. + +* `prometheus_rule_group_last_duration_seconds < prometheus_rule_group_interval_seconds` If the difference is heavy it means +that rule evaluation took more time than scheduled interval. It can indicate your query backend (e.g Querier) takes too much time +to evaluate the query, that is not fast enough to fill the rule. This might indicate other problems like slow StoreAPis or +too complex query expression in rule. + +* `thanos_rule_evaluation_with_warnings_total`. If you choose to use Rules and Alerts with [partial response strategy](rule.md#PartialResponse) +equals "warn", this metric will tell you how many evaluation ended up with some kind of warning. To see the actual warnings +see WARN log level. This might suggest that those evaluations returns partial response and might be or not accurate. + +Those metrics are important for vanilla Prometheus as well, but even more important when we rely on (sometimes WAN) network. + +// TODO(bwplotka): Rereview them after recent changes in metrics. +See [alerts](/examples/alerts.md#Ruler) for more example alerts for ruler. + +NOTE: It is also recommend to set an mocked Alert on Ruler that checks if query is up. This might be something simple like `vector(1)` query, just +to check if Querier is live. + +## Performance. + As rule nodes outsource query processing to query nodes, they should generally experience little load. If necessary, functional sharding can be applied by splitting up the sets of rules between HA pairs. Rules are processed with deduplicated data according to the replica label configured on query nodes. -## Deployment +## External labels + +It is *mandatory* to add certain external labels to indicate the ruler origin (e.g `label='replica="A"'` or for `cluster`). +Otherwise running multiple ruler replicas will be not possible, resulting in clash during compaction. + +NOTE: It is advised to put different external labels than labels given by other sources we are recording or alerting against. + +For example: + +* Ruler is in cluster `mon1` and we have Prometheus in cluster `eu1` +* By default we could try having consistent labels so we have `cluster=eu1` for Prometheus and `cluster=mon1` for Ruler. +* We configure `ScraperIsDown` alert that monitors service from `work1` cluster. +* When triggered this alert results in `ScraperIsDown{cluster=mon1}` since external labels always *replace* source labels. + +This effectively drops the important metadata and makes it impossible to tell in what exactly `cluster` the `ScraperIsDown` alert found problem +without falling back to manual query. + +## Ruler UI + +On HTTP address ruler exposes its UI that shows mainly Alerts and Rules page (similar to Prometheus Alerts page). +Each alert is linked to query that alert is performing that you can click to navigate to configured `alert.query-url`. + +## Ruler HA + +Ruler aims to use similar approach as Prometheus does. You can configure external labels, as well as simple relabelling. + +In case of Ruler in HA you need to make sure you have following labelling setup: + +* Labels that identifies the HA group ruler and replica label with different value for each ruler instance, e.g: +`cluster="eu1", replica="A"` and `cluster=eu1, replica="B"` by using `--label` flag. +* Labels that needs to be dropped just before sending to alermanager in order for alertmanger to deduplicate alerts e.g +`--alertmanager.label-drop="replica"`. + +Full relabelling is planned to be done in future and is tracked here: https://github.com/improbable-eng/thanos/issues/660 ## Flags diff --git a/pkg/promclient/promclient.go b/pkg/promclient/promclient.go index e70fbb245b9..ad9dd4e75c3 100644 --- a/pkg/promclient/promclient.go +++ b/pkg/promclient/promclient.go @@ -17,6 +17,8 @@ import ( "strings" "time" + "github.com/improbable-eng/thanos/pkg/store/storepb" + "github.com/go-kit/kit/log" "github.com/improbable-eng/thanos/pkg/runutil" "github.com/improbable-eng/thanos/pkg/tracing" @@ -25,7 +27,7 @@ import ( promlabels "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" "github.com/prometheus/tsdb/labels" - "gopkg.in/yaml.v2" + yaml "gopkg.in/yaml.v2" ) // IsWALFileAccesible returns no error if WAL dir can be found. This helps to tell @@ -242,24 +244,53 @@ func Snapshot(ctx context.Context, logger log.Logger, base *url.URL, skipHead bo return path.Join("snapshots", d.Data.Name), nil } +type ThanosQueryParams struct { + Deduplicate bool + PartialResponseStrategy storepb.PartialResponseStrategy +} + +func (p *ThanosQueryParams) AddTo(values url.Values) error { + values.Add("dedup", fmt.Sprintf("%v", p.Deduplicate)) + + var partialResponseValue string + switch p.PartialResponseStrategy { + case storepb.PartialResponseStrategy_WARN: + partialResponseValue = strconv.FormatBool(true) + case storepb.PartialResponseStrategy_ABORT: + partialResponseValue = strconv.FormatBool(false) + default: + return errors.Errorf("unknown partial response strategy %v", p.PartialResponseStrategy) + } + + // TODO(bwplotka): Apply change from bool to strategy in Query API as well. + values.Add("partial_response", partialResponseValue) + + return nil +} + // QueryInstant performs instant query and returns results in model.Vector type. -func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query string, t time.Time, dedup bool) (model.Vector, error) { +func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query string, t time.Time, extra ThanosQueryParams) (model.Vector, []string, error) { if logger == nil { logger = log.NewNopLogger() } - u := *base - u.Path = path.Join(u.Path, "/api/v1/query") - - params := url.Values{} + params, err := url.ParseQuery(base.RawQuery) + if err != nil { + return nil, nil, errors.Wrapf(err, "parse raw query %s", base.RawQuery) + } params.Add("query", query) params.Add("time", t.Format(time.RFC3339Nano)) - params.Add("dedup", fmt.Sprintf("%v", dedup)) + if err := extra.AddTo(params); err != nil { + return nil, nil, errors.Wrap(err, "add thanos extra query params") + } + + u := *base + u.Path = path.Join(u.Path, "/api/v1/query") u.RawQuery = params.Encode() req, err := http.NewRequest("GET", u.String(), nil) if err != nil { - return nil, err + return nil, nil, err } req = req.WithContext(ctx) @@ -269,7 +300,7 @@ func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query s } resp, err := client.Do(req) if err != nil { - return nil, err + return nil, nil, err } defer runutil.CloseWithLogOnErr(logger, resp.Body, "query body") @@ -280,10 +311,13 @@ func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query s ResultType string `json:"resultType"` Result json.RawMessage `json:"result"` } `json:"data"` + + // Extra field supported by Thanos Querier. + Warnings []string `json:"warnings"` } if err = json.NewDecoder(resp.Body).Decode(&m); err != nil { - return nil, err + return nil, nil, err } var vectorResult model.Vector @@ -293,24 +327,24 @@ func QueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query s switch m.Data.ResultType { case promql.ValueTypeVector: if err = json.Unmarshal(m.Data.Result, &vectorResult); err != nil { - return nil, err + return nil, nil, err } case promql.ValueTypeScalar: vectorResult, err = convertScalarJSONToVector(m.Data.Result) if err != nil { - return nil, err + return nil, nil, err } default: - return nil, errors.Errorf("unknown response type: '%q'", m.Data.ResultType) + return nil, nil, errors.Errorf("unknown response type: '%q'", m.Data.ResultType) } - return vectorResult, nil + return vectorResult, m.Warnings, nil } // PromqlQueryInstant performs instant query and returns results in promql.Vector type that is compatible with promql package. -func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query string, t time.Time, dedup bool) (promql.Vector, error) { - vectorResult, err := QueryInstant(ctx, logger, base, query, t, dedup) +func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, query string, t time.Time, extra ThanosQueryParams) (promql.Vector, []string, error) { + vectorResult, warnings, err := QueryInstant(ctx, logger, base, query, t, extra) if err != nil { - return nil, err + return nil, nil, err } vec := make(promql.Vector, 0, len(vectorResult)) @@ -332,7 +366,7 @@ func PromqlQueryInstant(ctx context.Context, logger log.Logger, base *url.URL, q }) } - return vec, nil + return vec, warnings, nil } // Scalar response consists of array with mixed types so it needs to be diff --git a/pkg/rule/api/v1.go b/pkg/rule/api/v1.go index be2dc5abf48..8ad52bd8e4f 100644 --- a/pkg/rule/api/v1.go +++ b/pkg/rule/api/v1.go @@ -5,32 +5,36 @@ import ( "net/http" "time" + thanosrule "github.com/improbable-eng/thanos/pkg/rule" + + "github.com/improbable-eng/thanos/pkg/store/storepb" + "github.com/NYTimes/gziphandler" qapi "github.com/improbable-eng/thanos/pkg/query/api" "github.com/improbable-eng/thanos/pkg/tracing" "github.com/prometheus/client_golang/prometheus" "github.com/go-kit/kit/log" - "github.com/opentracing/opentracing-go" + opentracing "github.com/opentracing/opentracing-go" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/rules" ) type API struct { - logger log.Logger - now func() time.Time - rulesRetriever rulesRetriever + logger log.Logger + now func() time.Time + ruleRetriever RulesRetriever } func NewAPI( logger log.Logger, - rr rulesRetriever, + ruleRetriever RulesRetriever, ) *API { return &API{ - logger: logger, - now: time.Now, - rulesRetriever: rr, + logger: logger, + now: time.Now, + ruleRetriever: ruleRetriever, } } @@ -54,20 +58,20 @@ func (api *API) Register(r *route.Router, tracer opentracing.Tracer, logger log. } -type rulesRetriever interface { - RuleGroups() []*rules.Group - AlertingRules() []*rules.AlertingRule +type RulesRetriever interface { + RuleGroups() []thanosrule.Group + AlertingRules() []thanosrule.AlertingRule } func (api *API) rules(r *http.Request) (interface{}, []error, *qapi.ApiError) { - ruleGroups := api.rulesRetriever.RuleGroups() - res := &RuleDiscovery{RuleGroups: make([]*RuleGroup, len(ruleGroups))} - for i, grp := range ruleGroups { + res := &RuleDiscovery{} + for _, grp := range api.ruleRetriever.RuleGroups() { apiRuleGroup := &RuleGroup{ - Name: grp.Name(), - File: grp.File(), - Interval: grp.Interval().Seconds(), - Rules: []rule{}, + Name: grp.Name(), + File: grp.File(), + Interval: grp.Interval().Seconds(), + Rules: []rule{}, + PartialResponseStrategy: grp.PartialResponseStrategy.String(), } for _, r := range grp.Rules() { @@ -86,7 +90,7 @@ func (api *API) rules(r *http.Request) (interface{}, []error, *qapi.ApiError) { Duration: rule.Duration().Seconds(), Labels: rule.Labels(), Annotations: rule.Annotations(), - Alerts: rulesAlertsToAPIAlerts(rule.ActiveAlerts()), + Alerts: rulesAlertsToAPIAlerts(grp.PartialResponseStrategy, rule.ActiveAlerts()), Health: rule.Health(), LastError: lastError, Type: "alerting", @@ -107,22 +111,20 @@ func (api *API) rules(r *http.Request) (interface{}, []error, *qapi.ApiError) { apiRuleGroup.Rules = append(apiRuleGroup.Rules, enrichedRule) } - res.RuleGroups[i] = apiRuleGroup + res.RuleGroups = append(res.RuleGroups, apiRuleGroup) } + return res, nil, nil } func (api *API) alerts(r *http.Request) (interface{}, []error, *qapi.ApiError) { - alertingRules := api.rulesRetriever.AlertingRules() - alerts := []*Alert{} - - for _, alertingRule := range alertingRules { + var alerts []*Alert + for _, alertingRule := range api.ruleRetriever.AlertingRules() { alerts = append( alerts, - rulesAlertsToAPIAlerts(alertingRule.ActiveAlerts())..., + rulesAlertsToAPIAlerts(alertingRule.PartialResponseStrategy, alertingRule.ActiveAlerts())..., ) } - res := &AlertDiscovery{Alerts: alerts} return res, nil, nil @@ -133,22 +135,24 @@ type AlertDiscovery struct { } type Alert struct { - Labels labels.Labels `json:"labels"` - Annotations labels.Labels `json:"annotations"` - State string `json:"state"` - ActiveAt *time.Time `json:"activeAt,omitempty"` - Value float64 `json:"value"` + Labels labels.Labels `json:"labels"` + Annotations labels.Labels `json:"annotations"` + State string `json:"state"` + ActiveAt *time.Time `json:"activeAt,omitempty"` + Value float64 `json:"value"` + PartialResponseStrategy string `json:"partial_response_strategy"` } -func rulesAlertsToAPIAlerts(rulesAlerts []*rules.Alert) []*Alert { +func rulesAlertsToAPIAlerts(s storepb.PartialResponseStrategy, rulesAlerts []*rules.Alert) []*Alert { apiAlerts := make([]*Alert, len(rulesAlerts)) for i, ruleAlert := range rulesAlerts { apiAlerts[i] = &Alert{ - Labels: ruleAlert.Labels, - Annotations: ruleAlert.Annotations, - State: ruleAlert.State.String(), - ActiveAt: &ruleAlert.ActiveAt, - Value: ruleAlert.Value, + PartialResponseStrategy: s.String(), + Labels: ruleAlert.Labels, + Annotations: ruleAlert.Annotations, + State: ruleAlert.State.String(), + ActiveAt: &ruleAlert.ActiveAt, + Value: ruleAlert.Value, } } @@ -165,8 +169,9 @@ type RuleGroup struct { // In order to preserve rule ordering, while exposing type (alerting or recording) // specific properties, both alerting and recording rules are exposed in the // same array. - Rules []rule `json:"rules"` - Interval float64 `json:"interval"` + Rules []rule `json:"rules"` + Interval float64 `json:"interval"` + PartialResponseStrategy string `json:"partial_response_strategy"` } type rule interface{} diff --git a/pkg/rule/api/v1_test.go b/pkg/rule/api/v1_test.go index af4034bd8dc..eba90887def 100644 --- a/pkg/rule/api/v1_test.go +++ b/pkg/rule/api/v1_test.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/kit/log" qapi "github.com/improbable-eng/thanos/pkg/query/api" + thanosrule "github.com/improbable-eng/thanos/pkg/rule" "github.com/prometheus/common/route" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" @@ -23,7 +24,7 @@ type rulesRetrieverMock struct { testing *testing.T } -func (m rulesRetrieverMock) RuleGroups() []*rules.Group { +func (m rulesRetrieverMock) RuleGroups() []thanosrule.Group { var ar rulesRetrieverMock arules := ar.AlertingRules() storage := testutil.NewStorage(m.testing) @@ -59,10 +60,10 @@ func (m rulesRetrieverMock) RuleGroups() []*rules.Group { r = append(r, recordingRule) group := rules.NewGroup("grp", "/path/to/file", time.Second, r, false, opts) - return []*rules.Group{group} + return []thanosrule.Group{{Group: group}} } -func (m rulesRetrieverMock) AlertingRules() []*rules.AlertingRule { +func (m rulesRetrieverMock) AlertingRules() []thanosrule.AlertingRule { expr1, err := promql.ParseExpr(`absent(test_metric3) != 1`) if err != nil { m.testing.Fatalf("unable to parse alert expression: %s", err) @@ -90,9 +91,9 @@ func (m rulesRetrieverMock) AlertingRules() []*rules.AlertingRule { true, log.NewNopLogger(), ) - var r []*rules.AlertingRule - r = append(r, rule1) - r = append(r, rule2) + var r []thanosrule.AlertingRule + r = append(r, thanosrule.AlertingRule{AlertingRule: rule1}) + r = append(r, thanosrule.AlertingRule{AlertingRule: rule2}) return r } @@ -122,7 +123,10 @@ func TestEndpoints(t *testing.T) { algr.testing = t algr.AlertingRules() algr.RuleGroups() - api := NewAPI(nil, algr) + api := NewAPI( + nil, + algr, + ) testEndpoints(t, api) }) } diff --git a/pkg/rule/rule.go b/pkg/rule/rule.go new file mode 100644 index 00000000000..c9abd6471eb --- /dev/null +++ b/pkg/rule/rule.go @@ -0,0 +1,170 @@ +package thanosrule + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "path/filepath" + "strings" + "time" + + "github.com/prometheus/prometheus/pkg/rulefmt" + + "github.com/improbable-eng/thanos/pkg/store/storepb" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/rules" + + "github.com/prometheus/tsdb" + yaml "gopkg.in/yaml.v2" +) + +const tmpRuleDir = ".tmp-rules" + +type Group struct { + *rules.Group + + PartialResponseStrategy storepb.PartialResponseStrategy +} + +type AlertingRule struct { + *rules.AlertingRule + + PartialResponseStrategy storepb.PartialResponseStrategy +} + +type RuleGroups struct { + Groups []RuleGroup `yaml:"groups"` +} + +type RuleGroup struct { + rulefmt.RuleGroup + + PartialResponseStrategy storepb.PartialResponseStrategy `yaml:"partial_response_strategy"` +} + +type Managers map[storepb.PartialResponseStrategy]*rules.Manager + +func (m Managers) RuleGroups() []Group { + var res []Group + for s, r := range m { + for _, group := range r.RuleGroups() { + res = append(res, Group{Group: group, PartialResponseStrategy: s}) + } + } + return res +} + +func (m Managers) AlertingRules() []AlertingRule { + var res []AlertingRule + for s, r := range m { + for _, r := range r.AlertingRules() { + res = append(res, AlertingRule{AlertingRule: r, PartialResponseStrategy: s}) + } + } + return res +} + +func (r *RuleGroup) UnmarshalYAML(unmarshal func(interface{}) error) error { + rs := struct { + String string `yaml:"partial_response_strategy"` + }{} + + errMsg := fmt.Sprintf("failed to unmarshal 'partial_response_strategy'. Possible values are %s", strings.Join(storepb.PartialResponseStrategyValues, ",")) + if err := unmarshal(&rs); err != nil { + return errors.Wrapf(err, errMsg) + } + + rg := rulefmt.RuleGroup{} + if err := unmarshal(&rg); err != nil { + return errors.Wrapf(err, errMsg) + } + + p, ok := storepb.PartialResponseStrategy_value[strings.ToUpper(rs.String)] + if !ok { + if rs.String != "" { + return errors.Errorf("%s. Got: %s", errMsg, rs.String) + } + // NOTE: For Rule default is abort as this is recommended for alerting. + p = storepb.PartialResponseStrategy_value[storepb.PartialResponseStrategy_ABORT.String()] + } + + r.RuleGroup = rg + r.PartialResponseStrategy = storepb.PartialResponseStrategy(p) + return nil +} + +// Update updates rules from given files to all managers we hold. We decide which groups should go where, based on +// special field in RuleGroup file. +func (m *Managers) Update(dataDir string, evalInterval time.Duration, files []string) error { + var ( + errs tsdb.MultiError + filesMap = map[storepb.PartialResponseStrategy][]string{} + ) + + if err := os.RemoveAll(path.Join(dataDir, tmpRuleDir)); err != nil { + return errors.Wrapf(err, "rm %s", path.Join(dataDir, tmpRuleDir)) + } + if err := os.MkdirAll(path.Join(dataDir, tmpRuleDir), os.ModePerm); err != nil { + return errors.Wrapf(err, "mkdir %s", path.Join(dataDir, tmpRuleDir)) + } + + for _, fn := range files { + b, err := ioutil.ReadFile(fn) + if err != nil { + errs = append(errs, err) + continue + } + + var rg RuleGroups + if err := yaml.Unmarshal(b, &rg); err != nil { + errs = append(errs, err) + continue + } + + // NOTE: This is very ugly, but we need to reparse it into tmp dir without the field to have to reuse + // rules.Manager. The problem is that it uses yaml.UnmarshalStrict for some reasons. + mapped := map[storepb.PartialResponseStrategy]*rulefmt.RuleGroups{} + for _, rg := range rg.Groups { + if _, ok := mapped[rg.PartialResponseStrategy]; !ok { + mapped[rg.PartialResponseStrategy] = &rulefmt.RuleGroups{} + } + + mapped[rg.PartialResponseStrategy].Groups = append( + mapped[rg.PartialResponseStrategy].Groups, + rg.RuleGroup, + ) + } + + for s, rg := range mapped { + b, err := yaml.Marshal(rg) + if err != nil { + errs = append(errs, err) + continue + } + + newFn := path.Join(dataDir, tmpRuleDir, filepath.Base(fn)+"."+s.String()) + if err := ioutil.WriteFile(newFn, b, os.ModePerm); err != nil { + errs = append(errs, err) + continue + } + + filesMap[s] = append(filesMap[s], newFn) + } + + } + + for s, fs := range filesMap { + updater, ok := (*m)[s] + if !ok { + errs = append(errs, errors.Errorf("no updater found for %v", s)) + continue + } + if err := updater.Update(evalInterval, fs); err != nil { + errs = append(errs, err) + continue + } + } + + return errs.Err() +} diff --git a/pkg/rule/rule_test.go b/pkg/rule/rule_test.go new file mode 100644 index 00000000000..4c8926d22c0 --- /dev/null +++ b/pkg/rule/rule_test.go @@ -0,0 +1,125 @@ +package thanosrule + +import ( + "io/ioutil" + "os" + "path" + "sort" + "strings" + "testing" + "time" + + "github.com/go-kit/kit/log" + + "github.com/improbable-eng/thanos/pkg/store/storepb" + "github.com/improbable-eng/thanos/pkg/testutil" + "github.com/prometheus/prometheus/rules" +) + +func TestUpdate(t *testing.T) { + dir, err := ioutil.TempDir("", "test_rule_rule_groups") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() + + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "no_strategy.yaml"), []byte(` +groups: +- name: "something1" + rules: + - alert: "some" + expr: "up" +`), os.ModePerm)) + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "abort.yaml"), []byte(` +groups: +- name: "something2" + partial_response_strategy: "abort" + rules: + - alert: "some" + expr: "up" +`), os.ModePerm)) + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "warn.yaml"), []byte(` +groups: +- name: "something3" + partial_response_strategy: "warn" + rules: + - alert: "some" + expr: "up" +`), os.ModePerm)) + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "wrong.yaml"), []byte(` +groups: +- name: "something4" + partial_response_strategy: "afafsdgsdgs" # Err 1 + rules: + - alert: "some" + expr: "up" +`), os.ModePerm)) + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "combined.yaml"), []byte(` +groups: +- name: "something5" + partial_response_strategy: "warn" + rules: + - alert: "some" + expr: "up" +- name: "something6" + partial_response_strategy: "abort" + rules: + - alert: "some" + expr: "up" +- name: "something7" + rules: + - alert: "some" + expr: "up" +`), os.ModePerm)) + testutil.Ok(t, ioutil.WriteFile(path.Join(dir, "combined-wrong.yaml"), []byte(` +groups: +- name: "something8" + partial_response_strategy: "warn" + rules: + - alert: "some" + expr: "up" +- name: "something9" + partial_response_strategy: "adad" # Err 2 + rules: + - alert: "some" + expr: "up" +`), os.ModePerm)) + + opts := rules.ManagerOptions{ + Logger: log.NewLogfmtLogger(os.Stderr), + } + m := Managers{ + storepb.PartialResponseStrategy_ABORT: rules.NewManager(&opts), + storepb.PartialResponseStrategy_WARN: rules.NewManager(&opts), + } + + err = m.Update(dir, 10*time.Second, []string{ + path.Join(dir, "no_strategy.yaml"), + path.Join(dir, "abort.yaml"), + path.Join(dir, "warn.yaml"), + path.Join(dir, "wrong.yaml"), + path.Join(dir, "combined.yaml"), + path.Join(dir, "combined_wrong.yaml"), + }) + + testutil.NotOk(t, err) + testutil.Assert(t, strings.HasPrefix(err.Error(), "2 errors: failed to unmarshal 'partial_response_strategy'"), err.Error()) + + g := m[storepb.PartialResponseStrategy_WARN].RuleGroups() + testutil.Equals(t, 2, len(g)) + + sort.Slice(g, func(i, j int) bool { + return g[i].Name() < g[j].Name() + }) + testutil.Equals(t, "something3", g[0].Name()) + testutil.Equals(t, "something5", g[1].Name()) + + g = m[storepb.PartialResponseStrategy_ABORT].RuleGroups() + testutil.Equals(t, 4, len(g)) + + sort.Slice(g, func(i, j int) bool { + return g[i].Name() < g[j].Name() + }) + testutil.Equals(t, "something1", g[0].Name()) + testutil.Equals(t, "something2", g[1].Name()) + testutil.Equals(t, "something6", g[2].Name()) + testutil.Equals(t, "something7", g[3].Name()) +} diff --git a/pkg/store/prompb/remote.pb.go b/pkg/store/prompb/remote.pb.go index 06a0952c98e..8da7ac6b6dc 100644 --- a/pkg/store/prompb/remote.pb.go +++ b/pkg/store/prompb/remote.pb.go @@ -53,7 +53,7 @@ func (LabelMatcher_Type) EnumDescriptor() ([]byte, []int) { } type WriteRequest struct { - Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries"` + Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -93,7 +93,7 @@ func (m *WriteRequest) XXX_DiscardUnknown() { var xxx_messageInfo_WriteRequest proto.InternalMessageInfo type ReadRequest struct { - Queries []Query `protobuf:"bytes,1,rep,name=queries" json:"queries"` + Queries []Query `protobuf:"bytes,1,rep,name=queries,proto3" json:"queries"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -134,7 +134,7 @@ var xxx_messageInfo_ReadRequest proto.InternalMessageInfo type ReadResponse struct { // In same order as the request's queries. - Results []QueryResult `protobuf:"bytes,1,rep,name=results" json:"results"` + Results []QueryResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -176,8 +176,8 @@ var xxx_messageInfo_ReadResponse proto.InternalMessageInfo type Query struct { StartTimestampMs int64 `protobuf:"varint,1,opt,name=start_timestamp_ms,json=startTimestampMs,proto3" json:"start_timestamp_ms,omitempty"` EndTimestampMs int64 `protobuf:"varint,2,opt,name=end_timestamp_ms,json=endTimestampMs,proto3" json:"end_timestamp_ms,omitempty"` - Matchers []LabelMatcher `protobuf:"bytes,3,rep,name=matchers" json:"matchers"` - Hints *ReadHints `protobuf:"bytes,4,opt,name=hints" json:"hints,omitempty"` + Matchers []LabelMatcher `protobuf:"bytes,3,rep,name=matchers,proto3" json:"matchers"` + Hints *ReadHints `protobuf:"bytes,4,opt,name=hints,proto3" json:"hints,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -217,7 +217,7 @@ func (m *Query) XXX_DiscardUnknown() { var xxx_messageInfo_Query proto.InternalMessageInfo type QueryResult struct { - Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries" json:"timeseries"` + Timeseries []TimeSeries `protobuf:"bytes,1,rep,name=timeseries,proto3" json:"timeseries"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -298,8 +298,8 @@ func (m *Sample) XXX_DiscardUnknown() { var xxx_messageInfo_Sample proto.InternalMessageInfo type TimeSeries struct { - Labels []Label `protobuf:"bytes,1,rep,name=labels" json:"labels"` - Samples []Sample `protobuf:"bytes,2,rep,name=samples" json:"samples"` + Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` + Samples []Sample `protobuf:"bytes,2,rep,name=samples,proto3" json:"samples"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -863,6 +863,9 @@ func encodeVarintRemote(dAtA []byte, offset int, v uint64) int { return offset + 1 } func (m *WriteRequest) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Timeseries) > 0 { @@ -878,6 +881,9 @@ func (m *WriteRequest) Size() (n int) { } func (m *ReadRequest) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Queries) > 0 { @@ -893,6 +899,9 @@ func (m *ReadRequest) Size() (n int) { } func (m *ReadResponse) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Results) > 0 { @@ -908,6 +917,9 @@ func (m *ReadResponse) Size() (n int) { } func (m *Query) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.StartTimestampMs != 0 { @@ -933,6 +945,9 @@ func (m *Query) Size() (n int) { } func (m *QueryResult) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Timeseries) > 0 { @@ -948,6 +963,9 @@ func (m *QueryResult) Size() (n int) { } func (m *Sample) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Value != 0 { @@ -963,6 +981,9 @@ func (m *Sample) Size() (n int) { } func (m *TimeSeries) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Labels) > 0 { @@ -984,6 +1005,9 @@ func (m *TimeSeries) Size() (n int) { } func (m *Label) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l l = len(m.Name) @@ -1001,6 +1025,9 @@ func (m *Label) Size() (n int) { } func (m *LabelMatcher) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Type != 0 { @@ -1021,6 +1048,9 @@ func (m *LabelMatcher) Size() (n int) { } func (m *ReadHints) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.StepMs != 0 { diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index 18e2035e69d..2097282408e 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -6,6 +6,14 @@ import ( "github.com/prometheus/prometheus/pkg/labels" ) +var PartialResponseStrategyValues = func() []string { + var s []string + for k := range PartialResponseStrategy_value { + s = append(s, k) + } + return s +}() + func NewWarnSeriesResponse(err error) *SeriesResponse { return &SeriesResponse{ Result: &SeriesResponse_Warning{ diff --git a/pkg/store/storepb/rpc.pb.go b/pkg/store/storepb/rpc.pb.go index e9ddda8711c..a176361ddb3 100644 --- a/pkg/store/storepb/rpc.pb.go +++ b/pkg/store/storepb/rpc.pb.go @@ -8,8 +8,10 @@ import fmt "fmt" import math "math" import _ "github.com/gogo/protobuf/gogoproto" -import context "golang.org/x/net/context" -import grpc "google.golang.org/grpc" +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) import io "io" @@ -56,7 +58,30 @@ func (x StoreType) String() string { return proto.EnumName(StoreType_name, int32(x)) } func (StoreType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{0} + return fileDescriptor_rpc_f4f04914f1106c76, []int{0} +} + +type PartialResponseStrategy int32 + +const ( + PartialResponseStrategy_WARN PartialResponseStrategy = 0 + PartialResponseStrategy_ABORT PartialResponseStrategy = 1 +) + +var PartialResponseStrategy_name = map[int32]string{ + 0: "WARN", + 1: "ABORT", +} +var PartialResponseStrategy_value = map[string]int32{ + "WARN": 0, + "ABORT": 1, +} + +func (x PartialResponseStrategy) String() string { + return proto.EnumName(PartialResponseStrategy_name, int32(x)) +} +func (PartialResponseStrategy) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_rpc_f4f04914f1106c76, []int{1} } type Aggr int32 @@ -91,7 +116,7 @@ func (x Aggr) String() string { return proto.EnumName(Aggr_name, int32(x)) } func (Aggr) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{1} + return fileDescriptor_rpc_f4f04914f1106c76, []int{2} } type InfoRequest struct { @@ -104,7 +129,7 @@ func (m *InfoRequest) Reset() { *m = InfoRequest{} } func (m *InfoRequest) String() string { return proto.CompactTextString(m) } func (*InfoRequest) ProtoMessage() {} func (*InfoRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{0} + return fileDescriptor_rpc_f4f04914f1106c76, []int{0} } func (m *InfoRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -134,7 +159,7 @@ func (m *InfoRequest) XXX_DiscardUnknown() { var xxx_messageInfo_InfoRequest proto.InternalMessageInfo type InfoResponse struct { - Labels []Label `protobuf:"bytes,1,rep,name=labels" json:"labels"` + Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` MinTime int64 `protobuf:"varint,2,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` MaxTime int64 `protobuf:"varint,3,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` StoreType StoreType `protobuf:"varint,4,opt,name=storeType,proto3,enum=thanos.StoreType" json:"storeType,omitempty"` @@ -147,7 +172,7 @@ func (m *InfoResponse) Reset() { *m = InfoResponse{} } func (m *InfoResponse) String() string { return proto.CompactTextString(m) } func (*InfoResponse) ProtoMessage() {} func (*InfoResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{1} + return fileDescriptor_rpc_f4f04914f1106c76, []int{1} } func (m *InfoResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -177,22 +202,24 @@ func (m *InfoResponse) XXX_DiscardUnknown() { var xxx_messageInfo_InfoResponse proto.InternalMessageInfo type SeriesRequest struct { - MinTime int64 `protobuf:"varint,1,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` - MaxTime int64 `protobuf:"varint,2,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` - Matchers []LabelMatcher `protobuf:"bytes,3,rep,name=matchers" json:"matchers"` - MaxResolutionWindow int64 `protobuf:"varint,4,opt,name=max_resolution_window,json=maxResolutionWindow,proto3" json:"max_resolution_window,omitempty"` - Aggregates []Aggr `protobuf:"varint,5,rep,packed,name=aggregates,enum=thanos.Aggr" json:"aggregates,omitempty"` - PartialResponseDisabled bool `protobuf:"varint,6,opt,name=partial_response_disabled,json=partialResponseDisabled,proto3" json:"partial_response_disabled,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + MinTime int64 `protobuf:"varint,1,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` + MaxTime int64 `protobuf:"varint,2,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` + Matchers []LabelMatcher `protobuf:"bytes,3,rep,name=matchers,proto3" json:"matchers"` + MaxResolutionWindow int64 `protobuf:"varint,4,opt,name=max_resolution_window,json=maxResolutionWindow,proto3" json:"max_resolution_window,omitempty"` + Aggregates []Aggr `protobuf:"varint,5,rep,packed,name=aggregates,proto3,enum=thanos.Aggr" json:"aggregates,omitempty"` + // Deprecated. Use partial_response_strategy instead. + PartialResponseDisabled bool `protobuf:"varint,6,opt,name=partial_response_disabled,json=partialResponseDisabled,proto3" json:"partial_response_disabled,omitempty"` + PartialResponseStrategy PartialResponseStrategy `protobuf:"varint,7,opt,name=partial_response_strategy,json=partialResponseStrategy,proto3,enum=thanos.PartialResponseStrategy" json:"partial_response_strategy,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *SeriesRequest) Reset() { *m = SeriesRequest{} } func (m *SeriesRequest) String() string { return proto.CompactTextString(m) } func (*SeriesRequest) ProtoMessage() {} func (*SeriesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{2} + return fileDescriptor_rpc_f4f04914f1106c76, []int{2} } func (m *SeriesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -235,7 +262,7 @@ func (m *SeriesResponse) Reset() { *m = SeriesResponse{} } func (m *SeriesResponse) String() string { return proto.CompactTextString(m) } func (*SeriesResponse) ProtoMessage() {} func (*SeriesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{3} + return fileDescriptor_rpc_f4f04914f1106c76, []int{3} } func (m *SeriesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -271,7 +298,7 @@ type isSeriesResponse_Result interface { } type SeriesResponse_Series struct { - Series *Series `protobuf:"bytes,1,opt,name=series,oneof"` + Series *Series `protobuf:"bytes,1,opt,name=series,proto3,oneof"` } type SeriesResponse_Warning struct { Warning string `protobuf:"bytes,2,opt,name=warning,proto3,oneof"` @@ -382,7 +409,7 @@ func (m *LabelNamesRequest) Reset() { *m = LabelNamesRequest{} } func (m *LabelNamesRequest) String() string { return proto.CompactTextString(m) } func (*LabelNamesRequest) ProtoMessage() {} func (*LabelNamesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{4} + return fileDescriptor_rpc_f4f04914f1106c76, []int{4} } func (m *LabelNamesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -412,8 +439,8 @@ func (m *LabelNamesRequest) XXX_DiscardUnknown() { var xxx_messageInfo_LabelNamesRequest proto.InternalMessageInfo type LabelNamesResponse struct { - Names []string `protobuf:"bytes,1,rep,name=names" json:"names,omitempty"` - Warnings []string `protobuf:"bytes,2,rep,name=warnings" json:"warnings,omitempty"` + Names []string `protobuf:"bytes,1,rep,name=names,proto3" json:"names,omitempty"` + Warnings []string `protobuf:"bytes,2,rep,name=warnings,proto3" json:"warnings,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -423,7 +450,7 @@ func (m *LabelNamesResponse) Reset() { *m = LabelNamesResponse{} } func (m *LabelNamesResponse) String() string { return proto.CompactTextString(m) } func (*LabelNamesResponse) ProtoMessage() {} func (*LabelNamesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{5} + return fileDescriptor_rpc_f4f04914f1106c76, []int{5} } func (m *LabelNamesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -464,7 +491,7 @@ func (m *LabelValuesRequest) Reset() { *m = LabelValuesRequest{} } func (m *LabelValuesRequest) String() string { return proto.CompactTextString(m) } func (*LabelValuesRequest) ProtoMessage() {} func (*LabelValuesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{6} + return fileDescriptor_rpc_f4f04914f1106c76, []int{6} } func (m *LabelValuesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -494,8 +521,8 @@ func (m *LabelValuesRequest) XXX_DiscardUnknown() { var xxx_messageInfo_LabelValuesRequest proto.InternalMessageInfo type LabelValuesResponse struct { - Values []string `protobuf:"bytes,1,rep,name=values" json:"values,omitempty"` - Warnings []string `protobuf:"bytes,2,rep,name=warnings" json:"warnings,omitempty"` + Values []string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty"` + Warnings []string `protobuf:"bytes,2,rep,name=warnings,proto3" json:"warnings,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -505,7 +532,7 @@ func (m *LabelValuesResponse) Reset() { *m = LabelValuesResponse{} } func (m *LabelValuesResponse) String() string { return proto.CompactTextString(m) } func (*LabelValuesResponse) ProtoMessage() {} func (*LabelValuesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_rpc_b2f04ff11750c7dd, []int{7} + return fileDescriptor_rpc_f4f04914f1106c76, []int{7} } func (m *LabelValuesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -544,6 +571,7 @@ func init() { proto.RegisterType((*LabelValuesRequest)(nil), "thanos.LabelValuesRequest") proto.RegisterType((*LabelValuesResponse)(nil), "thanos.LabelValuesResponse") proto.RegisterEnum("thanos.StoreType", StoreType_name, StoreType_value) + proto.RegisterEnum("thanos.PartialResponseStrategy", PartialResponseStrategy_name, PartialResponseStrategy_value) proto.RegisterEnum("thanos.Aggr", Aggr_name, Aggr_value) } @@ -555,8 +583,9 @@ var _ grpc.ClientConn // is compatible with the grpc package it is being compiled against. const _ = grpc.SupportPackageIsVersion4 -// Client API for Store service - +// StoreClient is the client API for Store service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type StoreClient interface { // / Info returns meta information about a store e.g labels that makes that store unique as well as time range that is // / available. @@ -637,8 +666,7 @@ func (c *storeClient) LabelValues(ctx context.Context, in *LabelValuesRequest, o return out, nil } -// Server API for Store service - +// StoreServer is the server API for Store service. type StoreServer interface { // / Info returns meta information about a store e.g labels that makes that store unique as well as time range that is // / available. @@ -896,6 +924,11 @@ func (m *SeriesRequest) MarshalTo(dAtA []byte) (int, error) { } i++ } + if m.PartialResponseStrategy != 0 { + dAtA[i] = 0x38 + i++ + i = encodeVarintRpc(dAtA, i, uint64(m.PartialResponseStrategy)) + } if m.XXX_unrecognized != nil { i += copy(dAtA[i:], m.XXX_unrecognized) } @@ -1132,6 +1165,9 @@ func encodeVarintRpc(dAtA []byte, offset int, v uint64) int { return offset + 1 } func (m *InfoRequest) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.XXX_unrecognized != nil { @@ -1141,6 +1177,9 @@ func (m *InfoRequest) Size() (n int) { } func (m *InfoResponse) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Labels) > 0 { @@ -1165,6 +1204,9 @@ func (m *InfoResponse) Size() (n int) { } func (m *SeriesRequest) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.MinTime != 0 { @@ -1192,6 +1234,9 @@ func (m *SeriesRequest) Size() (n int) { if m.PartialResponseDisabled { n += 2 } + if m.PartialResponseStrategy != 0 { + n += 1 + sovRpc(uint64(m.PartialResponseStrategy)) + } if m.XXX_unrecognized != nil { n += len(m.XXX_unrecognized) } @@ -1199,6 +1244,9 @@ func (m *SeriesRequest) Size() (n int) { } func (m *SeriesResponse) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Result != nil { @@ -1211,6 +1259,9 @@ func (m *SeriesResponse) Size() (n int) { } func (m *SeriesResponse_Series) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Series != nil { @@ -1220,6 +1271,9 @@ func (m *SeriesResponse_Series) Size() (n int) { return n } func (m *SeriesResponse_Warning) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l l = len(m.Warning) @@ -1227,6 +1281,9 @@ func (m *SeriesResponse_Warning) Size() (n int) { return n } func (m *LabelNamesRequest) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.PartialResponseDisabled { @@ -1239,6 +1296,9 @@ func (m *LabelNamesRequest) Size() (n int) { } func (m *LabelNamesResponse) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Names) > 0 { @@ -1260,6 +1320,9 @@ func (m *LabelNamesResponse) Size() (n int) { } func (m *LabelValuesRequest) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l l = len(m.Label) @@ -1276,6 +1339,9 @@ func (m *LabelValuesRequest) Size() (n int) { } func (m *LabelValuesResponse) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Values) > 0 { @@ -1657,6 +1723,10 @@ func (m *SeriesRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } + var elementCount int + if elementCount != 0 && len(m.Aggregates) == 0 { + m.Aggregates = make([]Aggr, 0, elementCount) + } for iNdEx < postIndex { var v Aggr for shift := uint(0); ; shift += 7 { @@ -1698,6 +1768,25 @@ func (m *SeriesRequest) Unmarshal(dAtA []byte) error { } } m.PartialResponseDisabled = bool(v != 0) + case 7: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field PartialResponseStrategy", wireType) + } + m.PartialResponseStrategy = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowRpc + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.PartialResponseStrategy |= (PartialResponseStrategy(b) & 0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipRpc(dAtA[iNdEx:]) @@ -2326,51 +2415,54 @@ var ( ErrIntOverflowRpc = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("rpc.proto", fileDescriptor_rpc_b2f04ff11750c7dd) } - -var fileDescriptor_rpc_b2f04ff11750c7dd = []byte{ - // 683 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x54, 0xc1, 0x6e, 0xda, 0x4c, - 0x10, 0xc6, 0x36, 0x36, 0x78, 0x48, 0x90, 0xb3, 0x21, 0xf9, 0x8d, 0x7f, 0x89, 0x22, 0x4e, 0x28, - 0xad, 0x92, 0x96, 0x4a, 0x95, 0xda, 0x1b, 0x10, 0x47, 0x41, 0x4d, 0x40, 0x5d, 0x20, 0x69, 0x7b, - 0x49, 0x4d, 0xb2, 0x71, 0x2c, 0x19, 0xdb, 0xf5, 0x9a, 0x26, 0xb9, 0xf6, 0x35, 0x7a, 0xeb, 0xd3, - 0xe4, 0xd8, 0x27, 0xa8, 0x5a, 0x9e, 0xa4, 0xf2, 0x7a, 0x0d, 0xb8, 0x4a, 0xb9, 0xed, 0x7c, 0xdf, - 0x78, 0xe6, 0xdb, 0x99, 0xcf, 0x0b, 0x6a, 0x18, 0x5c, 0xee, 0x07, 0xa1, 0x1f, 0xf9, 0x48, 0x89, - 0x6e, 0x2c, 0xcf, 0xa7, 0x46, 0x29, 0xba, 0x0f, 0x08, 0x4d, 0x40, 0xa3, 0x62, 0xfb, 0xb6, 0xcf, - 0x8e, 0x07, 0xf1, 0x29, 0x41, 0x1b, 0x9b, 0x50, 0xea, 0x79, 0xd7, 0x3e, 0x26, 0x9f, 0x67, 0x84, - 0x46, 0x8d, 0xef, 0x02, 0x6c, 0x24, 0x31, 0x0d, 0x7c, 0x8f, 0x12, 0xf4, 0x14, 0x14, 0xd7, 0x9a, - 0x10, 0x97, 0xea, 0x42, 0x5d, 0x6a, 0x96, 0x5a, 0x9b, 0xfb, 0x49, 0xed, 0xfd, 0x93, 0x18, 0xed, - 0xe4, 0x1f, 0x7e, 0x3e, 0xc9, 0x61, 0x9e, 0x82, 0xaa, 0x50, 0x9c, 0x3a, 0xde, 0x45, 0xe4, 0x4c, - 0x89, 0x2e, 0xd6, 0x85, 0xa6, 0x84, 0x0b, 0x53, 0xc7, 0x1b, 0x39, 0x53, 0xc2, 0x28, 0xeb, 0x2e, - 0xa1, 0x24, 0x4e, 0x59, 0x77, 0x8c, 0x3a, 0x00, 0x95, 0x46, 0x7e, 0x48, 0x46, 0xf7, 0x01, 0xd1, - 0xf3, 0x75, 0xa1, 0x59, 0x6e, 0x6d, 0xa5, 0x5d, 0x86, 0x29, 0x81, 0x97, 0x39, 0x8d, 0x6f, 0x22, - 0x6c, 0x0e, 0x49, 0xe8, 0x10, 0xca, 0x65, 0x67, 0x1a, 0x0b, 0xff, 0x6e, 0x2c, 0x66, 0x1b, 0xbf, - 0x8a, 0xa9, 0xe8, 0xf2, 0x86, 0x84, 0x54, 0x97, 0xd8, 0xed, 0x2a, 0x99, 0xdb, 0x9d, 0x26, 0x24, - 0xbf, 0xe4, 0x22, 0x17, 0xb5, 0x60, 0x27, 0x2e, 0x19, 0x12, 0xea, 0xbb, 0xb3, 0xc8, 0xf1, 0xbd, - 0x8b, 0x5b, 0xc7, 0xbb, 0xf2, 0x6f, 0x99, 0x78, 0x09, 0x6f, 0x4f, 0xad, 0x3b, 0xbc, 0xe0, 0xce, - 0x19, 0x85, 0x9e, 0x01, 0x58, 0xb6, 0x1d, 0x12, 0xdb, 0x8a, 0x08, 0xd5, 0xe5, 0xba, 0xd4, 0x2c, - 0xb7, 0x36, 0xd2, 0x6e, 0x6d, 0xdb, 0x0e, 0xf1, 0x0a, 0x8f, 0xde, 0x40, 0x35, 0xb0, 0xc2, 0xc8, - 0xb1, 0xdc, 0xb8, 0x0b, 0xdb, 0xc4, 0xc5, 0x95, 0x43, 0xad, 0x89, 0x4b, 0xae, 0x74, 0xa5, 0x2e, - 0x34, 0x8b, 0xf8, 0x3f, 0x9e, 0x90, 0x6e, 0xea, 0x90, 0xd3, 0x8d, 0x4f, 0x50, 0x4e, 0x87, 0xc3, - 0x77, 0xd8, 0x04, 0x85, 0x32, 0x84, 0xcd, 0xa6, 0xd4, 0x2a, 0x2f, 0xa6, 0xcb, 0xd0, 0xe3, 0x1c, - 0xe6, 0x3c, 0x32, 0xa0, 0x70, 0x6b, 0x85, 0x9e, 0xe3, 0xd9, 0x6c, 0x56, 0xea, 0x71, 0x0e, 0xa7, - 0x40, 0xa7, 0x08, 0x4a, 0x48, 0xe8, 0xcc, 0x8d, 0x1a, 0x03, 0xd8, 0x62, 0xf3, 0xe9, 0x5b, 0xd3, - 0xe5, 0x0a, 0xd6, 0x4a, 0x16, 0xd6, 0x4b, 0x3e, 0x02, 0xb4, 0x5a, 0x90, 0xcb, 0xae, 0x80, 0xec, - 0xc5, 0x00, 0x73, 0x9e, 0x8a, 0x93, 0x00, 0x19, 0x50, 0xe4, 0x8a, 0xa8, 0x2e, 0x32, 0x62, 0x11, - 0x37, 0xae, 0x79, 0x9d, 0x33, 0xcb, 0x9d, 0x2d, 0x95, 0x55, 0x40, 0x66, 0xfe, 0x64, 0x2a, 0x54, - 0x9c, 0x04, 0xeb, 0xf5, 0x8a, 0xeb, 0xf5, 0xf6, 0x60, 0x3b, 0xd3, 0x87, 0x0b, 0xde, 0x05, 0xe5, - 0x0b, 0x43, 0xb8, 0x62, 0x1e, 0xad, 0x93, 0xbc, 0x87, 0x41, 0x5d, 0x78, 0x1c, 0x95, 0xa0, 0x30, - 0xee, 0xbf, 0xed, 0x0f, 0xce, 0xfb, 0x5a, 0x0e, 0xa9, 0x20, 0xbf, 0x1b, 0x9b, 0xf8, 0x83, 0x26, - 0xa0, 0x22, 0xe4, 0xf1, 0xf8, 0xc4, 0xd4, 0xc4, 0x38, 0x63, 0xd8, 0x3b, 0x34, 0xbb, 0x6d, 0xac, - 0x49, 0x71, 0xc6, 0x70, 0x34, 0xc0, 0xa6, 0x96, 0x8f, 0x71, 0x6c, 0x76, 0xcd, 0xde, 0x99, 0xa9, - 0xc9, 0x7b, 0x1d, 0xc8, 0xc7, 0x8e, 0x42, 0x05, 0x90, 0x70, 0xfb, 0x3c, 0x29, 0xd5, 0x1d, 0x8c, - 0xfb, 0x23, 0x4d, 0x88, 0xb1, 0xe1, 0xf8, 0x54, 0x13, 0xe3, 0xc3, 0x69, 0xaf, 0xaf, 0x49, 0xec, - 0xd0, 0x7e, 0x9f, 0xd4, 0x60, 0x59, 0x26, 0xd6, 0xe4, 0xd6, 0x57, 0x11, 0x64, 0x26, 0x0c, 0xbd, - 0x80, 0x7c, 0xfc, 0x22, 0xa0, 0xed, 0xd4, 0x35, 0x2b, 0xef, 0x85, 0x51, 0xc9, 0x82, 0x7c, 0x10, - 0xaf, 0x41, 0x49, 0xac, 0x85, 0x76, 0xb2, 0x56, 0x4b, 0x3f, 0xdb, 0xfd, 0x1b, 0x4e, 0x3e, 0x7c, - 0x2e, 0xa0, 0x2e, 0xc0, 0xd2, 0x0a, 0xa8, 0x9a, 0xf9, 0x1f, 0x57, 0xfd, 0x66, 0x18, 0x8f, 0x51, - 0xbc, 0xff, 0x11, 0x94, 0x56, 0xf6, 0x83, 0xb2, 0xa9, 0x19, 0x73, 0x18, 0xff, 0x3f, 0xca, 0x25, - 0x75, 0x3a, 0xd5, 0x87, 0xdf, 0xb5, 0xdc, 0xc3, 0xbc, 0x26, 0xfc, 0x98, 0xd7, 0x84, 0x5f, 0xf3, - 0x9a, 0xf0, 0xb1, 0xc0, 0x5e, 0xa1, 0x60, 0x32, 0x51, 0xd8, 0xf3, 0xf9, 0xf2, 0x4f, 0x00, 0x00, - 0x00, 0xff, 0xff, 0x33, 0x33, 0x9b, 0x3a, 0x76, 0x05, 0x00, 0x00, +func init() { proto.RegisterFile("rpc.proto", fileDescriptor_rpc_f4f04914f1106c76) } + +var fileDescriptor_rpc_f4f04914f1106c76 = []byte{ + // 729 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x54, 0xcf, 0x6e, 0xda, 0x4e, + 0x10, 0xc6, 0x36, 0x18, 0x18, 0x12, 0xe4, 0x6c, 0x48, 0x62, 0xfc, 0x93, 0x08, 0xe2, 0x84, 0xf2, + 0xab, 0x48, 0x4b, 0xa5, 0x4a, 0xed, 0x0d, 0x88, 0xa3, 0xa0, 0x26, 0xd0, 0x2e, 0x10, 0xfa, 0xe7, + 0x90, 0x9a, 0x64, 0xe3, 0x58, 0x02, 0x9b, 0x7a, 0x4d, 0x93, 0x5c, 0xfb, 0x28, 0x7d, 0x9a, 0x1c, + 0xfb, 0x04, 0x55, 0x9b, 0xa7, 0xe8, 0xb1, 0xda, 0xf5, 0x1a, 0x70, 0x9b, 0x70, 0xdb, 0xfd, 0xbe, + 0xf1, 0xcc, 0xb7, 0x33, 0x9f, 0x07, 0xb2, 0xfe, 0xf4, 0xbc, 0x36, 0xf5, 0xbd, 0xc0, 0x43, 0x6a, + 0x70, 0x65, 0xb9, 0x1e, 0x35, 0x72, 0xc1, 0xed, 0x94, 0xd0, 0x10, 0x34, 0x0a, 0xb6, 0x67, 0x7b, + 0xfc, 0xb8, 0xcf, 0x4e, 0x21, 0x5a, 0x59, 0x87, 0x5c, 0xdb, 0xbd, 0xf4, 0x30, 0xf9, 0x3c, 0x23, + 0x34, 0xa8, 0x7c, 0x93, 0x60, 0x2d, 0xbc, 0xd3, 0xa9, 0xe7, 0x52, 0x82, 0xfe, 0x07, 0x75, 0x6c, + 0x8d, 0xc8, 0x98, 0xea, 0x52, 0x59, 0xa9, 0xe6, 0xea, 0xeb, 0xb5, 0x30, 0x77, 0xed, 0x98, 0xa1, + 0xcd, 0xe4, 0xdd, 0x8f, 0xdd, 0x04, 0x16, 0x21, 0xa8, 0x08, 0x99, 0x89, 0xe3, 0x9e, 0x05, 0xce, + 0x84, 0xe8, 0x72, 0x59, 0xaa, 0x2a, 0x38, 0x3d, 0x71, 0xdc, 0xbe, 0x33, 0x21, 0x9c, 0xb2, 0x6e, + 0x42, 0x4a, 0x11, 0x94, 0x75, 0xc3, 0xa9, 0x7d, 0xc8, 0xd2, 0xc0, 0xf3, 0x49, 0xff, 0x76, 0x4a, + 0xf4, 0x64, 0x59, 0xaa, 0xe6, 0xeb, 0x1b, 0x51, 0x95, 0x5e, 0x44, 0xe0, 0x45, 0x4c, 0xe5, 0xb7, + 0x0c, 0xeb, 0x3d, 0xe2, 0x3b, 0x84, 0x0a, 0xd9, 0xb1, 0xc2, 0xd2, 0xe3, 0x85, 0xe5, 0x78, 0xe1, + 0x17, 0x8c, 0x0a, 0xce, 0xaf, 0x88, 0x4f, 0x75, 0x85, 0xbf, 0xae, 0x10, 0x7b, 0xdd, 0x49, 0x48, + 0x8a, 0x47, 0xce, 0x63, 0x51, 0x1d, 0xb6, 0x58, 0x4a, 0x9f, 0x50, 0x6f, 0x3c, 0x0b, 0x1c, 0xcf, + 0x3d, 0xbb, 0x76, 0xdc, 0x0b, 0xef, 0x9a, 0x8b, 0x57, 0xf0, 0xe6, 0xc4, 0xba, 0xc1, 0x73, 0x6e, + 0xc8, 0x29, 0xf4, 0x04, 0xc0, 0xb2, 0x6d, 0x9f, 0xd8, 0x56, 0x40, 0xa8, 0x9e, 0x2a, 0x2b, 0xd5, + 0x7c, 0x7d, 0x2d, 0xaa, 0xd6, 0xb0, 0x6d, 0x1f, 0x2f, 0xf1, 0xe8, 0x15, 0x14, 0xa7, 0x96, 0x1f, + 0x38, 0xd6, 0x98, 0x55, 0xe1, 0x93, 0x38, 0xbb, 0x70, 0xa8, 0x35, 0x1a, 0x93, 0x0b, 0x5d, 0x2d, + 0x4b, 0xd5, 0x0c, 0xde, 0x11, 0x01, 0xd1, 0xa4, 0x0e, 0x04, 0x8d, 0x3e, 0x3e, 0xf0, 0x2d, 0x0d, + 0x7c, 0x2b, 0x20, 0xf6, 0xad, 0x9e, 0xe6, 0xed, 0xdd, 0x8d, 0x0a, 0xbf, 0x89, 0xe7, 0xe8, 0x89, + 0xb0, 0x7f, 0x92, 0x47, 0x44, 0xe5, 0x13, 0xe4, 0xa3, 0xce, 0x0b, 0x83, 0x54, 0x41, 0xa5, 0x1c, + 0xe1, 0x8d, 0xcf, 0xd5, 0xf3, 0xf3, 0xd1, 0x71, 0xf4, 0x28, 0x81, 0x05, 0x8f, 0x0c, 0x48, 0x5f, + 0x5b, 0xbe, 0xeb, 0xb8, 0x36, 0x1f, 0x44, 0xf6, 0x28, 0x81, 0x23, 0xa0, 0x99, 0x01, 0xd5, 0x27, + 0x74, 0x36, 0x0e, 0x2a, 0x5d, 0xd8, 0xe0, 0xcd, 0xef, 0x58, 0x93, 0xc5, 0x7c, 0x57, 0xf6, 0x43, + 0x5a, 0xd9, 0x8f, 0xca, 0x21, 0xa0, 0xe5, 0x84, 0x42, 0x76, 0x01, 0x52, 0x2e, 0x03, 0xb8, 0xad, + 0xb3, 0x38, 0xbc, 0x20, 0x03, 0x32, 0x42, 0x11, 0xd5, 0x65, 0x4e, 0xcc, 0xef, 0x95, 0x4b, 0x91, + 0xe7, 0xd4, 0x1a, 0xcf, 0x16, 0xca, 0x0a, 0x90, 0xe2, 0xe6, 0xe7, 0x2a, 0xb2, 0x38, 0xbc, 0xac, + 0xd6, 0x2b, 0xaf, 0xd6, 0xdb, 0x86, 0xcd, 0x58, 0x1d, 0x21, 0x78, 0x1b, 0xd4, 0x2f, 0x1c, 0x11, + 0x8a, 0xc5, 0x6d, 0x95, 0xe4, 0x3d, 0x0c, 0xd9, 0xf9, 0x0f, 0x84, 0x72, 0x90, 0x1e, 0x74, 0x5e, + 0x77, 0xba, 0xc3, 0x8e, 0x96, 0x40, 0x59, 0x48, 0xbd, 0x1d, 0x98, 0xf8, 0xbd, 0x26, 0xa1, 0x0c, + 0x24, 0xf1, 0xe0, 0xd8, 0xd4, 0x64, 0x16, 0xd1, 0x6b, 0x1f, 0x98, 0xad, 0x06, 0xd6, 0x14, 0x16, + 0xd1, 0xeb, 0x77, 0xb1, 0xa9, 0x25, 0x19, 0x8e, 0xcd, 0x96, 0xd9, 0x3e, 0x35, 0xb5, 0xd4, 0x5e, + 0x0d, 0x76, 0x1e, 0x71, 0x0d, 0xcb, 0x34, 0x6c, 0x60, 0x91, 0xbe, 0xd1, 0xec, 0xe2, 0xbe, 0x26, + 0xed, 0x35, 0x21, 0xc9, 0xec, 0x8d, 0xd2, 0xa0, 0xe0, 0xc6, 0x30, 0xe4, 0x5a, 0xdd, 0x41, 0xa7, + 0xaf, 0x49, 0x0c, 0xeb, 0x0d, 0x4e, 0x34, 0x99, 0x1d, 0x4e, 0xda, 0x1d, 0x4d, 0xe1, 0x87, 0xc6, + 0xbb, 0xb0, 0x26, 0x8f, 0x32, 0xb1, 0x96, 0xaa, 0x7f, 0x95, 0x21, 0xc5, 0x1f, 0x82, 0x9e, 0x41, + 0x92, 0xad, 0x27, 0xb4, 0x19, 0xb9, 0x6c, 0x69, 0x79, 0x19, 0x85, 0x38, 0x28, 0x1a, 0xf7, 0x12, + 0xd4, 0xd0, 0x8a, 0x68, 0x2b, 0x6e, 0xcd, 0xe8, 0xb3, 0xed, 0xbf, 0xe1, 0xf0, 0xc3, 0xa7, 0x12, + 0x6a, 0x01, 0x2c, 0xac, 0x83, 0x8a, 0xb1, 0xe5, 0xb0, 0xec, 0x4f, 0xc3, 0x78, 0x88, 0x12, 0xf5, + 0x0f, 0x21, 0xb7, 0x34, 0x4f, 0x14, 0x0f, 0x8d, 0x99, 0xc9, 0xf8, 0xef, 0x41, 0x2e, 0xcc, 0xd3, + 0x2c, 0xde, 0xfd, 0x2a, 0x25, 0xee, 0xee, 0x4b, 0xd2, 0xf7, 0xfb, 0x92, 0xf4, 0xf3, 0xbe, 0x24, + 0x7d, 0x48, 0xf3, 0x95, 0x38, 0x1d, 0x8d, 0x54, 0xbe, 0xcb, 0x9f, 0xff, 0x09, 0x00, 0x00, 0xff, + 0xff, 0x92, 0x5a, 0x97, 0xd8, 0x03, 0x06, 0x00, 0x00, } diff --git a/pkg/store/storepb/rpc.proto b/pkg/store/storepb/rpc.proto index 2c264f1c15f..95a9d59e4d4 100644 --- a/pkg/store/storepb/rpc.proto +++ b/pkg/store/storepb/rpc.proto @@ -12,10 +12,6 @@ option (gogoproto.unmarshaler_all) = true; option (gogoproto.goproto_getters_all) = false; /// Store reprents API against instance that stores XOR encoded values with label set metadata (e.g Prometheus metrics). -/// -/// Partial Response is supported unless `partial_response_disabled` is true. When disabled any error that will result -/// in partial data returned (e.g missing chunk series because of underlying storeAPI is temporarily not available) is -/// failing the request. service Store { /// Info returns meta information about a store e.g labels that makes that store unique as well as time range that is /// available. @@ -51,6 +47,20 @@ message InfoResponse { StoreType storeType = 4; } +/// PartialResponseStrategy controls partial response handling. +enum PartialResponseStrategy { + /// WARN strategy tells server to treat any error that will related to single StoreAPI (e.g missing chunk series because of underlying + /// storeAPI is temporarily not available) as warning which will not fail the whole query (still OK response). + /// Server should produce those as a warnings field in response. + WARN = 0; + /// ABORT strategy tells server to treat any error that will related to single StoreAPI (e.g missing chunk series because of underlying + /// storeAPI is temporarily not available) as the gRPC error that aborts the query. + /// + /// This is especially useful for any rule/alert evaluations on top of StoreAPI which usually does not tolerate partial + /// errors. + ABORT = 1; +} + message SeriesRequest { int64 min_time = 1; int64 max_time = 2; @@ -59,7 +69,11 @@ message SeriesRequest { int64 max_resolution_window = 4; repeated Aggr aggregates = 5; + // Deprecated. Use partial_response_strategy instead. bool partial_response_disabled = 6; + + // TODO(bwplotka): Move Thanos components to use strategy instead. Inlcuding QueryAPI. + PartialResponseStrategy partial_response_strategy = 7; } enum Aggr { @@ -83,6 +97,9 @@ message SeriesResponse { message LabelNamesRequest { bool partial_response_disabled = 1; + + // TODO(bwplotka): Move Thanos components to use strategy instead. Inlcuding QueryAPI. + PartialResponseStrategy partial_response_strategy = 2; } message LabelNamesResponse { @@ -94,6 +111,9 @@ message LabelValuesRequest { string label = 1; bool partial_response_disabled = 2; + + // TODO(bwplotka): Move Thanos components to use strategy instead. Inlcuding QueryAPI. + PartialResponseStrategy partial_response_strategy = 3; } message LabelValuesResponse { diff --git a/pkg/store/storepb/types.pb.go b/pkg/store/storepb/types.pb.go index 0f6767b97be..9344fbc9d0d 100644 --- a/pkg/store/storepb/types.pb.go +++ b/pkg/store/storepb/types.pb.go @@ -153,8 +153,8 @@ func (m *Chunk) XXX_DiscardUnknown() { var xxx_messageInfo_Chunk proto.InternalMessageInfo type Series struct { - Labels []Label `protobuf:"bytes,1,rep,name=labels" json:"labels"` - Chunks []AggrChunk `protobuf:"bytes,2,rep,name=chunks" json:"chunks"` + Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"` + Chunks []AggrChunk `protobuf:"bytes,2,rep,name=chunks,proto3" json:"chunks"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -196,12 +196,12 @@ var xxx_messageInfo_Series proto.InternalMessageInfo type AggrChunk struct { MinTime int64 `protobuf:"varint,1,opt,name=min_time,json=minTime,proto3" json:"min_time,omitempty"` MaxTime int64 `protobuf:"varint,2,opt,name=max_time,json=maxTime,proto3" json:"max_time,omitempty"` - Raw *Chunk `protobuf:"bytes,3,opt,name=raw" json:"raw,omitempty"` - Count *Chunk `protobuf:"bytes,4,opt,name=count" json:"count,omitempty"` - Sum *Chunk `protobuf:"bytes,5,opt,name=sum" json:"sum,omitempty"` - Min *Chunk `protobuf:"bytes,6,opt,name=min" json:"min,omitempty"` - Max *Chunk `protobuf:"bytes,7,opt,name=max" json:"max,omitempty"` - Counter *Chunk `protobuf:"bytes,8,opt,name=counter" json:"counter,omitempty"` + Raw *Chunk `protobuf:"bytes,3,opt,name=raw,proto3" json:"raw,omitempty"` + Count *Chunk `protobuf:"bytes,4,opt,name=count,proto3" json:"count,omitempty"` + Sum *Chunk `protobuf:"bytes,5,opt,name=sum,proto3" json:"sum,omitempty"` + Min *Chunk `protobuf:"bytes,6,opt,name=min,proto3" json:"min,omitempty"` + Max *Chunk `protobuf:"bytes,7,opt,name=max,proto3" json:"max,omitempty"` + Counter *Chunk `protobuf:"bytes,8,opt,name=counter,proto3" json:"counter,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -541,6 +541,9 @@ func encodeVarintTypes(dAtA []byte, offset int, v uint64) int { return offset + 1 } func (m *Label) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l l = len(m.Name) @@ -558,6 +561,9 @@ func (m *Label) Size() (n int) { } func (m *Chunk) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Type != 0 { @@ -574,6 +580,9 @@ func (m *Chunk) Size() (n int) { } func (m *Series) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if len(m.Labels) > 0 { @@ -595,6 +604,9 @@ func (m *Series) Size() (n int) { } func (m *AggrChunk) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.MinTime != 0 { @@ -634,6 +646,9 @@ func (m *AggrChunk) Size() (n int) { } func (m *LabelMatcher) Size() (n int) { + if m == nil { + return 0 + } var l int _ = l if m.Type != 0 { diff --git a/pkg/ui/rule.go b/pkg/ui/rule.go index ff4b9412ca7..af489f4eba8 100644 --- a/pkg/ui/rule.go +++ b/pkg/ui/rule.go @@ -9,6 +9,10 @@ import ( "regexp" "sort" + thanosrule "github.com/improbable-eng/thanos/pkg/rule" + + "github.com/improbable-eng/thanos/pkg/store/storepb" + "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/route" @@ -20,16 +24,16 @@ type Rule struct { flagsMap map[string]string - ruleManager *rules.Manager - queryURL string + ruleManagers thanosrule.Managers + queryURL string } -func NewRuleUI(logger log.Logger, ruleManager *rules.Manager, queryURL string, flagsMap map[string]string) *Rule { +func NewRuleUI(logger log.Logger, ruleManagers map[storepb.PartialResponseStrategy]*rules.Manager, queryURL string, flagsMap map[string]string) *Rule { return &Rule{ - BaseUI: NewBaseUI(logger, "rule_menu.html", ruleTmplFuncs(queryURL)), - flagsMap: flagsMap, - ruleManager: ruleManager, - queryURL: queryURL, + BaseUI: NewBaseUI(logger, "rule_menu.html", ruleTmplFuncs(queryURL)), + flagsMap: flagsMap, + ruleManagers: ruleManagers, + queryURL: queryURL, } } @@ -96,7 +100,7 @@ func ruleTmplFuncs(queryURL string) template.FuncMap { } func (ru *Rule) alerts(w http.ResponseWriter, r *http.Request) { - alerts := ru.ruleManager.AlertingRules() + alerts := ru.ruleManagers.AlertingRules() alertsSorter := byAlertStateAndNameSorter{alerts: alerts} sort.Sort(alertsSorter) @@ -111,13 +115,15 @@ func (ru *Rule) alerts(w http.ResponseWriter, r *http.Request) { prefix := GetWebPrefix(ru.logger, ru.flagsMap, r) + // TODO(bwplotka): Update HTML to include partial response. ru.executeTemplate(w, "alerts.html", prefix, alertStatus) } func (ru *Rule) rules(w http.ResponseWriter, r *http.Request) { prefix := GetWebPrefix(ru.logger, ru.flagsMap, r) - ru.executeTemplate(w, "rules.html", prefix, ru.ruleManager) + // TODO(bwplotka): Update HTML to include partial response. + ru.executeTemplate(w, "rules.html", prefix, ru.ruleManagers) } // root redirects / requests to /graph, taking into account the path prefix value @@ -139,12 +145,12 @@ func (ru *Rule) Register(r *route.Router) { // AlertStatus bundles alerting rules and the mapping of alert states to row classes. type AlertStatus struct { - AlertingRules []*rules.AlertingRule + AlertingRules []thanosrule.AlertingRule AlertStateToRowClass map[rules.AlertState]string } type byAlertStateAndNameSorter struct { - alerts []*rules.AlertingRule + alerts []thanosrule.AlertingRule } func (s byAlertStateAndNameSorter) Len() int { diff --git a/scripts/genproto.sh b/scripts/genproto.sh index fa2933831f5..5749e699eab 100755 --- a/scripts/genproto.sh +++ b/scripts/genproto.sh @@ -18,7 +18,7 @@ if ! [[ $(${PROTOC_BIN} --version) =~ "3.4.0" ]]; then exit 255 fi -THANOS_ROOT="${GOPATH}/src/github.com/improbable-eng/thanos" +THANOS_ROOT=$(pwd) PROM_PATH="${THANOS_ROOT}/pkg/store/storepb" GOGOPROTO_ROOT="${THANOS_ROOT}/vendor/github.com/gogo/protobuf" GOGOPROTO_PATH="${GOGOPROTO_ROOT}:${GOGOPROTO_ROOT}/protobuf" diff --git a/test/e2e/rule_test.go b/test/e2e/rule_test.go index 4af72bd6488..df7ef156090 100644 --- a/test/e2e/rule_test.go +++ b/test/e2e/rule_test.go @@ -16,7 +16,8 @@ import ( "github.com/prometheus/prometheus/pkg/timestamp" ) -const alwaysFireRule = ` +const ( + alwaysFireRule = ` groups: - name: example rules: @@ -27,18 +28,34 @@ groups: annotations: summary: "I always complain" ` + alwaysFireStrictRule = ` +partial_response_disabled: true +groups: +- name: example + rules: + - alert: AlwaysStrictFiring + expr: vector(1) + labels: + severity: page + annotations: + summary: "I always complain, but I don't allow partial response in query.'" +` + ) var ( + alertsToTest = []string{alwaysFireRule, alwaysFireStrictRule} + + // TODO(bwplotka): Test partial response more by injecting erroring stores at some point. ruleStaticFlagsSuite = newSpinupSuite(). Add(querierWithStoreFlags(1, "", rulerGRPC(1), rulerGRPC(2))). - Add(rulerWithQueryFlags(1, alwaysFireRule, queryHTTP(1))). - Add(rulerWithQueryFlags(2, alwaysFireRule, queryHTTP(1))). + Add(rulerWithQueryFlags(1, alertsToTest, queryHTTP(1))). + Add(rulerWithQueryFlags(2, alertsToTest, queryHTTP(1))). Add(alertManager(1)) ruleFileSDSuite = newSpinupSuite(). Add(querierWithFileSD(1, "", rulerGRPC(1), rulerGRPC(2))). - Add(rulerWithFileSD(1, alwaysFireRule, queryHTTP(1))). - Add(rulerWithFileSD(2, alwaysFireRule, queryHTTP(1))). + Add(rulerWithFileSD(1, alertsToTest, queryHTTP(1))). + Add(rulerWithFileSD(2, alertsToTest, queryHTTP(1))). Add(alertManager(1)) ) @@ -93,6 +110,20 @@ func testRuleComponent(t *testing.T, conf testConfig) { "alertstate": "firing", "replica": "2", }, + { + "__name__": "ALERTS", + "severity": "page", + "alertname": "AlwaysStrictFiring", + "alertstate": "firing", + "replica": "1", + }, + { + "__name__": "ALERTS", + "severity": "page", + "alertname": "AlwaysStrictFiring", + "alertstate": "firing", + "replica": "2", + }, } expAlertLabels := []model.LabelSet{ { @@ -105,6 +136,16 @@ func testRuleComponent(t *testing.T, conf testConfig) { "alertname": "AlwaysFiring", "replica": "2", }, + { + "severity": "page", + "alertname": "AlwaysStrictFiring", + "replica": "1", + }, + { + "severity": "page", + "alertname": "AlwaysStrictFiring", + "replica": "2", + }, } testutil.Ok(t, runutil.Retry(5*time.Second, ctx.Done(), func() error { @@ -136,12 +177,13 @@ func testRuleComponent(t *testing.T, conf testConfig) { return errors.Errorf("unexpected value %f", r.Value) } } + // A notification must be sent to Alertmanager. alrts, err := queryAlertmanagerAlerts(ctx, "http://localhost:29093") if err != nil { return err } - if len(alrts) != 2 { + if len(alrts) != len(expAlertLabels) { return errors.Errorf("unexpected alerts length %d", len(alrts)) } for i, a := range alrts { diff --git a/test/e2e/spinup_test.go b/test/e2e/spinup_test.go index 592c1e7e693..6cf4929ce1e 100644 --- a/test/e2e/spinup_test.go +++ b/test/e2e/spinup_test.go @@ -213,16 +213,18 @@ receivers: } } -func ruler(i int, rules string) cmdScheduleFunc { +func ruler(i int, rules []string) cmdScheduleFunc { return func(workDir string) ([]*exec.Cmd, error) { dbDir := fmt.Sprintf("%s/data/rule%d", workDir, i) if err := os.MkdirAll(dbDir, 0777); err != nil { - return nil, errors.Wrap(err, "creating ruler dir failed") + return nil, errors.Wrap(err, "creating ruler dir") } - err := ioutil.WriteFile(dbDir+"/rules.yaml", []byte(rules), 0666) - if err != nil { - return nil, errors.Wrap(err, "creating ruler file failed") + + for i, rule := range rules { + if err := ioutil.WriteFile(path.Join(dbDir, fmt.Sprintf("/rules-%d.yaml", i)), []byte(rule), 0666); err != nil { + return nil, errors.Wrapf(err, "writing rule %s", path.Join(dbDir, fmt.Sprintf("/rules-%d.yaml", i))) + } } args := append(defaultRulerFlags(i, dbDir), @@ -232,16 +234,17 @@ func ruler(i int, rules string) cmdScheduleFunc { } } -func rulerWithQueryFlags(i int, rules string, queryAddresses ...string) cmdScheduleFunc { +func rulerWithQueryFlags(i int, rules []string, queryAddresses ...string) cmdScheduleFunc { return func(workDir string) ([]*exec.Cmd, error) { dbDir := fmt.Sprintf("%s/data/rule%d", workDir, i) if err := os.MkdirAll(dbDir, 0777); err != nil { - return nil, errors.Wrap(err, "creating ruler dir failed") + return nil, errors.Wrap(err, "creating ruler dir") } - err := ioutil.WriteFile(dbDir+"/rules.yaml", []byte(rules), 0666) - if err != nil { - return nil, errors.Wrap(err, "creating ruler file failed") + for i, rule := range rules { + if err := ioutil.WriteFile(path.Join(dbDir, fmt.Sprintf("/rules-%d.yaml", i)), []byte(rule), 0666); err != nil { + return nil, errors.Wrapf(err, "writing rule %s", path.Join(dbDir, fmt.Sprintf("/rules-%d.yaml", i))) + } } args := defaultRulerFlags(i, dbDir) @@ -253,25 +256,26 @@ func rulerWithQueryFlags(i int, rules string, queryAddresses ...string) cmdSched } } -func rulerWithFileSD(i int, rules string, queryAddresses ...string) cmdScheduleFunc { +func rulerWithFileSD(i int, rules []string, queryAddresses ...string) cmdScheduleFunc { return func(workDir string) ([]*exec.Cmd, error) { dbDir := fmt.Sprintf("%s/data/rule%d", workDir, i) if err := os.MkdirAll(dbDir, 0777); err != nil { - return nil, errors.Wrap(err, "creating ruler dir failed") + return nil, errors.Wrap(err, "creating ruler dir") } - err := ioutil.WriteFile(dbDir+"/rules.yaml", []byte(rules), 0666) - if err != nil { - return nil, errors.Wrap(err, "creating ruler file failed") + for i, rule := range rules { + if err := ioutil.WriteFile(path.Join(dbDir, fmt.Sprintf("/rules-%d.yaml", i)), []byte(rule), 0666); err != nil { + return nil, errors.Wrapf(err, "writing rule %s", path.Join(dbDir, fmt.Sprintf("/rules-%d.yaml", i))) + } } ruleFileSDDir := fmt.Sprintf("%s/data/ruleFileSd%d", workDir, i) if err := os.MkdirAll(ruleFileSDDir, 0777); err != nil { - return nil, errors.Wrap(err, "create ruler filesd dir failed") + return nil, errors.Wrap(err, "create ruler filesd dir") } if err := ioutil.WriteFile(ruleFileSDDir+"/filesd.json", []byte(generateFileSD(queryAddresses)), 0666); err != nil { - return nil, errors.Wrap(err, "creating ruler filesd config failed") + return nil, errors.Wrap(err, "creating ruler filesd config") } args := append(defaultRulerFlags(i, dbDir),