Skip to content

Commit

Permalink
Fix kvlist and byte arrays marshalling (#145)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

This is step 1 of fixing a problem where some logs being sent to
honeycomb contained some very large nested maps being sent as "Body"
elements.

Logs allow byte arrays to be sent, but we were just converting them to
nil, so this adds byte array support and converts them with JSON
marshal, which sends them as B64-encoded strings.

OTel kvlist is a special type that OTel invented to allow a map to be
indexed and traversed in-order. But the defined semantics are that of a
map. Our previous marshalling had two problems:
- It was sending the kvlist as a stringized array of objects, each of
which had a single k-v pair. In other words, `[{"a":1},{"b":2}]` -- but
this is unneeded, as it's guaranteed that keys are unique. So this PR
converts that to a simple map: `{"a":1, "b":2}`.
- A recursive routine was rendering each level of maps and arrays as
strings, so unmarshalling nested objects would not result in an
equivalent data structure. This PR creates a two-level recursion, where
the first level converts objects to a "safe-for-honeycomb" data
structure, and only then marshals the result with a single call to JSON
marshal.

A followup PR will limit the marshalling length to something Honeycomb
will consume.
  • Loading branch information
kentquirk authored Nov 7, 2022
1 parent baf6ae3 commit 5cb3ccf
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 32 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.18

require (
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3
github.com/json-iterator/go v1.1.12
github.com/klauspost/compress v1.15.11
github.com/stretchr/testify v1.8.0
google.golang.org/grpc v1.50.0
Expand All @@ -14,6 +15,8 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/net v0.0.0-20220802222814-0bcc04d9c69b // indirect
golang.org/x/sys v0.0.0-20220731174439-a90be440212d // indirect
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,25 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 h1:lLT7ZLSzGLI08vc9cpd+tYmNWjdKDqyr/2L+f6U12Fk=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OHLH3mGKHDcjJRFFRrJa6eAM5H+CtDdOsPc=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
Expand Down
101 changes: 86 additions & 15 deletions otlp/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"io"
"math"
"net/http"
"regexp"
"strings"
"time"

common "github.com/honeycombio/husky/proto/otlp/common/v1"
resource "github.com/honeycombio/husky/proto/otlp/resource/v1"
jsoniter "github.com/json-iterator/go"
"github.com/klauspost/compress/zstd"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/encoding/protojson"
Expand All @@ -33,6 +34,10 @@ const (
unknownLogSource = "unknown_log_source"
)

// fieldSizeMax is the maximum size of a field that will be accepted by honeycomb.
// The limit is enforced in retriever (in private honeycomb code), in varstring.go.
const fieldSizeMax = math.MaxUint16

var (
legacyApiKeyPattern = regexp.MustCompile("^[0-9a-f]{32}$")
// Incoming OpenTelemetry HTTP Content-Types (e.g. "application/protobuf") we support
Expand All @@ -43,6 +48,9 @@ var (
}
// Incoming Content-Encodings we support. "" included as a stand in for "not given, assume uncompressed"
supportedContentEncodings = []string{"", "gzip", "zstd"}

// Use json-iterator for better performance
json = jsoniter.ConfigCompatibleWithStandardLibrary
)

// List of HTTP Content Types supported for OTLP ingest.
Expand Down Expand Up @@ -188,8 +196,16 @@ func addAttributesToMap(attrs map[string]interface{}, attributes []*common.KeyVa
if attr.Key == "" || attr.Value == nil {
continue
}
if val := getValue(attr.Value); val != nil {
if val, truncatedBytes := getValue(attr.Value); val != nil {
attrs[attr.Key] = val
if truncatedBytes != 0 {
// if we trim a field, add telemetry about it; because we trim at 64K and
// a whole span can't be more than 100K, this can't happen more than once
// for a single span. If we ever change those limits, this will need to
// become additive.
attrs["meta.truncated_bytes"] = val
attrs["meta.truncated_field"] = attr.Key
}
}
}
}
Expand Down Expand Up @@ -248,7 +264,39 @@ func getLogsDataset(ri RequestInfo, attrs map[string]interface{}) string {
return dataset
}

func getValue(value *common.AnyValue) interface{} {
// limitedWriter is a writer that will stop writing after reaching its max,
// but continue to lie to the caller that it was successful.
// It's a wrapper around strings.Builder for efficiency.
type limitedWriter struct {
max int
w strings.Builder
truncatedBytes int
}

func newLimitedWriter(n int) *limitedWriter {
return &limitedWriter{max: n}
}

func (l *limitedWriter) Write(b []byte) (int, error) {
n := len(b)
if n+l.w.Len() > l.max {
b = b[:l.max-l.w.Len()]
l.truncatedBytes += n - len(b)
}
_, err := l.w.Write(b)
// return the value that the user sent us
// so they think we wrote it all
return n, err
}

func (l *limitedWriter) String() string {
return l.w.String()
}

// Returns a value that can be marshalled by JSON -- aggregate data structures
// are returned as native Go aggregates (maps and slices), rather than marshalled
// strings (we expect the caller to do the marshalling).
func getMarshallableValue(value *common.AnyValue) interface{} {
switch value.Value.(type) {
case *common.AnyValue_StringValue:
return value.GetStringValue()
Expand All @@ -258,30 +306,53 @@ func getValue(value *common.AnyValue) interface{} {
return value.GetDoubleValue()
case *common.AnyValue_IntValue:
return value.GetIntValue()
case *common.AnyValue_BytesValue:
return value.GetBytesValue()
case *common.AnyValue_ArrayValue:
items := value.GetArrayValue().Values
arr := make([]interface{}, len(items))
for i := 0; i < len(items); i++ {
arr[i] = getValue(items[i])
}
bytes, err := json.Marshal(arr)
if err == nil {
return string(bytes)
arr[i] = getMarshallableValue(items[i])
}
return arr
case *common.AnyValue_KvlistValue:
items := value.GetKvlistValue().Values
arr := make([]map[string]interface{}, len(items))
m := make(map[string]interface{}, len(items))
for i := 0; i < len(items); i++ {
arr[i] = map[string]interface{}{
items[i].Key: getValue(items[i].Value),
}
m[items[i].GetKey()] = getMarshallableValue(items[i].Value)
}
bytes, err := json.Marshal(arr)
return m
}
return nil
}

// This function returns a value that can be handled by Honeycomb -- it must be one of:
// string, int, bool, float. All other values are converted to strings containing JSON.
func getValue(value *common.AnyValue) (result interface{}, truncatedBytes int) {
switch value.Value.(type) {
case *common.AnyValue_StringValue:
return value.GetStringValue(), 0
case *common.AnyValue_BoolValue:
return value.GetBoolValue(), 0
case *common.AnyValue_DoubleValue:
return value.GetDoubleValue(), 0
case *common.AnyValue_IntValue:
return value.GetIntValue(), 0
// These types are all be marshalled to a string after conversion to Honeycomb-safe values.
// We use our limitedWriter to ensure that the string can't be bigger than the allowable,
// and it also minimizes allocations.
// Note that an Encoder emits JSON with a trailing newline because it's intended for use
// in streaming. This is correct but sometimes surprising and the tests need to expect it.
case *common.AnyValue_ArrayValue, *common.AnyValue_KvlistValue, *common.AnyValue_BytesValue:
arr := getMarshallableValue(value)
w := newLimitedWriter(fieldSizeMax)
enc := json.NewEncoder(w)
err := enc.Encode(arr)
if err == nil {
return string(bytes)
return w.String(), w.truncatedBytes
}
}
return nil
return nil, 0
}

func parseOtlpRequestBody(body io.ReadCloser, contentType string, contentEncoding string, request protoreflect.ProtoMessage) error {
Expand Down
124 changes: 110 additions & 14 deletions otlp/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package otlp
import (
"context"
"net/http"
"reflect"
"strings"
"testing"

common "github.com/honeycombio/husky/proto/otlp/common/v1"
Expand Down Expand Up @@ -86,8 +88,8 @@ func TestAddAttributesToMap(t *testing.T) {
},
},
{
key: "array-attr", // not supported
expected: "[\"one\",true,3]",
key: "array-attr",
expected: "[\"one\",true,3]\n",
attribute: &common.KeyValue{
Key: "array-attr", Value: &common.AnyValue{Value: &common.AnyValue_ArrayValue{ArrayValue: &common.ArrayValue{
Values: []*common.AnyValue{
Expand All @@ -97,18 +99,9 @@ func TestAddAttributesToMap(t *testing.T) {
}}}},
},
},
{
key: "kvlist-attr",
expected: "[{\"kv-attr-str\":\"kv-attr-str-value\"},{\"kv-attr-int\":1}]",
attribute: &common.KeyValue{
Key: "kvlist-attr", Value: &common.AnyValue{
Value: &common.AnyValue_KvlistValue{KvlistValue: &common.KeyValueList{
Values: []*common.KeyValue{
{Key: "kv-attr-str", Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: "kv-attr-str-value"}}},
{Key: "kv-attr-int", Value: &common.AnyValue{Value: &common.AnyValue_IntValue{IntValue: 1}}},
},
}}}},
},
// Testing single-layer maps is valid but may fail due to map iteration order differences, and
// that functionality is more completely tested by Test_getValue(). The case of a nested map will fail
// badly in the way this test is structured, so we don't do maps at all here.
{
key: "nil-value-attr",
expected: nil,
Expand Down Expand Up @@ -334,3 +327,106 @@ func TestGetRequestInfoFromHttpHeadersIsCaseInsensitive(t *testing.T) {
})
}
}

func Test_getValue(t *testing.T) {
tests := []struct {
name string
value *common.AnyValue
want interface{}
}{
{"int64", &common.AnyValue{Value: &common.AnyValue_IntValue{IntValue: 123}}, int64(123)},
{"bool", &common.AnyValue{Value: &common.AnyValue_BoolValue{BoolValue: true}}, true},
{"float64", &common.AnyValue{Value: &common.AnyValue_DoubleValue{DoubleValue: 123}}, float64(123)},
{"bytes as b64", &common.AnyValue{Value: &common.AnyValue_BytesValue{BytesValue: []byte{10, 20, 30}}}, `"ChQe"` + "\n"},
{"array as mixed-type string", &common.AnyValue{Value: &common.AnyValue_ArrayValue{
ArrayValue: &common.ArrayValue{Values: []*common.AnyValue{
{Value: &common.AnyValue_IntValue{IntValue: 123}},
{Value: &common.AnyValue_DoubleValue{DoubleValue: 45.6}},
{Value: &common.AnyValue_StringValue{StringValue: "hi mom"}},
}},
}}, `[123,45.6,"hi mom"]` + "\n"},
{"map as mixed-type string", &common.AnyValue{
Value: &common.AnyValue_KvlistValue{KvlistValue: &common.KeyValueList{
Values: []*common.KeyValue{
{Key: "foo", Value: &common.AnyValue{Value: &common.AnyValue_IntValue{IntValue: 123}}},
{Key: "bar", Value: &common.AnyValue{Value: &common.AnyValue_DoubleValue{DoubleValue: 45.6}}},
{Key: "mom", Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: "hi mom"}}},
},
}}}, `{"foo":123,"bar":45.6,"mom":"hi mom"}` + "\n"},
{"nested map as mixed-type string", &common.AnyValue{
Value: &common.AnyValue_KvlistValue{KvlistValue: &common.KeyValueList{
Values: []*common.KeyValue{
{Key: "foo", Value: &common.AnyValue{Value: &common.AnyValue_IntValue{IntValue: 123}}},
{Key: "bar", Value: &common.AnyValue{Value: &common.AnyValue_DoubleValue{DoubleValue: 45.6}}},
{Key: "nest", Value: &common.AnyValue{
Value: &common.AnyValue_KvlistValue{KvlistValue: &common.KeyValueList{
Values: []*common.KeyValue{
{Key: "foo", Value: &common.AnyValue{Value: &common.AnyValue_IntValue{IntValue: 123}}},
{Key: "bar", Value: &common.AnyValue{Value: &common.AnyValue_DoubleValue{DoubleValue: 45.6}}},
{Key: "mom", Value: &common.AnyValue{Value: &common.AnyValue_StringValue{StringValue: "hi mom"}}},
},
}}}},
},
}}}, `{"bar":45.6,"foo":123,"nest":{"bar":45.6,"foo":123,"mom":"hi mom"}}`},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, truncated := getValue(tt.value)
if truncated != 0 {
t.Errorf("getValue() returned %v for truncatedBytes, should be 0", truncated)
}
if s, ok := got.(string); ok && strings.HasPrefix(s, "{") {
// it's a string wrapping an object, and might be out of order, so convert them both to objects
// and compare them as unmarshalled objects
var g, w map[string]any
json.Unmarshal([]byte(s), &g)
json.Unmarshal([]byte(tt.want.(string)), &w)
if !reflect.DeepEqual(g, w) {
t.Errorf("getValue() unmarshalled = %#v, want %#v", g, w)
t.Errorf("getValue() marshalled = %v (%T), want %v (%T)", got, got, tt.want, tt.want)
}
} else {
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getValue() = %v (%T), want %v (%T)", got, got, tt.want, tt.want)
}
}
})
}
}

