From c0c834f078e422ed1d2761b81795966c7f9016a4 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 5 May 2021 09:21:12 -0400 Subject: [PATCH] Support for Zipkin: CombineTraces (#688) * changelog Signed-off-by: Joe Elliott * fix combine traces Signed-off-by: Joe Elliott --- CHANGELOG.md | 1 + pkg/util/trace.go | 15 ++++++++++++--- pkg/util/trace_test.go | 12 ++++++++++++ 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 963d07d624b..56db3044c58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ * [ENHANCEMENT] Improve WAL Replay by not rebuilding the WAL. [#668](https://github.com/grafana/tempo/pull/668) * [ENHANCEMENT] Add config option to disable write extension to the ingesters. [#677](https://github.com/grafana/tempo/pull/677) * [ENHANCEMENT] Preallocate byte slices on ingester request unmarshal. [#679](https://github.com/grafana/tempo/pull/679) +* [ENHANCEMENT] Zipkin Support - CombineTraces. [#688](https://github.com/grafana/tempo/pull/688) ## v0.7.0 diff --git a/pkg/util/trace.go b/pkg/util/trace.go index c05801a8fb0..a1d8305665d 100644 --- a/pkg/util/trace.go +++ b/pkg/util/trace.go @@ -2,6 +2,7 @@ package util import ( "bytes" + "encoding/binary" "hash" "hash/fnv" "sort" @@ -68,12 +69,13 @@ func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int, int spanCountTotal := 0 h := fnv.New32() + buffer := make([]byte, 4) spansInA := make(map[uint32]struct{}) for _, batchA := range traceA.Batches { for _, ilsA := range batchA.InstrumentationLibrarySpans { for _, spanA := range ilsA.Spans { - spansInA[tokenForID(h, spanA.SpanId)] = struct{}{} + spansInA[tokenForID(h, buffer, int32(spanA.Kind), spanA.SpanId)] = struct{}{} } spanCountA += len(ilsA.Spans) spanCountTotal += len(ilsA.Spans) @@ -88,7 +90,7 @@ func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int, int notFoundSpans := ilsB.Spans[:0] for _, spanB := range ilsB.Spans { // if found in A, remove from the batch - _, ok := spansInA[tokenForID(h, spanB.SpanId)] + _, ok := spansInA[tokenForID(h, buffer, int32(spanB.Kind), spanB.SpanId)] if !ok { notFoundSpans = append(notFoundSpans, spanB) } @@ -155,8 +157,15 @@ func compareSpans(a *v1.Span, b *v1.Span) bool { return a.StartTimeUnixNano < b.StartTimeUnixNano } -func tokenForID(h hash.Hash32, b []byte) uint32 { +// tokenForID returns a uint32 token for use in a hash map given a span id and span kind +// buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function +// kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique +// as it is shared between client and server spans. +func tokenForID(h hash.Hash32, buffer []byte, kind int32, b []byte) uint32 { + binary.LittleEndian.PutUint32(buffer, uint32(kind)) + h.Reset() _, _ = h.Write(b) + _, _ = h.Write(buffer) return h.Sum32() } diff --git a/pkg/util/trace_test.go b/pkg/util/trace_test.go index ef154714835..e9f20494328 100644 --- a/pkg/util/trace_test.go +++ b/pkg/util/trace_test.go @@ -3,6 +3,7 @@ package util import ( "bytes" "fmt" + "hash/fnv" "math/rand" "testing" @@ -274,3 +275,14 @@ func TestSortTrace(t *testing.T) { assert.Equal(t, tt.expected, tt.input) } } + +func BenchmarkTokenForID(b *testing.B) { + h := fnv.New32() + id := []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08} + buffer := make([]byte, 4) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = tokenForID(h, buffer, 0, id) + } +}