Skip to content

Commit

Permalink
flesh out otlp export handler
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeGoldsmith committed Jan 8, 2021
1 parent 321b43d commit 3b92a6a
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 1 deletion.
220 changes: 219 additions & 1 deletion route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/binary"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand All @@ -21,6 +23,7 @@ import (
"github.com/klauspost/compress/zstd"
"github.com/vmihailenco/msgpack/v4"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"github.com/honeycombio/refinery/collect"
"github.com/honeycombio/refinery/config"
Expand All @@ -29,13 +32,19 @@ import (
"github.com/honeycombio/refinery/sharder"
"github.com/honeycombio/refinery/transmit"
"github.com/honeycombio/refinery/types"

collectortrace "github.com/honeycombio/refinery/internal/opentelemetry-proto-gen/collector/trace/v1"
common "github.com/honeycombio/refinery/internal/opentelemetry-proto-gen/common/v1"
trace "github.com/honeycombio/refinery/internal/opentelemetry-proto-gen/trace/v1"
)

const (
// numZstdDecoders is set statically here - we may make it into a config option
// A normal practice might be to use some multiple of the CPUs, but that goes south
// in kubernetes
numZstdDecoders = 4
numZstdDecoders = 4
traceIDShortLength = 8
traceIDLongLength = 16
)

