Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some additional logging to the canary on queries #1137

Merged
merged 3 commits into from
Oct 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions pkg/canary/comparator/comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ import (
)

const (
ErrOutOfOrderEntry = "out of order entry %s was received before entries: %v\n"
ErrEntryNotReceivedWs = "websocket failed to receive entry %v within %f seconds\n"
ErrEntryNotReceived = "failed to receive entry %v within %f seconds\n"
ErrDuplicateEntry = "received a duplicate entry for ts %v\n"
ErrUnexpectedEntry = "received an unexpected entry with ts %v\n"
ErrOutOfOrderEntry = "out of order entry %s was received before entries: %v\n"
ErrEntryNotReceivedWs = "websocket failed to receive entry %v within %f seconds\n"
ErrEntryNotReceived = "failed to receive entry %v within %f seconds\n"
ErrDuplicateEntry = "received a duplicate entry for ts %v\n"
ErrUnexpectedEntry = "received an unexpected entry with ts %v\n"
DebugWebsocketMissingEntry = "websocket missing entry: %v\n"
DebugQueryResult = "confirmation query result: %v\n"
)

var (
Expand Down Expand Up @@ -125,7 +127,7 @@ func (c *Comparator) entryReceived(ts time.Time) {
// If this isn't the first item in the list we received it out of order
if i != 0 {
outOfOrderEntries.Inc()
_, _ = fmt.Fprintf(c.w, ErrOutOfOrderEntry, e, c.entries[:i])
fmt.Fprintf(c.w, ErrOutOfOrderEntry, e, c.entries[:i])
}
responseLatency.Observe(time.Since(ts).Seconds())
// Put this element in the acknowledged entries list so we can use it to check for duplicates
Expand All @@ -145,12 +147,12 @@ func (c *Comparator) entryReceived(ts time.Time) {
if ts.Equal(*e) {
duplicate = true
duplicateEntries.Inc()
_, _ = fmt.Fprintf(c.w, ErrDuplicateEntry, ts.UnixNano())
fmt.Fprintf(c.w, ErrDuplicateEntry, ts.UnixNano())
break
}
}
if !duplicate {
_, _ = fmt.Fprintf(c.w, ErrUnexpectedEntry, ts.UnixNano())
fmt.Fprintf(c.w, ErrUnexpectedEntry, ts.UnixNano())
unexpectedEntries.Inc()
}
}
Expand Down Expand Up @@ -199,7 +201,7 @@ func (c *Comparator) pruneEntries() {
if e.Before(time.Now().Add(-c.maxWait)) {
missing = append(missing, e)
wsMissingEntries.Inc()
_, _ = fmt.Fprintf(c.w, ErrEntryNotReceivedWs, e.UnixNano(), c.maxWait.Seconds())
fmt.Fprintf(c.w, ErrEntryNotReceivedWs, e.UnixNano(), c.maxWait.Seconds())
} else {
if i != k {
c.entries[k] = c.entries[i]
Expand Down Expand Up @@ -249,9 +251,18 @@ func (c *Comparator) confirmMissing(missing []*time.Time) {
end = end.Add(10 * time.Second)
recvd, err := c.rdr.Query(start, end)
if err != nil {
_, _ = fmt.Fprintf(c.w, "error querying loki: %s", err)
fmt.Fprintf(c.w, "error querying loki: %s\n", err)
return
}
// This is to help debug some missing log entries when queried,
// let's print exactly what we are missing and what Loki sent back
for _, r := range missing {
fmt.Fprintf(c.w, DebugWebsocketMissingEntry, r.UnixNano())
}
for _, r := range recvd {
fmt.Fprintf(c.w, DebugQueryResult, r.UnixNano())
}

k := 0
for i, m := range missing {
found := false
Expand All @@ -277,6 +288,6 @@ func (c *Comparator) confirmMissing(missing []*time.Time) {
missing = missing[:k]
for _, e := range missing {
missingEntries.Inc()
_, _ = fmt.Fprintf(c.w, ErrEntryNotReceived, e.UnixNano(), c.maxWait.Seconds())
fmt.Fprintf(c.w, ErrEntryNotReceived, e.UnixNano(), c.maxWait.Seconds())
}
}
12 changes: 11 additions & 1 deletion pkg/canary/comparator/comparator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,21 @@ func TestEntryNeverReceived(t *testing.T) {

c.pruneEntries()

expected := fmt.Sprintf(ErrOutOfOrderEntry+ErrOutOfOrderEntry+ErrEntryNotReceivedWs+ErrEntryNotReceivedWs+ErrEntryNotReceived,
expected := fmt.Sprintf(ErrOutOfOrderEntry+ErrOutOfOrderEntry+ // Out of order because we missed entries
ErrEntryNotReceivedWs+ErrEntryNotReceivedWs+ // Complain about missed entries
DebugWebsocketMissingEntry+DebugWebsocketMissingEntry+ // List entries we are missing
DebugQueryResult+DebugQueryResult+DebugQueryResult+DebugQueryResult+ // List entries we got back from Loki
ErrEntryNotReceived, // List entry not received from Loki
t3, []time.Time{t2},
t5, []time.Time{t2, t4},
t2.UnixNano(), maxWait.Seconds(),
t4.UnixNano(), maxWait.Seconds(),
t2.UnixNano(),
t4.UnixNano(),
t1.UnixNano(),
t3.UnixNano(),
t4.UnixNano(),
t5.UnixNano(),
t2.UnixNano(), maxWait.Seconds())

assert.Equal(t, expected, actual.String())
Expand Down
14 changes: 7 additions & 7 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func NewReader(writer io.Writer, receivedChan chan time.Time, tls bool,
go func() {
<-rd.quit
if rd.conn != nil {
_, _ = fmt.Fprintf(rd.w, "shutting down reader\n")
fmt.Fprintf(rd.w, "shutting down reader\n")
rd.shuttingDown = true
_ = rd.conn.Close()
}
Expand Down Expand Up @@ -107,7 +107,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
"&query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)) +
"&limit=1000",
}
_, _ = fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String())
fmt.Fprintf(r.w, "Querying loki for missing values with query: %v\n", u.String())

req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
Expand Down Expand Up @@ -142,7 +142,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
for _, entry := range stream.Entries {
ts, err := parseResponse(&entry)
if err != nil {
_, _ = fmt.Fprint(r.w, err)
fmt.Fprint(r.w, err)
continue
}
tss = append(tss, *ts)
Expand All @@ -166,15 +166,15 @@ func (r *Reader) run() {
close(r.done)
return
}
_, _ = fmt.Fprintf(r.w, "error reading websocket: %s\n", err)
fmt.Fprintf(r.w, "error reading websocket: %s\n", err)
r.closeAndReconnect()
continue
}
for _, stream := range tailResponse.Streams {
for _, entry := range stream.Entries {
ts, err := parseResponse(&entry)
if err != nil {
_, _ = fmt.Fprint(r.w, err)
fmt.Fprint(r.w, err)
continue
}
r.recv <- *ts
Expand Down Expand Up @@ -203,11 +203,11 @@ func (r *Reader) closeAndReconnect() {
RawQuery: "query=" + url.QueryEscape(fmt.Sprintf("{stream=\"stdout\",%v=\"%v\"}", r.lName, r.lVal)),
}

_, _ = fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal)
fmt.Fprintf(r.w, "Connecting to loki at %v, querying for label '%v' with value '%v'\n", u.String(), r.lName, r.lVal)

c, _, err := websocket.DefaultDialer.Dial(u.String(), r.header)
if err != nil {
_, _ = fmt.Fprintf(r.w, "failed to connect to %s with err %s\n", u.String(), err)
fmt.Fprintf(r.w, "failed to connect to %s with err %s\n", u.String(), err)
<-time.After(5 * time.Second)
continue
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/canary/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (w *Writer) run() {
w.prevTsLen = tsLen
}

_, _ = fmt.Fprintf(w.w, LogEntry, ts, w.pad)
fmt.Fprintf(w.w, LogEntry, ts, w.pad)
w.sent <- t
case <-w.quit:
return
Expand Down