Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Loki-Canary: Backoff retries on query failures, add histograms for query performance. #2413

Merged
merged 3 commits into from
Jul 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func main() {
"e.g. 15min default one entry every 15 min will be saved and then queried again every 15min until spot-check-max is reached")
spotCheckMax := flag.Duration("spot-check-max", 4*time.Hour, "How far back to check a spot check entry before dropping it")
spotCheckQueryRate := flag.Duration("spot-check-query-rate", 1*time.Minute, "Interval that the canary will query Loki for the current list of all spot check entries")
spotCheckWait := flag.Duration("spot-check-initial-wait", 10*time.Second, "How long should the spot check query wait before starting to check for entries")

printVersion := flag.Bool("version", false, "Print this builds version information")

Expand Down Expand Up @@ -84,7 +85,7 @@ func main() {

c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *maxWait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *spotCheckWait, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
}

startCanary()
Expand Down
1 change: 1 addition & 0 deletions docs/sources/operations/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ loki_canary_missing_entries -> loki_canary_missing_entries_tota
loki_canary_unexpected_entries -> loki_canary_unexpected_entries_total
loki_canary_duplicate_entries -> loki_canary_duplicate_entries_total
loki_canary_ws_reconnects -> loki_canary_ws_reconnects_total
loki_canary_response_latency -> loki_canary_response_latency_seconds
```

## 1.5.0
Expand Down
29 changes: 26 additions & 3 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/instrument"

"github.com/grafana/loki/pkg/canary/reader"
)
Expand Down Expand Up @@ -75,7 +76,19 @@ var (
Name: "metric_test_actual",
Help: "How many counts were actually received by the metric test query",
})
responseLatency prometheus.Histogram
responseLatency prometheus.Histogram
metricTestLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki_canary",
Name: "metric_test_request_duration_seconds",
Help: "how long the metric test query execution took in seconds.",
Buckets: instrument.DefBuckets,
})
spotTestLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki_canary",
Name: "spot_check_request_duration_seconds",
Help: "how long the spot check test query execution took in seconds.",
Buckets: instrument.DefBuckets,
})
)

type Comparator struct {
Expand All @@ -97,6 +110,7 @@ type Comparator struct {
spotCheckInterval time.Duration
spotCheckMax time.Duration
spotCheckQueryRate time.Duration
spotCheckWait time.Duration
spotCheckRunning bool
metricTestInterval time.Duration
metricTestRange time.Duration
Expand All @@ -115,7 +129,7 @@ func NewComparator(writer io.Writer,
wait time.Duration,
maxWait time.Duration,
pruneInterval time.Duration,
spotCheckInterval, spotCheckMax, spotCheckQueryRate time.Duration,
spotCheckInterval, spotCheckMax, spotCheckQueryRate, spotCheckWait time.Duration,
metricTestInterval time.Duration,
metricTestRange time.Duration,
writeInterval time.Duration,
Expand All @@ -135,6 +149,7 @@ func NewComparator(writer io.Writer,
spotCheckInterval: spotCheckInterval,
spotCheckMax: spotCheckMax,
spotCheckQueryRate: spotCheckQueryRate,
spotCheckWait: spotCheckWait,
spotCheckRunning: false,
metricTestInterval: metricTestInterval,
metricTestRange: metricTestRange,
Expand All @@ -152,7 +167,7 @@ func NewComparator(writer io.Writer,
if responseLatency == nil {
responseLatency = promauto.NewHistogram(prometheus.HistogramOpts{
Namespace: "loki_canary",
Name: "response_latency",
Name: "response_latency_seconds",
Help: "is how long it takes for log lines to be returned from Loki in seconds.",
Buckets: prometheus.ExponentialBuckets(0.5, 2, buckets),
})
Expand Down Expand Up @@ -292,7 +307,9 @@ func (c *Comparator) metricTest(currTime time.Time) {
if currTime.Add(-c.metricTestRange).Before(c.startTime) {
adjustedRange = currTime.Sub(c.startTime)
}
begin := time.Now()
actualCount, err := c.rdr.QueryCountOverTime(fmt.Sprintf("%.0fs", adjustedRange.Seconds()))
metricTestLatency.Observe(time.Since(begin).Seconds())
if err != nil {
fmt.Fprintf(c.w, "error running metric query test: %s\n", err.Error())
return
Expand Down Expand Up @@ -326,13 +343,19 @@ func (c *Comparator) spotCheckEntries(currTime time.Time) {
c.spotEntMtx.Unlock()

for _, sce := range cpy {
// Make sure enough time has passed to start checking for this entry
if currTime.Sub(*sce) < c.spotCheckWait {
continue
}
spotCheckEntries.Inc()
// Because we are querying loki timestamps vs the timestamp in the log,
// make the range +/- 10 seconds to allow for clock inaccuracies
start := *sce
adjustedStart := start.Add(-10 * time.Second)
adjustedEnd := start.Add(10 * time.Second)
begin := time.Now()
recvd, err := c.rdr.Query(adjustedStart, adjustedEnd)
spotTestLatency.Observe(time.Since(begin).Seconds())
if err != nil {
fmt.Fprintf(c.w, "error querying loki: %s\n", err)
return
Expand Down
33 changes: 21 additions & 12 deletions pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestComparatorEntryReceivedOutOfOrder(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -60,7 +60,7 @@ func TestComparatorEntryReceivedNotExpected(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Now()
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestComparatorEntryReceivedDuplicate(t *testing.T) {
duplicateEntries = &mockCounter{}

actual := &bytes.Buffer{}
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 1*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Unix(0, 0)
t2 := t1.Add(1 * time.Second)
Expand Down Expand Up @@ -159,7 +159,7 @@ func TestEntryNeverReceived(t *testing.T) {
wait := 60 * time.Second
maxWait := 300 * time.Second
//We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)

c.entrySent(t1)
c.entrySent(t2)
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestPruneAckdEntires(t *testing.T) {
wait := 30 * time.Millisecond
maxWait := 30 * time.Millisecond
//We set the prune interval timer to a huge value here so that it never runs, instead we call pruneEntries manually below
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)
c := NewComparator(actual, wait, maxWait, 50*time.Hour, 15*time.Minute, 4*time.Hour, 4*time.Hour, 0, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), nil, false)

t1 := time.Unix(0, 0)
t2 := t1.Add(1 * time.Millisecond)
Expand Down Expand Up @@ -278,25 +278,34 @@ func TestSpotCheck(t *testing.T) {

mr := &mockReader{resp: found}
spotCheck := 10 * time.Millisecond
spotCheckMax := 10 * time.Millisecond
spotCheckMax := 20 * time.Millisecond
//We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, spotCheck, spotCheckMax, 4*time.Hour, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, spotCheck, spotCheckMax, 4*time.Hour, 3*time.Millisecond, 1*time.Minute, 0, 0, 1, make(chan time.Time), make(chan time.Time), mr, false)

// Send all the entries
for i := range entries {
c.entrySent(entries[i])
}

// Should include the following entries
assert.Equal(t, 3, len(c.spotCheck))
assert.Equal(t, time.Unix(0, 0), *c.spotCheck[0])
assert.Equal(t, time.Unix(0, 10*time.Millisecond.Nanoseconds()), *c.spotCheck[1])
assert.Equal(t, time.Unix(0, 20*time.Millisecond.Nanoseconds()), *c.spotCheck[2])

// Run with "current time" 11ms after start which will prune the first entry which is no "before" the 10ms spot check max
c.spotCheckEntries(time.Unix(0, 11*time.Millisecond.Nanoseconds()))
// Run with "current time" 1ms after start which is less than spotCheckWait so nothing should be checked
c.spotCheckEntries(time.Unix(0, 2*time.Millisecond.Nanoseconds()))
assert.Equal(t, 3, len(c.spotCheck))
assert.Equal(t, 0, spotCheckEntries.(*mockCounter).count)

// Run with "current time" at 25ms, the first entry should be pruned, the second entry should be found, and the last entry should come back as missing
c.spotCheckEntries(time.Unix(0, 25*time.Millisecond.Nanoseconds()))

// First entry should have been pruned
// First entry should have been pruned, second and third entries have not expired yet
assert.Equal(t, 2, len(c.spotCheck))

expected := fmt.Sprintf(ErrSpotCheckEntryNotReceived, // List entry not received from Loki
entries[20].UnixNano(), "-9ms")
entries[20].UnixNano(), "5ms")

// We didn't send the last entry and our initial counter did not start at 0 so we should get back entries 1-19
for i := 1; i < 20; i++ {
Expand All @@ -322,7 +331,7 @@ func TestMetricTest(t *testing.T) {
mr := &mockReader{}
metricTestRange := 30 * time.Second
//We set the prune interval timer to a huge value here so that it never runs, instead we call spotCheckEntries manually below
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 10*time.Minute, metricTestRange, writeInterval, 1, make(chan time.Time), make(chan time.Time), mr, false)
c := NewComparator(actual, 1*time.Hour, 1*time.Hour, 50*time.Hour, 0, 0, 4*time.Hour, 0, 10*time.Minute, metricTestRange, writeInterval, 1, make(chan time.Time), make(chan time.Time), mr, false)
// Force the start time to a known value
c.startTime = time.Unix(10, 0)

Expand Down
69 changes: 69 additions & 0 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"github.com/cortexproject/cortex/pkg/util"
"github.com/gorilla/websocket"
json "github.com/json-iterator/go"
"github.com/pkg/errors"
Expand Down Expand Up @@ -56,6 +58,9 @@ type Reader struct {
sValue string
lName string
lVal string
backoff *util.Backoff
nextQuery time.Time
backoffMtx sync.RWMutex
interval time.Duration
conn *websocket.Conn
w io.Writer
Expand All @@ -82,6 +87,14 @@ func NewReader(writer io.Writer,
h = http.Header{"Authorization": {"Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))}}
}

next := time.Now()
bkcfg := util.BackoffConfig{
MinBackoff: 1 * time.Second,
MaxBackoff: 10 * time.Minute,
MaxRetries: 0,
}
bkoff := util.NewBackoff(context.Background(), bkcfg)

rd := Reader{
header: h,
tls: tls,
Expand All @@ -93,6 +106,8 @@ func NewReader(writer io.Writer,
sValue: streamValue,
lName: labelName,
lVal: labelVal,
nextQuery: next,
backoff: bkoff,
interval: interval,
w: writer,
recv: receivedChan,
Expand Down Expand Up @@ -123,7 +138,20 @@ func (r *Reader) Stop() {
}
}

// QueryCountOverTime will ask Loki for a count of logs over the provided range e.g. 5m
// QueryCountOverTime blocks if a previous query has failed until the appropriate backoff time has been reached.
func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) {
r.backoffMtx.RLock()
next := r.nextQuery
r.backoffMtx.RUnlock()
for time.Now().Before(next) {
time.Sleep(50 * time.Millisecond)
// Update next in case other queries have tried and failed
r.backoffMtx.RLock()
next = r.nextQuery
r.backoffMtx.RUnlock()
}

scheme := "http"
if r.tls {
scheme = "https"
Expand Down Expand Up @@ -159,9 +187,17 @@ func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) {
}()

if resp.StatusCode/100 != 2 {
r.backoffMtx.Lock()
r.nextQuery = nextBackoff(r.w, resp.StatusCode, r.backoff)
r.backoffMtx.Unlock()
buf, _ := ioutil.ReadAll(resp.Body)
return 0, fmt.Errorf("error response from server: %s (%v)", string(buf), err)
}
// No Errors, reset backoff
r.backoffMtx.Lock()
r.backoff.Reset()
r.backoffMtx.Unlock()

var decoded loghttp.QueryResponse
err = json.NewDecoder(resp.Body).Decode(&decoded)
if err != nil {
Expand All @@ -187,7 +223,20 @@ func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) {
return ret, nil
}

// Query will ask Loki for all canary timestamps in the requested timerange.
// Query blocks if a previous query has failed until the appropriate backoff time has been reached.
func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
r.backoffMtx.RLock()
next := r.nextQuery
r.backoffMtx.RUnlock()
for time.Now().Before(next) {
time.Sleep(50 * time.Millisecond)
// Update next in case other queries have tried and failed moving it even farther in the future
r.backoffMtx.RLock()
next = r.nextQuery
r.backoffMtx.RUnlock()
}

scheme := "http"
if r.tls {
scheme = "https"
Expand Down Expand Up @@ -224,9 +273,17 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
}()

if resp.StatusCode/100 != 2 {
r.backoffMtx.Lock()
r.nextQuery = nextBackoff(r.w, resp.StatusCode, r.backoff)
r.backoffMtx.Unlock()
buf, _ := ioutil.ReadAll(resp.Body)
return nil, fmt.Errorf("error response from server: %s (%v)", string(buf), err)
}
// No Errors, reset backoff
r.backoffMtx.Lock()
r.backoff.Reset()
r.backoffMtx.Unlock()

var decoded loghttp.QueryResponse
err = json.NewDecoder(resp.Body).Decode(&decoded)
if err != nil {
Expand Down Expand Up @@ -368,3 +425,15 @@ func parseResponse(entry *loghttp.Entry) (*time.Time, error) {
t := time.Unix(0, ts)
return &t, nil
}

func nextBackoff(w io.Writer, statusCode int, backoff *util.Backoff) time.Time {
// Be way more conservative with an http 429 and wait 5 minutes before trying again.
var next time.Time
if statusCode == http.StatusTooManyRequests {
next = time.Now().Add(5 * time.Minute)
} else {
next = time.Now().Add(backoff.NextDelay())
}
fmt.Fprintf(w, "Loki returned an error code: %v, waiting %v before next query.", statusCode, next.Sub(time.Now()))
return next
}