Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ruler: Added support for strict rule groups that does not allow partial_response #970

Merged
merged 1 commit into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ New tracing span:

:warning: **WARNING** :warning: #798 adds a new default limit to Thanos Store: `--store.grpc.series-max-concurrency`. Most likely you will want to make it the same as `--query.max-concurrent` on Thanos Query.

- [#970](https://github.com/improbable-eng/thanos/pull/970) Added `PartialResponseStrategy` field for `RuleGroups` for `Ruler`.

### Changed
- [#970](https://github.com/improbable-eng/thanos/pull/970) Deprecated partial_response_disabled proto field. Added partial_response_strategy instead. Both in gRPC and Query API.
- [#970](https://github.com/improbable-eng/thanos/pull/970) No `PartialResponseStrategy` field for `RuleGroups` by default means `abort` strategy (old PartialResponse disabled) as this is recommended option for Rules and alerts.

### Fixed
- [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far
- [#966](https://github.com/improbable-eng/thanos/pull/966) Bucket: verify no longer warns about overlapping blocks, that overlap `0s`
Expand Down
2 changes: 1 addition & 1 deletion cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func runQuery(
fileSDCache := cache.New()
dnsProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_querier_store_apis", reg),
extprom.WrapRegistererWithPrefix("thanos_querier_store_apis_", reg),
)

var (
Expand Down
185 changes: 122 additions & 63 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/improbable-eng/thanos/pkg/extprom"
"github.com/improbable-eng/thanos/pkg/objstore/client"
"github.com/improbable-eng/thanos/pkg/promclient"
thanosrule "github.com/improbable-eng/thanos/pkg/rule"
v1 "github.com/improbable-eng/thanos/pkg/rule/api"
"github.com/improbable-eng/thanos/pkg/runutil"
"github.com/improbable-eng/thanos/pkg/shipper"
Expand Down Expand Up @@ -227,13 +228,23 @@ func runRule(
Name: "thanos_rule_loaded_rules",
Help: "Loaded rules partitioned by file and group",
},
[]string{"file", "group"},
[]string{"part_resp_strategy", "file", "group"},
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not consistent ):

)
ruleEvalWarnings := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_rule_evaluation_with_warnings_total",
Help: "The total number of rule evaluation that were successful but had warnings which can indicate partial error.",
}, []string{"strategy"},
)
ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_ABORT.String()))
ruleEvalWarnings.WithLabelValues(strings.ToLower(storepb.PartialResponseStrategy_WARN.String()))

reg.MustRegister(configSuccess)
reg.MustRegister(configSuccessTime)
reg.MustRegister(duplicatedQuery)
reg.MustRegister(alertMngrAddrResolutionErrors)
reg.MustRegister(rulesLoaded)
reg.MustRegister(ruleEvalWarnings)

for _, addr := range queryAddrs {
if addr == "" {
Expand All @@ -260,57 +271,16 @@ func runRule(

dnsProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_ruler_query_apis", reg),
extprom.WrapRegistererWithPrefix("thanos_ruler_query_apis_", reg),
)

// Hit the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
queryFn := func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
var addrs []string

// Add addresses from gossip.
peers := peer.PeerStates(cluster.PeerTypeQuery)
var ids []string
for id := range peers {
ids = append(ids, id)
}
sort.Slice(ids, func(i int, j int) bool {
return strings.Compare(ids[i], ids[j]) < 0
})
for _, id := range ids {
addrs = append(addrs, peers[id].QueryAPIAddr)
}

// Add DNS resolved addresses from static flags and file SD.
// TODO(bwplotka): Consider generating addresses in *url.URL
addrs = append(addrs, dnsProvider.Addresses()...)

removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs)

for _, i := range rand.Perm(len(addrs)) {
u, err := url.Parse(fmt.Sprintf("http://%s", addrs[i]))
if err != nil {
return nil, errors.Wrapf(err, "url parse %s", addrs[i])
}

span, ctx := tracing.StartSpan(ctx, "/rule_instant_query HTTP[client]")
v, err := promclient.PromqlQueryInstant(ctx, logger, u, q, t, true)
span.Finish()
return v, err
}
return nil, errors.Errorf("no query peer reachable")
}

