Skip to content

Commit

Permalink
Fix an inconsistency in wal search (#1548)
Browse files Browse the repository at this point in the history
* Move flatbuffer search extract methods from distributor to reuseable location

* Move trace test suite to reusable location

* Run trace search tests against wal and backend search blocks.  Fix wal header tag check to be substring

* lint

* changelog

* review feedback
  • Loading branch information
mdisibio authored Jul 6, 2022
1 parent 910b33c commit 6f98ff7
Show file tree
Hide file tree
Showing 10 changed files with 427 additions and 329 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Additionally, default label `span_status` is renamed to `status_code`.
* [BUGFIX] Update tempo microservices Helm values example which missed the 'enabled' key for thriftHttp. [#1472](https://github.com/grafana/tempo/pull/1472) (@hajowieland)
* [BUGFIX] Fix race condition in forwarder overrides loop. [1468](https://github.com/grafana/tempo/pull/1468) (@mapno)
* [BUGFIX] Fix v2 backend check on span name to be substring [#1538](https://github.com/grafana/tempo/pull/1538) (@mdisibio)
* [BUGFIX] Fix wal check on span name to be substring [#1548](https://github.com/grafana/tempo/pull/1548) (@mdisibio)
* [ENHANCEMENT] Add a config to query single ingester instance based on trace id hash for Trace By ID API. (1484)[https://github.com/grafana/tempo/pull/1484] (@sagarwala, @bikashmishra100, @ashwinidulams)
* [ENHANCEMENT] Add blocklist metrics for total backend objects and total backend bytes [#1519](https://github.com/grafana/tempo/pull/1519) (@ie-pham)

Expand Down
105 changes: 3 additions & 102 deletions modules/distributor/search_data.go
Original file line number Diff line number Diff line change
@@ -1,113 +1,14 @@
package distributor

import (
"strconv"

"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/pkg/tempopb"
common_v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
)

type extractTagFunc func(tag string) bool
import "github.com/grafana/tempo/pkg/model/trace"

// extractSearchDataAll returns flatbuffer search data for every trace.
func extractSearchDataAll(traces []*rebatchedTrace, extractTag extractTagFunc) [][]byte {
func extractSearchDataAll(traces []*rebatchedTrace, extractTag trace.ExtractTagFunc) [][]byte {
headers := make([][]byte, len(traces))

for i, t := range traces {
headers[i] = extractSearchData(t.trace, t.id, extractTag)
headers[i] = trace.ExtractSearchData(t.trace, t.id, extractTag)
}

return headers
}

// extractSearchData returns the flatbuffer search data for the given trace. It is extracted here
// in the distributor because this is the only place on the ingest path where the trace is available
// in object form.
func extractSearchData(tr *tempopb.Trace, id []byte, extractTag extractTagFunc) []byte {
data := &tempofb.SearchEntryMutable{}

data.TraceID = id

for _, b := range tr.Batches {
// Batch attrs
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(a.Key, s)
}
}
}

for _, ils := range b.InstrumentationLibrarySpans {
for _, s := range ils.Spans {

// Root span
if len(s.ParentSpanId) == 0 {

// Collect root.name
data.AddTag(trace.RootSpanNameTag, s.Name)

// Collect root.service.name
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
if a.Key == trace.ServiceNameTag {
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(trace.RootServiceNameTag, s)
}
}
}
}
}

// Collect for any spans
data.AddTag(trace.SpanNameTag, s.Name)
if s.Status != nil {
data.AddTag(trace.StatusCodeTag, strconv.Itoa(int(s.Status.Code)))
}
data.SetStartTimeUnixNano(s.StartTimeUnixNano)
data.SetEndTimeUnixNano(s.EndTimeUnixNano)

for _, a := range s.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(a.Key, s)
}
}
}
}
}

return data.ToBytes()
}

func extractValueAsString(v *common_v1.AnyValue) (s string, ok bool) {
vv := v.GetValue()
if vv == nil {
return "", false
}

if s, ok := vv.(*common_v1.AnyValue_StringValue); ok {
return s.StringValue, true
}

if b, ok := vv.(*common_v1.AnyValue_BoolValue); ok {
return strconv.FormatBool(b.BoolValue), true
}

if i, ok := vv.(*common_v1.AnyValue_IntValue); ok {
return strconv.FormatInt(i.IntValue, 10), true
}

if d, ok := vv.(*common_v1.AnyValue_DoubleValue); ok {
return strconv.FormatFloat(d.DoubleValue, 'g', -1, 64), true
}

return "", false
}
101 changes: 101 additions & 0 deletions pkg/model/trace/search_data.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package trace

