Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Canary tweaks #2350

Merged
merged 7 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/loki-canary/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func main() {
defer c.lock.Unlock()

c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *queryTimeout, *lName, *lVal, *sName, *sValue)
c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval)
c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true)
}

Expand Down
14 changes: 14 additions & 0 deletions docs/operations/upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,20 @@ On this page we will document any upgrade issues/gotchas/considerations we are a
A new ingester GRPC API has been added allowing to speed up metric queries, to ensure a rollout without query errors make sure you upgrade all ingesters first.
Once this is done you can then proceed with the rest of the deployment, this is to ensure that queriers won't look for an API not yet available.

### Loki Canary metric name changes

When adding some new features to the canary we realized the existing metrics were not compliant with standards for counter names, the following metrics have been renamed:

```nohighlight
loki_canary_total_entries -> loki_canary_entries_total
loki_canary_out_of_order_entries -> loki_canary_out_of_order_entries_total
loki_canary_websocket_missing_entries -> loki_canary_websocket_missing_entries_total
loki_canary_missing_entries -> loki_canary_missing_entries_total
loki_canary_unexpected_entries -> loki_canary_unexpected_entries_total
loki_canary_duplicate_entries -> loki_canary_duplicate_entries_total
loki_canary_ws_reconnects -> loki_canary_ws_reconnects_total
```

## 1.5.0

Note: The required upgrade path outlined for version 1.4.0 below is still true for moving to 1.5.0 from any release older than 1.4.0 (e.g. 1.3.0->1.5.0 needs to also look at the 1.4.0 upgrade requirements).
Expand Down
35 changes: 18 additions & 17 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,22 @@ var (
Name: "duplicate_entries_total",
Help: "counts a log entry received more than one time",
})
metricTestDeviation = promauto.NewGauge(prometheus.GaugeOpts{
metricTestExpected = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "loki_canary",
Name: "metric_test_deviation",
Help: "How many counts was the actual query result from the expected based on the canary log write rate",
Name: "metric_test_expected",
Help: "How many counts were expected by the metric test query",
})
metricTestActual = promauto.NewGauge(prometheus.GaugeOpts{
Namespace: "loki_canary",
Name: "metric_test_actual",
Help: "How many counts were actually recevied by the metric test query",
})
responseLatency prometheus.Histogram
)