// Run rule evaluation and alert notifications.
var (
alertmgrs = newAlertmanagerSet(alertmgrURLs)
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
mgr *rules.Manager
ruleMgrs = thanosrule.Managers{}
)
{
ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)

notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*alert.Alert, 0, len(alerts))
for _, alrt := range alerts {
Expand All @@ -331,26 +301,38 @@ func runRule(
}
alertQ.Push(res)
}

st := tsdb.Adapter(db, 0)
mgr = rules.NewManager(&rules.ManagerOptions{
Context: ctx,
QueryFunc: queryFn,

opts := rules.ManagerOptions{
NotifyFunc: notify,
Logger: log.With(logger, "component", "rules"),
Appendable: st,
Registerer: reg,
ExternalURL: nil,
TSDB: st,
})
g.Add(func() error {
mgr.Run()
<-ctx.Done()
mgr.Stop()
return nil
}, func(error) {
cancel()
})
}

for _, strategy := range storepb.PartialResponseStrategy_value {
s := storepb.PartialResponseStrategy(strategy)

ctx, cancel := context.WithCancel(context.Background())
povilasv marked this conversation as resolved.
Show resolved Hide resolved
ctx = tracing.ContextWithTracer(ctx, tracer)

opts := opts
opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg)
opts.Context = ctx
opts.QueryFunc = queryFunc(logger, peer, dnsProvider, duplicatedQuery, ruleEvalWarnings, s)

ruleMgrs[s] = rules.NewManager(&opts)
g.Add(func() error {
ruleMgrs[s].Run()
<-ctx.Done()
povilasv marked this conversation as resolved.
Show resolved Hide resolved

return nil
}, func(error) {
cancel()
povilasv marked this conversation as resolved.
Show resolved Hide resolved
ruleMgrs[s].Stop()
})
}
}
{
var storeLset []storepb.Label
Expand Down Expand Up @@ -469,11 +451,13 @@ func runRule(
level.Error(logger).Log("msg", "retrieving rule files failed. Ignoring file.", "pattern", pat, "err", err)
continue
}

files = append(files, fs...)
}

level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files))
if err := mgr.Update(evalInterval, files); err != nil {

if err := ruleMgrs.Update(dataDir, evalInterval, files); err != nil {
configSuccess.Set(0)
level.Error(logger).Log("msg", "reloading rules failed", "err", err)
continue
Expand All @@ -483,9 +467,12 @@ func runRule(
configSuccessTime.Set(float64(time.Now().UnixNano()) / 1e9)

rulesLoaded.Reset()
for _, group := range mgr.RuleGroups() {
rulesLoaded.WithLabelValues(group.File(), group.Name()).Set(float64(len(group.Rules())))
for s, mgr := range ruleMgrs {
for _, group := range mgr.RuleGroups() {
rulesLoaded.WithLabelValues(s.String(), group.File(), group.Name()).Set(float64(len(group.Rules())))
}
}

}
}, func(error) {
close(cancel)
Expand Down Expand Up @@ -569,9 +556,9 @@ func runRule(
"web.prefix-header": webPrefixHeaderName,
}

ui.NewRuleUI(logger, mgr, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix))
ui.NewRuleUI(logger, ruleMgrs, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix))

api := v1.NewAPI(logger, mgr)
api := v1.NewAPI(logger, ruleMgrs)
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger)

mux := http.NewServeMux()
Expand Down Expand Up @@ -767,3 +754,75 @@ func removeDuplicateQueryAddrs(logger log.Logger, duplicatedQueriers prometheus.
}
return deduplicated
}

// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
func queryFunc(
logger log.Logger,
peer cluster.Peer,
dnsProvider *dns.Provider,
duplicatedQuery prometheus.Counter,
ruleEvalWarnings *prometheus.CounterVec,
partialResponseStrategy storepb.PartialResponseStrategy,
) rules.QueryFunc {
var spanID string

switch partialResponseStrategy {
case storepb.PartialResponseStrategy_WARN:
spanID = "/rule_instant_query HTTP[client]"
case storepb.PartialResponseStrategy_ABORT:
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"
default:
// Programming error will be caught by tests.
panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error())
}

