Skip to content

Commit

Permalink
ruler: Added support for strict rule groups that does not allow parti…
Browse files Browse the repository at this point in the history
…al_response (#970)

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka authored Apr 2, 2019
1 parent 9997bc8 commit 84cad51
Show file tree
Hide file tree
Showing 22 changed files with 1,596 additions and 389 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ New tracing span:

:warning: **WARNING** :warning: #798 adds a new default limit to Thanos Store: `--store.grpc.series-max-concurrency`. Most likely you will want to make it the same as `--query.max-concurrent` on Thanos Query.

- [#970](https://github.com/improbable-eng/thanos/pull/970) Added `PartialResponseStrategy` field for `RuleGroups` for `Ruler`.

### Changed
- [#970](https://github.com/improbable-eng/thanos/pull/970) Deprecated partial_response_disabled proto field. Added partial_response_strategy instead. Both in gRPC and Query API.
- [#970](https://github.com/improbable-eng/thanos/pull/970) No `PartialResponseStrategy` field for `RuleGroups` by default means `abort` strategy (old PartialResponse disabled) as this is recommended option for Rules and alerts.

### Fixed
- [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far
- [#966](https://github.com/improbable-eng/thanos/pull/966) Bucket: verify no longer warns about overlapping blocks, that overlap `0s`
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func runQuery(
fileSDCache := cache.New()
dnsProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_querier_store_apis", reg),
extprom.WrapRegistererWithPrefix("thanos_querier_store_apis_", reg),
)

var (
Expand Down
185 changes: 122 additions & 63 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/promclient"
thanosrule "github.com/improbable-eng/thanos/pkg/rule"
v1 "github.com/improbable-eng/thanos/pkg/rule/api"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
Expand Down Expand Up @@ -227,13 +228,23 @@ func runRule(
Name: "thanos_rule_loaded_rules",
Help: "Loaded rules partitioned by file and group",
},
[]string{"file", "group"},
[]string{"part_resp_strategy", "file", "group"},
)
ruleEvalWarnings := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_rule_evaluation_with_warnings_total",
Help: "The total number of rule evaluation that were successful but had warnings which can indicate partial error.",
}, []string{"strategy"},
)
ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_ABORT.String()))
ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_WARN.String()))

reg.MustRegister(configSuccess)
reg.MustRegister(configSuccessTime)
reg.MustRegister(duplicatedQuery)
reg.MustRegister(alertMngrAddrResolutionErrors)
reg.MustRegister(rulesLoaded)
reg.MustRegister(ruleEvalWarnings)

for _, addr := range queryAddrs {
if addr == "" {
Expand All @@ -260,57 +271,16 @@ func runRule(

dnsProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_ruler_query_apis", reg),
extprom.WrapRegistererWithPrefix("thanos_ruler_query_apis_", reg),
)

// Hit the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
queryFn := func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
var addrs []string

// Add addresses from gossip.
peers := peer.PeerStates(cluster.PeerTypeQuery)
var ids []string
for id := range peers {
ids = append(ids, id)
}
sort.Slice(ids, func(i int, j int) bool {
return strings.Compare(ids[i], ids[j]) < 0
})
for _, id := range ids {
addrs = append(addrs, peers[id].QueryAPIAddr)
}

// Add DNS resolved addresses from static flags and file SD.
// TODO(bwplotka): Consider generating addresses in *url.URL
addrs = append(addrs, dnsProvider.Addresses()...)

removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs)

for _, i := range rand.Perm(len(addrs)) {
u, err := url.Parse(fmt.Sprintf("http://%s", addrs[i]))
if err != nil {
return nil, errors.Wrapf(err, "url parse %s", addrs[i])
}

span, ctx := tracing.StartSpan(ctx, "/rule_instant_query HTTP[client]")
v, err := promclient.PromqlQueryInstant(ctx, logger, u, q, t, true)
span.Finish()
return v, err
}
return nil, errors.Errorf("no query peer reachable")
}

// Run rule evaluation and alert notifications.
var (
alertmgrs = newAlertmanagerSet(alertmgrURLs)
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
mgr *rules.Manager
ruleMgrs = thanosrule.Managers{}
)
{
ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)

notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*alert.Alert, 0, len(alerts))
for _, alrt := range alerts {
Expand All @@ -331,26 +301,38 @@ func runRule(
}
alertQ.Push(res)
}

st := tsdb.Adapter(db, 0)
mgr = rules.NewManager(&rules.ManagerOptions{
Context: ctx,
QueryFunc: queryFn,

opts := rules.ManagerOptions{
NotifyFunc: notify,
Logger: log.With(logger, "component", "rules"),
Appendable: st,
Registerer: reg,
ExternalURL: nil,
TSDB: st,
})
g.Add(func() error {
mgr.Run()
<-ctx.Done()
mgr.Stop()
return nil
}, func(error) {
cancel()
})
}

for _, strategy := range storepb.PartialResponseStrategy_value {
s := storepb.PartialResponseStrategy(strategy)

ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)

opts := opts
opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg)
opts.Context = ctx
opts.QueryFunc = queryFunc(logger, peer, dnsProvider, duplicatedQuery, ruleEvalWarnings, s)

ruleMgrs[s] = rules.NewManager(&opts)
g.Add(func() error {
ruleMgrs[s].Run()
<-ctx.Done()

return nil
}, func(error) {
cancel()
ruleMgrs[s].Stop()
})
}
}
{
var storeLset []storepb.Label
Expand Down Expand Up @@ -469,11 +451,13 @@ func runRule(
level.Error(logger).Log("msg", "retrieving rule files failed. Ignoring file.", "pattern", pat, "err", err)
continue
}

files = append(files, fs...)
}