import (
"strconv"

"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/pkg/tempopb"
common_v1 "github.com/grafana/tempo/pkg/tempopb/common/v1"
)

type ExtractTagFunc func(tag string) bool

// ExtractSearchData returns the flatbuffer search data for the given trace. It is extracted here
// in the distributor because this is the only place on the ingest path where the trace is available
// in object form.
func ExtractSearchData(tr *tempopb.Trace, id []byte, extractTag ExtractTagFunc) []byte {
data := &tempofb.SearchEntryMutable{}

data.TraceID = id

for _, b := range tr.Batches {
// Batch attrs
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(a.Key, s)
}
}
}

for _, ils := range b.InstrumentationLibrarySpans {
for _, s := range ils.Spans {

// Root span
if len(s.ParentSpanId) == 0 {

// Collect root.name
data.AddTag(RootSpanNameTag, s.Name)

// Collect root.service.name
if b.Resource != nil {
for _, a := range b.Resource.Attributes {
if a.Key == ServiceNameTag {
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(RootServiceNameTag, s)
}
}
}
}
}

// Collect for any spans
data.AddTag(SpanNameTag, s.Name)
if s.Status != nil {
data.AddTag(StatusCodeTag, strconv.Itoa(int(s.Status.Code)))
}
data.SetStartTimeUnixNano(s.StartTimeUnixNano)
data.SetEndTimeUnixNano(s.EndTimeUnixNano)

for _, a := range s.Attributes {
if !extractTag(a.Key) {
continue
}
if s, ok := extractValueAsString(a.Value); ok {
data.AddTag(a.Key, s)
}
}
}
}
}

return data.ToBytes()
}

func extractValueAsString(v *common_v1.AnyValue) (s string, ok bool) {
vv := v.GetValue()
if vv == nil {
return "", false
}

if s, ok := vv.(*common_v1.AnyValue_StringValue); ok {
return s.StringValue, true
}

if b, ok := vv.(*common_v1.AnyValue_BoolValue); ok {
return strconv.FormatBool(b.BoolValue), true
}

if i, ok := vv.(*common_v1.AnyValue_IntValue); ok {
return strconv.FormatInt(i.IntValue, 10), true
}

if d, ok := vv.(*common_v1.AnyValue_DoubleValue); ok {
return strconv.FormatFloat(d.DoubleValue, 'g', -1, 64), true
}

return "", false
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package distributor
package trace

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/grafana/tempo/pkg/model/trace"
"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/pkg/tempopb"
v1_common "github.com/grafana/tempo/pkg/tempopb/common/v1"
Expand All @@ -20,7 +19,7 @@ func TestExtractSearchData(t *testing.T) {
name string
trace *tempopb.Trace
id []byte
extractTag extractTagFunc
extractTag ExtractTagFunc
searchData *tempofb.SearchEntryMutable
}{
{
Expand Down Expand Up @@ -64,11 +63,11 @@ func TestExtractSearchData(t *testing.T) {
searchData: &tempofb.SearchEntryMutable{
TraceID: traceIDA,
Tags: tempofb.NewSearchDataMapWithData(map[string][]string{
"foo": {"bar"},
trace.RootSpanNameTag: {"firstSpan"},
trace.SpanNameTag: {"firstSpan"},
trace.RootServiceNameTag: {"baz"},
trace.ServiceNameTag: {"baz"},
"foo": {"bar"},
RootSpanNameTag: {"firstSpan"},
SpanNameTag: {"firstSpan"},
RootServiceNameTag: {"baz"},
ServiceNameTag: {"baz"},
}),
StartTimeUnixNano: 0,
EndTimeUnixNano: 0,
Expand Down Expand Up @@ -118,7 +117,7 @@ func TestExtractSearchData(t *testing.T) {

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.searchData.ToBytes(), extractSearchData(tc.trace, tc.id, tc.extractTag))
assert.Equal(t, tc.searchData.ToBytes(), ExtractSearchData(tc.trace, tc.id, tc.extractTag))
})
}
}
Loading

0 comments on commit 6f98ff7

Please sign in to comment.