Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove WAL contention between ingest and searches #1076

Merged
merged 8 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
* [ENHANCEMENT] Adding metrics around ingester flush retries [#1049](https://github.com/grafana/tempo/pull/944) (@dannykopping)
* [ENHANCEMENT] Allow search disablement in vulture [#1069](https://github.com/grafana/tempo/pull/1069) (@zalegrala)
* [ENHANCEMENT] Jsonnet: add `$._config.search_enabled`, correctly set `http_api_prefix` in config [#1072](https://github.com/grafana/tempo/pull/1072) (@kvrhdn)
* [ENHANCEMENT] Performance: Remove WAL contention between ingest and searches [#1076](https://github.com/grafana/tempo/pull/1076) (@mdisibio)
* [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala)
* [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio)
* [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio)
Expand Down
20 changes: 10 additions & 10 deletions modules/distributor/search_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ func TestExtractSearchData(t *testing.T) {
id: traceIDA,
searchData: &tempofb.SearchEntryMutable{
TraceID: traceIDA,
Tags: tempofb.SearchDataMap{
"foo": []string{"bar"},
search.RootSpanNameTag: []string{"firstSpan"},
search.SpanNameTag: []string{"firstSpan"},
search.RootServiceNameTag: []string{"baz"},
search.ServiceNameTag: []string{"baz"},
},
Tags: tempofb.NewSearchDataMapWithData(map[string][]string{
"foo": {"bar"},
search.RootSpanNameTag: {"firstSpan"},
search.SpanNameTag: {"firstSpan"},
search.RootServiceNameTag: {"baz"},
search.ServiceNameTag: {"baz"},
}),
StartTimeUnixNano: 0,
EndTimeUnixNano: 0,
},
Expand Down Expand Up @@ -104,9 +104,9 @@ func TestExtractSearchData(t *testing.T) {
id: traceIDA,
searchData: &tempofb.SearchEntryMutable{
TraceID: traceIDA,
Tags: tempofb.SearchDataMap{
"bar": []string{"baz"},
},
Tags: tempofb.NewSearchDataMapWithData(map[string][]string{
"bar": {"baz"},
}),
StartTimeUnixNano: 0,
EndTimeUnixNano: 0,
},
Expand Down
8 changes: 6 additions & 2 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (i *instance) ClearCompletingBlock(blockID uuid.UUID) error {
if completingBlock != nil {
entry := i.searchAppendBlocks[completingBlock]
if entry != nil {
// Take write lock to ensure no searches are reading.
entry.mtx.Lock()
defer entry.mtx.Unlock()
_ = entry.b.Clear()
Expand Down Expand Up @@ -544,8 +545,11 @@ func (i *instance) writeTraceToHeadBlock(id common.ID, b []byte, searchData [][]

entry := i.searchHeadBlock
if entry != nil {
entry.mtx.Lock()
defer entry.mtx.Unlock()
// Don't take a write lock on the block here. It is safe
joe-elliott marked this conversation as resolved.
Show resolved Hide resolved
// for the appender to write to its file while a search
// is reading it. This prevents stalling the write path
// while a search is happening. There are mutexes internally
// for the parts that aren't.
err := entry.b.Append(context.TODO(), id, searchData)
return err
}
Expand Down
89 changes: 58 additions & 31 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/tempo/tempodb/search"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/atomic"
)

func checkEqual(t *testing.T, ids [][]byte, sr *tempopb.SearchResponse) {
Expand Down Expand Up @@ -400,7 +401,6 @@ func TestInstanceSearchMetrics(t *testing.T) {

func BenchmarkInstanceSearchUnderLoad(b *testing.B) {
ctx := context.TODO()
//n := 1_000_000

i := defaultInstance(b, b.TempDir())

Expand All @@ -417,53 +417,80 @@ func BenchmarkInstanceSearchUnderLoad(b *testing.B) {
}
}

go concurrent(func() {
id := make([]byte, 16)
rand.Read(id)

trace := test.MakeTrace(10, id)
traceBytes, err := trace.Marshal()
require.NoError(b, err)

searchData := &tempofb.SearchEntryMutable{}
searchData.TraceID = id
searchData.AddTag("foo", "bar")
searchBytes := searchData.ToBytes()

// searchData will be nil if not
err = i.PushBytes(context.Background(), id, traceBytes, searchBytes)
require.NoError(b, err)
})
// Push data
var tracesPushed atomic.Int32
for j := 0; j < 2; j++ {
go concurrent(func() {
id := make([]byte, 16)
rand.Read(id)

trace := test.MakeTrace(10, id)
traceBytes, err := trace.Marshal()
require.NoError(b, err)

searchData := &tempofb.SearchEntryMutable{}
searchData.TraceID = id
searchData.AddTag("foo", "bar")
searchData.AddTag("foo", "baz")
searchData.AddTag("bar", "bar")
searchData.AddTag("bar", "baz")
searchBytes := searchData.ToBytes()

// searchData will be nil if not
err = i.PushBytes(context.Background(), id, traceBytes, searchBytes)
require.NoError(b, err)

tracesPushed.Inc()
})
}

cuts := 0
go concurrent(func() {
time.Sleep(250 * time.Millisecond)
err := i.CutCompleteTraces(0, true)
require.NoError(b, err, "error cutting complete traces")
cuts++
})

go concurrent(func() {
// Slow this down to prevent "too many open files" error
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
_, err := i.CutBlockIfReady(0, 0, true)
require.NoError(b, err)
})

var searches atomic.Int32
var bytesInspected atomic.Uint64
var tracesInspected atomic.Uint32

for j := 0; j < 2; j++ {
go concurrent(func() {
//time.Sleep(1 * time.Millisecond)
var req = &tempopb.SearchRequest{
Tags: map[string]string{search.SecretExhaustiveSearchTag: "!"},
}
resp, err := i.Search(ctx, req)
require.NoError(b, err)
searches.Inc()
bytesInspected.Add(resp.Metrics.InspectedBytes)
tracesInspected.Add(resp.Metrics.InspectedTraces)
})
}

b.ResetTimer()
start := time.Now()
bytesInspected := uint64(0)
for j := 0; j < b.N; j++ {
var req = &tempopb.SearchRequest{
Tags: map[string]string{"nomatch": "nomatch"},
}
resp, err := i.Search(ctx, req)
require.NoError(b, err)
bytesInspected += resp.Metrics.InspectedBytes
}
time.Sleep(time.Duration(b.N) * time.Millisecond)
elapsed := time.Since(start)

fmt.Printf("Instance search throughput under load: %v elapsed %.2f MB = %.2f MiB/s throughput \n",
fmt.Printf("Instance search throughput under load: %v elapsed %.2f MB = %.2f MiB/s throughput inspected %.2f traces/s pushed %.2f traces/s %.2f searches/s %.2f cuts/s\n",
elapsed,
float64(bytesInspected)/(1024*1024),
float64(bytesInspected)/(elapsed.Seconds())/(1024*1024))
float64(bytesInspected.Load())/(1024*1024),
float64(bytesInspected.Load())/(elapsed.Seconds())/(1024*1024),
float64(tracesInspected.Load())/(elapsed.Seconds()),
float64(tracesPushed.Load())/(elapsed.Seconds()),
float64(searches.Load())/(elapsed.Seconds()),
float64(cuts)/(elapsed.Seconds()),
)

b.StopTimer()
close(end)
Expand Down
3 changes: 2 additions & 1 deletion modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,8 @@ func defaultInstance(t require.TestingT, tmpDir string) *instance {
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: tmpDir,
Filepath: tmpDir,
SearchEncoding: backend.EncNone,
},
},
}, log.NewNopLogger())
Expand Down
19 changes: 4 additions & 15 deletions pkg/tempofb/SearchBlockHeader_util.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package tempofb

import (
"strings"

flatbuffers "github.com/google/flatbuffers/go"
)

Expand All @@ -18,7 +16,7 @@ type SearchBlockHeaderMutable struct {

func NewSearchBlockHeaderMutable() *SearchBlockHeaderMutable {
return &SearchBlockHeaderMutable{
Tags: SearchDataMap{},
Tags: NewSearchDataMap(),
}
}

Expand All @@ -29,8 +27,9 @@ func (s *SearchBlockHeaderMutable) AddEntry(e *SearchEntry) {
// Record all unique keyvalues
for i, ii := 0, e.TagsLength(); i < ii; i++ {
e.Tags(kv, i)
key := string(kv.Key())
for j, jj := 0, kv.ValueLength(); j < jj; j++ {
s.AddTag(string(kv.Key()), string(kv.Value(j)))
s.AddTag(key, string(kv.Value(j)))
}
}

Expand Down Expand Up @@ -58,17 +57,7 @@ func (s *SearchBlockHeaderMutable) MaxDurationNanos() uint64 {
}

func (s *SearchBlockHeaderMutable) Contains(k []byte, v []byte, _ *KeyValues) bool {
e := s.Tags[string(k)]
if e != nil {
vv := string(v)
for _, s := range e {
if strings.Contains(s, vv) {
return true
}
}
}

return false
return s.Tags.Contains(string(k), string(v))
}

func (s *SearchBlockHeaderMutable) ToBytes() []byte {
Expand Down
85 changes: 9 additions & 76 deletions pkg/tempofb/searchdata_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,81 +3,11 @@ package tempofb
import (
"bytes"
"sort"
"strings"

flatbuffers "github.com/google/flatbuffers/go"
"github.com/grafana/tempo/tempodb/encoding/common"
)

type SearchDataMap map[string][]string

func (s SearchDataMap) Add(k, v string) {
vs, ok := s[k]
if !ok {
// First entry for key
s[k] = []string{v}
return
}

// Key already present, now check for value
for i := range vs {
if vs[i] == v {
// Already present, nothing to do
return
}
}

// Not found, append
s[k] = append(vs, v)
}

func (s SearchDataMap) WriteToBuilder(b *flatbuffers.Builder) flatbuffers.UOffsetT {
offsets := make([]flatbuffers.UOffsetT, 0, len(s))

// Sort keys
keys := make([]string, 0, len(s))
for k := range s {
keys = append(keys, k)
}
sort.Strings(keys)

for _, k := range keys {
// Skip empty keys
if len(s[k]) <= 0 {
continue
}

ko := b.CreateSharedString(strings.ToLower(k))

// Sort values
v := s[k]
sort.Strings(v)

valueStrings := make([]flatbuffers.UOffsetT, len(v))
for i := range v {
valueStrings[i] = b.CreateSharedString(strings.ToLower(v[i]))
}

KeyValuesStartValueVector(b, len(valueStrings))
for _, vs := range valueStrings {
b.PrependUOffsetT(vs)
}
valueVector := b.EndVector(len(valueStrings))

KeyValuesStart(b)
KeyValuesAddKey(b, ko)
KeyValuesAddValue(b, valueVector)
offsets = append(offsets, KeyValuesEnd(b))
}

SearchEntryStartTagsVector(b, len(offsets))
for _, kvo := range offsets {
b.PrependUOffsetT(kvo)
}
keyValueVector := b.EndVector((len(offsets)))
return keyValueVector
}

// SearchEntryMutable is a mutable form of the flatbuffer-compiled SearchEntry struct to make building and transporting easier.
type SearchEntryMutable struct {
TraceID common.ID
Expand All @@ -89,7 +19,7 @@ type SearchEntryMutable struct {
// AddTag adds the unique tag name and value to the search data. No effect if the pair is already present.
func (s *SearchEntryMutable) AddTag(k string, v string) {
if s.Tags == nil {
s.Tags = SearchDataMap{}
s.Tags = NewSearchDataMap()
}
s.Tags.Add(k, v)
}
Expand All @@ -116,6 +46,9 @@ func (s *SearchEntryMutable) ToBytes() []byte {
}

func (s *SearchEntryMutable) WriteToBuilder(b *flatbuffers.Builder) flatbuffers.UOffsetT {
if s.Tags == nil {
s.Tags = NewSearchDataMap()
}

idOffset := b.CreateByteString(s.TraceID)

Expand All @@ -138,15 +71,15 @@ type SearchPageBuilder struct {
func NewSearchPageBuilder() *SearchPageBuilder {
return &SearchPageBuilder{
builder: flatbuffers.NewBuilder(1024),
allTags: SearchDataMap{},
allTags: NewSearchDataMap(),
}
}

func (b *SearchPageBuilder) AddData(data *SearchEntryMutable) int {
for k, vv := range data.Tags {
for _, v := range vv {
if data.Tags != nil {
data.Tags.Range(func(k, v string) {
b.allTags.Add(k, v)
}
})
}

oldOffset := b.builder.Offset()
Expand Down Expand Up @@ -186,7 +119,7 @@ func (b *SearchPageBuilder) Finish() []byte {
func (b *SearchPageBuilder) Reset() {
b.builder.Reset()
b.pageEntries = b.pageEntries[:0]
b.allTags = SearchDataMap{}
b.allTags = NewSearchDataMap()
}

// Get searches the entry and returns the first value found for the given key.
Expand Down
Loading