diff --git a/CHANGELOG.md b/CHANGELOG.md index 11ffe2568b3..6854faf8551 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,7 @@ * [BUGFIX] Fix an issue with WAL replay of zero-length search data when search is disabled. [#968](https://github.com/grafana/tempo/pull/968) (@annanay25) * [BUGFIX] Set span's tag `span.kind` to `client` in query-frontend [#975](https://github.com/grafana/tempo/pull/975) (@mapno) * [BUGFIX] Nil check overrides module in the `/status` handler [#994](https://github.com/grafana/tempo/pull/994) (@mapno) +* [BUGFIX] Several bug fixes for search contention and panics [#1033](https://github.com/grafana/tempo/pull/1033) (@mdisibio) ## v1.1.0 / 2021-08-26 * [CHANGE] Upgrade Cortex from v1.9.0 to v1.9.0-131-ga4bf10354 [#841](https://github.com/grafana/tempo/pull/841) (@aknuds1) diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index 61d561e5359..b64cfdbd550 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -213,7 +213,9 @@ func TestWal(t *testing.T) { } func TestSearchWAL(t *testing.T) { - tmpDir := t.TempDir() + tmpDir, err := os.MkdirTemp("/tmp", "") + require.NoError(t, err, "unexpected error getting tempdir") + defer os.RemoveAll(tmpDir) i := defaultIngesterModule(t, tmpDir) inst, _ := i.getOrCreateInstance("test") @@ -221,7 +223,7 @@ func TestSearchWAL(t *testing.T) { // create some search data id := make([]byte, 16) - _, err := rand.Read(id) + _, err = rand.Read(id) require.NoError(t, err) trace := test.MakeTrace(10, id) traceBytes, err := trace.Marshal() diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 1bd3e970ea6..1b45818ad70 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -277,9 +277,9 @@ func (i *instance) CompleteBlock(blockID uuid.UUID) error { } // Search data (optional) - i.blocksMtx.Lock() + i.blocksMtx.RLock() oldSearch := i.searchAppendBlocks[completingBlock] - i.blocksMtx.Unlock() + i.blocksMtx.RUnlock() var newSearch search.SearchableBlock if oldSearch != nil { @@ -332,8 +332,8 @@ func (i *instance) ClearCompletingBlock(blockID uuid.UUID) error { // GetBlockToBeFlushed gets a list of blocks that can be flushed to the backend func (i *instance) GetBlockToBeFlushed(blockID uuid.UUID) *wal.LocalBlock { - i.blocksMtx.Lock() - defer i.blocksMtx.Unlock() + i.blocksMtx.RLock() + defer i.blocksMtx.RUnlock() for _, c := range i.completeBlocks { if c.BlockMeta().BlockID == blockID && c.FlushedTime().IsZero() { @@ -392,8 +392,8 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace } i.tracesMtx.Unlock() - i.blocksMtx.Lock() - defer i.blocksMtx.Unlock() + i.blocksMtx.RLock() + defer i.blocksMtx.RUnlock() // headBlock foundBytes, err := i.headBlock.Find(id, model.ObjectCombiner) @@ -546,8 +546,8 @@ func (i *instance) writeTraceToHeadBlock(id common.ID, b []byte, searchData [][] entry := i.searchHeadBlock if entry != nil { entry.mtx.Lock() + defer entry.mtx.Unlock() err := entry.b.Append(context.TODO(), id, searchData) - entry.mtx.Unlock() return err } diff --git a/modules/ingester/instance_search.go b/modules/ingester/instance_search.go index 63ec57e8ceb..fa18ada7d6a 100644 --- a/modules/ingester/instance_search.go +++ b/modules/ingester/instance_search.go @@ -14,6 +14,9 @@ import ( func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tempopb.SearchResponse, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + maxResults := int(req.Limit) // if limit is not set, use a safe default if maxResults == 0 { @@ -26,8 +29,14 @@ func (i *instance) Search(ctx context.Context, req *tempopb.SearchRequest) (*tem defer sr.Close() i.searchLiveTraces(ctx, p, sr) + + // Lock blocks mutex until all search tasks have been created. This avoids + // deadlocking with other activity (ingest, flushing), caused by releasing + // and then attempting to retake the lock. + i.blocksMtx.RLock() i.searchWAL(ctx, p, sr) i.searchLocalBlocks(ctx, p, sr) + i.blocksMtx.RUnlock() sr.AllWorkersStarted() @@ -109,6 +118,7 @@ func (i *instance) searchLiveTraces(ctx context.Context, p search.Pipeline, sr * }() } +// searchWAL starts a search task for every WAL block. Must be called under lock. func (i *instance) searchWAL(ctx context.Context, p search.Pipeline, sr *search.Results) { searchFunc := func(k *wal.AppendBlock, e *searchStreamingBlockEntry) { defer sr.FinishWorker() @@ -122,9 +132,6 @@ func (i *instance) searchWAL(ctx context.Context, p search.Pipeline, sr *search. } } - i.blocksMtx.Lock() - defer i.blocksMtx.Unlock() - // head block sr.StartWorker() go searchFunc(i.headBlock, i.searchHeadBlock) @@ -136,10 +143,8 @@ func (i *instance) searchWAL(ctx context.Context, p search.Pipeline, sr *search. } } +// searchLocalBlocks starts a search task for every local block. Must be called under lock. func (i *instance) searchLocalBlocks(ctx context.Context, p search.Pipeline, sr *search.Results) { - i.blocksMtx.Lock() - defer i.blocksMtx.Unlock() - for b, e := range i.searchCompleteBlocks { sr.StartWorker() go func(b *wal.LocalBlock, e *searchLocalBlockEntry) { diff --git a/tempodb/search/backend_search_block.go b/tempodb/search/backend_search_block.go index ba839be0166..09c6d2b7606 100644 --- a/tempodb/search/backend_search_block.go +++ b/tempodb/search/backend_search_block.go @@ -147,8 +147,13 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results meta, err := ReadSearchBlockMeta(ctx, s.r, s.id, s.tenantID) if err != nil { - // we create BackendSearchBlocks even if search files are missing, return nil here if meta does not exist if err == backend.ErrDoesNotExist { + // This means one of the following: + // * This block predates search and doesn't actually have + // search data (we create the block entry regardless) + // * This block is deleted between when the search was + // initiated and when we got here. + // In either case it is not an error. return nil } return err diff --git a/tempodb/search/results.go b/tempodb/search/results.go index 5d56aa79c87..c9d8119ceba 100644 --- a/tempodb/search/results.go +++ b/tempodb/search/results.go @@ -2,6 +2,7 @@ package search import ( "context" + "sync" "github.com/grafana/tempo/pkg/tempopb" "go.uber.org/atomic" @@ -11,11 +12,11 @@ import ( // channel that is easy to consume, signaling workers to quit early as needed, and collecting // metrics. type Results struct { - resultsCh chan *tempopb.TraceSearchMetadata - doneCh chan struct{} - quit atomic.Bool - started atomic.Bool - workerCount atomic.Int32 + resultsCh chan *tempopb.TraceSearchMetadata + doneCh chan struct{} + quit atomic.Bool + started atomic.Bool + wg sync.WaitGroup tracesInspected atomic.Uint32 bytesInspected atomic.Uint64 @@ -35,6 +36,10 @@ func NewResults() *Results { // is buffer space in the results channel or if the task should stop searching because the // receiver went away or the given context is done. In this case true is returned. func (sr *Results) AddResult(ctx context.Context, r *tempopb.TraceSearchMetadata) (quit bool) { + if sr.quit.Load() { + return true + } + select { case sr.resultsCh <- r: return false @@ -64,10 +69,12 @@ func (sr *Results) Results() <-chan *tempopb.TraceSearchMetadata { // sr := NewSearchResults() // defer sr.Close() func (sr *Results) Close() { - // Closing done channel makes all subsequent and blocked calls to AddResult return - // quit immediately. - close(sr.doneCh) - sr.quit.Store(true) + // Only once + if sr.quit.CAS(false, true) { + // Closing done channel makes all subsequent and blocked calls to AddResult return + // quit immediately. + close(sr.doneCh) + } } // StartWorker indicates another sender will be using the results channel. Must be followed @@ -76,31 +83,28 @@ func (sr *Results) Close() { // go func() { // defer sr.FinishWorker() func (sr *Results) StartWorker() { - sr.workerCount.Inc() + sr.wg.Add(1) } // AllWorkersStarted indicates that no more workers (senders) will be launched, and the // results channel can be closed once the number of workers reaches zero. This function // call occurs after all calls to StartWorker. func (sr *Results) AllWorkersStarted() { - sr.started.Store(true) - sr.checkCleanup(sr.workerCount.Load()) + // Only once + if sr.started.CAS(false, true) { + // Close results when all workers finished. + go func() { + sr.wg.Wait() + close(sr.resultsCh) + }() + } } // FinishWorker indicates a sender (goroutine) is done searching and will not // send any more search results. When the last sender is finished, the results // channel is closed. func (sr *Results) FinishWorker() { - newCount := sr.workerCount.Dec() - sr.checkCleanup(newCount) -} - -func (sr *Results) checkCleanup(workerCount int32) { - if sr.started.Load() && workerCount == 0 { - // No more senders. This ends the receiver that is iterating - // the results channel. - close(sr.resultsCh) - } + sr.wg.Add(-1) } func (sr *Results) TracesInspected() uint32 { diff --git a/tempodb/search/results_test.go b/tempodb/search/results_test.go new file mode 100644 index 00000000000..fba5130c516 --- /dev/null +++ b/tempodb/search/results_test.go @@ -0,0 +1,49 @@ +package search + +import ( + "context" + "testing" + + "github.com/grafana/tempo/pkg/tempopb" +) + +func TestResultsDoesNotRace(t *testing.T) { + + testCases := []struct { + name string + consumeResults bool + }{ + {"default", true}, + {"exit early", false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sr := NewResults() + defer sr.Close() + + for i := 0; i < 100; i++ { + sr.StartWorker() + go func() { + defer sr.FinishWorker() + + for j := 0; j < 10_000; j++ { + if sr.AddResult(ctx, &tempopb.TraceSearchMetadata{}) { + break + } + } + }() + } + + sr.AllWorkersStarted() + + if tc.consumeResults { + for range sr.Results() { + } + } + }) + } +} diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index e1664a3372f..1a97f0edd85 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -6,9 +6,11 @@ import ( "context" "io" "os" + "sync" "github.com/google/uuid" "github.com/pkg/errors" + "go.uber.org/atomic" "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/tempodb/backend" @@ -23,7 +25,9 @@ type StreamingSearchBlock struct { BlockID uuid.UUID // todo: add the full meta? appender encoding.Appender file *os.File + closed atomic.Bool bufferedWriter *bufio.Writer + flushMtx sync.Mutex header *tempofb.SearchBlockHeaderMutable v encoding.VersionedEncoding enc backend.Encoding @@ -31,11 +35,13 @@ type StreamingSearchBlock struct { // Close closes the WAL file. Used in tests func (s *StreamingSearchBlock) Close() error { + s.closed.Store(true) return s.file.Close() } // Clear deletes the files for this block. func (s *StreamingSearchBlock) Clear() error { + s.closed.Store(true) s.file.Close() return os.Remove(s.file.Name()) } @@ -90,11 +96,21 @@ func (s *StreamingSearchBlock) FlushBuffer() error { if s.bufferedWriter == nil { return nil } + + // Lock required to handle concurrent searches/readers flushing. + s.flushMtx.Lock() + defer s.flushMtx.Unlock() + return s.bufferedWriter.Flush() } // Search the streaming block. func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results) error { + if s.closed.Load() { + // Generally this means block has already been deleted + return nil + } + if !p.MatchesBlock(s.header) { sr.AddBlockSkipped() return nil