Skip to content

Commit

Permalink
working example
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto committed Jul 22, 2021
1 parent 0ca3bf1 commit 6f26a11
Show file tree
Hide file tree
Showing 8 changed files with 894 additions and 103 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ require (
github.com/sirupsen/logrus v1.7.0
github.com/spf13/cobra v1.1.3
github.com/stretchr/testify v1.7.0
github.com/testcontainers/testcontainers-go v0.11.1
github.com/uber/jaeger-client-go v2.29.1+incompatible
github.com/weaveworks/common v0.0.0-20210419092856-009d1eebd624
github.com/wrouesnel/postgres_exporter v0.0.0-00010101000000-000000000000
Expand Down
234 changes: 228 additions & 6 deletions go.sum

Large diffs are not rendered by default.

278 changes: 181 additions & 97 deletions pkg/crow/crow.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,96 +3,71 @@
package crow

import (
"context"
"errors"
"flag"
"fmt"
"math"
"math/rand"
"strings"
"sync"
"time"

"github.com/prometheus/client_golang/api"
promapi "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
)

type metrics struct {
totalScrapes prometheus.Counter
totalSamples prometheus.Counter
totalResults *prometheus.CounterVec
pendingSets prometheus.Gauge

cachedCollectors []prometheus.Collector
}

func newMetrics() *metrics {
var m metrics

m.totalScrapes = prometheus.NewCounter(prometheus.CounterOpts{
Name: "crow_test_scrapes_total",
Help: "Total number of generated test sample sets",
})

m.totalSamples = prometheus.NewCounter(prometheus.CounterOpts{
Name: "crow_test_samples_total",
Help: "Total number of generated test samples",
})

m.totalResults = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "crow_test_sample_results_total",
Help: "Total validation results of test samples",
}, []string{"result"}) // result is either "success", "missing", or "mismatch"

m.pendingSets = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "crow_test_pending_validations",
Help: "Total number of pending validations to perform",
})
// Config for the Crow metrics checker.
type Config struct {
PrometheusAddr string // Base URL of Prometheus server
GenerateSamples int // Number of samples to generate
UserID string // User ID to use when querying.
ExtraSelectors string // Extra selectors for queries, i.e., cluster="prod"

return &m
}
// Querying Params

func (m *metrics) collectors() []prometheus.Collector {
if m.cachedCollectors == nil {
m.cachedCollectors = []prometheus.Collector{
m.totalScrapes,
m.totalSamples,
m.totalResults,
m.pendingSets,
}
}
return m.cachedCollectors
}

func (m *metrics) Describe(ch chan<- *prometheus.Desc) {
for _, c := range m.collectors() {
c.Describe(ch)
}
}
QueryTimeout time.Duration // Timeout for querying
QueryDuration time.Duration // Time before and after sample to search
QueryStep time.Duration // Step between samples in search

func (m *metrics) Collect(ch chan<- prometheus.Metric) {
for _, c := range m.collectors() {
c.Collect(ch)
}
}
// Validation Params

// Config for the Crow metrics checker.
type Config struct {
// Base URL of the Prometheus server.
PrometheusAddr string
// UserID to use when querying.
UserID string
// Extra seletors to be included in queries. i.e., cluster="prod"
ExtraSelectors string
// Maximum amount of times a validation can be attempted.
MaximumValidations int
MaxValidations int // Maximum amount of times to search for a sample
MaxTimestampDelta time.Duration // Maximum timestamp delta to use for validating.
ValueEpsilon float64 // Maximum epsilon to use for validating.
}

// RegisterFlags registers flags for the config to the given FlagSet.
func (c *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&c.PrometheusAddr, "prometheus-addr", DefaultConfig.PrometheusAddr, "Root URL of the Prometheus API to query against")
f.IntVar(&c.GenerateSamples, "generate-samples", DefaultConfig.GenerateSamples, "Number of samples to generate when being scraped")
f.StringVar(&c.UserID, "user-id", DefaultConfig.UserID, "UserID to attach to query. Useful for querying multi-tenated Cortex.")
f.StringVar(&c.ExtraSelectors, "extra-selectors", DefaultConfig.ExtraSelectors, "Extra selectors to include in queries, useful for identifying different instances of this job.")
f.IntVar(&c.MaximumValidations, "maximum-validations", DefaultConfig.MaximumValidations, "Maximum number of times to try validating a sample")

