Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Search stability improvements #1033

Merged
merged 7 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,17 @@ func TestWal(t *testing.T) {
}

func TestSearchWAL(t *testing.T) {
tmpDir := t.TempDir()
tmpDir, err := os.MkdirTemp("/tmp", "")
require.NoError(t, err, "unexpected error getting tempdir")
defer os.RemoveAll(tmpDir)

i := defaultIngesterModule(t, tmpDir)
inst, _ := i.getOrCreateInstance("test")
assert.NotNil(t, inst)

// create some search data
id := make([]byte, 16)
_, err := rand.Read(id)
_, err = rand.Read(id)
require.NoError(t, err)
trace := test.MakeTrace(10, id)
traceBytes, err := trace.Marshal()
Expand Down
14 changes: 7 additions & 7 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) error {
}

// Search data (optional)
i.blocksMtx.Lock()
i.blocksMtx.RLock()
oldSearch := i.searchAppendBlocks[completingBlock]
i.blocksMtx.Unlock()
i.blocksMtx.RUnlock()

var newSearch search.SearchableBlock
if oldSearch != nil {
Expand Down Expand Up @@ -332,8 +332,8 @@ func (i *instance) ClearCompletingBlock(blockID uuid.UUID) error {

// GetBlockToBeFlushed gets a list of blocks that can be flushed to the backend
func (i *instance) GetBlockToBeFlushed(blockID uuid.UUID) *wal.LocalBlock {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()
i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()

for _, c := range i.completeBlocks {
if c.BlockMeta().BlockID == blockID && c.FlushedTime().IsZero() {
Expand Down Expand Up @@ -392,8 +392,8 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace
}
i.tracesMtx.Unlock()

i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()
i.blocksMtx.RLock()
defer i.blocksMtx.RUnlock()

// headBlock
foundBytes, err := i.headBlock.Find(id, model.ObjectCombiner)
Expand Down Expand Up @@ -546,8 +546,8 @@ func (i *instance) writeTraceToHeadBlock(id common.ID, b []byte, searchData [][]
entry := i.searchHeadBlock
if entry != nil {
entry.mtx.Lock()
defer entry.mtx.Unlock()
err := entry.b.Append(context.TODO(), id, searchData)
entry.mtx.Unlock()
return err
}

Expand Down
17 changes: 11 additions & 6 deletions modules/ingester/instance_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (

func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tempopb.SearchResponse, error) {

ctx, cancel := context.WithCancel(ctx)
defer cancel()

maxResults := int(req.Limit)
// if limit is not set, use a safe default
if maxResults == 0 {
Expand All @@ -26,8 +29,14 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem
defer sr.Close()

i.searchLiveTraces(ctx, p, sr)

// Lock blocks mutex until all search tasks have been created. This avoids
// deadlocking with other activity (ingest, flushing), caused by releasing
// and then attempting to retake the lock.
annanay25 marked this conversation as resolved.
Show resolved Hide resolved
i.blocksMtx.RLock()
i.searchWAL(ctx, p, sr)
i.searchLocalBlocks(ctx, p, sr)
i.blocksMtx.RUnlock()

sr.AllWorkersStarted()

Expand Down Expand Up @@ -109,6 +118,7 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr *
}()
}

// searchWAL starts a search task for every WAL block. Must be called under lock.
func (i *instance) searchWAL(ctx context.Context, p search.Pipeline, sr *search.Results) {
searchFunc := func(k *wal.AppendBlock, e *searchStreamingBlockEntry) {
defer sr.FinishWorker()
Expand All @@ -122,9 +132,6 @@ func (i *instance) searchWAL(ctx context.Context, p search.Pipeline, sr *search.
}
}

i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

// head block
sr.StartWorker()
go searchFunc(i.headBlock, i.searchHeadBlock)
Expand All @@ -136,10 +143,8 @@ func (i *instance) searchWAL(ctx context.Context, p search.Pipeline, sr *search.
}
}

// searchLocalBlocks starts a search task for every local block. Must be called under lock.
func (i *instance) searchLocalBlocks(ctx context.Context, p search.Pipeline, sr *search.Results) {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

for b, e := range i.searchCompleteBlocks {
sr.StartWorker()
go func(b *wal.LocalBlock, e *searchLocalBlockEntry) {
Expand Down
7 changes: 6 additions & 1 deletion tempodb/search/backend_search_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,13 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results

meta, err := ReadSearchBlockMeta(ctx, s.r, s.id, s.tenantID)
if err != nil {
// we create BackendSearchBlocks even if search files are missing, return nil here if meta does not exist
if err == backend.ErrDoesNotExist {
// This means one of the following:
// * This block predates search and doesn't actually have
// search data (we create the block entry regardless)
// * This block is deleted between when the search was
mapno marked this conversation as resolved.
Show resolved Hide resolved
// initiated and when we got here.
// In either case it is not an error.
return nil
}
return err
Expand Down
48 changes: 26 additions & 22 deletions tempodb/search/results.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package search

import (
"context"
"sync"

"github.com/grafana/tempo/pkg/tempopb"
"go.uber.org/atomic"
Expand All @@ -11,11 +12,11 @@ import (
// channel that is easy to consume, signaling workers to quit early as needed, and collecting
// metrics.
type Results struct {
resultsCh chan *tempopb.TraceSearchMetadata
doneCh chan struct{}
quit atomic.Bool
started atomic.Bool
workerCount atomic.Int32
resultsCh chan *tempopb.TraceSearchMetadata
doneCh chan struct{}
quit atomic.Bool
started atomic.Bool
wg sync.WaitGroup

tracesInspected atomic.Uint32
bytesInspected atomic.Uint64
Expand All @@ -35,6 +36,10 @@ func NewResults() *Results {
// is buffer space in the results channel or if the task should stop searching because the
// receiver went away or the given context is done. In this case true is returned.
func (sr *Results) AddResult(ctx context.Context, r *tempopb.TraceSearchMetadata) (quit bool) {
if sr.quit.Load() {
return true
}

select {
case sr.resultsCh <- r:
return false
Expand Down Expand Up @@ -64,10 +69,12 @@ func (sr *Results) Results() <-chan *tempopb.TraceSearchMetadata {
// sr := NewSearchResults()
// defer sr.Close()
func (sr *Results) Close() {
// Closing done channel makes all subsequent and blocked calls to AddResult return
// quit immediately.
close(sr.doneCh)
sr.quit.Store(true)
// Only once
if sr.quit.CAS(false, true) {
// Closing done channel makes all subsequent and blocked calls to AddResult return
// quit immediately.
close(sr.doneCh)
}
}

// StartWorker indicates another sender will be using the results channel. Must be followed
Expand All @@ -76,31 +83,28 @@ func (sr *Results) Close() {
// go func() {
// defer sr.FinishWorker()
func (sr *Results) StartWorker() {
sr.workerCount.Inc()
sr.wg.Add(1)
}

// AllWorkersStarted indicates that no more workers (senders) will be launched, and the
// results channel can be closed once the number of workers reaches zero. This function
// call occurs after all calls to StartWorker.
func (sr *Results) AllWorkersStarted() {
sr.started.Store(true)
sr.checkCleanup(sr.workerCount.Load())
// Only once
if sr.started.CAS(false, true) {
// Close results when all workers finished.
go func() {
sr.wg.Wait()
close(sr.resultsCh)
}()
}
}

// FinishWorker indicates a sender (goroutine) is done searching and will not
// send any more search results. When the last sender is finished, the results
// channel is closed.
func (sr *Results) FinishWorker() {
newCount := sr.workerCount.Dec()
sr.checkCleanup(newCount)
}

func (sr *Results) checkCleanup(workerCount int32) {
if sr.started.Load() && workerCount == 0 {
// No more senders. This ends the receiver that is iterating
// the results channel.
close(sr.resultsCh)
}
sr.wg.Add(-1)
}

func (sr *Results) TracesInspected() uint32 {
Expand Down
49 changes: 49 additions & 0 deletions tempodb/search/results_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package search

import (
"context"
"testing"

"github.com/grafana/tempo/pkg/tempopb"
)

func TestResultsDoesNotRace(t *testing.T) {

testCases := []struct {
name string
consumeResults bool
}{
{"default", true},
{"exit early", false},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sr := NewResults()
defer sr.Close()

for i := 0; i < 100; i++ {
sr.StartWorker()
go func() {
defer sr.FinishWorker()

for j := 0; j < 10_000; j++ {
if sr.AddResult(ctx, &tempopb.TraceSearchMetadata{}) {
break
}
}
}()
}

sr.AllWorkersStarted()

if tc.consumeResults {
for range sr.Results() {
}
}
})
}
}
16 changes: 16 additions & 0 deletions tempodb/search/streaming_search_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"context"
"io"
"os"
"sync"

"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/atomic"

"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/tempodb/backend"
Expand All @@ -23,19 +25,23 @@ type StreamingSearchBlock struct {
BlockID uuid.UUID // todo: add the full meta?
appender encoding.Appender
file *os.File
closed atomic.Bool
bufferedWriter *bufio.Writer
flushMtx sync.Mutex
header *tempofb.SearchBlockHeaderMutable
v encoding.VersionedEncoding
enc backend.Encoding
}

// Close closes the WAL file. Used in tests
func (s *StreamingSearchBlock) Close() error {
s.closed.Store(true)
return s.file.Close()
}

// Clear deletes the files for this block.
func (s *StreamingSearchBlock) Clear() error {
s.closed.Store(true)
s.file.Close()
return os.Remove(s.file.Name())
}
Expand Down Expand Up @@ -90,11 +96,21 @@ func (s *StreamingSearchBlock) FlushBuffer() error {
if s.bufferedWriter == nil {
return nil
}

// Lock required to handle concurrent searches/readers flushing.
s.flushMtx.Lock()
defer s.flushMtx.Unlock()

return s.bufferedWriter.Flush()
}

// Search the streaming block.
func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results) error {
if s.closed.Load() {
// Generally this means block has already been deleted
return nil
}

if !p.MatchesBlock(s.header) {
sr.AddBlockSkipped()
return nil
Expand Down