Skip to content

Commit

Permalink
ruler: Added support for strict rule groups that does not allow parti…
Browse files Browse the repository at this point in the history
…al_response

Signed-off-by: Bartek Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Mar 27, 2019
1 parent d65ef8d commit d88a0d0
Show file tree
Hide file tree
Showing 18 changed files with 1,009 additions and 278 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ New tracing span:

:warning: **WARNING** :warning: #798 adds a new default limit to Thanos Store: `--store.grpc.series-max-concurrency`. Most likely you will want to make it the same as `--query.max-concurrent` on Thanos Query.

- [#970](https://github.com/improbable-eng/thanos/pull/970) Added `PartialResponseStrategy` field for `RuleGroups` for `Ruler`.
### Changed
- [#970](https://github.com/improbable-eng/thanos/pull/970) Deprecated partial_response_disabled proto field. Added partial_response_strategy instead. Both in gRPC and Query API.

### Fixed
- [#921](https://github.com/improbable-eng/thanos/pull/921) `thanos_objstore_bucket_last_successful_upload_time` now does not appear when no blocks have been uploaded so far
- [#966](https://github.com/improbable-eng/thanos/pull/966) Bucket: verify no longer warns about overlapping blocks, that overlap `0s`
Expand Down
177 changes: 115 additions & 62 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"syscall"
"time"

thanosrule "github.com/improbable-eng/thanos/pkg/rule"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/improbable-eng/thanos/pkg/alert"
Expand Down Expand Up @@ -227,13 +229,20 @@ func runRule(
Name: "thanos_rule_loaded_rules",
Help: "Loaded rules partitioned by file and group",
},
[]string{"file", "group"},
[]string{"part_resp_strategy", "file", "group"},
)
ruleEvalWarnings := prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "thanos_rule_evaluation_with_warnings_total",
Help: "The total number of rule evaluation that were successful but had warnings which can indicate partial error.",
}, []string{"strategy"},
)
reg.MustRegister(configSuccess)
reg.MustRegister(configSuccessTime)
reg.MustRegister(duplicatedQuery)
reg.MustRegister(alertMngrAddrResolutionErrors)
reg.MustRegister(rulesLoaded)
reg.MustRegister(ruleEvalWarnings)

for _, addr := range queryAddrs {
if addr == "" {
Expand Down Expand Up @@ -263,54 +272,13 @@ func runRule(
extprom.WrapRegistererWithPrefix("thanos_ruler_query_apis", reg),
)

// Hit the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
queryFn := func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
var addrs []string

// Add addresses from gossip.
peers := peer.PeerStates(cluster.PeerTypeQuery)
var ids []string
for id := range peers {
ids = append(ids, id)
}
sort.Slice(ids, func(i int, j int) bool {
return strings.Compare(ids[i], ids[j]) < 0
})
for _, id := range ids {
addrs = append(addrs, peers[id].QueryAPIAddr)
}

// Add DNS resolved addresses from static flags and file SD.
// TODO(bwplotka): Consider generating addresses in *url.URL
addrs = append(addrs, dnsProvider.Addresses()...)

removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs)

for _, i := range rand.Perm(len(addrs)) {
u, err := url.Parse(fmt.Sprintf("http://%s", addrs[i]))
if err != nil {
return nil, errors.Wrapf(err, "url parse %s", addrs[i])
}

span, ctx := tracing.StartSpan(ctx, "/rule_instant_query HTTP[client]")
v, err := promclient.PromqlQueryInstant(ctx, logger, u, q, t, true)
span.Finish()
return v, err
}
return nil, errors.Errorf("no query peer reachable")
}

// Run rule evaluation and alert notifications.
var (
alertmgrs = newAlertmanagerSet(alertmgrURLs)
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
mgr *rules.Manager
ruleMgrs = thanosrule.Managers{}
)
{
ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)

notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
res := make([]*alert.Alert, 0, len(alerts))
for _, alrt := range alerts {
Expand All @@ -331,26 +299,38 @@ func runRule(
}
alertQ.Push(res)
}

st := tsdb.Adapter(db, 0)
mgr = rules.NewManager(&rules.ManagerOptions{
Context: ctx,
QueryFunc: queryFn,

opts := rules.ManagerOptions{
NotifyFunc: notify,
Logger: log.With(logger, "component", "rules"),
Appendable: st,
Registerer: reg,
ExternalURL: nil,
TSDB: st,
})
g.Add(func() error {
mgr.Run()
<-ctx.Done()
mgr.Stop()
return nil
}, func(error) {
cancel()
})
}

for _, strategy := range storepb.PartialResponseStrategy_value {
s := storepb.PartialResponseStrategy(strategy)

ctx, cancel := context.WithCancel(context.Background())
ctx = tracing.ContextWithTracer(ctx, tracer)

opts := opts
opts.Registerer = extprom.WrapRegistererWith(prometheus.Labels{"strategy": strings.ToLower(s.String())}, reg)
opts.Context = ctx
opts.QueryFunc = queryFunc(logger, peer, dnsProvider, duplicatedQuery, ruleEvalWarnings, s)

ruleMgrs[s] = rules.NewManager(&opts)
g.Add(func() error {
ruleMgrs[s].Run()
<-ctx.Done()

return nil
}, func(error) {
cancel()
ruleMgrs[s].Stop()
})
}
}
{
var storeLset []storepb.Label
Expand Down Expand Up @@ -469,11 +449,13 @@ func runRule(
level.Error(logger).Log("msg", "retrieving rule files failed. Ignoring file.", "pattern", pat, "err", err)
continue
}

files = append(files, fs...)
}