type Comparator struct {
entMtx sync.Mutex
spotEntMtx sync.Mutex
spotMtx sync.Mutex
metTestMtx sync.Mutex
pruneMtx sync.Mutex
Expand Down Expand Up @@ -161,15 +167,16 @@ func (c *Comparator) Stop() {

func (c *Comparator) entrySent(time time.Time) {
c.entMtx.Lock()
defer c.entMtx.Unlock()
c.entries = append(c.entries, &time)
totalEntries.Inc()
c.entMtx.Unlock()
//If this entry equals or exceeds the spot check interval from the last entry in the spot check array, add it.
c.spotEntMtx.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very confused on the double lock/unlock instead of the single lock for this function, but I don't see it being a problem either.

if len(c.spotCheck) == 0 || time.Sub(*c.spotCheck[len(c.spotCheck)-1]) >= c.spotCheckInterval {
c.spotMtx.Lock()
c.spotCheck = append(c.spotCheck, &time)
c.spotMtx.Unlock()
}
totalEntries.Inc()
c.spotEntMtx.Unlock()

}

// entryReceived removes the received entry from the buffer if it exists, reports on out of order entries received
Expand Down Expand Up @@ -296,14 +303,8 @@ func (c *Comparator) metricTest(currTime time.Time) {
return
}
expectedCount := float64(adjustedRange.Milliseconds()) / float64(c.writeInterval.Milliseconds())
deviation := expectedCount - actualCount
// There is nothing special about the number 10 here, it's fairly common for the deviation to be 2-4
// based on how expected is calculated vs the actual query data, more than 10 would be unlikely
// unless there is a problem.
if deviation > 10 {
fmt.Fprintf(c.w, "large metric deviation: expected %v, actual %v\n", expectedCount, actualCount)
}
metricTestDeviation.Set(deviation)
metricTestExpected.Set(expectedCount)
metricTestActual.Set(actualCount)
}

func (c *Comparator) spotCheckEntries(currTime time.Time) {
Expand All @@ -313,7 +314,7 @@ func (c *Comparator) spotCheckEntries(currTime time.Time) {
c.spotCheckRunning = false
c.spotMtx.Unlock()
}()
c.spotMtx.Lock()
c.spotEntMtx.Lock()
k := 0
for i, e := range c.spotCheck {
if e.Before(currTime.Add(-c.spotCheckMax)) {
Expand All @@ -333,7 +334,7 @@ func (c *Comparator) spotCheckEntries(currTime time.Time) {
cpy := make([]*time.Time, len(c.spotCheck))
//Make a copy so we don't have to hold the lock to verify entries
copy(cpy, c.spotCheck)
c.spotMtx.Unlock()
c.spotEntMtx.Unlock()

for _, sce := range cpy {
spotCheckEntries.Inc()
Expand Down
12 changes: 8 additions & 4 deletions pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,8 @@ func TestSpotCheck(t *testing.T) {
}

func TestMetricTest(t *testing.T) {
metricTestDeviation = &mockGauge{}
metricTestActual = &mockGauge{}
metricTestExpected = &mockGauge{}

actual := &bytes.Buffer{}

Expand All @@ -327,23 +328,26 @@ func TestMetricTest(t *testing.T) {
// We want to look back 30s but have only been running from time 10s to time 20s so the query range should be adjusted to 10s
assert.Equal(t, "10s", mr.queryRange)
// Should be no deviation we set countOverTime to the runtime/writeinterval which should be what metrictTest expected
assert.Equal(t, float64(0), metricTestDeviation.(*mockGauge).val)
assert.Equal(t, float64(20), metricTestExpected.(*mockGauge).val)
assert.Equal(t, float64(20), metricTestActual.(*mockGauge).val)

// Run test at time 30s which is 20s after start
mr.countOverTime = float64((20 * time.Second).Milliseconds()) / float64(writeInterval.Milliseconds())
c.metricTest(time.Unix(0, 30*time.Second.Nanoseconds()))
// We want to look back 30s but have only been running from time 10s to time 20s so the query range should be adjusted to 10s
assert.Equal(t, "20s", mr.queryRange)
// Gauge should be equal to the countOverTime value
assert.Equal(t, float64(0), metricTestDeviation.(*mockGauge).val)
assert.Equal(t, float64(40), metricTestExpected.(*mockGauge).val)
assert.Equal(t, float64(40), metricTestActual.(*mockGauge).val)

// Run test 60s after start, we should now be capping the query range to 30s and expecting only 30s of counts
mr.countOverTime = float64((30 * time.Second).Milliseconds()) / float64(writeInterval.Milliseconds())
c.metricTest(time.Unix(0, 60*time.Second.Nanoseconds()))
// We want to look back 30s but have only been running from time 10s to time 20s so the query range should be adjusted to 10s
assert.Equal(t, "30s", mr.queryRange)
// Gauge should be equal to the countOverTime value
assert.Equal(t, float64(0), metricTestDeviation.(*mockGauge).val)
assert.Equal(t, float64(60), metricTestExpected.(*mockGauge).val)
assert.Equal(t, float64(60), metricTestActual.(*mockGauge).val)

prometheus.Unregister(responseLatency)
}
Expand Down
57 changes: 51 additions & 6 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"strconv"
Expand All @@ -28,9 +29,14 @@ import (
var (
reconnects = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki_canary",
Name: "ws_reconnects",
Name: "ws_reconnects_total",
Help: "counts every time the websocket connection has to reconnect",
})
websocketPings = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "loki_canary",
Name: "ws_pings_total",
Help: "counts every time the websocket receives a ping message",
})
userAgent = fmt.Sprintf("loki-canary/%s", build.Version)
)

Expand All @@ -50,6 +56,7 @@ type Reader struct {
sValue string
lName string
lVal string
interval time.Duration
conn *websocket.Conn
w io.Writer
recv chan time.Time
Expand All @@ -68,7 +75,8 @@ func NewReader(writer io.Writer,
labelName string,
labelVal string,
streamName string,
streamValue string) *Reader {
streamValue string,
interval time.Duration) *Reader {
h := http.Header{}
if user != "" {
h = http.Header{"Authorization": {"Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))}}
Expand All @@ -85,6 +93,7 @@ func NewReader(writer io.Writer,
sValue: streamValue,
lName: labelName,
lVal: labelVal,
interval: interval,
w: writer,
recv: receivedChan,
quit: make(chan struct{}),
Expand Down Expand Up @@ -251,21 +260,34 @@ func (r *Reader) run() {
r.closeAndReconnect()

tailResponse := &loghttp.TailResponse{}
lastMessage := time.Now()

for {
if r.shuttingDown {
if r.conn != nil {
_ = r.conn.Close()
r.conn = nil
}
close(r.done)
return
}

// Set a read timeout of 10x the interval we expect to see messages
// Ignore the error as it will get caught when we call ReadJSON
_ = r.conn.SetReadDeadline(time.Now().Add(10 * r.interval))
err := r.conn.ReadJSON(tailResponse)
if err != nil {
if r.shuttingDown {
close(r.done)
return
}
fmt.Fprintf(r.w, "error reading websocket, will retry in 10 seconds: %s\n", err)
// Even though we sleep between connection retries, we found it's possible to DOS Loki if the connection
// succeeds but some other error is returned, so also sleep here before retrying.
<-time.After(10 * time.Second)
r.closeAndReconnect()
continue
}
// If there were streams update last message timestamp
if len(tailResponse.Streams) > 0 {
lastMessage = time.Now()
}
for _, stream := range tailResponse.Streams {
for _, entry := range stream.Entries {
ts, err := parseResponse(&entry)
Expand All @@ -276,10 +298,22 @@ func (r *Reader) run() {
r.recv <- *ts
}
}
// Ping messages can reset the read deadline so also make sure we are receiving regular messages.
// We use the same 10x interval to make sure we are getting recent messages
if time.Since(lastMessage).Milliseconds() > 10*r.interval.Milliseconds() {
fmt.Fprintf(r.w, "Have not received a canary message from loki on the websocket in %vms, "+
"sleeping 10s and reconnecting\n", 10*r.interval.Milliseconds())
<-time.After(10 * time.Second)
r.closeAndReconnect()
continue
}
}
}

func (r *Reader) closeAndReconnect() {
if r.shuttingDown {
return
}
if r.conn != nil {
_ = r.conn.Close()
r.conn = nil
Expand Down Expand Up @@ -307,6 +341,17 @@ func (r *Reader) closeAndReconnect() {
<-time.After(10 * time.Second)
continue
}
// Use a custom ping handler so we can increment a metric, this is copied from the default ping handler
c.SetPingHandler(func(message string) error {
websocketPings.Inc()
err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second))
if err == websocket.ErrCloseSent {
return nil
} else if e, ok := err.(net.Error); ok && e.Temporary() {
return nil
}
return err
})
r.conn = c
}
}
Expand Down