Skip to content

Commit

Permalink
Canary tweaks (#2350)
Browse files Browse the repository at this point in the history
* tweak the canary a bit, add a metric for websocket pings, fix some logging, improve the mutex locking a bit, add a read timeout on the websocket, add a lastmessage time checker to restart websocket

* fix constructor

* fix counter name
change metric count reporting to two separate metrics, expected vs actual

* add note to upgrade guide for metric name changes

* fixed tests....

* Update docs/operations/upgrade.md

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
  • Loading branch information
slim-bean and owen-d authored Jul 15, 2020
1 parent 8809de6 commit 2b145da
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 31 deletions.
2 changes: 1 addition & 1 deletion cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func main() {
defer c.lock.Unlock()

c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *queryTimeout, *lName, *lVal, *sName, *sValue)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
}

Expand Down
23 changes: 20 additions & 3 deletions docs/operations/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@ provided.

S3 config now supports exapnded config. Example can be found here [s3_expanded_config](../configuration/examples.md#s3-expanded-config)

### New Ingester GRPC API special rollout procedure in microservices mode

A new ingester GRPC API has been added allowing to speed up metric queries, to ensure a rollout without query errors make sure you upgrade all ingesters first.
Once this is done you can then proceed with the rest of the deployment, this is to ensure that queriers won't look for an API not yet available.

If you roll out everything at once, queriers with this new code will attempt to query ingesters which may not have the new method on the API and queries will fail.

This will only affect reads(queries) and not writes and only for the duration of the rollout.

### Breaking CLI flags changes

```diff
Expand All @@ -30,11 +39,19 @@ S3 config now supports exapnded config. Example can be found here [s3_expanded_c
+ ingester.concurrent-flushes
```

## 1.6.0
### Loki Canary metric name changes

A new ingester GRPC API has been added allowing to speed up metric queries, to ensure a rollout without query errors make sure you upgrade all ingesters first.
Once this is done you can then proceed with the rest of the deployment, this is to ensure that queriers won't look for an API not yet available.
When adding some new features to the canary we realized the existing metrics were not compliant with standards for counter names, the following metrics have been renamed:

```nohighlight
loki_canary_total_entries -> loki_canary_entries_total
loki_canary_out_of_order_entries -> loki_canary_out_of_order_entries_total
loki_canary_websocket_missing_entries -> loki_canary_websocket_missing_entries_total
loki_canary_missing_entries -> loki_canary_missing_entries_total
loki_canary_unexpected_entries -> loki_canary_unexpected_entries_total
loki_canary_duplicate_entries -> loki_canary_duplicate_entries_total
loki_canary_ws_reconnects -> loki_canary_ws_reconnects_total
```

## 1.5.0

Expand Down
35 changes: 18 additions & 17 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,22 @@ var (
Name: "duplicate_entries_total",
Help: "counts a log entry received more than one time",
})
metricTestDeviation = promauto.NewGauge(prometheus.GaugeOpts{
metricTestExpected = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "loki_canary",
Name: "metric_test_deviation",
Help: "How many counts was the actual query result from the expected based on the canary log write rate",
Name: "metric_test_expected",
Help: "How many counts were expected by the metric test query",
})
metricTestActual = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "loki_canary",
Name: "metric_test_actual",
Help: "How many counts were actually recevied by the metric test query",
})
responseLatency prometheus.Histogram
)

