From e120828e84ff543426b2843f81380651d5dbd095 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Wed, 9 Oct 2019 11:24:54 -0400 Subject: [PATCH 1/3] log what was missing from the websocket connection and what we received from a query to Loki to troubleshoot missing logs --- pkg/canary/comparator/comparator.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/pkg/canary/comparator/comparator.go b/pkg/canary/comparator/comparator.go index 4a668766f290..0ea6bf4f4fa3 100644 --- a/pkg/canary/comparator/comparator.go +++ b/pkg/canary/comparator/comparator.go @@ -249,9 +249,18 @@ func (c *Comparator) confirmMissing(missing []*time.Time) { end = end.Add(10 * time.Second) recvd, err := c.rdr.Query(start, end) if err != nil { - _, _ = fmt.Fprintf(c.w, "error querying loki: %s", err) + _, _ = fmt.Fprintf(c.w, "error querying loki: %s\n", err) return } + // This is to help debug some missing log entries when queried, + // let's print exactly what we are missing and what Loki sent back + for _, r := range missing { + _, _ = fmt.Fprintf(c.w, "Websocket missing entry: %v\n", r.UnixNano()) + } + for _, r := range recvd { + _, _ = fmt.Fprintf(c.w, "Confirmation query result: %v\n", r.UnixNano()) + } + k := 0 for i, m := range missing { found := false From 4d7a6d4e22a6d77f76d198951277f6d37910e788 Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Wed, 9 Oct 2019 11:53:12 -0400 Subject: [PATCH 2/3] removing unnecessary unused return variables from print statements --- pkg/canary/comparator/comparator.go | 16 ++++++++-------- pkg/canary/reader/reader.go | 14 +++++++------- pkg/canary/writer/writer.go | 2 +- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/canary/comparator/comparator.go b/pkg/canary/comparator/comparator.go index 0ea6bf4f4fa3..fdfd3f05e61e 100644 --- a/pkg/canary/comparator/comparator.go +++ b/pkg/canary/comparator/comparator.go @@ -125,7 +125,7 @@ func (c *Comparator) entryReceived(ts time.Time) { // If this isn't the first item in the list we received it out of order if i != 0 { outOfOrderEntries.Inc() - _, _ = fmt.Fprintf(c.w, ErrOutOfOrderEntry, e, c.entries[:i]) + fmt.Fprintf(c.w, ErrOutOfOrderEntry, e, c.entries[:i]) } responseLatency.Observe(time.Since(ts).Seconds()) // Put this element in the acknowledged entries list so we can use it to check for duplicates @@ -145,12 +145,12 @@ func (c *Comparator) entryReceived(ts time.Time) { if ts.Equal(*e) { duplicate = true duplicateEntries.Inc() - _, _ = fmt.Fprintf(c.w, ErrDuplicateEntry, ts.UnixNano()) + fmt.Fprintf(c.w, ErrDuplicateEntry, ts.UnixNano()) break } } if !duplicate { - _, _ = fmt.Fprintf(c.w, ErrUnexpectedEntry, ts.UnixNano()) + fmt.Fprintf(c.w, ErrUnexpectedEntry, ts.UnixNano()) unexpectedEntries.Inc() } } @@ -199,7 +199,7 @@ func (c *Comparator) pruneEntries() { if e.Before(time.Now().Add(-c.maxWait)) { missing = append(missing, e) wsMissingEntries.Inc() - _, _ = fmt.Fprintf(c.w, ErrEntryNotReceivedWs, e.UnixNano(), c.maxWait.Seconds()) + fmt.Fprintf(c.w, ErrEntryNotReceivedWs, e.UnixNano(), c.maxWait.Seconds()) } else { if i != k { c.entries[k] = c.entries[i] @@ -249,16 +249,16 @@ func (c *Comparator) confirmMissing(missing []*time.Time) { end = end.Add(10 * time.Second) recvd, err := c.rdr.Query(start, end) if err != nil { - _, _ = fmt.Fprintf(c.w, "error querying loki: %s\n", err) + fmt.Fprintf(c.w, "error querying loki: %s\n", err) return } // This is to help debug some missing log entries when queried, // let's print exactly what we are missing and what Loki sent back for _, r := range missing { - _, _ = fmt.Fprintf(c.w, "Websocket missing entry: %v\n", r.UnixNano()) + fmt.Fprintf(c.w, "Websocket missing entry: %v\n", r.UnixNano()) } for _, r := range recvd { - _, _ = fmt.Fprintf(c.w, "Confirmation query result: %v\n", r.UnixNano()) + fmt.Fprintf(c.w, "Confirmation query result: %v\n", r.UnixNano()) } k := 0 @@ -286,6 +286,6 @@ func (c *Comparator) confirmMissing(missing []*time.Time) { missing = missing[:k] for _, e := range missing { missingEntries.Inc() - _, _ = fmt.Fprintf(c.w, ErrEntryNotReceived, e.UnixNano(), c.maxWait.Seconds()) + fmt.Fprintf(c.w, ErrEntryNotReceived, e.UnixNano(), c.maxWait.Seconds()) } } diff --git a/pkg/canary/reader/reader.go b/pkg/canary/reader/reader.go index 1b1824c4281f..f139bb2d2d0d 100644 --- a/pkg/canary/reader/reader.go +++ b/pkg/canary/reader/reader.go @@ -77,7 +77,7 @@ func NewReader(writer io.Writer, receivedChan chan time.Time, tls bool, go func() { <-rd.quit if rd.conn != nil { - _, _ = fmt.Fprintf(rd.w, "shutting down reader\n") + fmt.Fprintf(rd.w, "shutting down reader\n") rd.shuttingDown = true _ = rd.conn.Close() } @@ -107,7 +107,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) { "&query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)) + "&limit=1000", } - _, _ = fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String()) + fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String()) req, err := http.NewRequest("GET", u.String(), nil) if err != nil { @@ -142,7 +142,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) { for _, entry := range stream.Entries { ts, err := parseResponse(&entry) if err != nil { - _, _ = fmt.Fprint(r.w, err) + fmt.Fprint(r.w, err) continue } tss = append(tss, *ts) @@ -166,7 +166,7 @@ func (r *Reader) run() { close(r.done) return } - _, _ = fmt.Fprintf(r.w, "error reading websocket: %s\n", err) + fmt.Fprintf(r.w, "error reading websocket: %s\n", err) r.closeAndReconnect() continue } @@ -174,7 +174,7 @@ func (r *Reader) run() { for _, entry := range stream.Entries { ts, err := parseResponse(&entry) if err != nil { - _, _ = fmt.Fprint(r.w, err) + fmt.Fprint(r.w, err) continue } r.recv <- *ts @@ -203,11 +203,11 @@ func (r *Reader) closeAndReconnect() { RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)), } - _, _ = fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal) + fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal) c, _, err := websocket.DefaultDialer.Dial(u.String(), r.header) if err != nil { - _, _ = fmt.Fprintf(r.w, "failed to connect to %s with err %s\n", u.String(), err) + fmt.Fprintf(r.w, "failed to connect to %s with err %s\n", u.String(), err) <-time.After(5 * time.Second) continue } diff --git a/pkg/canary/writer/writer.go b/pkg/canary/writer/writer.go index 8d9b163a808b..2bf918b03f69 100644 --- a/pkg/canary/writer/writer.go +++ b/pkg/canary/writer/writer.go @@ -72,7 +72,7 @@ func (w *Writer) run() { w.prevTsLen = tsLen } - _, _ = fmt.Fprintf(w.w, LogEntry, ts, w.pad) + fmt.Fprintf(w.w, LogEntry, ts, w.pad) w.sent <- t case <-w.quit: return From b0a4fac357f6552932a577e0da8172ef75003aae Mon Sep 17 00:00:00 2001 From: Edward Welch Date: Wed, 9 Oct 2019 12:26:24 -0400 Subject: [PATCH 3/3] update test to handle the new debug output --- pkg/canary/comparator/comparator.go | 16 +++++++++------- pkg/canary/comparator/comparator_test.go | 12 +++++++++++- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/pkg/canary/comparator/comparator.go b/pkg/canary/comparator/comparator.go index fdfd3f05e61e..3d118eb0c7b7 100644 --- a/pkg/canary/comparator/comparator.go +++ b/pkg/canary/comparator/comparator.go @@ -13,11 +13,13 @@ import ( ) const ( - ErrOutOfOrderEntry = "out of order entry %s was received before entries: %v\n" - ErrEntryNotReceivedWs = "websocket failed to receive entry %v within %f seconds\n" - ErrEntryNotReceived = "failed to receive entry %v within %f seconds\n" - ErrDuplicateEntry = "received a duplicate entry for ts %v\n" - ErrUnexpectedEntry = "received an unexpected entry with ts %v\n" + ErrOutOfOrderEntry = "out of order entry %s was received before entries: %v\n" + ErrEntryNotReceivedWs = "websocket failed to receive entry %v within %f seconds\n" + ErrEntryNotReceived = "failed to receive entry %v within %f seconds\n" + ErrDuplicateEntry = "received a duplicate entry for ts %v\n" + ErrUnexpectedEntry = "received an unexpected entry with ts %v\n" + DebugWebsocketMissingEntry = "websocket missing entry: %v\n" + DebugQueryResult = "confirmation query result: %v\n" ) var ( @@ -255,10 +257,10 @@ func (c *Comparator) confirmMissing(missing []*time.Time) { // This is to help debug some missing log entries when queried, // let's print exactly what we are missing and what Loki sent back for _, r := range missing { - fmt.Fprintf(c.w, "Websocket missing entry: %v\n", r.UnixNano()) + fmt.Fprintf(c.w, DebugWebsocketMissingEntry, r.UnixNano()) } for _, r := range recvd { - fmt.Fprintf(c.w, "Confirmation query result: %v\n", r.UnixNano()) + fmt.Fprintf(c.w, DebugQueryResult, r.UnixNano()) } k := 0 diff --git a/pkg/canary/comparator/comparator_test.go b/pkg/canary/comparator/comparator_test.go index 80d9c7d31fad..0af065664410 100644 --- a/pkg/canary/comparator/comparator_test.go +++ b/pkg/canary/comparator/comparator_test.go @@ -179,11 +179,21 @@ func TestEntryNeverReceived(t *testing.T) { c.pruneEntries() - expected := fmt.Sprintf(ErrOutOfOrderEntry+ErrOutOfOrderEntry+ErrEntryNotReceivedWs+ErrEntryNotReceivedWs+ErrEntryNotReceived, + expected := fmt.Sprintf(ErrOutOfOrderEntry+ErrOutOfOrderEntry+ // Out of order because we missed entries + ErrEntryNotReceivedWs+ErrEntryNotReceivedWs+ // Complain about missed entries + DebugWebsocketMissingEntry+DebugWebsocketMissingEntry+ // List entries we are missing + DebugQueryResult+DebugQueryResult+DebugQueryResult+DebugQueryResult+ // List entries we got back from Loki + ErrEntryNotReceived, // List entry not received from Loki t3, []time.Time{t2}, t5, []time.Time{t2, t4}, t2.UnixNano(), maxWait.Seconds(), t4.UnixNano(), maxWait.Seconds(), + t2.UnixNano(), + t4.UnixNano(), + t1.UnixNano(), + t3.UnixNano(), + t4.UnixNano(), + t5.UnixNano(), t2.UnixNano(), maxWait.Seconds()) assert.Equal(t, expected, actual.String())