f.DurationVar(&c.QueryDuration, "query-timeout", DefaultConfig.QueryTimeout, "timeout for querying")
f.DurationVar(&c.QueryDuration, "query-duration", DefaultConfig.QueryDuration, "time before and after sample to search")
f.DurationVar(&c.QueryDuration, "query-step", DefaultConfig.QueryStep, "step between samples when searching")

f.IntVar(&c.MaxValidations, "max-validations", DefaultConfig.MaxValidations, "Maximum number of times to try validating a sample")
f.DurationVar(&c.MaxTimestampDelta, "max-timestamp-delta", DefaultConfig.MaxTimestampDelta, "maximum difference from the stored timestamp from the validating sample to allow")
f.Float64Var(&c.ValueEpsilon, "sample-epsilon", DefaultConfig.ValueEpsilon, "maximum difference from the stored value from the validating sample to allow")
}

// DefaultConfig holds defaults for Crow settings.
var DefaultConfig = Config{
MaximumValidations: 5,
MaxValidations: 5,
GenerateSamples: 10,

QueryTimeout: 150 * time.Millisecond,
QueryDuration: 2 * time.Second,
QueryStep: 100 * time.Millisecond,

// MaxTimestampDelta is set to 750ms to allow some buffer for a slow network
// before the scrape goes through.
MaxTimestampDelta: 750 * time.Millisecond,
ValueEpsilon: 0.0001,
}

// Crow is a collectness checker that validates scraped metrics reach a
Expand Down Expand Up @@ -120,25 +95,48 @@ type Crow struct {
cfg Config
m *metrics

promClient promapi.API

wg sync.WaitGroup
quit chan struct{}

pendingMtx sync.Mutex
pending []*sample
sampleCh chan []*sample
}