level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files))
if err := mgr.Update(evalInterval, files); err != nil {

if err := ruleMgrs.Update(dataDir, evalInterval, files); err != nil {
configSuccess.Set(0)
level.Error(logger).Log("msg", "reloading rules failed", "err", err)
continue
Expand All @@ -483,9 +467,12 @@ func runRule(
configSuccessTime.Set(float64(time.Now().UnixNano()) / 1e9)

rulesLoaded.Reset()
for _, group := range mgr.RuleGroups() {
rulesLoaded.WithLabelValues(group.File(), group.Name()).Set(float64(len(group.Rules())))
for s, mgr := range ruleMgrs {
for _, group := range mgr.RuleGroups() {
rulesLoaded.WithLabelValues(s.String(), group.File(), group.Name()).Set(float64(len(group.Rules())))
}
}

}
}, func(error) {
close(cancel)
Expand Down Expand Up @@ -569,9 +556,9 @@ func runRule(
"web.prefix-header": webPrefixHeaderName,
}

ui.NewRuleUI(logger, mgr, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix))
ui.NewRuleUI(logger, ruleMgrs, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix))

api := v1.NewAPI(logger, mgr)
api := v1.NewAPI(logger, ruleMgrs)
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger)

mux := http.NewServeMux()
Expand Down Expand Up @@ -767,3 +754,75 @@ func removeDuplicateQueryAddrs(logger log.Logger, duplicatedQueriers prometheus.
}
return deduplicated
}

// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
func queryFunc(
logger log.Logger,
peer cluster.Peer,
dnsProvider *dns.Provider,
duplicatedQuery prometheus.Counter,
ruleEvalWarnings *prometheus.CounterVec,
partialResponseStrategy storepb.PartialResponseStrategy,
) rules.QueryFunc {
var spanID string

switch partialResponseStrategy {
case storepb.PartialResponseStrategy_WARN:
spanID = "/rule_instant_query HTTP[client]"
case storepb.PartialResponseStrategy_ABORT:
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"
default:
// Programming error will be caught by tests.
panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error())
}

return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
var addrs []string

// Add addresses from gossip.
peers := peer.PeerStates(cluster.PeerTypeQuery)
var ids []string
for id := range peers {
ids = append(ids, id)
}
sort.Slice(ids, func(i int, j int) bool {
return strings.Compare(ids[i], ids[j]) < 0
})
for _, id := range ids {
addrs = append(addrs, peers[id].QueryAPIAddr)
}

// Add DNS resolved addresses from static flags and file SD.
// TODO(bwplotka): Consider generating addresses in *url.URL
addrs = append(addrs, dnsProvider.Addresses()...)

removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs)

for _, i := range rand.Perm(len(addrs)) {
u, err := url.Parse(fmt.Sprintf("http://%s", addrs[i]))
if err != nil {
return nil, errors.Wrapf(err, "url parse %s", addrs[i])
}

span, ctx := tracing.StartSpan(ctx, spanID)
v, warns, err := promclient.PromqlQueryInstant(ctx, logger, u, q, t, promclient.QueryOptions{
Deduplicate: true,
PartialResponseStrategy: partialResponseStrategy,
})
span.Finish()

if err != nil {
level.Error(logger).Log("err", err, "query", q)
}

if err == nil && len(warns) > 0 {
ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc()
// TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ):
level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q)
}
return v, err
}
return nil, errors.Errorf("no query peer reachable")
}
}
34 changes: 31 additions & 3 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,33 @@ This logic can also be controlled via parameter on QueryAPI. More details below.

Overall QueryAPI exposed by Thanos is guaranteed to be compatible with Prometheus 2.x.

However, for additional Thanos features, Thanos, on top of Prometheus adds several
additional parameters listed below as well as custom response fields.
However, for additional Thanos features, Thanos, on top of Prometheus adds
* partial response behaviour
* several additional parameters listed below
* custom response fields.

### Partial Response

QueryAPI and StoreAPI has additional behaviour controlled via query parameter called [PartialResponseStrategy](../../pkg/store/storepb/rpc.pb.go).

This parameter controls tradeoff between accuracy and availability.

Partial response is a potentially missed result within query against QueryAPI or StoreAPI. This can happen if one
of StoreAPIs is returning error or timeout whereas couple of others returns success. It does not mean you are missing data,
you might lucky enough that you actually get the correct data as the broken StoreAPI did not have anything for your query.

If partial response happen QueryAPI returns human readable warnings explained [here](query.md#CustomResponseFields)

NOTE that having warning does not necessary means partial response (e.g no store matched query warning)

See [this](query.md#PartialResponseStrategy) on how to control this behaviour.

Querier also allows to configure different timeouts:
* `--query.timeout`
* `--store.response-timeout`

If you prefer availability over accuracy you can set tighter timeout to underlying StoreAPI than overall query timeout. If partial response
strategy is NOT `abort`, this will "ignore" slower StoreAPIs producing just warning with 200 status code response.

### Deduplication Enabled

Expand All @@ -77,7 +102,9 @@ Max source resolution is max resolution in seconds we want to use for data we qu
* 5m -> we will use max 5m downsampling.
* 1h -> we will use max 1h downsampling.

### Partial Response / Error Enabled
### Partial Response Strategy

// TODO(bwplotka): Update. This will change to "strategy" soon as [PartialResponseStrategy enum here](../../pkg/store/storepb/rpc.proto)

| HTTP URL/FORM parameter | Type | Default | Example |
|----|----|----|----|
Expand All @@ -92,6 +119,7 @@ return warning.
Any additional field does not break compatibility, however there is no guarantee that Grafana or any other client will understand those.

Currently Thanos UI exposed by Thanos understands

```go
type queryData struct {
ResultType promql.ValueType `json:"resultType"`
Expand Down
Loading

0 comments on commit 84cad51

Please sign in to comment.