level.Info(logger).Log("msg", "reload rule files", "numFiles", len(files))
if err := mgr.Update(evalInterval, files); err != nil {

if err := ruleMgrs.Update(dataDir, evalInterval, files); err != nil {
configSuccess.Set(0)
level.Error(logger).Log("msg", "reloading rules failed", "err", err)
continue
Expand All @@ -483,9 +465,12 @@ func runRule(
configSuccessTime.Set(float64(time.Now().UnixNano()) / 1e9)

rulesLoaded.Reset()
for _, group := range mgr.RuleGroups() {
rulesLoaded.WithLabelValues(group.File(), group.Name()).Set(float64(len(group.Rules())))
for s, mgr := range ruleMgrs {
for _, group := range mgr.RuleGroups() {
rulesLoaded.WithLabelValues(s.String(), group.File(), group.Name()).Set(float64(len(group.Rules())))
}
}

}
}, func(error) {
close(cancel)
Expand Down Expand Up @@ -569,9 +554,9 @@ func runRule(
"web.prefix-header": webPrefixHeaderName,
}

ui.NewRuleUI(logger, mgr, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix))
ui.NewRuleUI(logger, ruleMgrs, alertQueryURL.String(), flagsMap).Register(router.WithPrefix(webRoutePrefix))

api := v1.NewAPI(logger, mgr)
api := v1.NewAPI(logger, ruleMgrs)
api.Register(router.WithPrefix(path.Join(webRoutePrefix, "/api/v1")), tracer, logger)

mux := http.NewServeMux()
Expand Down Expand Up @@ -767,3 +752,71 @@ func removeDuplicateQueryAddrs(logger log.Logger, duplicatedQueriers prometheus.
}
return deduplicated
}

// queryFunc returns query function that hits the HTTP query API of query peers in randomized order until we get a result
// back or the context get canceled.
func queryFunc(
logger log.Logger,
peer cluster.Peer,
dnsProvider *dns.Provider,
duplicatedQuery prometheus.Counter,
ruleEvalWarnings *prometheus.CounterVec,
partialResponseStrategy storepb.PartialResponseStrategy,
) rules.QueryFunc {
var spanID string

switch partialResponseStrategy {
case storepb.PartialResponseStrategy_WARN:
spanID = "/rule_instant_query HTTP[client]"
case storepb.PartialResponseStrategy_ABORT:
spanID = "/rule_instant_query_part_resp_abort HTTP[client]"
default:
// Programming error will be caught by tests.
panic(errors.Errorf("unknown partial response strategy %v", partialResponseStrategy).Error())
}

return func(ctx context.Context, q string, t time.Time) (promql.Vector, error) {
var addrs []string

// Add addresses from gossip.
peers := peer.PeerStates(cluster.PeerTypeQuery)
var ids []string
for id := range peers {
ids = append(ids, id)
}
sort.Slice(ids, func(i int, j int) bool {
return strings.Compare(ids[i], ids[j]) < 0
})
for _, id := range ids {
addrs = append(addrs, peers[id].QueryAPIAddr)
}

// Add DNS resolved addresses from static flags and file SD.
// TODO(bwplotka): Consider generating addresses in *url.URL
addrs = append(addrs, dnsProvider.Addresses()...)

removeDuplicateQueryAddrs(logger, duplicatedQuery, addrs)

for _, i := range rand.Perm(len(addrs)) {
u, err := url.Parse(fmt.Sprintf("http://%s", addrs[i]))
if err != nil {
return nil, errors.Wrapf(err, "url parse %s", addrs[i])
}

span, ctx := tracing.StartSpan(ctx, spanID)
v, warns, err := promclient.PromqlQueryInstant(ctx, logger, u, q, t, promclient.ThanosQueryParams{
Deduplicate: true,
PartialResponseStrategy: partialResponseStrategy,
})
span.Finish()

if err != nil && len(warns) > 0 {
ruleEvalWarnings.WithLabelValues(strings.ToLower(partialResponseStrategy.String())).Inc()
// TODO(bwplotka): Propagate those to UI, probably requires changing rule manager code ):
level.Warn(logger).Log("warnings", strings.Join(warns, ", "), "query", q)
}
return v, err
}
return nil, errors.Errorf("no query peer reachable")
}
}
22 changes: 19 additions & 3 deletions docs/components/query.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,22 @@ This logic can also be controlled via parameter on QueryAPI. More details below.

Overall QueryAPI exposed by Thanos is guaranteed to be compatible with Prometheus 2.x.

However, for additional Thanos features, Thanos, on top of Prometheus adds several
additional parameters listed below as well as custom response fields.
However, for additional Thanos features, Thanos, on top of Prometheus adds
* partial response behaviour
* several additional parameters listed below
* custom response fields.

### Partial Response

QueryAPI and StoreAPI has additional behaviour controlled via query parameter called [PartialResponseStrategy](../../pkg/store/storepb/rpc.pb.go).

Partial response is a potentially missed result within query against QueryAPI or StoreAPI. This can happen if one
of StoreAPIs is returning error or timeout whereas couple of others returns success. It does not mean you are missing data,
you might lucky enough that you actually get the correct data as the broken StoreAPI did not have anything for your query.

If partial response happen QueryAPI returns human readable warnings explained [here](query.md#CustomResponseFields)

See [this](query.md#PartialResponseStrategy) on how to control this behaviour.

### Deduplication Enabled

Expand All @@ -77,7 +91,9 @@ Max source resolution is max resolution in seconds we want to use for data we qu
* 5m -> we will use max 5m downsampling.
* 1h -> we will use max 1h downsampling.

### Partial Response / Error Enabled
### Partial Response Strategy

// TODO(bwplotka): Update. This will change to "strategy" soon as [PartialResponseStrategy enum here](../../pkg/store/storepb/rpc.proto)

| HTTP URL/FORM parameter | Type | Default | Example |
|----|----|----|----|
Expand Down
Loading

0 comments on commit d88a0d0

Please sign in to comment.