func Test_limitedWriter(t *testing.T) {
tests := []struct {
name string
max int
input []string
total int
want string
wantTrunc int
}{
{"no limit", 100, []string{"abcde"}, 5, "abcde", 0},
{"one write", 5, []string{"abcdefghij"}, 10, "abcde", 5},
{"two writes", 12, []string{"abcdefghij", "abcdefghij"}, 20, "abcdefghijab", 8},
{"exact overrun", 10, []string{"abcdefghij", "abcdefghij"}, 20, "abcdefghij", 10},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := newLimitedWriter(tt.max)
total := 0
for _, s := range tt.input {
n, err := l.Write([]byte(s))
if err != nil {
t.Errorf("limitedWriter.Write() error = %v", err)
return
}
total += n
}
if total != tt.total {
t.Errorf("limitedWriter.Write() total was %v, want %v", total, tt.total)
}
s := l.String()
if s != tt.want {
t.Errorf("limitedWriter.String() = '%v', want '%v'", s, tt.want)
}
})
}
}
7 changes: 6 additions & 1 deletion otlp/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@ func TranslateLogsRequest(request *collectorLogs.ExportLogsServiceRequest, ri Re
attrs["severity_text"] = log.SeverityText
}
if log.Body != nil {
if val := getValue(log.Body); val != nil {
if val, truncatedBytes := getValue(log.Body); val != nil {
attrs["body"] = val
if truncatedBytes != 0 {
// if we trim the body, add telemetry about it
attrs["meta.truncated_bytes"] = truncatedBytes
attrs["meta.truncated_field"] = "body"
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions otlp/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func TestCanExtractBody(t *testing.T) {
{Value: &common.AnyValue_BoolValue{BoolValue: true}},
},
}}},
expectedValue: "[\"one\",2,true]",
expectedValue: "[\"one\",2,true]\n",
},
{
name: "kvlist",
Expand All @@ -362,7 +362,7 @@ func TestCanExtractBody(t *testing.T) {
{Key: "key3", Value: &common.AnyValue{Value: &common.AnyValue_BoolValue{BoolValue: true}}},
},
}}},
expectedValue: "[{\"key1\":\"value1\"},{\"key2\":2},{\"key3\":true}]",
expectedValue: "{\"key1\":\"value1\",\"key2\":2,\"key3\":true}\n",
},
}
ri := RequestInfo{
Expand Down

0 comments on commit 5cb3ccf

Please sign in to comment.