type Router struct {
Expand Down Expand Up @@ -355,6 +364,124 @@ func (r *Router) batch(w http.ResponseWriter, req *http.Request) {
w.Write(response)
}

func (r *Router) Export(ctx context.Context, req *collectortrace.ExportTraceServiceRequest) (*collectortrace.ExportTraceServiceResponse, error) {

md, ok := metadata.FromIncomingContext(ctx)
if !ok {
r.Logger.Error().Logf("Unable to retreive metadata from OTLP request.")
return &collectortrace.ExportTraceServiceResponse{}, nil
}

var (
apiKey string
dataset string
requestId types.RequestIDContextKey
)

apiKey = getFirstValueFromMetadata(types.APIKeyHeader, md)
if apiKey == "" {
apiKey = getFirstValueFromMetadata(types.APIKeyHeaderShort, md)
}
dataset = getFirstValueFromMetadata(types.DatasetHeader, md)

if apiKey == "" {
r.Logger.Error().Logf("Received OTLP request without Honeycomb APIKey header")
return &collectortrace.ExportTraceServiceResponse{}, nil
}
if dataset == "" {
r.Logger.Error().Logf("Received OTLP request without Honeycomb dataset header")
return &collectortrace.ExportTraceServiceResponse{}, nil
}

apiHost, err := r.Config.GetHoneycombAPI()
if err != nil {
r.Logger.Error().Logf("Unable to retrieve APIHost from config while processing OTLP batch")
return &collectortrace.ExportTraceServiceResponse{}, nil
}

for _, resourceSpan := range req.ResourceSpans {
resourceAttrs := make(map[string]interface{})

if resourceSpan.Resource != nil {
addAttributesToMap(resourceAttrs, resourceSpan.Resource.Attributes)
}

for _, librarySpan := range resourceSpan.InstrumentationLibrarySpans {
library := librarySpan.InstrumentationLibrary
if library.Name != "" {
resourceAttrs["library.name"] = library.Name
}
if library.Version != "" {
resourceAttrs["library.version"] = library.Version
}

for _, span := range librarySpan.GetSpans() {
traceID := bytesToTraceID(span.TraceId)
spanID := hex.EncodeToString(span.SpanId)
timestamp := time.Unix(0, int64(span.StartTimeUnixNano)).UTC()

eventAttrs := map[string]interface{}{
"trace.trace_id": traceID,
"trace.span_id": spanID,
"type": getSpanKind(span.Kind),
"name": span.Name,
"duration_ms": float64(span.EndTimeUnixNano-span.StartTimeUnixNano) / float64(time.Millisecond),
"status_code": int32(span.Status.Code),
}
if span.ParentSpanId != nil {
eventAttrs["trace.parent_id"] = hex.EncodeToString(span.ParentSpanId)
}
if r.getSpanStatusCode(span.Status) == trace.Status_STATUS_CODE_ERROR {
eventAttrs["error"] = true
}
if len(span.Status.Message) > 0 {
eventAttrs["status_message"] = span.Status.Message
}
if span.Attributes != nil {
addAttributesToMap(eventAttrs, span.Attributes)
}

sampleRate := 1
if eventAttrs["sampleRate"] != nil {
switch eventAttrs["sampleRate"].(type) {
case string:
sampleRate, err = strconv.Atoi(eventAttrs["sampleRate"].(string))
if err != nil {
r.Logger.Info().WithField("Unable to parse sampleRate in OTLP span attribute", eventAttrs["sampleRate"])
}
case int:
sampleRate = eventAttrs["sampleRate"].(int)
}
// remove sampleRate from event fields
delete(eventAttrs, "sampleRate")
}

// copy resource attributes to event attributes
for k, v := range resourceAttrs {
eventAttrs[k] = v
}

event := &types.Event{
Context: ctx,
APIHost: apiHost,
APIKey: apiKey,
Dataset: dataset,
SampleRate: uint(sampleRate),
Timestamp: timestamp,
Data: eventAttrs,
}

err := r.processEvent(event, requestId)
if err != nil {
// log failure? We don't send per-event responses back to sender for OTLP
}
}
}
}

return &collectortrace.ExportTraceServiceResponse{}, nil
}

func (r *Router) processEvent(ev *types.Event, reqID interface{}) error {
debugLog := r.iopLogger.Debug().
WithField("request_id", reqID).
Expand Down Expand Up @@ -571,3 +698,94 @@ func unmarshal(r *http.Request, data io.Reader, v interface{}) error {
return jsoniter.NewDecoder(data).Decode(v)
}
}

// safeGetValueFromMeta returns the first value of a metadata entry using a
// case-insensitive key
func getFirstValueFromMetadata(key string, md metadata.MD) string {
if values := md.Get(key); len(values) > 0 {
return values[0]
}
return ""
}

func addAttributesToMap(attrs map[string]interface{}, attributes []*common.KeyValue) {
for _, attr := range attributes {
if attr.Key == "" {
continue
}
switch attr.Value.Value.(type) {
case *common.AnyValue_StringValue:
attrs[attr.Key] = attr.Value.GetStringValue()
case *common.AnyValue_BoolValue:
attrs[attr.Key] = attr.Value.GetBoolValue()
case *common.AnyValue_DoubleValue:
attrs[attr.Key] = attr.Value.GetDoubleValue()
case *common.AnyValue_IntValue:
attrs[attr.Key] = attr.Value.GetIntValue()
}
}
}

func getSpanKind(kind trace.Span_SpanKind) string {
switch kind {
case trace.Span_SPAN_KIND_CLIENT:
return "client"
case trace.Span_SPAN_KIND_SERVER:
return "server"
case trace.Span_SPAN_KIND_PRODUCER:
return "producer"
case trace.Span_SPAN_KIND_CONSUMER:
return "consumer"
case trace.Span_SPAN_KIND_INTERNAL:
return "internal"
case trace.Span_SPAN_KIND_UNSPECIFIED:
fallthrough
default:
return "unspecified"
}
}

// bytesToTraceID returns an ID suitable for use for spans and traces. Before
// encoding the bytes as a hex string, we want to handle cases where we are
// given 128-bit IDs with zero padding, e.g. 0000000000000000f798a1e7f33c8af6.
// To do this, we borrow a strategy from Jaeger [1] wherein we split the byte
// sequence into two parts. The leftmost part could contain all zeros. We use
// that to determine whether to return a 64-bit hex encoded string or a 128-bit
// one.
//
// [1]: https://github.com/jaegertracing/jaeger/blob/cd19b64413eca0f06b61d92fe29bebce1321d0b0/model/ids.go#L81
func bytesToTraceID(traceID []byte) string {
// binary.BigEndian.Uint64() does a bounds check on traceID which will
// cause a panic if traceID is fewer than 8 bytes. In this case, we don't
// need to check for zero padding on the high part anyway, so just return a
// hex string.
if len(traceID) < traceIDShortLength {
return fmt.Sprintf("%x", traceID)
}
var low uint64
if len(traceID) == traceIDLongLength {
low = binary.BigEndian.Uint64(traceID[traceIDShortLength:])
if high := binary.BigEndian.Uint64(traceID[:traceIDShortLength]); high != 0 {
return fmt.Sprintf("%016x%016x", high, low)
}
} else {
low = binary.BigEndian.Uint64(traceID)
}

return fmt.Sprintf("%016x", low)
}

// getSpanStatusCode checks the value of both the deprecated code and code fields
// on the span status and using the rules specified in the backward compatibility
// notes in the protobuf definitions. See:
//
// https://github.com/open-telemetry/opentelemetry-proto/blob/59c488bfb8fb6d0458ad6425758b70259ff4a2bd/opentelemetry/proto/trace/v1/trace.proto#L230
func (r *Router) getSpanStatusCode(status *trace.Status) trace.Status_StatusCode {
if status.Code == trace.Status_STATUS_CODE_UNSET {
if status.DeprecatedCode == trace.Status_DEPRECATED_STATUS_CODE_OK {
return trace.Status_STATUS_CODE_UNSET
}
return trace.Status_STATUS_CODE_ERROR
}
return status.Code
}
1 change: 1 addition & 0 deletions types/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const (
APIKeyHeader = "X-Honeycomb-Team"
// libhoney-js uses this
APIKeyHeaderShort = "X-Hny-Team"
DatasetHeader = "X-Honeycomb-Dataset"
SampleRateHeader = "X-Honeycomb-Samplerate"
TimestampHeader = "X-Honeycomb-Event-Time"
)
Expand Down

0 comments on commit 3b92a6a

Please sign in to comment.