Skip to content

Commit

Permalink
Merge pull request elastic#82 from urso/feature/lumberjack-json
Browse files Browse the repository at this point in the history
lumberjack: use json encoded data frame for output
  • Loading branch information
ruflin committed Sep 18, 2015
2 parents a80c692 + 13f105d commit 213a7cb
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 79 deletions.
66 changes: 12 additions & 54 deletions outputs/lumberjack/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ var (
)

var (
codeWindowSize = []byte("1W")
codeDataFrame = []byte("1D")
codeCompressed = []byte("1C")
codeWindowSize = []byte("1W")
codeJSONDataFrame = []byte("1J")
codeCompressed = []byte("1C")
)

func newLumberjackClient(conn TransportClient, timeout time.Duration) *lumberjackClient {
Expand Down Expand Up @@ -217,69 +217,27 @@ func (l *lumberjackClient) writeDataFrame(
seq uint32,
out io.Writer,
) error {
// This makes me sad: marshal -> unmarshal -> flatten into map[string]string -> encode
// Write JSON Data Frame:
// version: uint8 = '1'
// code: uint8 = 'J'
// seq: uint32
// payloadLen (bytes): uint32
// payload: JSON document

jsonEvent, err := json.Marshal(event)
if err != nil {
logp.Err("Fail to convert the event to JSON: %s", err)
return err
}

var root interface{}
json.Unmarshal(jsonEvent, &root)

fields := make(map[string]string)
flattenInto(fields, "", root.(map[string]interface{}))

// Write Data Frame:
// version: uint8
// code: uint8 = 'D'
// seq: uint32
// numFields: uint32
// fields: [numFields]{
// lenKey: uint32
// key: [lenKey]byte
// lenValue: uint32
// value: [lenValue]byte
// }
out.Write(codeDataFrame) // version + code
out.Write(codeJSONDataFrame) // version + code
writeUint32(out, seq)
writeUint32(out, uint32(len(fields)))

for k, v := range fields {
writeKV(out, k, v)
}
writeUint32(out, uint32(len(jsonEvent)))
out.Write(jsonEvent)

return nil
}

func flattenInto(to map[string]string, baseKey string, event map[string]interface{}) {
for k, v := range event {
var key string
if baseKey != "" {
key = baseKey + "." + k
} else {
key = k
}

switch t := v.(type) {
case map[string]interface{}:
flattenInto(to, key, t)
default:
bytes, _ := json.Marshal(t)
to[key] = string(bytes)
}
}
}

func writeUint32(out io.Writer, v uint32) error {
return binary.Write(out, binary.BigEndian, v)
}

func writeKV(out io.Writer, k string, v string) error {
writeUint32(out, uint32(len(k)))
out.Write([]byte(k))
writeUint32(out, uint32(len(v)))
out.Write([]byte(v))
return nil
}
54 changes: 34 additions & 20 deletions outputs/lumberjack/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ package lumberjack

import (
"compress/zlib"
"encoding/json"
"errors"
"io"
"net"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -180,7 +182,18 @@ type message struct {
size uint32
seq uint32
events []*message
kv map[string]string
doc document
}

type document map[string]interface{}

func (d document) get(path string) interface{} {
doc := d
elems := strings.Split(path, ".")
for i := 0; i < len(elems)-1; i++ {
doc = doc[elems[i]].(map[string]interface{})
}
return doc[elems[len(elems)-1]]
}

func readMessage(buf *streambuf.Buffer) (*message, error) {
Expand Down Expand Up @@ -236,22 +249,25 @@ func readMessage(buf *streambuf.Buffer) (*message, error) {
}

code, _ := dataBuf.ReadNetUint8()
if code != 'D' {
return nil, errors.New("expected data frame")
if code != 'J' {
return nil, errors.New("expected json data frame")
}

seq, _ := dataBuf.ReadNetUint32()
pairCount, _ := dataBuf.ReadNetUint32()
kv := make(map[string]string)
for i := 0; i < int(pairCount); i++ {
keyLen, _ := dataBuf.ReadNetUint32()
keyRaw, _ := dataBuf.Collect(int(keyLen))
valLen, _ := dataBuf.ReadNetUint32()
valRaw, _ := dataBuf.Collect(int(valLen))
kv[string(keyRaw)] = string(valRaw)
payloadLen, _ := dataBuf.ReadNetUint32()
jsonRaw, _ := dataBuf.Collect(int(payloadLen))

var doc interface{}
err = json.Unmarshal(jsonRaw, &doc)
if err != nil {
return nil, err
}

events = append(events, &message{code: code, seq: seq, kv: kv})
events = append(events, &message{
code: code,
seq: seq,
doc: doc.(map[string]interface{}),
})
}
return &message{code: 'C', events: events}, nil
default:
Expand Down Expand Up @@ -320,8 +336,8 @@ func TestSimpleEvent(t *testing.T) {
assert.NotNil(t, msg)
assert.Equal(t, 1, len(msg.events))
msg = msg.events[0]
assert.Equal(t, "\"me\"", msg.kv["name"])
assert.Equal(t, "10", msg.kv["line"])
assert.Equal(t, "me", msg.doc["name"])
assert.Equal(t, 10.0, msg.doc["line"])
}

func TestStructuredEvent(t *testing.T) {
Expand Down Expand Up @@ -364,10 +380,8 @@ func TestStructuredEvent(t *testing.T) {
assert.NotNil(t, msg)
assert.Equal(t, 1, len(msg.events))
msg = msg.events[0]
assert.Equal(t, "\"test\"", msg.kv["name"])
assert.Equal(t, "1", msg.kv["struct.field1"])
assert.Equal(t, "true", msg.kv["struct.field2"])
assert.Equal(t, "[1,2,3]", msg.kv["struct.field3"])
assert.Equal(t, "[1,\"test\",{\"sub\":\"field\"}]", msg.kv["struct.field4"])
assert.Equal(t, "2", msg.kv["struct.field5.sub1"])
assert.Equal(t, "test", msg.doc["name"])
assert.Equal(t, 1.0, msg.doc.get("struct.field1"))
assert.Equal(t, true, msg.doc.get("struct.field2"))
assert.Equal(t, 2.0, msg.doc.get("struct.field5.sub1"))
}
9 changes: 4 additions & 5 deletions outputs/lumberjack/lumberjack_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ func TestLumberjackTCP(t *testing.T) {
assert.NotNil(t, data)
assert.Equal(t, 1, len(data.events))
data = data.events[0]
assert.Equal(t, "\"me\"", data.kv["name"])
assert.Equal(t, "10", data.kv["line"])
assert.Equal(t, "me", data.doc["name"])
assert.Equal(t, 10.0, data.doc["line"])
}

func TestLumberjackTLS(t *testing.T) {
Expand Down Expand Up @@ -329,8 +329,7 @@ func TestLumberjackTLS(t *testing.T) {
if data != nil {
assert.Equal(t, 1, len(data.events))
data = data.events[0]
assert.Equal(t, "\"me\"", data.kv["name"])
assert.Equal(t, "10", data.kv["line"])
assert.Equal(t, "me", data.doc["name"])
assert.Equal(t, 10.0, data.doc["line"])
}

}

0 comments on commit 213a7cb

Please sign in to comment.