Skip to content

Commit

Permalink
Remove WAL contention between ingest and searches (#1076)
Browse files Browse the repository at this point in the history
* Create SearchDataMap2, benchmarks, relocate code

* Renames, update tests to avoid direct usage, swap out types

* Remove block-level write lock from ingest path, add necessary mutexes around appender records and block header

* cleanup

* cleanup, fix tests

* changelog

* lint

* Restore benchmark code block
  • Loading branch information
mdisibio authored Oct 26, 2021
1 parent b539171 commit 3407b35
Show file tree
Hide file tree
Showing 14 changed files with 393 additions and 162 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
* [ENHANCEMENT] Performance: More efficient distributor batching [#1075](https://github.com/grafana/tempo/pull/1075) (@joe-elliott)
* [ENHANCEMENT] Allow search disablement in vulture [#1069](https://github.com/grafana/tempo/pull/1069) (@zalegrala)
* [ENHANCEMENT] Jsonnet: add `$._config.search_enabled`, correctly set `http_api_prefix` in config [#1072](https://github.com/grafana/tempo/pull/1072) (@kvrhdn)
* [ENHANCEMENT] Performance: Remove WAL contention between ingest and searches [#1076](https://github.com/grafana/tempo/pull/1076) (@mdisibio)
* [BUGFIX] Update port spec for GCS docker-compose example [#869](https://github.com/grafana/tempo/pull/869) (@zalegrala)
* [BUGFIX] Fix "magic number" errors and other block mishandling when an ingester forcefully shuts down [#937](https://github.com/grafana/tempo/issues/937) (@mdisibio)
* [BUGFIX] Fix compactor memory leak [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio)
Expand Down
20 changes: 10 additions & 10 deletions modules/distributor/search_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,13 @@ func TestExtractSearchData(t *testing.T) {
id: traceIDA,
searchData: &tempofb.SearchEntryMutable{
TraceID: traceIDA,
Tags: tempofb.SearchDataMap{
"foo": []string{"bar"},
search.RootSpanNameTag: []string{"firstSpan"},
search.SpanNameTag: []string{"firstSpan"},
search.RootServiceNameTag: []string{"baz"},
search.ServiceNameTag: []string{"baz"},
},
Tags: tempofb.NewSearchDataMapWithData(map[string][]string{
"foo": {"bar"},
search.RootSpanNameTag: {"firstSpan"},
search.SpanNameTag: {"firstSpan"},
search.RootServiceNameTag: {"baz"},
search.ServiceNameTag: {"baz"},
}),
StartTimeUnixNano: 0,
EndTimeUnixNano: 0,
},
Expand Down Expand Up @@ -104,9 +104,9 @@ func TestExtractSearchData(t *testing.T) {
id: traceIDA,
searchData: &tempofb.SearchEntryMutable{
TraceID: traceIDA,
Tags: tempofb.SearchDataMap{
"bar": []string{"baz"},
},
Tags: tempofb.NewSearchDataMapWithData(map[string][]string{
"bar": {"baz"},
}),
StartTimeUnixNano: 0,
EndTimeUnixNano: 0,
},
Expand Down
8 changes: 6 additions & 2 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ func (i *instance) ClearCompletingBlock(blockID uuid.UUID) error {
if completingBlock != nil {
entry := i.searchAppendBlocks[completingBlock]
if entry != nil {
// Take write lock to ensure no searches are reading.
entry.mtx.Lock()
defer entry.mtx.Unlock()
_ = entry.b.Clear()
Expand Down Expand Up @@ -544,8 +545,11 @@ func (i *instance) writeTraceToHeadBlock(id common.ID, b []byte, searchData [][]

entry := i.searchHeadBlock
if entry != nil {
entry.mtx.Lock()
defer entry.mtx.Unlock()
// Don't take a write lock on the block here. It is safe
// for the appender to write to its file while a search
// is reading it. This prevents stalling the write path
// while a search is happening. There are mutexes internally
// for the parts that aren't.
err := entry.b.Append(context.TODO(), id, searchData)
return err
}
Expand Down
89 changes: 58 additions & 31 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/tempo/tempodb/search"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/atomic"
)

func checkEqual(t *testing.T, ids [][]byte, sr *tempopb.SearchResponse) {
Expand Down Expand Up @@ -400,7 +401,6 @@ func TestInstanceSearchMetrics(t *testing.T) {

func BenchmarkInstanceSearchUnderLoad(b *testing.B) {
ctx := context.TODO()
//n := 1_000_000

i := defaultInstance(b, b.TempDir())

Expand All @@ -417,53 +417,80 @@ func BenchmarkInstanceSearchUnderLoad(b *testing.B) {
}
}

go concurrent(func() {
id := make([]byte, 16)
rand.Read(id)

trace := test.MakeTrace(10, id)
traceBytes, err := trace.Marshal()
require.NoError(b, err)

searchData := &tempofb.SearchEntryMutable{}
searchData.TraceID = id
searchData.AddTag("foo", "bar")
searchBytes := searchData.ToBytes()

// searchData will be nil if not
err = i.PushBytes(context.Background(), id, traceBytes, searchBytes)
require.NoError(b, err)
})
// Push data
var tracesPushed atomic.Int32
for j := 0; j < 2; j++ {
go concurrent(func() {
id := make([]byte, 16)
rand.Read(id)

trace := test.MakeTrace(10, id)
traceBytes, err := trace.Marshal()
require.NoError(b, err)

searchData := &tempofb.SearchEntryMutable{}
searchData.TraceID = id
searchData.AddTag("foo", "bar")
searchData.AddTag("foo", "baz")
searchData.AddTag("bar", "bar")
searchData.AddTag("bar", "baz")
searchBytes := searchData.ToBytes()

// searchData will be nil if not
err = i.PushBytes(context.Background(), id, traceBytes, searchBytes)
require.NoError(b, err)

tracesPushed.Inc()
})
}

cuts := 0
go concurrent(func() {
time.Sleep(250 * time.Millisecond)
err := i.CutCompleteTraces(0, true)
require.NoError(b, err, "error cutting complete traces")
cuts++
})

go concurrent(func() {
// Slow this down to prevent "too many open files" error
time.Sleep(10 * time.Millisecond)
time.Sleep(100 * time.Millisecond)
_, err := i.CutBlockIfReady(0, 0, true)
require.NoError(b, err)
})

var searches atomic.Int32
var bytesInspected atomic.Uint64
var tracesInspected atomic.Uint32

for j := 0; j < 2; j++ {
go concurrent(func() {
//time.Sleep(1 * time.Millisecond)
var req = &tempopb.SearchRequest{
Tags: map[string]string{search.SecretExhaustiveSearchTag: "!"},
}
resp, err := i.Search(ctx, req)
require.NoError(b, err)
searches.Inc()
bytesInspected.Add(resp.Metrics.InspectedBytes)
tracesInspected.Add(resp.Metrics.InspectedTraces)
})
}

b.ResetTimer()
start := time.Now()
bytesInspected := uint64(0)
for j := 0; j < b.N; j++ {
var req = &tempopb.SearchRequest{
Tags: map[string]string{"nomatch": "nomatch"},
}
resp, err := i.Search(ctx, req)
require.NoError(b, err)
bytesInspected += resp.Metrics.InspectedBytes
}
time.Sleep(time.Duration(b.N) * time.Millisecond)
elapsed := time.Since(start)

fmt.Printf("Instance search throughput under load: %v elapsed %.2f MB = %.2f MiB/s throughput \n",
fmt.Printf("Instance search throughput under load: %v elapsed %.2f MB = %.2f MiB/s throughput inspected %.2f traces/s pushed %.2f traces/s %.2f searches/s %.2f cuts/s\n",
elapsed,
float64(bytesInspected)/(1024*1024),
float64(bytesInspected)/(elapsed.Seconds())/(1024*1024))
float64(bytesInspected.Load())/(1024*1024),
float64(bytesInspected.Load())/(elapsed.Seconds())/(1024*1024),
float64(tracesInspected.Load())/(elapsed.Seconds()),
float64(tracesPushed.Load())/(elapsed.Seconds()),
float64(searches.Load())/(elapsed.Seconds()),
float64(cuts)/(elapsed.Seconds()),
)

b.StopTimer()
close(end)
Expand Down
3 changes: 2 additions & 1 deletion modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,8 @@ func defaultInstance(t require.TestingT, tmpDir string) *instance {
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: tmpDir,
Filepath: tmpDir,
SearchEncoding: backend.EncNone,
},
},
}, log.NewNopLogger())
Expand Down
19 changes: 4 additions & 15 deletions pkg/tempofb/SearchBlockHeader_util.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package tempofb

import (
"strings"

flatbuffers "github.com/google/flatbuffers/go"
)

Expand All @@ -18,7 +16,7 @@ type SearchBlockHeaderMutable struct {

func NewSearchBlockHeaderMutable() *SearchBlockHeaderMutable {
return &SearchBlockHeaderMutable{
Tags: SearchDataMap{},
Tags: NewSearchDataMap(),
}
}

Expand All @@ -29,8 +27,9 @@ func (s *SearchBlockHeaderMutable) AddEntry(e *SearchEntry) {
// Record all unique keyvalues
for i, ii := 0, e.TagsLength(); i < ii; i++ {
e.Tags(kv, i)
key := string(kv.Key())
for j, jj := 0, kv.ValueLength(); j < jj; j++ {
s.AddTag(string(kv.Key()), string(kv.Value(j)))
s.AddTag(key, string(kv.Value(j)))
}
}

Expand Down Expand Up @@ -58,17 +57,7 @@ func (s *SearchBlockHeaderMutable) MaxDurationNanos() uint64 {
}

func (s *SearchBlockHeaderMutable) Contains(k []byte, v []byte, _ *KeyValues) bool {
e := s.Tags[string(k)]
if e != nil {
vv := string(v)
for _, s := range e {
if strings.Contains(s, vv) {
return true
}
}
}

return false
return s.Tags.Contains(string(k), string(v))
}

func (s *SearchBlockHeaderMutable) ToBytes() []byte {
Expand Down
85 changes: 9 additions & 76 deletions pkg/tempofb/searchdata_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,81 +3,11 @@ package tempofb
import (
"bytes"
"sort"
"strings"

flatbuffers "github.com/google/flatbuffers/go"
"github.com/grafana/tempo/tempodb/encoding/common"
)

type SearchDataMap map[string][]string

func (s SearchDataMap) Add(k, v string) {
vs, ok := s[k]
if !ok {
// First entry for key
s[k] = []string{v}
return
}

// Key already present, now check for value
for i := range vs {
if vs[i] == v {
// Already present, nothing to do
return
}
}

// Not found, append
s[k] = append(vs, v)
}

func (s SearchDataMap) WriteToBuilder(b *flatbuffers.Builder) flatbuffers.UOffsetT {
offsets := make([]flatbuffers.UOffsetT, 0, len(s))

// Sort keys
keys := make([]string, 0, len(s))
for k := range s {
keys = append(keys, k)
}
sort.Strings(keys)

for _, k := range keys {
// Skip empty keys
if len(s[k]) <= 0 {
continue
}

ko := b.CreateSharedString(strings.ToLower(k))

// Sort values
v := s[k]
sort.Strings(v)

valueStrings := make([]flatbuffers.UOffsetT, len(v))
for i := range v {
valueStrings[i] = b.CreateSharedString(strings.ToLower(v[i]))
}

KeyValuesStartValueVector(b, len(valueStrings))
for _, vs := range valueStrings {
b.PrependUOffsetT(vs)
}
valueVector := b.EndVector(len(valueStrings))

KeyValuesStart(b)
KeyValuesAddKey(b, ko)
KeyValuesAddValue(b, valueVector)
offsets = append(offsets, KeyValuesEnd(b))
}

SearchEntryStartTagsVector(b, len(offsets))
for _, kvo := range offsets {
b.PrependUOffsetT(kvo)
}
keyValueVector := b.EndVector((len(offsets)))
return keyValueVector
}

// SearchEntryMutable is a mutable form of the flatbuffer-compiled SearchEntry struct to make building and transporting easier.
type SearchEntryMutable struct {
TraceID common.ID
Expand All @@ -89,7 +19,7 @@ type SearchEntryMutable struct {
// AddTag adds the unique tag name and value to the search data. No effect if the pair is already present.
func (s *SearchEntryMutable) AddTag(k string, v string) {
if s.Tags == nil {
s.Tags = SearchDataMap{}
s.Tags = NewSearchDataMap()
}
s.Tags.Add(k, v)
}
Expand All @@ -116,6 +46,9 @@ func (s *SearchEntryMutable) ToBytes() []byte {
}

func (s *SearchEntryMutable) WriteToBuilder(b *flatbuffers.Builder) flatbuffers.UOffsetT {
if s.Tags == nil {
s.Tags = NewSearchDataMap()
}

idOffset := b.CreateByteString(s.TraceID)

Expand All @@ -138,15 +71,15 @@ type SearchPageBuilder struct {
func NewSearchPageBuilder() *SearchPageBuilder {
return &SearchPageBuilder{
builder: flatbuffers.NewBuilder(1024),
allTags: SearchDataMap{},
allTags: NewSearchDataMap(),
}
}

func (b *SearchPageBuilder) AddData(data *SearchEntryMutable) int {
for k, vv := range data.Tags {
for _, v := range vv {
if data.Tags != nil {
data.Tags.Range(func(k, v string) {
b.allTags.Add(k, v)
}
})
}

oldOffset := b.builder.Offset()
Expand Down Expand Up @@ -186,7 +119,7 @@ func (b *SearchPageBuilder) Finish() []byte {
func (b *SearchPageBuilder) Reset() {
b.builder.Reset()
b.pageEntries = b.pageEntries[:0]
b.allTags = SearchDataMap{}
b.allTags = NewSearchDataMap()
}

// Get searches the entry and returns the first value found for the given key.
Expand Down
Loading

0 comments on commit 3407b35

Please sign in to comment.