Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "warnings" from prometheus upstream #168

Closed
wants to merge 16 commits into from
  •  
  •  
  •  
812 changes: 455 additions & 357 deletions Gopkg.lock

Large diffs are not rendered by default.

13 changes: 10 additions & 3 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
version = "1.1.0"

[[constraint]]
branch = "tjackson_fork_23"
name = "github.com/prometheus/common"
source = "github.com/jacksontj/common"
branch = "master"

[[constraint]]
branch = "release-2.3_fork_promxy"
branch = "release-2.10_fork_promxy"
name = "github.com/prometheus/prometheus"
source = "github.com/jacksontj/prometheus"

Expand Down Expand Up @@ -49,3 +48,11 @@
[[override]]
name = "github.com/cockroachdb/cockroach"
revision = "84bc9597164f671c0130543778228928d6865c5c"

[[override]]
name = "k8s.io/apimachinery"
revision = "2b1284ed4c93a43499e781493253e2ac5959c4fd"

[[override]]
name = "k8s.io/client-go"
version = "v11.0.0"
25 changes: 18 additions & 7 deletions cmd/promxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (

proxyconfig "github.com/jacksontj/promxy/config"
"github.com/jacksontj/promxy/logging"
"github.com/jacksontj/promxy/noop"
"github.com/jacksontj/promxy/proxystorage"
)

