-
Notifications
You must be signed in to change notification settings - Fork 1
/
reader.go
84 lines (68 loc) · 1.95 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package main
import (
"bufio"
"context"
"encoding/json"
"fmt"
"log"
"os"
)
// https://stackoverflow.com/a/49704981
// https://flaviocopes.com/go-shell-pipes/
func readStdin(ctx context.Context, config Config, logEntryCh chan<- *LogEntry) {
stat, err := os.Stdin.Stat()
if err != nil {
log.Fatal(err)
return
}
// Check that stdin is not a terminal, implying that we are reading from a pipe
if (stat.Mode() & os.ModeCharDevice) == 0 {
readAndParseStdin(ctx, config, logEntryCh)
} else {
log.Fatalf("Expected to find content from stdin piped from another command. Example usage: kubectl logs <pod> | plr")
}
}
func readAndParseStdin(ctx context.Context, config Config, logEntryCh chan<- *LogEntry) {
reader := bufio.NewReader(os.Stdin)
line, readErr := reader.ReadBytes('\n')
if readErr != nil {
log.Fatalf("failed to read from stdin: %v", readErr)
return
}
lineCount := 0
for readErr == nil {
lineCount++
logEntry := parseLogLine(line, lineCount, config)
sendToPrinter(ctx, logEntryCh, logEntry)
line, readErr = reader.ReadBytes('\n')
}
close(logEntryCh)
}
func parseLogLine(line []byte, lineCount int, config Config) *LogEntry {
logEntry := &LogEntry{
LineNumber: lineCount,
OriginalLogLine: line,
Fields: make(map[string]string),
}
parsedLogLine := make(map[string]interface{}, 0)
if err := json.Unmarshal(line, &parsedLogLine); err != nil {
logEntry.setOriginalLogLine(line)
} else {
logEntry.setFromJsonMap(parsedLogLine, *config.Keywords)
}
if isDebug() {
fmt.Printf("==== BEGIN DEBUG LINE %d ====\n", lineCount)
fmt.Printf("[RAW INPUT]: %q\n", string(line))
j, _ := json.MarshalIndent(logEntry, "", " ")
fmt.Printf("[PARSED LOG ENTRY]: %s\n", string(j))
fmt.Printf("==== END DEBUG LINE %d ====\n", lineCount)
}
return logEntry
}
func sendToPrinter(ctx context.Context, logEntryCh chan<- *LogEntry, logEntry *LogEntry) {
select {
case <-ctx.Done():
return
case logEntryCh <- logEntry:
}
}