Skip to content

Commit

Permalink
Support for Zipkin: CombineTraces (#688)
Browse files Browse the repository at this point in the history
* changelog

Signed-off-by: Joe Elliott <number101010@gmail.com>

* fix combine traces

Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott committed May 5, 2021
1 parent 0555733 commit c0c834f
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
* [ENHANCEMENT] Improve WAL Replay by not rebuilding the WAL. [#668](https://github.com/grafana/tempo/pull/668)
* [ENHANCEMENT] Add config option to disable write extension to the ingesters. [#677](https://github.com/grafana/tempo/pull/677)
* [ENHANCEMENT] Preallocate byte slices on ingester request unmarshal. [#679](https://github.com/grafana/tempo/pull/679)
* [ENHANCEMENT] Zipkin Support - CombineTraces. [#688](https://github.com/grafana/tempo/pull/688)

## v0.7.0

Expand Down
15 changes: 12 additions & 3 deletions pkg/util/trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package util

import (
"bytes"
"encoding/binary"
"hash"
"hash/fnv"
"sort"
Expand Down Expand Up @@ -68,12 +69,13 @@ func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int, int
spanCountTotal := 0

h := fnv.New32()
buffer := make([]byte, 4)

spansInA := make(map[uint32]struct{})
for _, batchA := range traceA.Batches {
for _, ilsA := range batchA.InstrumentationLibrarySpans {
for _, spanA := range ilsA.Spans {
spansInA[tokenForID(h, spanA.SpanId)] = struct{}{}
spansInA[tokenForID(h, buffer, int32(spanA.Kind), spanA.SpanId)] = struct{}{}
}
spanCountA += len(ilsA.Spans)
spanCountTotal += len(ilsA.Spans)
Expand All @@ -88,7 +90,7 @@ func CombineTraceProtos(traceA, traceB *tempopb.Trace) (*tempopb.Trace, int, int
notFoundSpans := ilsB.Spans[:0]
for _, spanB := range ilsB.Spans {
// if found in A, remove from the batch
_, ok := spansInA[tokenForID(h, spanB.SpanId)]
_, ok := spansInA[tokenForID(h, buffer, int32(spanB.Kind), spanB.SpanId)]
if !ok {
notFoundSpans = append(notFoundSpans, spanB)
}
Expand Down Expand Up @@ -155,8 +157,15 @@ func compareSpans(a *v1.Span, b *v1.Span) bool {
return a.StartTimeUnixNano < b.StartTimeUnixNano
}

func tokenForID(h hash.Hash32, b []byte) uint32 {
// tokenForID returns a uint32 token for use in a hash map given a span id and span kind
// buffer must be a 4 byte slice and is reused for writing the span kind to the hashing function
// kind is used along with the actual id b/c in zipkin traces span id is not guaranteed to be unique
// as it is shared between client and server spans.
func tokenForID(h hash.Hash32, buffer []byte, kind int32, b []byte) uint32 {
binary.LittleEndian.PutUint32(buffer, uint32(kind))

h.Reset()
_, _ = h.Write(b)
_, _ = h.Write(buffer)
return h.Sum32()
}
12 changes: 12 additions & 0 deletions pkg/util/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package util
import (
"bytes"
"fmt"
"hash/fnv"
"math/rand"
"testing"

Expand Down Expand Up @@ -274,3 +275,14 @@ func TestSortTrace(t *testing.T) {
assert.Equal(t, tt.expected, tt.input)
}
}

func BenchmarkTokenForID(b *testing.B) {
h := fnv.New32()
id := []byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}
buffer := make([]byte, 4)

b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = tokenForID(h, buffer, 0, id)
}
}

0 comments on commit c0c834f

Please sign in to comment.