// New creates a new Crow.
func New(cfg Config) (*Crow, error) {
c := &Crow{
cfg: cfg,
m: newMetrics(),
c, err := newCrow(cfg)
if err != nil {
return nil, err
}

c.wg.Add(1)
go c.runLoop()
return c, nil
}

func newCrow(cfg Config) (*Crow, error) {
cli, err := api.NewClient(api.Config{
Address: cfg.PrometheusAddr,
})
if err != nil {
return nil, fmt.Errorf("failed to create prometheus client: %w", err)
}

c := &Crow{
cfg: cfg,
m: newMetrics(),
promClient: promapi.NewAPI(cli),

quit: make(chan struct{}),

sampleCh: make(chan []*sample),
}
return c, nil
}

func (c *Crow) runLoop() {
defer c.wg.Done()

Expand All @@ -149,20 +147,33 @@ func (c *Crow) runLoop() {
select {
case <-c.quit:
return
case samples := <-c.sampleCh:
c.m.totalScrapes.Inc()
c.m.totalSamples.Add(float64(len(samples)))

c.appendSamples(samples)
case <-ticker.C:
c.checkPending()
}
}
}

// appendSamples queues samples to be checked.
func (c *Crow) appendSamples(samples []*sample) {
c.pendingMtx.Lock()
defer c.pendingMtx.Unlock()
c.pending = append(c.pending, samples...)
c.m.pendingSets.Set(float64(len(c.pending)))
}

// checkPending iterates over all pending samples. Samples that are ready
// are immediately validated. Samples are requeued if they're not ready or
// not found during validation.
func (c *Crow) checkPending() {
c.pendingMtx.Lock()
defer c.pendingMtx.Unlock()

now := time.Now()
now := time.Now().UTC()

requeued := []*sample{}
for _, s := range c.pending {
Expand All @@ -171,31 +182,120 @@ func (c *Crow) checkPending() {
continue
}

requeue := c.validate(s)
if requeue {
err := c.validate(s)
if err == nil {
c.m.totalResults.WithLabelValues("success").Inc()
continue
}

s.ValidationAttempt++
if s.ValidationAttempt < c.cfg.MaxValidations {
requeued = append(requeued, s)
continue
}

var vf errValidationFailed
if errors.As(err, &vf) {
switch {
case vf.mismatch:
c.m.totalResults.WithLabelValues("mismatch").Inc()
case vf.missing:
c.m.totalResults.WithLabelValues("missing").Inc()
default:
c.m.totalResults.WithLabelValues("unknown").Inc()
}
}
}
c.pending = requeued
c.m.pendingSets.Set(float64(len(c.pending)))
}

type errValidationFailed struct {
missing bool
mismatch bool
}

func (e errValidationFailed) Error() string {
switch {
case e.missing:
return "validation failed: sample missing"
case e.mismatch:
return "validation failed: sample does not match"
default:
return "validation failed"
}
}

// validate validates a sample. If the sample should be requeued (i.e.,
// couldn't be found), returns true.
func (c *Crow) validate(b *sample) (requeue bool) {
// TODO(rfratto): do validation
func (c *Crow) validate(b *sample) error {
ctx, cancel := context.WithTimeout(context.Background(), c.cfg.QueryTimeout)
defer cancel()

// TODO(rfratto): only do this if the validation failed
b.ValidationAttempt++
if b.ValidationAttempt >= c.cfg.MaximumValidations {
return false
labels := make([]string, 0, len(b.Labels))
for k, v := range b.Labels {
labels = append(labels, fmt.Sprintf(`%s="%s"`, k, v))
}
return true
if c.cfg.ExtraSelectors != "" {
labels = append(labels, c.cfg.ExtraSelectors)
}

query := fmt.Sprintf("%s{%s}", validationSampleName, strings.Join(labels, ","))
val, _, err := c.promClient.QueryRange(ctx, query, promapi.Range{
Start: b.ScrapeTime.UTC().Add(-c.cfg.QueryDuration),
End: b.ScrapeTime.UTC().Add(+c.cfg.QueryDuration),
Step: c.cfg.QueryStep,
})

if err != nil {
fmt.Println(err)
} else if m, ok := val.(model.Matrix); ok {
return c.validateInMatrix(b, m)
}

return errValidationFailed{missing: true}
}

func (c *Crow) validateInMatrix(b *sample, m model.Matrix) error {
var found, matches bool

for _, ss := range m {
for _, sp := range ss.Values {
ts := time.Unix(0, sp.Timestamp.UnixNano())
dist := b.ScrapeTime.Sub(ts)
if dist < 0 {
dist = -dist
}

if dist <= c.cfg.MaxTimestampDelta {
found = true
matches = math.Abs(float64(sp.Value)-b.Value) <= c.cfg.ValueEpsilon
}

if found && matches {
break
}
}
}

if !found || !matches {
return errValidationFailed{
missing: !found,
mismatch: found && !matches,
}
}
return nil
}

// TestMetrics exposes a collector of test metrics. Each collection will
// schedule a validation job.
func (c *Crow) TestMetrics() prometheus.Collector {
panic("NYI")
return &sampleGenerator{
numSamples: c.cfg.GenerateSamples,
sendCh: c.sampleCh,

r: rand.New(rand.NewSource(time.Now().Unix())),
}
}

// StateMetrics exposes metrics of Crow itself. These metrics are not validated
Expand All @@ -207,19 +307,3 @@ func (c *Crow) Stop() {
close(c.quit)
c.wg.Wait()
}

type sample struct {
ScrapeTime time.Time
Labels prometheus.Labels
Value float64

// How many times this sample has attempted to be valdated. Starts at 0.
ValidationAttempt int
}

// Ready checks if this sample is ready to be validated.
func (s *sample) Ready(now time.Time) bool {
// Exponential backoff from 500ms up (500ms * 2^attempt).
backoff := (500 * time.Millisecond) * time.Duration(math.Pow(2, float64(s.ValidationAttempt)))
return now.After(s.ScrapeTime.Add(backoff))
}
Loading

0 comments on commit 6f26a11

Please sign in to comment.