diff --git a/cmd/loki-canary/main.go b/cmd/loki-canary/main.go index cf9d67c1d1a2..0937e93867f4 100644 --- a/cmd/loki-canary/main.go +++ b/cmd/loki-canary/main.go @@ -82,7 +82,7 @@ func main() { defer c.lock.Unlock() c.writer = writer.NewWriter(os.Stdout, sentChan, *interval, *size) - c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *queryTimeout, *lName, *lVal, *sName, *sValue) + c.reader = reader.NewReader(os.Stderr, receivedChan, *tls, *addr, *user, *pass, *queryTimeout, *lName, *lVal, *sName, *sValue, *interval) c.comparator = comparator.NewComparator(os.Stderr, *wait, *pruneInterval, *spotCheckInterval, *spotCheckMax, *spotCheckQueryRate, *metricTestInterval, *metricTestQueryRange, *interval, *buckets, sentChan, receivedChan, c.reader, true) } diff --git a/docs/operations/upgrade.md b/docs/operations/upgrade.md index 2e4ee23efa70..aa49b8cddaf7 100644 --- a/docs/operations/upgrade.md +++ b/docs/operations/upgrade.md @@ -14,6 +14,15 @@ provided. S3 config now supports exapnded config. Example can be found here [s3_expanded_config](../configuration/examples.md#s3-expanded-config) +### New Ingester GRPC API special rollout procedure in microservices mode + +A new ingester GRPC API has been added allowing to speed up metric queries, to ensure a rollout without query errors make sure you upgrade all ingesters first. +Once this is done you can then proceed with the rest of the deployment, this is to ensure that queriers won't look for an API not yet available. + +If you roll out everything at once, queriers with this new code will attempt to query ingesters which may not have the new method on the API and queries will fail. + +This will only affect reads(queries) and not writes and only for the duration of the rollout. + ### Breaking CLI flags changes ```diff @@ -30,11 +39,19 @@ S3 config now supports exapnded config. Example can be found here [s3_expanded_c + ingester.concurrent-flushes ``` -## 1.6.0 +### Loki Canary metric name changes -A new ingester GRPC API has been added allowing to speed up metric queries, to ensure a rollout without query errors make sure you upgrade all ingesters first. -Once this is done you can then proceed with the rest of the deployment, this is to ensure that queriers won't look for an API not yet available. +When adding some new features to the canary we realized the existing metrics were not compliant with standards for counter names, the following metrics have been renamed: +```nohighlight +loki_canary_total_entries -> loki_canary_entries_total +loki_canary_out_of_order_entries -> loki_canary_out_of_order_entries_total +loki_canary_websocket_missing_entries -> loki_canary_websocket_missing_entries_total +loki_canary_missing_entries -> loki_canary_missing_entries_total +loki_canary_unexpected_entries -> loki_canary_unexpected_entries_total +loki_canary_duplicate_entries -> loki_canary_duplicate_entries_total +loki_canary_ws_reconnects -> loki_canary_ws_reconnects_total +``` ## 1.5.0 diff --git a/pkg/canary/comparator/comparator.go b/pkg/canary/comparator/comparator.go index 6f6e93d5e2b0..504a0eebb417 100644 --- a/pkg/canary/comparator/comparator.go +++ b/pkg/canary/comparator/comparator.go @@ -64,16 +64,22 @@ var ( Name: "duplicate_entries_total", Help: "counts a log entry received more than one time", }) - metricTestDeviation = promauto.NewGauge(prometheus.GaugeOpts{ + metricTestExpected = promauto.NewGauge(prometheus.GaugeOpts{ Namespace: "loki_canary", - Name: "metric_test_deviation", - Help: "How many counts was the actual query result from the expected based on the canary log write rate", + Name: "metric_test_expected", + Help: "How many counts were expected by the metric test query", + }) + metricTestActual = promauto.NewGauge(prometheus.GaugeOpts{ + Namespace: "loki_canary", + Name: "metric_test_actual", + Help: "How many counts were actually recevied by the metric test query", }) responseLatency prometheus.Histogram ) type Comparator struct { entMtx sync.Mutex + spotEntMtx sync.Mutex spotMtx sync.Mutex metTestMtx sync.Mutex pruneMtx sync.Mutex @@ -161,15 +167,16 @@ func (c *Comparator) Stop() { func (c *Comparator) entrySent(time time.Time) { c.entMtx.Lock() - defer c.entMtx.Unlock() c.entries = append(c.entries, &time) + totalEntries.Inc() + c.entMtx.Unlock() //If this entry equals or exceeds the spot check interval from the last entry in the spot check array, add it. + c.spotEntMtx.Lock() if len(c.spotCheck) == 0 || time.Sub(*c.spotCheck[len(c.spotCheck)-1]) >= c.spotCheckInterval { - c.spotMtx.Lock() c.spotCheck = append(c.spotCheck, &time) - c.spotMtx.Unlock() } - totalEntries.Inc() + c.spotEntMtx.Unlock() + } // entryReceived removes the received entry from the buffer if it exists, reports on out of order entries received @@ -296,14 +303,8 @@ func (c *Comparator) metricTest(currTime time.Time) { return } expectedCount := float64(adjustedRange.Milliseconds()) / float64(c.writeInterval.Milliseconds()) - deviation := expectedCount - actualCount - // There is nothing special about the number 10 here, it's fairly common for the deviation to be 2-4 - // based on how expected is calculated vs the actual query data, more than 10 would be unlikely - // unless there is a problem. - if deviation > 10 { - fmt.Fprintf(c.w, "large metric deviation: expected %v, actual %v\n", expectedCount, actualCount) - } - metricTestDeviation.Set(deviation) + metricTestExpected.Set(expectedCount) + metricTestActual.Set(actualCount) } func (c *Comparator) spotCheckEntries(currTime time.Time) { @@ -313,7 +314,7 @@ func (c *Comparator) spotCheckEntries(currTime time.Time) { c.spotCheckRunning = false c.spotMtx.Unlock() }() - c.spotMtx.Lock() + c.spotEntMtx.Lock() k := 0 for i, e := range c.spotCheck { if e.Before(currTime.Add(-c.spotCheckMax)) { @@ -333,7 +334,7 @@ func (c *Comparator) spotCheckEntries(currTime time.Time) { cpy := make([]*time.Time, len(c.spotCheck)) //Make a copy so we don't have to hold the lock to verify entries copy(cpy, c.spotCheck) - c.spotMtx.Unlock() + c.spotEntMtx.Unlock() for _, sce := range cpy { spotCheckEntries.Inc() diff --git a/pkg/canary/comparator/comparator_test.go b/pkg/canary/comparator/comparator_test.go index 5ed9beb65423..8db08b4b310c 100644 --- a/pkg/canary/comparator/comparator_test.go +++ b/pkg/canary/comparator/comparator_test.go @@ -307,7 +307,8 @@ func TestSpotCheck(t *testing.T) { } func TestMetricTest(t *testing.T) { - metricTestDeviation = &mockGauge{} + metricTestActual = &mockGauge{} + metricTestExpected = &mockGauge{} actual := &bytes.Buffer{} @@ -327,7 +328,8 @@ func TestMetricTest(t *testing.T) { // We want to look back 30s but have only been running from time 10s to time 20s so the query range should be adjusted to 10s assert.Equal(t, "10s", mr.queryRange) // Should be no deviation we set countOverTime to the runtime/writeinterval which should be what metrictTest expected - assert.Equal(t, float64(0), metricTestDeviation.(*mockGauge).val) + assert.Equal(t, float64(20), metricTestExpected.(*mockGauge).val) + assert.Equal(t, float64(20), metricTestActual.(*mockGauge).val) // Run test at time 30s which is 20s after start mr.countOverTime = float64((20 * time.Second).Milliseconds()) / float64(writeInterval.Milliseconds()) @@ -335,7 +337,8 @@ func TestMetricTest(t *testing.T) { // We want to look back 30s but have only been running from time 10s to time 20s so the query range should be adjusted to 10s assert.Equal(t, "20s", mr.queryRange) // Gauge should be equal to the countOverTime value - assert.Equal(t, float64(0), metricTestDeviation.(*mockGauge).val) + assert.Equal(t, float64(40), metricTestExpected.(*mockGauge).val) + assert.Equal(t, float64(40), metricTestActual.(*mockGauge).val) // Run test 60s after start, we should now be capping the query range to 30s and expecting only 30s of counts mr.countOverTime = float64((30 * time.Second).Milliseconds()) / float64(writeInterval.Milliseconds()) @@ -343,7 +346,8 @@ func TestMetricTest(t *testing.T) { // We want to look back 30s but have only been running from time 10s to time 20s so the query range should be adjusted to 10s assert.Equal(t, "30s", mr.queryRange) // Gauge should be equal to the countOverTime value - assert.Equal(t, float64(0), metricTestDeviation.(*mockGauge).val) + assert.Equal(t, float64(60), metricTestExpected.(*mockGauge).val) + assert.Equal(t, float64(60), metricTestActual.(*mockGauge).val) prometheus.Unregister(responseLatency) } diff --git a/pkg/canary/reader/reader.go b/pkg/canary/reader/reader.go index 9ee6de6dd32f..7a40e82601ce 100644 --- a/pkg/canary/reader/reader.go +++ b/pkg/canary/reader/reader.go @@ -7,6 +7,7 @@ import ( "io" "io/ioutil" "log" + "net" "net/http" "net/url" "strconv" @@ -28,9 +29,14 @@ import ( var ( reconnects = promauto.NewCounter(prometheus.CounterOpts{ Namespace: "loki_canary", - Name: "ws_reconnects", + Name: "ws_reconnects_total", Help: "counts every time the websocket connection has to reconnect", }) + websocketPings = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: "loki_canary", + Name: "ws_pings_total", + Help: "counts every time the websocket receives a ping message", + }) userAgent = fmt.Sprintf("loki-canary/%s", build.Version) ) @@ -50,6 +56,7 @@ type Reader struct { sValue string lName string lVal string + interval time.Duration conn *websocket.Conn w io.Writer recv chan time.Time @@ -68,7 +75,8 @@ func NewReader(writer io.Writer, labelName string, labelVal string, streamName string, - streamValue string) *Reader { + streamValue string, + interval time.Duration) *Reader { h := http.Header{} if user != "" { h = http.Header{"Authorization": {"Basic " + base64.StdEncoding.EncodeToString([]byte(user+":"+pass))}} @@ -85,6 +93,7 @@ func NewReader(writer io.Writer, sValue: streamValue, lName: labelName, lVal: labelVal, + interval: interval, w: writer, recv: receivedChan, quit: make(chan struct{}), @@ -251,14 +260,23 @@ func (r *Reader) run() { r.closeAndReconnect() tailResponse := &loghttp.TailResponse{} + lastMessage := time.Now() for { + if r.shuttingDown { + if r.conn != nil { + _ = r.conn.Close() + r.conn = nil + } + close(r.done) + return + } + + // Set a read timeout of 10x the interval we expect to see messages + // Ignore the error as it will get caught when we call ReadJSON + _ = r.conn.SetReadDeadline(time.Now().Add(10 * r.interval)) err := r.conn.ReadJSON(tailResponse) if err != nil { - if r.shuttingDown { - close(r.done) - return - } fmt.Fprintf(r.w, "error reading websocket, will retry in 10 seconds: %s\n", err) // Even though we sleep between connection retries, we found it's possible to DOS Loki if the connection // succeeds but some other error is returned, so also sleep here before retrying. @@ -266,6 +284,10 @@ func (r *Reader) run() { r.closeAndReconnect() continue } + // If there were streams update last message timestamp + if len(tailResponse.Streams) > 0 { + lastMessage = time.Now() + } for _, stream := range tailResponse.Streams { for _, entry := range stream.Entries { ts, err := parseResponse(&entry) @@ -276,10 +298,22 @@ func (r *Reader) run() { r.recv <- *ts } } + // Ping messages can reset the read deadline so also make sure we are receiving regular messages. + // We use the same 10x interval to make sure we are getting recent messages + if time.Since(lastMessage).Milliseconds() > 10*r.interval.Milliseconds() { + fmt.Fprintf(r.w, "Have not received a canary message from loki on the websocket in %vms, "+ + "sleeping 10s and reconnecting\n", 10*r.interval.Milliseconds()) + <-time.After(10 * time.Second) + r.closeAndReconnect() + continue + } } } func (r *Reader) closeAndReconnect() { + if r.shuttingDown { + return + } if r.conn != nil { _ = r.conn.Close() r.conn = nil @@ -307,6 +341,17 @@ func (r *Reader) closeAndReconnect() { <-time.After(10 * time.Second) continue } + // Use a custom ping handler so we can increment a metric, this is copied from the default ping handler + c.SetPingHandler(func(message string) error { + websocketPings.Inc() + err := c.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Second)) + if err == websocket.ErrCloseSent { + return nil + } else if e, ok := err.(net.Error); ok && e.Temporary() { + return nil + } + return err + }) r.conn = c } }