Expand All @@ -66,6 +67,7 @@ type cliOpts struct {

QueryTimeout time.Duration `long:"query.timeout" description:"Maximum time a query may take before being aborted." default:"2m"`
QueryMaxConcurrency int `long:"query.max-concurrency" description:"Maximum number of queries executed concurrently." default:"1000"`
QueryMaxSamples int `long:"query.max-samples" description:"Maximum number of samples a single query can load into memory. Note that queries will fail if they would load more samples than this into memory, so this also limits the number of samples a query can return." default:"50000000"`

NotificationQueueCapacity int `long:"alertmanager.notification-queue-capacity" description:"The capacity of the queue for pending alert manager notifications." default:"10000"`
AccessLogDestination string `long:"access-log-destination" description:"where to log access logs, options (none, stderr, stdout)" default:"stdout"`
Expand Down Expand Up @@ -162,7 +164,12 @@ func main() {
reloadables = append(reloadables, ps)
proxyStorage = ps

engine := promql.NewEngine(nil, prometheus.DefaultRegisterer, opts.QueryMaxConcurrency, opts.QueryTimeout)
engine := promql.NewEngine(promql.EngineOpts{
Reg: prometheus.DefaultRegisterer,
MaxConcurrent: opts.QueryMaxConcurrency,
Timeout: opts.QueryTimeout,
MaxSamples: opts.QueryMaxSamples,
})
engine.NodeReplacer = ps.NodeReplacer

// TODO: rename
Expand All @@ -172,11 +179,15 @@ func main() {
}

// Alert notifier
lvl := promlog.AllowedLevel{}
if err := lvl.Set("info"); err != nil {
logCfg := &promlog.Config{
Level: &promlog.AllowedLevel{},
Format: &promlog.AllowedFormat{},
}
if err := logCfg.Level.Set("info"); err != nil {
panic(err)
}
logger := promlog.New(lvl)

logger := promlog.New(logCfg)

notifierManager := notifier.NewManager(
&notifier.Options{
Expand Down Expand Up @@ -221,6 +232,7 @@ func main() {
ExternalURL: externalUrl, // URL listed as URL for "who fired this alert"
QueryFunc: rules.EngineQueryFunc(engine, proxyStorage),
NotifyFunc: sendAlerts(notifierManager, externalUrl.String()),
TSDB: noop.NoopStorage(), // TODO: use remote_read?
Appendable: proxyStorage,
Logger: logger,
})
Expand All @@ -237,7 +249,7 @@ func main() {
}
files = append(files, fs...)
}
if err := ruleManager.Update(time.Duration(cfg.GlobalConfig.EvaluationInterval), files); err != nil {
if err := ruleManager.Update(time.Duration(cfg.GlobalConfig.EvaluationInterval), files, cfg.GlobalConfig.ExternalLabels); err != nil {
return err
}

Expand Down Expand Up @@ -420,7 +432,7 @@ func main() {
// sendAlerts implements the rules.NotifyFunc for a Notifier.
// It filters any non-firing alerts from the input.
func sendAlerts(n *notifier.Manager, externalURL string) rules.NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*rules.Alert) error {
return func(ctx context.Context, expr string, alerts ...*rules.Alert) {
var res []*notifier.Alert

for _, alert := range alerts {
Expand All @@ -443,7 +455,6 @@ func sendAlerts(n *notifier.Manager, externalURL string) rules.NotifyFunc {
if len(alerts) > 0 {
n.Send(res...)
}
return nil
}
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/remote_write_exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ func main() {
var sample *prompb.Sample
for _, s := range ts.Samples {
if sample == nil {
sample = s
sample = &s
continue
}
if s.Timestamp > sample.Timestamp {
sample = s
sample = &s
}
}

Expand Down
3 changes: 3 additions & 0 deletions noop/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#noop

A collection of no-op implementations of various interfaces
52 changes: 52 additions & 0 deletions noop/noop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package noop

import (
"context"

"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/storage"
)

type noopStorage struct{}

func NoopStorage() storage.Storage {
return &noopStorage{}
}

// Querier returns a new Querier on the storage.
func (n *noopStorage) Querier(ctx context.Context, mint, maxt int64) (storage.Querier, error) {
return storage.NoopQuerier(), nil
}

// StartTime returns the oldest timestamp stored in the storage.
func (n *noopStorage) StartTime() (int64, error) {
return 0, nil
}

// Appender returns a new appender against the storage.
func (n *noopStorage) Appender() (storage.Appender, error) {
return NoopAppender(), nil
}

// Close closes the storage and all its underlying resources.
func (n *noopStorage) Close() error {
return nil
}

type noopAppender struct{}

func NoopAppender() storage.Appender {
return &noopAppender{}
}

func (a *noopAppender) Add(l labels.Labels, t int64, v float64) (uint64, error) {
return 0, nil
}

func (a *noopAppender) AddFast(l labels.Labels, ref uint64, t int64, v float64) error {
return nil
}

// Commit submits the collected samples and purges the batch.
func (a *noopAppender) Commit() error { return nil }
func (a *noopAppender) Rollback() error { return nil }
19 changes: 10 additions & 9 deletions promclient/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
Expand All @@ -26,26 +27,26 @@ func (p *PromAPIV1) LabelValues(ctx context.Context, label string) (model.LabelV
}

// Query performs a query for the given time.
func (p *PromAPIV1) Query(ctx context.Context, query string, ts time.Time) (model.Value, error) {
func (p *PromAPIV1) Query(ctx context.Context, query string, ts time.Time) (model.Value, api.Warnings, error) {
return p.API.Query(ctx, query, ts)
}

// QueryRange performs a query for the given range.
func (p *PromAPIV1) QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, error) {
func (p *PromAPIV1) QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, api.Warnings, error) {
return p.API.QueryRange(ctx, query, r)
}

// Series finds series by label matchers.
func (p *PromAPIV1) Series(ctx context.Context, matches []string, startTime time.Time, endTime time.Time) ([]model.LabelSet, error) {
func (p *PromAPIV1) Series(ctx context.Context, matches []string, startTime time.Time, endTime time.Time) ([]model.LabelSet, api.Warnings, error) {
return p.API.Series(ctx, matches, startTime, endTime)
}

// GetValue loads the raw data for a given set of matchers in the time range
func (p *PromAPIV1) GetValue(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, error) {
func (p *PromAPIV1) GetValue(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, api.Warnings, error) {
// http://localhost:8080/api/v1/query?query=scrape_duration_seconds%7Bjob%3D%22prometheus%22%7D&time=1507412244.663&_=1507412096887
pql, err := promhttputil.MatcherToString(matchers)
if err != nil {
return nil, err
return nil, nil, err
}

// We want to grab only the raw datapoints, so we do that through the query interface
Expand All @@ -64,14 +65,14 @@ type PromAPIRemoteRead struct {
}

// GetValue loads the raw data for a given set of matchers in the time range
func (p *PromAPIRemoteRead) GetValue(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, error) {
func (p *PromAPIRemoteRead) GetValue(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, api.Warnings, error) {
query, err := remote.ToQuery(int64(timestamp.FromTime(start)), int64(timestamp.FromTime(end)), matchers, nil)
if err != nil {
return nil, err
return nil, nil, err
}
result, err := p.Client.Read(ctx, query)
if err != nil {
return nil, err
return nil, nil, err
}

// convert result (timeseries) to SampleStream
Expand All @@ -96,5 +97,5 @@ func (p *PromAPIRemoteRead) GetValue(ctx context.Context, start, end time.Time,
}
}

return matrix, nil
return matrix, nil, nil
}
51 changes: 27 additions & 24 deletions promclient/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package promclient

import (
"context"
"fmt"
"time"

"github.com/prometheus/client_golang/api"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
Expand Down Expand Up @@ -37,90 +37,93 @@ func (d *DebugAPI) LabelValues(ctx context.Context, label string) (model.LabelVa
}

// Query performs a query for the given time.
func (d *DebugAPI) Query(ctx context.Context, query string, ts time.Time) (model.Value, error) {
func (d *DebugAPI) Query(ctx context.Context, query string, ts time.Time) (model.Value, api.Warnings, error) {
logrus.WithFields(logrus.Fields{
"api": "Query",
"query": query,
"ts": ts,
}).Debug(d.PrefixMessage)

v, err := d.API.Query(ctx, query, ts)
v, w, err := d.API.Query(ctx, query, ts)

logrus.WithFields(logrus.Fields{
"api": "Query",
"query": query,
"ts": ts,
"value": v,
"error": err,
"api": "Query",
"query": query,
"ts": ts,
"value": v,
"warnings": w,
"error": err,
}).Trace(d.PrefixMessage)

return v, err
return v, w, err
}

// QueryRange performs a query for the given range.
func (d *DebugAPI) QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, error) {
fmt.Println("what")
func (d *DebugAPI) QueryRange(ctx context.Context, query string, r v1.Range) (model.Value, api.Warnings, error) {
logrus.WithFields(logrus.Fields{
"api": "QueryRange",
"query": query,
"r": r,
}).Debug(d.PrefixMessage)

v, err := d.API.QueryRange(ctx, query, r)
v, w, err := d.API.QueryRange(ctx, query, r)

logrus.WithFields(logrus.Fields{
"api": "QueryRange",
"query": query,
"r": r,
"value": v,
"error": err,
"api": "QueryRange",
"query": query,
"r": r,
"value": v,
"warnings": w,
"error": err,
}).Trace(d.PrefixMessage)

return v, err
return v, w, err
}

// Series finds series by label matchers.
func (d *DebugAPI) Series(ctx context.Context, matches []string, startTime time.Time, endTime time.Time) ([]model.LabelSet, error) {
func (d *DebugAPI) Series(ctx context.Context, matches []string, startTime time.Time, endTime time.Time) ([]model.LabelSet, api.Warnings, error) {
logrus.WithFields(logrus.Fields{
"api": "Series",
"matches": matches,
"startTime": startTime,
"endTime": endTime,
}).Debug(d.PrefixMessage)

v, err := d.API.Series(ctx, matches, startTime, endTime)
v, w, err := d.API.Series(ctx, matches, startTime, endTime)

logrus.WithFields(logrus.Fields{
"api": "Series",
"matches": matches,
"startTime": startTime,
"endTime": endTime,
"value": v,
"warnings": w,
"error": err,
}).Trace(d.PrefixMessage)

return v, err
return v, w, err
}

// GetValue loads the raw data for a given set of matchers in the time range
func (d *DebugAPI) GetValue(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, error) {
func (d *DebugAPI) GetValue(ctx context.Context, start, end time.Time, matchers []*labels.Matcher) (model.Value, api.Warnings, error) {
logrus.WithFields(logrus.Fields{
"api": "GetValue",
"start": start,
"end": end,
"matchers": matchers,
}).Debug(d.PrefixMessage)

v, err := d.API.GetValue(ctx, start, end, matchers)
v, w, err := d.API.GetValue(ctx, start, end, matchers)

logrus.WithFields(logrus.Fields{
"api": "GetValue",
"start": start,
"end": end,
"matchers": matchers,
"value": v,
"warnings": w,
"error": err,
}).Trace(d.PrefixMessage)

return v, err
return v, w, err
}
Loading