From 3b92a6a4fb34fae264f84c29276523cd28dee2a6 Mon Sep 17 00:00:00 2001 From: Mike Goldsmth Date: Fri, 8 Jan 2021 13:28:18 +0000 Subject: [PATCH] flesh out otlp export handler --- route/route.go | 220 ++++++++++++++++++++++++++++++++++++++++++++++++- types/event.go | 1 + 2 files changed, 220 insertions(+), 1 deletion(-) diff --git a/route/route.go b/route/route.go index 2f08bca1fb..b17e5fbb81 100644 --- a/route/route.go +++ b/route/route.go @@ -4,6 +4,8 @@ import ( "bytes" "compress/gzip" "context" + "encoding/binary" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -21,6 +23,7 @@ import ( "github.com/klauspost/compress/zstd" "github.com/vmihailenco/msgpack/v4" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" "github.com/honeycombio/refinery/collect" "github.com/honeycombio/refinery/config" @@ -29,13 +32,19 @@ import ( "github.com/honeycombio/refinery/sharder" "github.com/honeycombio/refinery/transmit" "github.com/honeycombio/refinery/types" + + collectortrace "github.com/honeycombio/refinery/internal/opentelemetry-proto-gen/collector/trace/v1" + common "github.com/honeycombio/refinery/internal/opentelemetry-proto-gen/common/v1" + trace "github.com/honeycombio/refinery/internal/opentelemetry-proto-gen/trace/v1" ) const ( // numZstdDecoders is set statically here - we may make it into a config option // A normal practice might be to use some multiple of the CPUs, but that goes south // in kubernetes - numZstdDecoders = 4 + numZstdDecoders = 4 + traceIDShortLength = 8 + traceIDLongLength = 16 ) type Router struct { @@ -355,6 +364,124 @@ func (r *Router) batch(w http.ResponseWriter, req *http.Request) { w.Write(response) } +func (r *Router) Export(ctx context.Context, req *collectortrace.ExportTraceServiceRequest) (*collectortrace.ExportTraceServiceResponse, error) { + + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + r.Logger.Error().Logf("Unable to retreive metadata from OTLP request.") + return &collectortrace.ExportTraceServiceResponse{}, nil + } + + var ( + apiKey string + dataset string + requestId types.RequestIDContextKey + ) + + apiKey = getFirstValueFromMetadata(types.APIKeyHeader, md) + if apiKey == "" { + apiKey = getFirstValueFromMetadata(types.APIKeyHeaderShort, md) + } + dataset = getFirstValueFromMetadata(types.DatasetHeader, md) + + if apiKey == "" { + r.Logger.Error().Logf("Received OTLP request without Honeycomb APIKey header") + return &collectortrace.ExportTraceServiceResponse{}, nil + } + if dataset == "" { + r.Logger.Error().Logf("Received OTLP request without Honeycomb dataset header") + return &collectortrace.ExportTraceServiceResponse{}, nil + } + + apiHost, err := r.Config.GetHoneycombAPI() + if err != nil { + r.Logger.Error().Logf("Unable to retrieve APIHost from config while processing OTLP batch") + return &collectortrace.ExportTraceServiceResponse{}, nil + } + + for _, resourceSpan := range req.ResourceSpans { + resourceAttrs := make(map[string]interface{}) + + if resourceSpan.Resource != nil { + addAttributesToMap(resourceAttrs, resourceSpan.Resource.Attributes) + } + + for _, librarySpan := range resourceSpan.InstrumentationLibrarySpans { + library := librarySpan.InstrumentationLibrary + if library.Name != "" { + resourceAttrs["library.name"] = library.Name + } + if library.Version != "" { + resourceAttrs["library.version"] = library.Version + } + + for _, span := range librarySpan.GetSpans() { + traceID := bytesToTraceID(span.TraceId) + spanID := hex.EncodeToString(span.SpanId) + timestamp := time.Unix(0, int64(span.StartTimeUnixNano)).UTC() + + eventAttrs := map[string]interface{}{ + "trace.trace_id": traceID, + "trace.span_id": spanID, + "type": getSpanKind(span.Kind), + "name": span.Name, + "duration_ms": float64(span.EndTimeUnixNano-span.StartTimeUnixNano) / float64(time.Millisecond), + "status_code": int32(span.Status.Code), + } + if span.ParentSpanId != nil { + eventAttrs["trace.parent_id"] = hex.EncodeToString(span.ParentSpanId) + } + if r.getSpanStatusCode(span.Status) == trace.Status_STATUS_CODE_ERROR { + eventAttrs["error"] = true + } + if len(span.Status.Message) > 0 { + eventAttrs["status_message"] = span.Status.Message + } + if span.Attributes != nil { + addAttributesToMap(eventAttrs, span.Attributes) + } + + sampleRate := 1 + if eventAttrs["sampleRate"] != nil { + switch eventAttrs["sampleRate"].(type) { + case string: + sampleRate, err = strconv.Atoi(eventAttrs["sampleRate"].(string)) + if err != nil { + r.Logger.Info().WithField("Unable to parse sampleRate in OTLP span attribute", eventAttrs["sampleRate"]) + } + case int: + sampleRate = eventAttrs["sampleRate"].(int) + } + // remove sampleRate from event fields + delete(eventAttrs, "sampleRate") + } + + // copy resource attributes to event attributes + for k, v := range resourceAttrs { + eventAttrs[k] = v + } + + event := &types.Event{ + Context: ctx, + APIHost: apiHost, + APIKey: apiKey, + Dataset: dataset, + SampleRate: uint(sampleRate), + Timestamp: timestamp, + Data: eventAttrs, + } + + err := r.processEvent(event, requestId) + if err != nil { + // log failure? We don't send per-event responses back to sender for OTLP + } + } + } + } + + return &collectortrace.ExportTraceServiceResponse{}, nil +} + func (r *Router) processEvent(ev *types.Event, reqID interface{}) error { debugLog := r.iopLogger.Debug(). WithField("request_id", reqID). @@ -571,3 +698,94 @@ func unmarshal(r *http.Request, data io.Reader, v interface{}) error { return jsoniter.NewDecoder(data).Decode(v) } } + +// safeGetValueFromMeta returns the first value of a metadata entry using a +// case-insensitive key +func getFirstValueFromMetadata(key string, md metadata.MD) string { + if values := md.Get(key); len(values) > 0 { + return values[0] + } + return "" +} + +func addAttributesToMap(attrs map[string]interface{}, attributes []*common.KeyValue) { + for _, attr := range attributes { + if attr.Key == "" { + continue + } + switch attr.Value.Value.(type) { + case *common.AnyValue_StringValue: + attrs[attr.Key] = attr.Value.GetStringValue() + case *common.AnyValue_BoolValue: + attrs[attr.Key] = attr.Value.GetBoolValue() + case *common.AnyValue_DoubleValue: + attrs[attr.Key] = attr.Value.GetDoubleValue() + case *common.AnyValue_IntValue: + attrs[attr.Key] = attr.Value.GetIntValue() + } + } +} + +func getSpanKind(kind trace.Span_SpanKind) string { + switch kind { + case trace.Span_SPAN_KIND_CLIENT: + return "client" + case trace.Span_SPAN_KIND_SERVER: + return "server" + case trace.Span_SPAN_KIND_PRODUCER: + return "producer" + case trace.Span_SPAN_KIND_CONSUMER: + return "consumer" + case trace.Span_SPAN_KIND_INTERNAL: + return "internal" + case trace.Span_SPAN_KIND_UNSPECIFIED: + fallthrough + default: + return "unspecified" + } +} + +// bytesToTraceID returns an ID suitable for use for spans and traces. Before +// encoding the bytes as a hex string, we want to handle cases where we are +// given 128-bit IDs with zero padding, e.g. 0000000000000000f798a1e7f33c8af6. +// To do this, we borrow a strategy from Jaeger [1] wherein we split the byte +// sequence into two parts. The leftmost part could contain all zeros. We use +// that to determine whether to return a 64-bit hex encoded string or a 128-bit +// one. +// +// [1]: https://github.com/jaegertracing/jaeger/blob/cd19b64413eca0f06b61d92fe29bebce1321d0b0/model/ids.go#L81 +func bytesToTraceID(traceID []byte) string { + // binary.BigEndian.Uint64() does a bounds check on traceID which will + // cause a panic if traceID is fewer than 8 bytes. In this case, we don't + // need to check for zero padding on the high part anyway, so just return a + // hex string. + if len(traceID) < traceIDShortLength { + return fmt.Sprintf("%x", traceID) + } + var low uint64 + if len(traceID) == traceIDLongLength { + low = binary.BigEndian.Uint64(traceID[traceIDShortLength:]) + if high := binary.BigEndian.Uint64(traceID[:traceIDShortLength]); high != 0 { + return fmt.Sprintf("%016x%016x", high, low) + } + } else { + low = binary.BigEndian.Uint64(traceID) + } + + return fmt.Sprintf("%016x", low) +} + +// getSpanStatusCode checks the value of both the deprecated code and code fields +// on the span status and using the rules specified in the backward compatibility +// notes in the protobuf definitions. See: +// +// https://github.com/open-telemetry/opentelemetry-proto/blob/59c488bfb8fb6d0458ad6425758b70259ff4a2bd/opentelemetry/proto/trace/v1/trace.proto#L230 +func (r *Router) getSpanStatusCode(status *trace.Status) trace.Status_StatusCode { + if status.Code == trace.Status_STATUS_CODE_UNSET { + if status.DeprecatedCode == trace.Status_DEPRECATED_STATUS_CODE_OK { + return trace.Status_STATUS_CODE_UNSET + } + return trace.Status_STATUS_CODE_ERROR + } + return status.Code +} diff --git a/types/event.go b/types/event.go index 0ef2157889..0a1115155c 100644 --- a/types/event.go +++ b/types/event.go @@ -9,6 +9,7 @@ const ( APIKeyHeader = "X-Honeycomb-Team" // libhoney-js uses this APIKeyHeaderShort = "X-Hny-Team" + DatasetHeader = "X-Honeycomb-Dataset" SampleRateHeader = "X-Honeycomb-Samplerate" TimestampHeader = "X-Honeycomb-Event-Time" )