return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
var addrs []string

// Add addresses from gossip.
peers := peer.PeerStates(cluster.PeerTypeQuery)
var ids []string
for id := range peers {
ids = append(ids, id)
}
sort.Slice(ids, func(i int, j int) bool {
return strings.Compare(ids[i], ids[j]) < 0
})
for _, id := range ids {
addrs = append(addrs, peers[id].QueryAPIAddr)
}

// Add DNS resolved addresses from static flags and file SD.
// TODO(bwplotka): Consider generating addresses in *url.URL
addrs = append(addrs, dnsProvider.Addresses()...)

removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs)

for _, i := range rand.Perm(len(addrs)) {
u, err := url.Parse(fmt.Sprintf("http://%s", addrs[i]))
if err != nil {
return nil, errors.Wrapf(err, "url parse %s", addrs[i])
}

span, ctx := tracing.StartSpan(ctx, spanID)
v, warns, err := promclient.PromqlQueryInstant(ctx, logger, u, q, t, promclient.QueryOptions{
Deduplicate: true,
PartialResponseStrategy: partialResponseStrategy,
})
span.Finish()

if err != nil {
level.Error(logger).Log("err", err, "query", q)
}

if err == nil && len(warns) > 0 {
ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc()
// TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ):
level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q)
}
return v, err
}
return nil, errors.Errorf("no query peer reachable")
}
}
34 changes: 31 additions & 3 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,33 @@ This logic can also be controlled via parameter on QueryAPI. More details below.

Overall QueryAPI exposed by Thanos is guaranteed to be compatible with Prometheus 2.x.

However, for additional Thanos features, Thanos, on top of Prometheus adds several
additional parameters listed below as well as custom response fields.
However, for additional Thanos features, Thanos, on top of Prometheus adds
* partial response behaviour
* several additional parameters listed below
* custom response fields.

### Partial Response

QueryAPI and StoreAPI has additional behaviour controlled via query parameter called [PartialResponseStrategy](../../pkg/store/storepb/rpc.pb.go).

This parameter controls tradeoff between accuracy and availability.

Partial response is a potentially missed result within query against QueryAPI or StoreAPI. This can happen if one
of StoreAPIs is returning error or timeout whereas couple of others returns success. It does not mean you are missing data,
you might lucky enough that you actually get the correct data as the broken StoreAPI did not have anything for your query.

If partial response happen QueryAPI returns human readable warnings explained [here](query.md#CustomResponseFields)

NOTE that having warning does not necessary means partial response (e.g no store matched query warning)

See [this](query.md#PartialResponseStrategy) on how to control this behaviour.

Querier also allows to configure different timeouts:
* `--query.timeout`
* `--store.response-timeout`

If you prefer availability over accuracy you can set tighter timeout to underlying StoreAPI than overall query timeout. If partial response
strategy is NOT `abort`, this will "ignore" slower StoreAPIs producing just warning with 200 status code response.

### Deduplication Enabled

Expand All @@ -77,7 +102,9 @@ Max source resolution is max resolution in seconds we want to use for data we qu
* 5m -> we will use max 5m downsampling.
* 1h -> we will use max 1h downsampling.

### Partial Response / Error Enabled
### Partial Response Strategy

// TODO(bwplotka): Update. This will change to "strategy" soon as [PartialResponseStrategy enum here](../../pkg/store/storepb/rpc.proto)

| HTTP URL/FORM parameter | Type | Default | Example |
|----|----|----|----|
Expand All @@ -92,6 +119,7 @@ return warning.
Any additional field does not break compatibility, however there is no guarantee that Grafana or any other client will understand those.

Currently Thanos UI exposed by Thanos understands

```go
type queryData struct {
ResultType promql.ValueType `json:"resultType"`
Expand Down
Loading