Skip to content

Commit

Permalink
Begin comparing traces
Browse files Browse the repository at this point in the history
  • Loading branch information
zalegrala committed Aug 27, 2021
1 parent 6e55642 commit 362569d
Show file tree
Hide file tree
Showing 15 changed files with 1,023 additions and 48 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* [ENHANCEMENT] Add support to tempo workloads to `overrides` from single configmap in microservice mode. [#896](https://github.com/grafana/tempo/pull/896) (@kavirajk)
* [ENHANCEMENT] Make `overrides_config` block name consistent with Loki and Cortex in microservice mode. [#906](https://github.com/grafana/tempo/pull/906) (@kavirajk)
* [ENHANCEMENT] Make `overrides_config` mount name static `tempo-overrides` in the tempo workloads in microservice mode. [#906](https://github.com/grafana/tempo/pull/914) (@kavirajk)

* [ENHANCEMENT] Implement trace comparison in Vulture [#904](https://github.com/grafana/tempo/pull/904) (@zalegrala)

## v1.1.0-rc.0 / 2021-08-11

Expand Down
158 changes: 142 additions & 16 deletions cmd/tempo-vulture/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,26 @@ package main
import (
"bytes"
"context"
"encoding/hex"
"flag"
"fmt"
"log"
"math/rand"
"net/http"
"net/url"
"os"
"reflect"
"time"

"github.com/go-test/deep"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempopb"
v1common "github.com/grafana/tempo/pkg/tempopb/common/v1"
v1resource "github.com/grafana/tempo/pkg/tempopb/resource/v1"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util"
jaeger_grpc "github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc"
"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
zaplogfmt "github.com/jsternberg/zap-logfmt"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -39,10 +47,11 @@ var (
)

type traceMetrics struct {
requested int
requestFailed int
notFound int
missingSpans int
requested int
requestFailed int
notFound int
missingSpans int
incorrectResult int
}

func init() {
Expand Down Expand Up @@ -89,7 +98,7 @@ func main() {

log := logger.With(
zap.String("org_id", tempoOrgID),
zap.String("write_trace_id", fmt.Sprintf("%016x%016x", traceIDLow, traceIDHigh)),
zap.String("write_trace_id", fmt.Sprintf("%016x%016x", traceIDHigh, traceIDLow)),
zap.Int64("seed", now.Unix()),
)
log.Info("sending trace")
Expand All @@ -102,7 +111,7 @@ func main() {
metricErrorTotal.Inc()
continue
}
err = c.EmitBatch(ctx, makeThriftBatch(traceIDHigh, traceIDLow, r))
err = c.EmitBatch(ctx, makeThriftBatch(traceIDHigh, traceIDLow, r, now))
if err != nil {
log.Error("error pushing batch to Tempo", zap.Error(err))
metricErrorTotal.Inc()
Expand All @@ -118,6 +127,7 @@ func main() {

intervals := intervalsBetween(startTime, now, interval)
intervals = trimOutdatedIntervals(intervals, tempoRetentionDuration)
startTime = intervals[0]

// pick past interval and re-generate trace
pick := generateRandomInt(0, int64(len(intervals)), newRand(now))
Expand All @@ -136,6 +146,7 @@ func main() {
metricTracesErrors.WithLabelValues("requestfailed").Add(float64(metrics.requestFailed))
metricTracesErrors.WithLabelValues("notfound").Add(float64(metrics.notFound))
metricTracesErrors.WithLabelValues("missingspans").Add(float64(metrics.missingSpans))
metricTracesErrors.WithLabelValues("incorrectresult").Add(float64(metrics.incorrectResult))
}
}()

Expand All @@ -159,12 +170,12 @@ func intervalsBetween(start, stop time.Time, interval time.Duration) []time.Time
return intervals
}

func trimOutdatedIntervals(intervals []time.Time, lookBehind time.Duration) []time.Time {
func trimOutdatedIntervals(intervals []time.Time, retention time.Duration) []time.Time {
if len(intervals) == 0 {
return nil
}

oldest := intervals[len(intervals)-1].Add(-lookBehind)
oldest := intervals[len(intervals)-1].Add(-retention)

for i, t := range intervals {
if t.Before(oldest) {
Expand Down Expand Up @@ -207,7 +218,7 @@ func generateRandomString(r *rand.Rand) string {

s := make([]rune, generateRandomInt(5, 20, r))
for i := range s {
s[i] = letters[rand.Intn(len(letters))]
s[i] = letters[r.Intn(len(letters))]
}
return string(s)
}
Expand All @@ -225,39 +236,116 @@ func generateRandomTags(r *rand.Rand) []*thrift.Tag {
return tags
}

func generateRandomLogs(r *rand.Rand) []*thrift.Log {
func generateRandomLogs(r *rand.Rand, now time.Time) []*thrift.Log {
var logs []*thrift.Log
count := generateRandomInt(1, 5, r)
for i := int64(0); i < count; i++ {
logs = append(logs, &thrift.Log{
Timestamp: time.Now().Unix(),
Timestamp: now.Unix(),
Fields: generateRandomTags(r),
})
}
return logs
}

func makeThriftBatch(TraceIDHigh int64, TraceIDLow int64, r *rand.Rand) *thrift.Batch {
func makeThriftBatch(TraceIDHigh int64, TraceIDLow int64, r *rand.Rand, now time.Time) *thrift.Batch {
var spans []*thrift.Span
count := generateRandomInt(1, 5, r)
for i := int64(0); i < count; i++ {
spans = append(spans, &thrift.Span{
TraceIdLow: TraceIDLow,
TraceIdHigh: TraceIDHigh,
SpanId: rand.Int63(),
SpanId: r.Int63(),
ParentSpanId: 0,
OperationName: generateRandomString(r),
References: nil,
Flags: 0,
StartTime: time.Now().Unix(),
Duration: rand.Int63(),
StartTime: now.Unix(),
Duration: generateRandomInt(0, 100, r),
Tags: generateRandomTags(r),
Logs: generateRandomLogs(r),
Logs: generateRandomLogs(r, now),
})
}

return &thrift.Batch{Spans: spans}
}

func jaegerBatchToPbTrace(batch *jaeger.Batch) *tempopb.Trace {
trace := &tempopb.Trace{
Batches: []*v1.ResourceSpans{},
}
libs := []*v1.InstrumentationLibrarySpans{
{
InstrumentationLibrary: &v1common.InstrumentationLibrary{},
},
}

for _, s := range batch.Spans {
traceIDHex := fmt.Sprintf("%016x%016x", s.TraceIdHigh, s.TraceIdLow)
traceID, err := util.HexStringToTraceID(traceIDHex)
if err != nil {
logger.Error(err.Error())
}

spanIDHex := fmt.Sprintf("%016x", s.SpanId)
spanID, err := hex.DecodeString(spanIDHex)
if err != nil {
logger.Error(err.Error())
}

startTime := time.Unix(s.StartTime, 0)
stopTime := startTime.Add(time.Duration(s.Duration) * time.Second)

span := &v1.Span{
TraceId: traceID,
SpanId: spanID,
Name: s.OperationName,
// TODO use time.UnixMilli() when upgraded to Go 1.17
StartTimeUnixNano: uint64(startTime.UnixNano() / int64(time.Millisecond)),
EndTimeUnixNano: uint64(stopTime.UnixNano() / int64(time.Millisecond)),
Status: &v1.Status{},
}

for _, tag := range s.Tags {
span.Attributes = append(span.Attributes, &v1common.KeyValue{
Key: tag.Key,
Value: &v1common.AnyValue{
Value: &v1common.AnyValue_StringValue{StringValue: *tag.VStr},
},
})
}

for _, event := range s.Logs {
attrs := []*v1common.KeyValue{}

for _, tag := range event.Fields {
attrs = append(attrs, &v1common.KeyValue{
Key: tag.Key,
Value: &v1common.AnyValue{
Value: &v1common.AnyValue_StringValue{StringValue: *tag.VStr},
},
})
}

t := time.Unix(event.Timestamp, 0)

span.Events = append(span.Events, &v1.Span_Event{
// TODO use time.UnixMilli() when upgraded to Go 1.17
TimeUnixNano: uint64(t.UnixNano() / int64(time.Millisecond)),
Attributes: attrs,
})
}
libs[0].Spans = append(libs[0].Spans, span)
}

trace.Batches = append(trace.Batches, &v1.ResourceSpans{
Resource: &v1resource.Resource{},
InstrumentationLibrarySpans: libs,
})

return trace
}

func generateRandomInt(min int64, max int64, r *rand.Rand) int64 {
number := min + r.Int63n(max-min)
if number == min {
Expand Down Expand Up @@ -301,9 +389,31 @@ func queryTempoAndAnalyze(baseURL string, seed time.Time, traceID string) (trace
tm.missingSpans++
}

// Get the expected
expected := constructTraceFromEpoch(seed)

match := equalTraces(expected, trace)
if !match {
tm.incorrectResult++
if diff := deep.Equal(expected, trace); diff != nil {
for _, d := range diff {
logger.Error("incorrect result",
zap.String("expected -> response", d),
)
}
}
}

return tm, nil
}

func equalTraces(a, b *tempopb.Trace) bool {
model.SortTrace(a)
model.SortTrace(b)

return reflect.DeepEqual(a, b)
}

func hasMissingSpans(t *tempopb.Trace) bool {
// collect all parent span IDs
linkedSpanIDs := make([][]byte, 0)
Expand Down Expand Up @@ -340,3 +450,19 @@ func hasMissingSpans(t *tempopb.Trace) bool {

return false
}

func constructTraceFromEpoch(epoch time.Time) *tempopb.Trace {
r := newRand(epoch)
traceIDHigh := r.Int63()
traceIDLow := r.Int63()

trace := &tempopb.Trace{}

for i := int64(0); i < generateRandomInt(1, 100, r); i++ {
batch := makeThriftBatch(traceIDHigh, traceIDLow, r, epoch)
result := jaegerBatchToPbTrace(batch)
trace.Batches = append(trace.Batches, result.Batches...)
}

return trace
}
Loading

0 comments on commit 362569d

Please sign in to comment.