type Comparator struct {
entMtx sync.Mutex
spotEntMtx sync.Mutex
spotMtx sync.Mutex
metTestMtx sync.Mutex
pruneMtx sync.Mutex
Expand Down Expand Up @@ -161,15 +167,16 @@ func (c *Comparator) Stop() {

func (c *Comparator) entrySent(time time.Time) {
c.entMtx.Lock()
defer c.entMtx.Unlock()
c.entries = append(c.entries, &time)
totalEntries.Inc()
c.entMtx.Unlock()
//If this entry equals or exceeds the spot check interval from the last entry in the spot check array, add it.
c.spotEntMtx.Lock()
if len(c.spotCheck) == 0 || time.Sub(*c.spotCheck[len(c.spotCheck)-1]) >= c.spotCheckInterval {
c.spotMtx.Lock()
c.spotCheck = append(c.spotCheck, &time)
c.spotMtx.Unlock()
}
totalEntries.Inc()
c.spotEntMtx.Unlock()

}

// entryReceived removes the received entry from the buffer if it exists, reports on out of order entries received
Expand Down Expand Up @@ -296,14 +303,8 @@ func (c *Comparator) metricTest(currTime time.Time) {
return
}
expectedCount := float64(adjustedRange.Milliseconds()) / float64(c.writeInterval.Milliseconds())
deviation := expectedCount - actualCount
// There is nothing special about the number 10 here, it's fairly common for the deviation to be 2-4
// based on how expected is calculated vs the actual query data, more than 10 would be unlikely
// unless there is a problem.
if deviation > 10 {
fmt.Fprintf(c.w, "large metric deviation: expected %v, actual %v\n", expectedCount, actualCount)
}
metricTestDeviation.Set(deviation)
metricTestExpected.Set(expectedCount)
metricTestActual.Set(actualCount)
}

func (c *Comparator) spotCheckEntries(currTime time.Time) {
Expand All @@ -313,7 +314,7 @@ func (c *Comparator) spotCheckEntries(currTime time.Time) {
c.spotCheckRunning = false
c.spotMtx.Unlock()
}()
c.spotMtx.Lock()
c.spotEntMtx.Lock()
k := 0
for i, e := range c.spotCheck {
if e.Before(currTime.Add(-c.spotCheckMax)) {
Expand All @@ -333,7 +334,7 @@ func (c *Comparator) spotCheckEntries(currTime time.Time) {
cpy := make([]*time.Time, len(c.spotCheck))
//Make a copy so we don't have to hold the lock to verify entries
copy(cpy, c.spotCheck)
c.spotMtx.Unlock()
c.spotEntMtx.Unlock()

for _, sce := range cpy {
spotCheckEntries.Inc()
Expand Down
12 changes: 8 additions & 4 deletions pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ func TestSpotCheck(t *testing.T) {
}

func TestMetricTest(t *testing.T) {
metricTestDeviation = &mockGauge{}
metricTestActual = &mockGauge{}
metricTestExpected = &mockGauge{}

actual := &bytes.Buffer{}

Expand All @@ -327,23 +328,26 @@ func TestMetricTest(t *testing.T) {
// We want to look back 30s but have only been running from time 10s to time 20s so the query range should be adjusted to 10s
assert.Equal(t, "10s", mr.queryRange)
// Should be no deviation we set countOverTime to the runtime/writeinterval which should be what metrictTest expected
assert.Equal(t, float64(0), metricTestDeviation.(*mockGauge).val)
assert.Equal(t, float64(20), metricTestExpected.(*mockGauge).val)
assert.Equal(t, float64(20), metricTestActual.(*mockGauge).val)

// Run test at time 30s which is 20s after start
mr.countOverTime = float64((20 * time.Second).Milliseconds()) / float64(writeInterval.Milliseconds())
c.metricTest(time.Unix(0, 30*time.Second.Nanoseconds()))
// We want to look back 30s but have only been running from time 10s to time 20s so the query range should be adjusted to 10s
assert.Equal(t, "20s", mr.queryRange)
// Gauge should be equal to the countOverTime value
assert.Equal(t, float64(0), metricTestDeviation.(*mockGauge).val)
assert.Equal(t, float64(40), metricTestExpected.(*mockGauge).val)
assert.Equal(t, float64(40), metricTestActual.(*mockGauge).val)

// Run test 60s after start, we should now be capping the query range to 30s and expecting only 30s of counts
mr.countOverTime = float64((30 * time.Second).Milliseconds()) / float64(writeInterval.Milliseconds())
c.metricTest(time.Unix(0, 60*time.Second.Nanoseconds()))
// We want to look back 30s but have only been running from time 10s to time 20s so the query range should be adjusted to 10s
assert.Equal(t, "30s", mr.queryRange)
// Gauge should be equal to the countOverTime value
assert.Equal(t, float64(0), metricTestDeviation.(*mockGauge).val)
assert.Equal(t, float64(60), metricTestExpected.(*mockGauge).val)
assert.Equal(t, float64(60), metricTestActual.(*mockGauge).val)

prometheus.Unregister(responseLatency)
}
Expand Down
57 changes: 51 additions & 6 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"strconv"
Expand All @@ -28,9 +29,14 @@ import (
var (
reconnects = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki_canary",
Name: "ws_reconnects",
Name: "ws_reconnects_total",
Help: "counts every time the websocket connection has to reconnect",
})
websocketPings = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki_canary",
Name: "ws_pings_total",
Help: "counts every time the websocket receives a ping message",
})
userAgent = fmt.Sprintf("loki-canary/%s", build.Version)
)

Expand All @@ -50,6 +56,7 @@ type Reader struct {
sValue string
lName string
lVal string
interval time.Duration
conn *websocket.Conn
w io.Writer
recv chan time.Time
Expand All @@ -68,7 +75,8 @@ func NewReader(writer io.Writer,
labelName string,
labelVal string,
streamName string,
streamValue string) *Reader {
streamValue string,
interval time.Duration) *Reader {
h := http.Header{}
if user != "" {
h = http.Header{"Authorization": {"Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))}}
Expand All @@ -85,6 +93,7 @@ func NewReader(writer io.Writer,
sValue: streamValue,
lName: labelName,
lVal: labelVal,
interval: interval,
w: writer,
recv: receivedChan,
quit: make(chan struct{}),
Expand Down Expand Up @@ -251,21 +260,34 @@ func (r *Reader) run() {
r.closeAndReconnect()

tailResponse := &loghttp.TailResponse{}
lastMessage := time.Now()

for {
if r.shuttingDown {
if r.conn != nil {
_ = r.conn.Close()
r.conn = nil
}
close(r.done)
return
}

// Set a read timeout of 10x the interval we expect to see messages
// Ignore the error as it will get caught when we call ReadJSON
_ = r.conn.SetReadDeadline(time.Now().Add(10 * r.interval))
err := r.conn.ReadJSON(tailResponse)
if err != nil {
if r.shuttingDown {
close(r.done)
return
}
fmt.Fprintf(r.w, "error reading websocket, will retry in 10 seconds: %s\n", err)
// Even though we sleep between connection retries, we found it's possible to DOS Loki if the connection
// succeeds but some other error is returned, so also sleep here before retrying.
<-time.After(10 * time.Second)
r.closeAndReconnect()
continue
}
// If there were streams update last message timestamp
if len(tailResponse.Streams) > 0 {
lastMessage = time.Now()
}
for _, stream := range tailResponse.Streams {
for _, entry := range stream.Entries {
ts, err := parseResponse(&entry)
Expand All @@ -276,10 +298,22 @@ func (r *Reader) run() {
r.recv <- *ts
}
}
// Ping messages can reset the read deadline so also make sure we are receiving regular messages.
// We use the same 10x interval to make sure we are getting recent messages
if time.Since(lastMessage).Milliseconds() > 10*r.interval.Milliseconds() {
fmt.Fprintf(r.w, "Have not received a canary message from loki on the websocket in %vms, "+
"sleeping 10s and reconnecting\n", 10*r.interval.Milliseconds())
<-time.After(10 * time.Second)
r.closeAndReconnect()
continue
}
}
}

func (r *Reader) closeAndReconnect() {
if r.shuttingDown {
return
}
if r.conn != nil {
_ = r.conn.Close()
r.conn = nil
Expand Down Expand Up @@ -307,6 +341,17 @@ func (r *Reader) closeAndReconnect() {
<-time.After(10 * time.Second)
continue
}
// Use a custom ping handler so we can increment a metric, this is copied from the default ping handler
c.SetPingHandler(func(message string) error {
websocketPings.Inc()
err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
if err == websocket.ErrCloseSent {
return nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
return nil
}
return err
})
r.conn = c
}
}
Expand Down

0 comments on commit 2b145da

Please sign in to comment.