Skip to content

Commit

Permalink
A few updates
Browse files Browse the repository at this point in the history
- Prevent panics from killing the process, recover them and print line as-is.
- Started the work on being more streaming aware and deals better with duplicated keys in the JSON line.
- Improved tests and benchmark a bit.
  • Loading branch information
Matthieu Vachon committed Dec 19, 2020
1 parent 4391465 commit 9d64744
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 22 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/develop.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
jobs:
build:
runs-on: ubuntu-16.04
strategy:
matrix:
go: [ 1.15', '1.14' ]
name: Go ${{ matrix.go }} sample
steps:
- uses: actions/checkout@v2

- name: Setup go
uses: actions/setup-go@v1
with:
go-version: ${{ matrix.go }}

- name: Tests
run: go test ./...
38 changes: 33 additions & 5 deletions benchmarks_test.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,51 @@
package main

import (
"bufio"
"bytes"
"io"
"io/ioutil"
"log"
"os"
"strings"
"testing"
)

func BenchmarkZapdriver(b *testing.B) {
debugBackup := debug
debug = log.New(ioutil.Discard, "", 0)
defer func(debugBackup *log.Logger) { debug = debugBackup }(debugBackup)
processor, byteCount, reset, cleanup := preprareBenchmark(ioutil.Discard)
defer cleanup()

lines := benchmarkZapdriverLines()
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(byteCount)

for n := 0; n < b.N; n++ {
executeProcessorTest(lines)
processor.process()
reset()
}
}

func TestBenchmarkCode(t *testing.T) {
processor, _, reset, cleanup := preprareBenchmark(os.Stdout)
defer cleanup()

// This test can be run in verbose mode to ensure the actual benchmark code works as expected
processor.process()
reset()
processor.process()
reset()
}

func preprareBenchmark(output io.Writer) (proc *processor, byteCount int64, reset func(), cleanup func()) {
debugBackup := debug
debug = log.New(ioutil.Discard, "", 0)

reader := bytes.NewReader([]byte(strings.Join(benchmarkZapdriverLines(), "\n")))
proc = &processor{scanner: bufio.NewScanner(reader), output: output}

return proc, reader.Size(), func() { reader.Seek(0, io.SeekStart) }, func() { debug = debugBackup }
}

func benchmarkZapdriverLines() []string {
return []string{
`{"severity":"DEBUG","time":"2018-12-24T06:38:03.75065493Z","caller":"first/second.go:87","message":"pre-processing message","message_id":"0202ae78b2dfee57a28ee22d83fda3e70d3913d343ea5c62c27cb03721ddc3b7","message_num":33730168,"labels":{},"logging.googleapis.com/sourceLocation":{"file":"/root/go/src/github.com/acme/project/first/second.go","line":"87","function":"main.(*MessagePipeline).PreprocessMessage"}}`,
Expand Down
70 changes: 53 additions & 17 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,55 @@ func (p *processor) process() {
}

func (p *processor) processLine(line string) {
defer func() {
if err := recover(); err != nil {
p.unformattedPrintLine(line, "Panic occurred while processing line '%s', ending processing (%s)", line, err)
}
}()

debugPrintln("Processing line: %s", line)
if !p.mightBeJSON(line) {
debugPrintln("Does not look like a JSON line, ending processing")
fmt.Fprint(p.output, line)
reader := bytes.NewReader([]byte(line))
decoder := json.NewDecoder(reader)

token, err := decoder.Token()
if err != nil {
p.unformattedPrintLine(line, "Does not look like a JSON line, ending processing (%s)", err)
return
}

var lineData map[string]interface{}
err := json.Unmarshal([]byte(line), &lineData)
if err != nil {
debugPrintln("unable to unmarshal line as JSON: %s", err)
fmt.Fprint(p.output, line)
delim, ok := token.(json.Delim)
if !ok || delim != '{' {
p.unformattedPrintLine(line, "Expecting a JSON object delimited, ending processing")
return
}

lineData := map[string]interface{}{}
for decoder.More() {
token, err := decoder.Token()
if err != nil {
p.unformattedPrintLine(line, "Invalid JSON key in line, ending processing (%s)", err)
return
}

key := token.(string)

// if keys[key] {
// // Key duplicated here ...
// }
// keys[key] = true

var value interface{}
if err := decoder.Decode(&value); err != nil {
p.unformattedPrintLine(line, "Invalid JSON value in line, ending processing (%s)", err)
return
}

lineData[key] = value
}

// Read the ending delimiter of the JSON object
if _, err := decoder.Token(); err != nil {
p.unformattedPrintLine(line, "Invalid JSON, misssing object end delimiter in line, ending processing (%s)", err)
return
}

Expand All @@ -142,16 +179,10 @@ func (p *processor) processLine(line string) {
debugPrintln("Not printing line due to error: %s", err)
}
} else {
debugPrintln("Printing!")
fmt.Fprint(p.output, prettyLine)
}
}

func (p *processor) mightBeJSON(line string) bool {
// TODO: Improve optimization when some benchmarks are available
return strings.Contains(line, "{")
}

func (p *processor) maybePrettyPrintLine(line string, lineData map[string]interface{}) (string, error) {
if lineData["level"] != nil && lineData["ts"] != nil && lineData["caller"] != nil && lineData["msg"] != nil {
return p.maybePrettyPrintZapLine(line, lineData)
Expand Down Expand Up @@ -212,7 +243,7 @@ func tsFieldToTimestamp(input interface{}) (*time.Time, error) {
return &timestamp, err
}

return &zeroTime, fmt.Errorf("don't know how to turn %t (value %s) into a time.Time object", input, input)
return &zeroTime, fmt.Errorf("don't know how to turn %T (value %s) into a time.Time object", input, input)
}

func (p *processor) maybePrettyPrintZapdriverLine(line string, lineData map[string]interface{}) (string, error) {
Expand Down Expand Up @@ -293,7 +324,7 @@ func (p *processor) writeErrorDetails(buffer *bytes.Buffer, errorVerbose string,
}

// The `errorVerbose` seems to contain a stack trace for each error captured. This behavior
// comes from `derr.Wrap` that create a stack of errors, each of the item having an associate
// comes from `github.com/pkg/errors` that create a stack of errors, each of the item having an associate
// stacktrace.
if errorVerbose != "" {
writeErrorVerbose(buffer, errorVerbose)
Expand Down Expand Up @@ -409,8 +440,13 @@ func (p *processor) colorizeSeverity(severity string) aurora.Value {
return Colorize(severity, color)
}

func (p *processor) unformattedPrintLine(line string, message string, args ...interface{}) {
debugPrintln(message, args...)
fmt.Fprint(p.output, line)
}

func debugPrintln(msg string, args ...interface{}) {
if debugEnabled {
debug.Println(fmt.Sprintf(msg, args...))
debug.Printf(msg+"\n", args...)
}
}
13 changes: 13 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type logTest struct {

func init() {
debug = log.New(os.Stdout, "[pretty-test] ", 0)
debugEnabled = os.Getenv("DEBUG") != ""
}

func TestStandardNonJSON(t *testing.T) {
Expand Down Expand Up @@ -208,6 +209,18 @@ func TestZapDriverNewProduction(t *testing.T) {
},
options: []processorOption{withAllFields()},
},

// Skip for now, requires transform json.Marshal into a streaming code that scans the input instead.
// {
// name: "panic_error_stackdriver",
// lines: []string{
// `{"severity":"DEBUG","time":"2020-11-18T10:47:54.507381105Z","caller":"counter.go:1","message":"msg","time":0.000182456}`,
// },
// expectedLines: []string{
// "[2018-12-21 23:06:49.435 EST] \x1b[32mINFO\x1b[0m \x1b[38;5;244m(c:0)\x1b[0m \x1b[34mm\x1b[0m {\"folder\":\"f\",\"labels\":{},\"logging.googleapis.com/sourceLocation\":{\"file\":\"f\",\"function\":\"fn\",\"line\":\"1\"}}",
// },
// options: []processorOption{withAllFields()},
// },
})
}

Expand Down

0 comments on commit 9d64744

Please sign in to comment.