From cdec27af2bbe0e202bab1d9dd41700c73f8d295f Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 29 Sep 2021 17:11:40 +0530 Subject: [PATCH 01/22] Checkpoint: Initial implementation of v2 search WAL Signed-off-by: Annanay --- modules/ingester/ingester.go | 16 ++- modules/ingester/instance.go | 8 +- tempodb/search/streaming_search_block.go | 127 ++++++++++++------ tempodb/search/streaming_search_block_test.go | 1 + tempodb/wal/append_block.go | 44 +----- tempodb/wal/replay.go | 62 +++++++++ 6 files changed, 170 insertions(+), 88 deletions(-) create mode 100644 tempodb/wal/replay.go diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index 3e9b4fc81cc..71430e7f184 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -3,9 +3,12 @@ package ingester import ( "context" "fmt" + "os" "sync" "time" + "github.com/grafana/tempo/tempodb/search" + "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/kit/log/level" @@ -354,7 +357,18 @@ func (i *Ingester) replayWal() error { return err } - instance.AddCompletingBlock(b) + // replay search WAL + filename := b.BlockID().String() + ":" + tenantID + ":searchdata" + file, err := os.OpenFile(filename, os.O_RDONLY, 0644) + if err != nil { + return err + } + searchWALBlock, err := search.NewStreamingSearchBlockFromWALReplay(file) + if err != nil { + return err + } + + instance.AddCompletingBlock(b, searchWALBlock) i.enqueue(&flushOp{ kind: opKindComplete, diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index a12c5e334e0..765fac7827a 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -447,11 +447,17 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace // AddCompletingBlock adds an AppendBlock directly to the slice of completing blocks. // This is used during wal replay. It is expected that calling code will add the appropriate // jobs to the queue to eventually flush these. -func (i *instance) AddCompletingBlock(b *wal.AppendBlock) { +func (i *instance) AddCompletingBlock(b *wal.AppendBlock, s *search.StreamingSearchBlock) { i.blocksMtx.Lock() defer i.blocksMtx.Unlock() i.completingBlocks = append(i.completingBlocks, b) + + // search WAL + if s == nil { + return + } + i.searchAppendBlocks[b] = &searchStreamingBlockEntry{b: s} } // getOrCreateTrace will return a new trace object for the given request diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index 4f21f90808f..bc65e946df3 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -1,25 +1,29 @@ package search import ( + "bytes" "context" "io" "os" + cortex_util "github.com/cortexproject/cortex/pkg/util/log" + "github.com/go-kit/log/level" + "github.com/grafana/tempo/pkg/tempofb" + "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" - "github.com/pkg/errors" + "github.com/grafana/tempo/tempodb/wal" ) var _ SearchableBlock = (*StreamingSearchBlock)(nil) -var _ common.DataWriter = (*StreamingSearchBlock)(nil) // StreamingSearchBlock is search data that is read/write, i.e. for traces in the WAL. type StreamingSearchBlock struct { - appender encoding.Appender - file *os.File - bytesWritten int - header *tempofb.SearchBlockHeaderMutable + appender encoding.Appender + file *os.File + header *tempofb.SearchBlockHeaderMutable + encoding encoding.VersionedEncoding } // Clear deletes the files for this block. @@ -28,46 +32,53 @@ func (s *StreamingSearchBlock) Clear() error { return os.Remove(s.file.Name()) } -func (*StreamingSearchBlock) Complete() error { - return nil -} - -// CutPage returns the number of bytes written previously so that the appender can build the index. -func (s *StreamingSearchBlock) CutPage() (int, error) { - b := s.bytesWritten - s.bytesWritten = 0 - return b, nil -} - -// Write the entry to the end of the file. The number of bytes written is saved and returned through CutPage. -func (s *StreamingSearchBlock) Write(id common.ID, obj []byte) (int, error) { - var err error - - _, err = s.file.Write(obj) - if err != nil { - return 0, err - } - - s.bytesWritten += len(obj) - - return len(obj), err -} - // NewStreamingSearchBlockForFile creates a new streaming block that will read/write the given file. // File must be opened for read/write permissions. func NewStreamingSearchBlockForFile(f *os.File) (*StreamingSearchBlock, error) { + v, err := encoding.FromVersion("v2") + if err != nil { + return nil, err + } s := &StreamingSearchBlock{ - file: f, - header: tempofb.NewSearchBlockHeaderMutable(), + file: f, + header: tempofb.NewSearchBlockHeaderMutable(), + encoding: v, } - // Entries are not paged, use non paged appender. - a := encoding.NewAppender(s) + // Use versioned encoding to create paged entries + dataWriter, err := s.encoding.NewDataWriter(f, backend.EncNone) + if err != nil { + return nil, err + } + + a := encoding.NewAppender(dataWriter) s.appender = a return s, nil } +func NewStreamingSearchBlockFromWALReplay(f *os.File) (*StreamingSearchBlock, error) { + // version is pinned to v2 for now + v, err := encoding.FromVersion("v2") + if err != nil { + return nil, err + } + records, warning, err := wal.ReplayWALAndGetRecords(f, v, backend.EncNone) + if err != nil { + return nil, err + } + // todo: figure out the warning/err handling and delete file if necessary + if warning != nil { + level.Warn(cortex_util.Logger).Log("msg", "received warning while replaying block. partial replay likely.", "file", f.Name(), "warning", warning, "records", len(records)) + } + return &StreamingSearchBlock{ + file: f, + appender: encoding.NewRecordAppender(records), + header: tempofb.NewSearchBlockHeaderMutable(), + encoding: v, + }, nil +} + // Append the given search data to the streaming block. Multiple byte buffers of search data for // the same trace can be passed and are merged into one entry. func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchData [][]byte) error { @@ -94,8 +105,16 @@ func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Resul sr.AddBlockInspected() - rr := s.appender.Records() + dr, err := s.encoding.NewDataReader(backend.NewContextReaderWithAllReader(s.file), backend.EncNone) + if err != nil { + return err + } + + orw := s.encoding.NewObjectReaderWriter() + rr := s.appender.Records() + var pagesBuffer [][]byte + var buffer []byte for _, r := range rr { if sr.Quit() { @@ -112,7 +131,12 @@ func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Resul } buf = buf[:r.Length] - _, err := s.file.ReadAt(buf, int64(r.Start)) + pagesBuffer, buffer, err = dr.Read(ctx, []common.Record{r}, pagesBuffer, buffer) + if err != nil { + return err + } + + _, pagesBuffer[0], err = orw.UnmarshalObjectFromReader(bytes.NewReader(pagesBuffer[0])) if err != nil { return err } @@ -120,7 +144,7 @@ func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Resul sr.AddBytesInspected(uint64(r.Length)) sr.AddTraceInspected(1) - entry := tempofb.SearchEntryFromBytes(buf) + entry := tempofb.SearchEntryFromBytes(pagesBuffer[0]) if !p.Matches(entry) { continue @@ -143,6 +167,14 @@ func (s *StreamingSearchBlock) Iterator() (encoding.Iterator, error) { file: s.file, } + dr, err := s.encoding.NewDataReader(backend.NewContextReaderWithAllReader(s.file), backend.EncNone) + if err != nil { + return nil, err + } + iter.dataReader = dr + + iter.objectRW = s.encoding.NewObjectReaderWriter() + combiner := &DataCombiner{} // Streaming (wal) blocks have to be deduped. @@ -153,11 +185,13 @@ type streamingSearchBlockIterator struct { currentIndex int records []common.Record file *os.File + dataReader common.DataReader + objectRW common.ObjectReaderWriter } var _ encoding.Iterator = (*streamingSearchBlockIterator)(nil) -func (s *streamingSearchBlockIterator) Next(_ context.Context) (common.ID, []byte, error) { +func (s *streamingSearchBlockIterator) Next(ctx context.Context) (common.ID, []byte, error) { if s.currentIndex >= len(s.records) { return nil, nil, io.EOF } @@ -166,15 +200,22 @@ func (s *streamingSearchBlockIterator) Next(_ context.Context) (common.ID, []byt // Use unique buffer that can be returned to the caller. // This is primarily for DedupingIterator which uses 2 buffers at once. - buffer := make([]byte, currentRecord.Length) - _, err := s.file.ReadAt(buffer, int64(currentRecord.Start)) + var buffer []byte + pagesBuffer := make([][]byte, 1) + pagesBuffer[0] = make([]byte, currentRecord.Length) + pagesBuffer, _, err := s.dataReader.Read(ctx, []common.Record{currentRecord}, pagesBuffer, buffer) + if err != nil { + return nil, nil, err + } + + _, pagesBuffer[0], err = s.objectRW.UnmarshalObjectFromReader(bytes.NewReader(pagesBuffer[0])) if err != nil { - return nil, nil, errors.Wrap(err, "error reading search file") + return nil, nil, err } s.currentIndex++ - return currentRecord.ID, buffer, nil + return currentRecord.ID, pagesBuffer[0], nil } func (*streamingSearchBlockIterator) Close() { diff --git a/tempodb/search/streaming_search_block_test.go b/tempodb/search/streaming_search_block_test.go index b8e6031b829..f2b228af503 100644 --- a/tempodb/search/streaming_search_block_test.go +++ b/tempodb/search/streaming_search_block_test.go @@ -17,6 +17,7 @@ import ( func newStreamingSearchBlockWithTraces(traceCount int, t testing.TB) *StreamingSearchBlock { id := []byte{1, 2, 3, 4, 5, 6, 7, 8} searchData := [][]byte{(&tempofb.SearchEntryMutable{ + TraceID: id, Tags: tempofb.SearchDataMap{ "key1": {"value10", "value11"}, "key2": {"value20", "value21"}, diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 5c11775c33f..723012ae892 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -1,10 +1,8 @@ package wal import ( - "bytes" "context" "fmt" - "io" "os" "path/filepath" "strings" @@ -93,50 +91,10 @@ func newAppendBlockFromFile(filename string, path string) (*AppendBlock, error, return nil, nil, err } - dataReader, err := b.encoding.NewDataReader(backend.NewContextReaderWithAllReader(f), b.meta.Encoding) + records, warning, err := ReplayWALAndGetRecords(f, v, e) if err != nil { return nil, nil, err } - defer dataReader.Close() - - var buffer []byte - var records []common.Record - objectReader := b.encoding.NewObjectReaderWriter() - currentOffset := uint64(0) - for { - buffer, pageLen, err := dataReader.NextPage(buffer) - if err == io.EOF { - break - } - if err != nil { - warning = err - break - } - - reader := bytes.NewReader(buffer) - id, _, err := objectReader.UnmarshalObjectFromReader(reader) - if err != nil { - warning = err - break - } - // wal should only ever have one object per page, test that here - _, _, err = objectReader.UnmarshalObjectFromReader(reader) - if err != io.EOF { - warning = err - break - } - - // make a copy so we don't hold onto the iterator buffer - recordID := append([]byte(nil), id...) - records = append(records, common.Record{ - ID: recordID, - Start: currentOffset, - Length: pageLen, - }) - currentOffset += uint64(pageLen) - } - - common.SortRecords(records) b.appender = encoding.NewRecordAppender(records) b.meta.TotalObjects = b.appender.Length() diff --git a/tempodb/wal/replay.go b/tempodb/wal/replay.go new file mode 100644 index 00000000000..a36edf22a87 --- /dev/null +++ b/tempodb/wal/replay.go @@ -0,0 +1,62 @@ +package wal + +import ( + "bytes" + "io" + "os" + + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding" + "github.com/grafana/tempo/tempodb/encoding/common" +) + +// ReplayWALAndGetRecords replays a WAL file that could contain either traces or searchdata +//nolint:interfacer +func ReplayWALAndGetRecords(file *os.File, v encoding.VersionedEncoding, enc backend.Encoding) ([]common.Record, error, error) { + dataReader, err := v.NewDataReader(backend.NewContextReaderWithAllReader(file), enc) + if err != nil { + return nil, nil, err + } + + var buffer []byte + var records []common.Record + var warning error + objectReader := v.NewObjectReaderWriter() + currentOffset := uint64(0) + for { + buffer, pageLen, err := dataReader.NextPage(buffer) + if err == io.EOF { + break + } + if err != nil { + warning = err + break + } + + reader := bytes.NewReader(buffer) + id, _, err := objectReader.UnmarshalObjectFromReader(reader) + if err != nil { + warning = err + break + } + // wal should only ever have one object per page, test that here + _, _, err = objectReader.UnmarshalObjectFromReader(reader) + if err != io.EOF { + warning = err + break + } + + // make a copy so we don't hold onto the iterator buffer + recordID := append([]byte(nil), id...) + records = append(records, common.Record{ + ID: recordID, + Start: currentOffset, + Length: pageLen, + }) + currentOffset += uint64(pageLen) + } + + common.SortRecords(records) + + return records, warning, nil +} From 6738a729fde614d0ab9772e60a89c98e86ea0a9a Mon Sep 17 00:00:00 2001 From: Annanay Date: Wed, 29 Sep 2021 19:34:37 +0530 Subject: [PATCH 02/22] better handling of reload of blocks without search data Signed-off-by: Annanay --- modules/ingester/ingester.go | 13 +++---------- modules/ingester/ingester_search.go | 10 ---------- modules/ingester/instance.go | 2 +- tempodb/search/streaming_search_block.go | 10 +++++++++- tempodb/wal/wal.go | 4 ++++ 5 files changed, 17 insertions(+), 22 deletions(-) diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index 71430e7f184..70a77cc2ffc 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -3,7 +3,7 @@ package ingester import ( "context" "fmt" - "os" + "path/filepath" "sync" "time" @@ -109,9 +109,6 @@ func (i *Ingester) starting(ctx context.Context) error { return fmt.Errorf("failed to rediscover local blocks %w", err) } - // Search data is considered experimental and removed on every startup. - i.clearSearchData() - // Now that user states have been created, we can start the lifecycler. // Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context if err := i.lifecycler.StartAsync(context.Background()); err != nil { @@ -358,12 +355,8 @@ func (i *Ingester) replayWal() error { } // replay search WAL - filename := b.BlockID().String() + ":" + tenantID + ":searchdata" - file, err := os.OpenFile(filename, os.O_RDONLY, 0644) - if err != nil { - return err - } - searchWALBlock, err := search.NewStreamingSearchBlockFromWALReplay(file) + filename := filepath.Join(i.store.WAL().GetFilepath(), searchDir, fmt.Sprintf("%v:%v:%v", b.BlockID().String(), tenantID, "seachdata")) + searchWALBlock, err := search.NewStreamingSearchBlockFromWALReplay(filename) if err != nil { return err } diff --git a/modules/ingester/ingester_search.go b/modules/ingester/ingester_search.go index 67548ea9109..1a7e057ce0d 100644 --- a/modules/ingester/ingester_search.go +++ b/modules/ingester/ingester_search.go @@ -3,8 +3,6 @@ package ingester import ( "context" - "github.com/cortexproject/cortex/pkg/util/log" - "github.com/go-kit/kit/log/level" "github.com/grafana/tempo/pkg/tempopb" "github.com/weaveworks/common/user" ) @@ -66,11 +64,3 @@ func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVa return resp, nil } - -func (i *Ingester) clearSearchData() { - // clear wal - err := i.store.WAL().ClearFolder(searchDir) - if err != nil { - level.Error(log.Logger).Log("msg", "error clearing search data from wal") - } -} diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 765fac7827a..5dc45847941 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -629,7 +629,7 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) error { i.blocksMtx.Lock() i.completeBlocks = append(i.completeBlocks, ib) - //i.searchCompleteBlocks[ib] = sb + //i.searchCompleteBlocks[ib] = &searchLocalBlockEntry{b: sb} i.blocksMtx.Unlock() level.Info(log.Logger).Log("msg", "reloaded local block", "tenantID", i.instanceID, "block", id.String(), "flushed", ib.FlushedTime()) diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index bc65e946df3..cd41e4ac448 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -57,7 +57,15 @@ func NewStreamingSearchBlockForFile(f *os.File) (*StreamingSearchBlock, error) { return s, nil } -func NewStreamingSearchBlockFromWALReplay(f *os.File) (*StreamingSearchBlock, error) { +func NewStreamingSearchBlockFromWALReplay(filename string) (*StreamingSearchBlock, error) { + f, err := os.OpenFile(filename, os.O_RDONLY, 0644) + if err != nil { + if os.IsNotExist(err) { // this is fine, don't return error + return nil, nil + } + return nil, err + } + // version is pinned to v2 for now v, err := encoding.FromVersion("v2") if err != nil { diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index 9b3c37d9658..99d83341adf 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -138,6 +138,10 @@ func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string, name strin return os.OpenFile(filepath.Join(p, fmt.Sprintf("%v:%v:%v", blockid, tenantid, name)), os.O_CREATE|os.O_RDWR, 0644) } +func (w *WAL) GetFilepath() string { + return w.c.Filepath +} + func (w *WAL) ClearFolder(dir string) error { p := filepath.Join(w.c.Filepath, dir) return os.RemoveAll(p) From 52e5f61a03f4118f2dded2967e3c92526a52b610 Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 30 Sep 2021 13:46:19 +0530 Subject: [PATCH 03/22] Checkpoint Signed-off-by: Annanay --- modules/ingester/ingester.go | 5 ++ tempodb/search/streaming_search_block.go | 71 ++++++++++++++++++++---- 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index 70a77cc2ffc..040c7bd9537 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -335,6 +335,11 @@ func (i *Ingester) replayWal() error { return fmt.Errorf("fatal error replaying wal %w", err) } + searchBlocks, err := search.RescanBlocks(i.store.WAL().GetFilepath()) + if err != nil { + return fmt.Errorf("fatal error replaying search wal %w", err) + } + for _, b := range blocks { tenantID := b.Meta().TenantID diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index cd41e4ac448..3b68405af36 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -4,7 +4,10 @@ import ( "bytes" "context" "io" + "io/ioutil" "os" + "path/filepath" + "time" cortex_util "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" @@ -57,34 +60,80 @@ func NewStreamingSearchBlockForFile(f *os.File) (*StreamingSearchBlock, error) { return s, nil } -func NewStreamingSearchBlockFromWALReplay(filename string) (*StreamingSearchBlock, error) { +// todo: can we reduce duplication between this and wal.RescanBlocks() ? +func RescanBlocks(walPath string) ([]*StreamingSearchBlock, error) { + searchFilepath := filepath.Join(walPath, "search") + files, err := ioutil.ReadDir(searchFilepath) + if err != nil { + return nil, err + } + + blocks := make([]*StreamingSearchBlock, 0, len(files)) + for _, f := range files { + if f.IsDir() { + continue + } + start := time.Now() + level.Info(cortex_util.Logger).Log("msg", "beginning replay", "file", f.Name(), "size", f.Size()) + + b, warning, err := newStreamingSearchBlockFromWALReplay(f.Name()) + + remove := false + if err != nil { + // wal replay failed, clear and warn + level.Warn(cortex_util.Logger).Log("msg", "failed to replay block. removing.", "file", f.Name(), "err", err) + remove = true + } + + if b != nil && b.appender.Length() == 0 { + level.Warn(cortex_util.Logger).Log("msg", "empty wal file. ignoring.", "file", f.Name(), "err", err) + remove = true + } + + if warning != nil { + level.Warn(cortex_util.Logger).Log("msg", "received warning while replaying block. partial replay likely.", "file", f.Name(), "warning", warning, "records", b.appender.Length()) + } + + if remove { + err = os.Remove(filepath.Join(searchFilepath, f.Name())) + if err != nil { + return nil, err + } + continue + } + + level.Info(cortex_util.Logger).Log("msg", "replay complete", "file", f.Name(), "duration", time.Since(start)) + + blocks = append(blocks, b) + } + + return blocks, nil +} + +func newStreamingSearchBlockFromWALReplay(filename string) (*StreamingSearchBlock, error, error) { f, err := os.OpenFile(filename, os.O_RDONLY, 0644) if err != nil { - if os.IsNotExist(err) { // this is fine, don't return error - return nil, nil + if os.IsNotExist(err) { // this is fine, just issue a warning + return nil, err, nil } - return nil, err + return nil, nil, err } // version is pinned to v2 for now v, err := encoding.FromVersion("v2") if err != nil { - return nil, err + return nil, nil, err } records, warning, err := wal.ReplayWALAndGetRecords(f, v, backend.EncNone) if err != nil { - return nil, err - } - // todo: figure out the warning/err handling and delete file if necessary - if warning != nil { - level.Warn(cortex_util.Logger).Log("msg", "received warning while replaying block. partial replay likely.", "file", f.Name(), "warning", warning, "records", len(records)) + return nil, nil, err } return &StreamingSearchBlock{ file: f, appender: encoding.NewRecordAppender(records), header: tempofb.NewSearchBlockHeaderMutable(), encoding: v, - }, nil + }, warning, nil } // Append the given search data to the streaming block. Multiple byte buffers of search data for From 4e0580428560679da0cd4cc8b6cd933efe5b0f0c Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 30 Sep 2021 19:03:50 +0530 Subject: [PATCH 04/22] another commit another wal replay Signed-off-by: Annanay --- modules/ingester/ingester.go | 31 ++++- tempodb/search/rescan_blocks.go | 120 ++++++++++++++++++ tempodb/search/streaming_search_block.go | 90 +------------ tempodb/search/streaming_search_block_test.go | 37 ++++++ tempodb/wal/append_block.go | 4 +- tempodb/wal/replay.go | 15 ++- tempodb/wal/wal.go | 5 +- 7 files changed, 207 insertions(+), 95 deletions(-) create mode 100644 tempodb/search/rescan_blocks.go diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index 040c7bd9537..a84581953e9 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -3,7 +3,6 @@ package ingester import ( "context" "fmt" - "path/filepath" "sync" "time" @@ -340,6 +339,25 @@ func (i *Ingester) replayWal() error { return fmt.Errorf("fatal error replaying search wal %w", err) } + // clear any searchBlock that does not have a corresponding entry in tracesBlocks + for j := len(searchBlocks) - 1; j >= 0; j-- { + clear := true + for _, tracesBlock := range blocks { + if searchBlocks[j].BlockID == tracesBlock.BlockID() { + clear = false + break + } + } + + if clear { + err := searchBlocks[j].Clear() + if err != nil { // just log the error + level.Warn(log.Logger).Log("msg", "error clearing search WAL file", "blockID", searchBlocks[j].BlockID, "err", err) + } + searchBlocks = append(searchBlocks[:j], searchBlocks[j+1:]...) + } + } + for _, b := range blocks { tenantID := b.Meta().TenantID @@ -359,13 +377,12 @@ func (i *Ingester) replayWal() error { return err } - // replay search WAL - filename := filepath.Join(i.store.WAL().GetFilepath(), searchDir, fmt.Sprintf("%v:%v:%v", b.BlockID().String(), tenantID, "seachdata")) - searchWALBlock, err := search.NewStreamingSearchBlockFromWALReplay(filename) - if err != nil { - return err + var searchWALBlock *search.StreamingSearchBlock + for _, s := range searchBlocks { + if b.BlockID() == s.BlockID { + searchWALBlock = s + } } - instance.AddCompletingBlock(b, searchWALBlock) i.enqueue(&flushOp{ diff --git a/tempodb/search/rescan_blocks.go b/tempodb/search/rescan_blocks.go new file mode 100644 index 00000000000..8fb9cd4005f --- /dev/null +++ b/tempodb/search/rescan_blocks.go @@ -0,0 +1,120 @@ +package search + +import ( + "io/ioutil" + "os" + "path/filepath" + "strings" + "time" + + cortex_util "github.com/cortexproject/cortex/pkg/util/log" + "github.com/go-kit/log/level" + "github.com/google/uuid" + + "github.com/grafana/tempo/pkg/tempofb" + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding" + "github.com/grafana/tempo/tempodb/wal" +) + +// RescanBlocks scans through the search directory in the WAL folder and replays files +// todo: copied from wal.RescanBlocks(), see if we can reduce duplication? +func RescanBlocks(walPath string) ([]*StreamingSearchBlock, error) { + searchFilepath := filepath.Join(walPath, "search") + files, err := ioutil.ReadDir(searchFilepath) + if err != nil { + return nil, err + } + + blocks := make([]*StreamingSearchBlock, 0, len(files)) + for _, f := range files { + if f.IsDir() { + continue + } + start := time.Now() + level.Info(cortex_util.Logger).Log("msg", "beginning replay", "file", f.Name(), "size", f.Size()) + + b, warning, err := newStreamingSearchBlockFromWALReplay(f.Name()) + + remove := false + if err != nil { + // wal replay failed, clear and warn + level.Warn(cortex_util.Logger).Log("msg", "failed to replay block. removing.", "file", f.Name(), "err", err) + remove = true + } + + if b != nil && b.appender.Length() == 0 { + level.Warn(cortex_util.Logger).Log("msg", "empty wal file. ignoring.", "file", f.Name(), "err", err) + remove = true + } + + if warning != nil { + level.Warn(cortex_util.Logger).Log("msg", "received warning while replaying block. partial replay likely.", "file", f.Name(), "warning", warning, "records", b.appender.Length()) + } + + if remove { + err = os.Remove(filepath.Join(searchFilepath, f.Name())) + if err != nil { + return nil, err + } + continue + } + + level.Info(cortex_util.Logger).Log("msg", "replay complete", "file", f.Name(), "duration", time.Since(start)) + + blocks = append(blocks, b) + } + + return blocks, nil +} + +// newStreamingSearchBlockFromWALReplay creates a StreamingSearchBlock with in-memory records from a search WAL file +func newStreamingSearchBlockFromWALReplay(filename string) (*StreamingSearchBlock, error, error) { + f, err := os.OpenFile(filename, os.O_RDONLY, 0644) + if err != nil { + return nil, nil, err + } + + blockID, _, _, _, _, err := parseFilename(filename) + if err != nil { + return nil, nil, err + } + + // version is pinned to v2 for now + v, err := encoding.FromVersion("v2") + if err != nil { + return nil, nil, err + } + + blockHeader := tempofb.NewSearchBlockHeaderMutable() + records, warning, err := wal.ReplayWALAndGetRecords(f, v, backend.EncNone, func(bytes []byte) error { + entry := tempofb.SearchEntryFromBytes(bytes) + blockHeader.AddEntry(entry) + return nil + }) + if err != nil { + return nil, nil, err + } + return &StreamingSearchBlock{ + BlockID: blockID, + file: f, + appender: encoding.NewRecordAppender(records), + header: blockHeader, + encoding: v, + }, warning, nil +} + +func parseFilename(filename string) (blockID uuid.UUID, tenantID string, version string, encoding string, dataEncoding string, err error) { + splits := strings.Split(filename, ":") + + if len(splits) < 6 { + return uuid.Nil, "", "", "", "", err + } + + id, err := uuid.Parse(splits[0]) + if err != nil { + return uuid.Nil, "", "", "", "", err + } + // todo: any other validation? + return id, splits[1], splits[2], splits[3], splits[4], nil +} diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index 3b68405af36..b36232550a7 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -4,31 +4,32 @@ import ( "bytes" "context" "io" - "io/ioutil" "os" - "path/filepath" - "time" - cortex_util "github.com/cortexproject/cortex/pkg/util/log" - "github.com/go-kit/log/level" + "github.com/google/uuid" "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" - "github.com/grafana/tempo/tempodb/wal" ) var _ SearchableBlock = (*StreamingSearchBlock)(nil) // StreamingSearchBlock is search data that is read/write, i.e. for traces in the WAL. type StreamingSearchBlock struct { + BlockID uuid.UUID // todo: add the full meta? appender encoding.Appender file *os.File header *tempofb.SearchBlockHeaderMutable encoding encoding.VersionedEncoding } +// Close closes the WAL file. Used in tests +func (s *StreamingSearchBlock) Close() error { + return s.file.Close() +} + // Clear deletes the files for this block. func (s *StreamingSearchBlock) Clear() error { s.file.Close() @@ -60,82 +61,6 @@ func NewStreamingSearchBlockForFile(f *os.File) (*StreamingSearchBlock, error) { return s, nil } -// todo: can we reduce duplication between this and wal.RescanBlocks() ? -func RescanBlocks(walPath string) ([]*StreamingSearchBlock, error) { - searchFilepath := filepath.Join(walPath, "search") - files, err := ioutil.ReadDir(searchFilepath) - if err != nil { - return nil, err - } - - blocks := make([]*StreamingSearchBlock, 0, len(files)) - for _, f := range files { - if f.IsDir() { - continue - } - start := time.Now() - level.Info(cortex_util.Logger).Log("msg", "beginning replay", "file", f.Name(), "size", f.Size()) - - b, warning, err := newStreamingSearchBlockFromWALReplay(f.Name()) - - remove := false - if err != nil { - // wal replay failed, clear and warn - level.Warn(cortex_util.Logger).Log("msg", "failed to replay block. removing.", "file", f.Name(), "err", err) - remove = true - } - - if b != nil && b.appender.Length() == 0 { - level.Warn(cortex_util.Logger).Log("msg", "empty wal file. ignoring.", "file", f.Name(), "err", err) - remove = true - } - - if warning != nil { - level.Warn(cortex_util.Logger).Log("msg", "received warning while replaying block. partial replay likely.", "file", f.Name(), "warning", warning, "records", b.appender.Length()) - } - - if remove { - err = os.Remove(filepath.Join(searchFilepath, f.Name())) - if err != nil { - return nil, err - } - continue - } - - level.Info(cortex_util.Logger).Log("msg", "replay complete", "file", f.Name(), "duration", time.Since(start)) - - blocks = append(blocks, b) - } - - return blocks, nil -} - -func newStreamingSearchBlockFromWALReplay(filename string) (*StreamingSearchBlock, error, error) { - f, err := os.OpenFile(filename, os.O_RDONLY, 0644) - if err != nil { - if os.IsNotExist(err) { // this is fine, just issue a warning - return nil, err, nil - } - return nil, nil, err - } - - // version is pinned to v2 for now - v, err := encoding.FromVersion("v2") - if err != nil { - return nil, nil, err - } - records, warning, err := wal.ReplayWALAndGetRecords(f, v, backend.EncNone) - if err != nil { - return nil, nil, err - } - return &StreamingSearchBlock{ - file: f, - appender: encoding.NewRecordAppender(records), - header: tempofb.NewSearchBlockHeaderMutable(), - encoding: v, - }, warning, nil -} - // Append the given search data to the streaming block. Multiple byte buffers of search data for // the same trace can be passed and are merged into one entry. func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchData [][]byte) error { @@ -152,7 +77,6 @@ func (s *StreamingSearchBlock) Append(ctx context.Context, id common.ID, searchD // Search the streaming block. func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results) error { - if !p.MatchesBlock(s.header) { sr.AddBlockSkipped() return nil diff --git a/tempodb/search/streaming_search_block_test.go b/tempodb/search/streaming_search_block_test.go index f2b228af503..6d849031635 100644 --- a/tempodb/search/streaming_search_block_test.go +++ b/tempodb/search/streaming_search_block_test.go @@ -3,6 +3,7 @@ package search import ( "context" "fmt" + "github.com/stretchr/testify/assert" "os" "path" "sync" @@ -38,6 +39,42 @@ func newStreamingSearchBlockWithTraces(traceCount int, t testing.TB) *StreamingS return sb } +func TestStreamingSearchBlockReplay(t *testing.T) { + traceCount := 100 + sb := newStreamingSearchBlockWithTraces(traceCount, t) + assert.NotNil(t, sb) + + walFile := sb.file.Name() + + assert.NoError(t, sb.Close()) + + newSearchBlock, warning, err := newStreamingSearchBlockFromWALReplay(walFile) + assert.NoError(t, warning) + assert.NoError(t, err) + assert.Equal(t, traceCount, len(newSearchBlock.appender.Records())) + + p := NewSearchPipeline(&tempopb.SearchRequest{ + Tags: map[string]string{"key1": "value10"}, + }) + + sr := NewResults() + + sr.StartWorker() + go func() { + defer sr.FinishWorker() + err := newSearchBlock.Search(context.TODO(), p, sr) + require.NoError(t, err) + }() + sr.AllWorkersStarted() + + var results []*tempopb.TraceSearchMetadata + for r := range sr.Results() { + results = append(results, r) + } + + require.Equal(t, traceCount, len(results)) +} + func TestStreamingSearchBlockSearchBlock(t *testing.T) { traceCount := 10 sb := newStreamingSearchBlockWithTraces(traceCount, t) diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 723012ae892..25712e0b938 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -91,7 +91,9 @@ func newAppendBlockFromFile(filename string, path string) (*AppendBlock, error, return nil, nil, err } - records, warning, err := ReplayWALAndGetRecords(f, v, e) + records, warning, err := ReplayWALAndGetRecords(f, v, e, func(bytes []byte) error { + return nil + }) if err != nil { return nil, nil, err } diff --git a/tempodb/wal/replay.go b/tempodb/wal/replay.go index a36edf22a87..29ac8e5a9cf 100644 --- a/tempodb/wal/replay.go +++ b/tempodb/wal/replay.go @@ -12,7 +12,7 @@ import ( // ReplayWALAndGetRecords replays a WAL file that could contain either traces or searchdata //nolint:interfacer -func ReplayWALAndGetRecords(file *os.File, v encoding.VersionedEncoding, enc backend.Encoding) ([]common.Record, error, error) { +func ReplayWALAndGetRecords(file *os.File, v encoding.VersionedEncoding, enc backend.Encoding, handleObj func([]byte) error) ([]common.Record, error, error) { dataReader, err := v.NewDataReader(backend.NewContextReaderWithAllReader(file), enc) if err != nil { return nil, nil, err @@ -21,10 +21,12 @@ func ReplayWALAndGetRecords(file *os.File, v encoding.VersionedEncoding, enc bac var buffer []byte var records []common.Record var warning error + var pageLen uint32 + var id []byte objectReader := v.NewObjectReaderWriter() currentOffset := uint64(0) for { - buffer, pageLen, err := dataReader.NextPage(buffer) + buffer, pageLen, err = dataReader.NextPage(buffer) if err == io.EOF { break } @@ -34,7 +36,7 @@ func ReplayWALAndGetRecords(file *os.File, v encoding.VersionedEncoding, enc bac } reader := bytes.NewReader(buffer) - id, _, err := objectReader.UnmarshalObjectFromReader(reader) + id, buffer, err = objectReader.UnmarshalObjectFromReader(reader) if err != nil { warning = err break @@ -46,6 +48,13 @@ func ReplayWALAndGetRecords(file *os.File, v encoding.VersionedEncoding, enc bac break } + // handleObj is primarily used by search replay to record search data in block header + err = handleObj(buffer) + if err != nil { + warning = err + break + } + // make a copy so we don't hold onto the iterator buffer recordID := append([]byte(nil), id...) records = append(records, common.Record{ diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index 99d83341adf..afbeef1cd4c 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -135,7 +135,10 @@ func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string, name strin if err != nil { return nil, err } - return os.OpenFile(filepath.Join(p, fmt.Sprintf("%v:%v:%v", blockid, tenantid, name)), os.O_CREATE|os.O_RDWR, 0644) + + // blockID, tenantID, version, encoding (compression), dataEncoding, searchFileSuffix + filename := fmt.Sprintf("%v:%v:%v:%v:%v:%v", blockid, tenantid, "v2", backend.EncNone, "", name) + return os.OpenFile(filepath.Join(p, filename), os.O_CREATE|os.O_RDWR, 0644) } func (w *WAL) GetFilepath() string { From dfbbd023c4123ddcde4bc08defd5cf738e1ec0a8 Mon Sep 17 00:00:00 2001 From: Annanay Date: Thu, 30 Sep 2021 20:38:09 +0530 Subject: [PATCH 05/22] wip: ingester search test Signed-off-by: Annanay --- modules/ingester/ingester_test.go | 36 +++++++++++++++++++ tempodb/search/streaming_search_block_test.go | 4 ++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index d20bcbd5ab9..718c96453aa 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -212,6 +212,42 @@ func TestWal(t *testing.T) { } } +//func TestSearchWAL(t *testing.T) { +// tmpDir := t.TempDir() +// +// i, _, _ := defaultIngester(t, tmpDir) +// inst, ok := i.getInstanceByID("test") +// require.True(t, ok) +// +// // Write wal +// err := inst.CutCompleteTraces(0, true) +// require.NoError(t, err) +// _, err = inst.CutBlockIfReady(0, 0, true) +// require.NoError(t, err) +// +// // assert that search WAL is being searched +// ctx := user.InjectOrgID(context.Background(), "test") +// searchReq := &tempopb.SearchRequest{Tags: map[string]string{ +// search.SecretExhaustiveSearchTag: "", +// }} +// results, err := inst.Search(ctx, searchReq) +// assert.NoError(t, err) +// assert.Greater(t, results.Metrics.InspectedTraces, 0) +// +// // Shutdown +// err = i.stopping(nil) +// require.NoError(t, err) +// +// // replay wal +// i, _, _ = defaultIngester(t, tmpDir) +// inst, ok = i.getInstanceByID("test") +// require.True(t, ok) +// +// results, err = inst.Search(ctx, searchReq) +// assert.NoError(t, err) +// assert.Greater(t, results.Metrics.InspectedTraces, 0) +//} + // TestWalReplayDeletesLocalBlocks simulates the condition where an ingester restarts after a wal is completed // to the local disk, but before the wal is deleted. On startup both blocks exist, and the ingester now errs // on the side of caution and chooses to replay the wal instead of rediscovering the local block. diff --git a/tempodb/search/streaming_search_block_test.go b/tempodb/search/streaming_search_block_test.go index 6d849031635..8ce1d70aac7 100644 --- a/tempodb/search/streaming_search_block_test.go +++ b/tempodb/search/streaming_search_block_test.go @@ -44,15 +44,17 @@ func TestStreamingSearchBlockReplay(t *testing.T) { sb := newStreamingSearchBlockWithTraces(traceCount, t) assert.NotNil(t, sb) + // grab the wal filename from the block and close the old file handler walFile := sb.file.Name() - assert.NoError(t, sb.Close()) + // create new block from the same wal file newSearchBlock, warning, err := newStreamingSearchBlockFromWALReplay(walFile) assert.NoError(t, warning) assert.NoError(t, err) assert.Equal(t, traceCount, len(newSearchBlock.appender.Records())) + // search the new block p := NewSearchPipeline(&tempopb.SearchRequest{ Tags: map[string]string{"key1": "value10"}, }) From 5fa8cc9722966a0fef7708c61c0fd58a0b0e960b Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 1 Oct 2021 17:15:31 +0530 Subject: [PATCH 06/22] Fix Rescan search blocks, move ParseFilename into wal folder Signed-off-by: Annanay --- tempodb/backend/encoding.go | 2 +- tempodb/search/rescan_blocks.go | 33 ++++--------- tempodb/search/streaming_search_block_test.go | 33 ++++++++----- tempodb/wal/append_block_test.go | 24 ++++++++-- tempodb/wal/wal.go | 47 ++++++++++++++++++- 5 files changed, 95 insertions(+), 44 deletions(-) diff --git a/tempodb/backend/encoding.go b/tempodb/backend/encoding.go index 13bb8cba8ef..62d4c35c906 100644 --- a/tempodb/backend/encoding.go +++ b/tempodb/backend/encoding.go @@ -105,7 +105,7 @@ func (e Encoding) MarshalJSON() ([]byte, error) { return buffer.Bytes(), nil } -// ParseEncoding parses an chunk encoding (compression algorithm) by its name. +// ParseEncoding parses a chunk encoding (compression algorithm) by its name. func ParseEncoding(enc string) (Encoding, error) { for _, e := range SupportedEncoding { if strings.EqualFold(e.String(), enc) { diff --git a/tempodb/search/rescan_blocks.go b/tempodb/search/rescan_blocks.go index 8fb9cd4005f..a7f132e02f6 100644 --- a/tempodb/search/rescan_blocks.go +++ b/tempodb/search/rescan_blocks.go @@ -4,15 +4,12 @@ import ( "io/ioutil" "os" "path/filepath" - "strings" "time" cortex_util "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/log/level" - "github.com/google/uuid" "github.com/grafana/tempo/pkg/tempofb" - "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/wal" ) @@ -34,7 +31,9 @@ func RescanBlocks(walPath string) ([]*StreamingSearchBlock, error) { start := time.Now() level.Info(cortex_util.Logger).Log("msg", "beginning replay", "file", f.Name(), "size", f.Size()) - b, warning, err := newStreamingSearchBlockFromWALReplay(f.Name()) + // pass the path to search subdirectory and filename + // here f.Name() does not have full path + b, warning, err := newStreamingSearchBlockFromWALReplay(searchFilepath, f.Name()) remove := false if err != nil { @@ -69,25 +68,24 @@ func RescanBlocks(walPath string) ([]*StreamingSearchBlock, error) { } // newStreamingSearchBlockFromWALReplay creates a StreamingSearchBlock with in-memory records from a search WAL file -func newStreamingSearchBlockFromWALReplay(filename string) (*StreamingSearchBlock, error, error) { - f, err := os.OpenFile(filename, os.O_RDONLY, 0644) +func newStreamingSearchBlockFromWALReplay(searchFilepath, filename string) (*StreamingSearchBlock, error, error) { + f, err := os.OpenFile(filepath.Join(searchFilepath, filename), os.O_RDONLY, 0644) if err != nil { return nil, nil, err } - blockID, _, _, _, _, err := parseFilename(filename) + blockID, _, version, enc, _, err := wal.ParseFilename(filename) if err != nil { return nil, nil, err } - // version is pinned to v2 for now - v, err := encoding.FromVersion("v2") + v, err := encoding.FromVersion(version) if err != nil { return nil, nil, err } blockHeader := tempofb.NewSearchBlockHeaderMutable() - records, warning, err := wal.ReplayWALAndGetRecords(f, v, backend.EncNone, func(bytes []byte) error { + records, warning, err := wal.ReplayWALAndGetRecords(f, v, enc, func(bytes []byte) error { entry := tempofb.SearchEntryFromBytes(bytes) blockHeader.AddEntry(entry) return nil @@ -103,18 +101,3 @@ func newStreamingSearchBlockFromWALReplay(filename string) (*StreamingSearchBloc encoding: v, }, warning, nil } - -func parseFilename(filename string) (blockID uuid.UUID, tenantID string, version string, encoding string, dataEncoding string, err error) { - splits := strings.Split(filename, ":") - - if len(splits) < 6 { - return uuid.Nil, "", "", "", "", err - } - - id, err := uuid.Parse(splits[0]) - if err != nil { - return uuid.Nil, "", "", "", "", err - } - // todo: any other validation? - return id, splits[1], splits[2], splits[3], splits[4], nil -} diff --git a/tempodb/search/streaming_search_block_test.go b/tempodb/search/streaming_search_block_test.go index 8ce1d70aac7..63b4baf937c 100644 --- a/tempodb/search/streaming_search_block_test.go +++ b/tempodb/search/streaming_search_block_test.go @@ -3,19 +3,27 @@ package search import ( "context" "fmt" - "github.com/stretchr/testify/assert" "os" "path" + "path/filepath" "sync" "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/pkg/tempopb" "github.com/stretchr/testify/require" ) -func newStreamingSearchBlockWithTraces(traceCount int, t testing.TB) *StreamingSearchBlock { +// newStreamingSearchBlockWithTraces returns (tmpDir path, *StreamingSearchBlock) +func newStreamingSearchBlockWithTraces(traceCount int, t testing.TB) (string, *StreamingSearchBlock) { + tmpDir := t.TempDir() + + // create search sub-directory + require.NoError(t, os.MkdirAll(filepath.Join(tmpDir, "search"), os.ModePerm)) + id := []byte{1, 2, 3, 4, 5, 6, 7, 8} searchData := [][]byte{(&tempofb.SearchEntryMutable{ TraceID: id, @@ -26,7 +34,7 @@ func newStreamingSearchBlockWithTraces(traceCount int, t testing.TB) *StreamingS "key4": {"value40", "value41"}, }}).ToBytes()} - f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644) + f, err := os.OpenFile(path.Join(tmpDir, "search", "1c505e8b-26cd-4621-ba7d-792bb55282d5:single-tenant:v2:none:"), os.O_CREATE|os.O_RDWR, 0644) require.NoError(t, err) sb, err := NewStreamingSearchBlockForFile(f) @@ -36,23 +44,22 @@ func newStreamingSearchBlockWithTraces(traceCount int, t testing.TB) *StreamingS require.NoError(t, sb.Append(context.Background(), id, searchData)) } - return sb + return tmpDir, sb } func TestStreamingSearchBlockReplay(t *testing.T) { traceCount := 100 - sb := newStreamingSearchBlockWithTraces(traceCount, t) + walDir, sb := newStreamingSearchBlockWithTraces(traceCount, t) assert.NotNil(t, sb) - // grab the wal filename from the block and close the old file handler - walFile := sb.file.Name() + // close the old file handler assert.NoError(t, sb.Close()) // create new block from the same wal file - newSearchBlock, warning, err := newStreamingSearchBlockFromWALReplay(walFile) - assert.NoError(t, warning) + blocks, err := RescanBlocks(walDir) assert.NoError(t, err) - assert.Equal(t, traceCount, len(newSearchBlock.appender.Records())) + require.Len(t, blocks, 1) + assert.Equal(t, traceCount, len(blocks[0].appender.Records())) // search the new block p := NewSearchPipeline(&tempopb.SearchRequest{ @@ -64,7 +71,7 @@ func TestStreamingSearchBlockReplay(t *testing.T) { sr.StartWorker() go func() { defer sr.FinishWorker() - err := newSearchBlock.Search(context.TODO(), p, sr) + err := blocks[0].Search(context.TODO(), p, sr) require.NoError(t, err) }() sr.AllWorkersStarted() @@ -79,7 +86,7 @@ func TestStreamingSearchBlockReplay(t *testing.T) { func TestStreamingSearchBlockSearchBlock(t *testing.T) { traceCount := 10 - sb := newStreamingSearchBlockWithTraces(traceCount, t) + _, sb := newStreamingSearchBlockWithTraces(traceCount, t) testCases := []struct { name string @@ -134,7 +141,7 @@ func TestStreamingSearchBlockSearchBlock(t *testing.T) { func BenchmarkStreamingSearchBlockSearch(b *testing.B) { - sb := newStreamingSearchBlockWithTraces(b.N, b) + _, sb := newStreamingSearchBlockWithTraces(b.N, b) p := NewSearchPipeline(&tempopb.SearchRequest{ Tags: map[string]string{"nomatch": "nomatch"}, diff --git a/tempodb/wal/append_block_test.go b/tempodb/wal/append_block_test.go index 257d26f9b21..c587bc021fb 100644 --- a/tempodb/wal/append_block_test.go +++ b/tempodb/wal/append_block_test.go @@ -140,14 +140,32 @@ func TestParseFilename(t *testing.T) { expectedEncoding: backend.EncSnappy, }, { - name: "version, encoding and dataencoding", - filename: "123e4567-e89b-12d3-a456-426614174000:foo:v1:snappy:dataencoding", + name: "version, enc snappy and dataencoding", + filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:snappy:dataencoding", expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), expectTenant: "foo", - expectedVersion: "v1", + expectedVersion: "v2", expectedEncoding: backend.EncSnappy, expectedDataEncoding: "dataencoding", }, + { + name: "version, enc none and dataencoding", + filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:none:dataencoding", + expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), + expectTenant: "foo", + expectedVersion: "v2", + expectedEncoding: backend.EncNone, + expectedDataEncoding: "dataencoding", + }, + { + name: "empty dataencoding", + filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:snappy:", + expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), + expectTenant: "foo", + expectedVersion: "v2", + expectedEncoding: backend.EncSnappy, + expectedDataEncoding: "", + }, { name: "path fails", filename: "/blerg/123e4567-e89b-12d3-a456-426614174000:foo", diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index afbeef1cd4c..a0e94b79903 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -5,13 +5,16 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" "time" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/google/uuid" + "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/backend/local" + versioned_encoding "github.com/grafana/tempo/tempodb/encoding" ) const ( @@ -136,11 +139,51 @@ func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string, name strin return nil, err } - // blockID, tenantID, version, encoding (compression), dataEncoding, searchFileSuffix - filename := fmt.Sprintf("%v:%v:%v:%v:%v:%v", blockid, tenantid, "v2", backend.EncNone, "", name) + // blockID, tenantID, version, encoding (compression), dataEncoding + filename := fmt.Sprintf("%v:%v:%v:%v:%v", blockid, tenantid, "v2", backend.EncNone, "") return os.OpenFile(filepath.Join(p, filename), os.O_CREATE|os.O_RDWR, 0644) } +// ParseFilename returns (blockID, tenant, version, encoding, dataEncoding, error) +func ParseFilename(filename string) (uuid.UUID, string, string, backend.Encoding, string, error) { + splits := strings.Split(filename, ":") + + if len(splits) != 5 { + return uuid.UUID{}, "", "", backend.EncNone, "", fmt.Errorf("unable to parse %s. unexpected number of segments", filename) + } + + // first segment is blockID + id, err := uuid.Parse(splits[0]) + if err != nil { + return uuid.UUID{}, "", "", backend.EncNone, "", fmt.Errorf("unable to parse %s. error parsing uuid: %w", filename, err) + } + + // second segment is tenant + tenant := splits[1] + if len(tenant) == 0 { + return uuid.UUID{}, "", "", backend.EncNone, "", fmt.Errorf("unable to parse %s. missing fields", filename) + } + + // third segment is version + version := splits[2] + _, err = versioned_encoding.FromVersion(version) + if err != nil { + return uuid.UUID{}, "", "", backend.EncNone, "", fmt.Errorf("unable to parse %s. error parsing version: %w", filename, err) + } + + // fourth is encoding + encodingString := splits[3] + encoding, err := backend.ParseEncoding(encodingString) + if err != nil { + return uuid.UUID{}, "", "", backend.EncNone, "", fmt.Errorf("unable to parse %s. error parsing encoding: %w", filename, err) + } + + // fifth is dataEncoding + dataEncoding := splits[4] + + return id, tenant, version, encoding, dataEncoding, nil +} + func (w *WAL) GetFilepath() string { return w.c.Filepath } From 1823cb1fe192fead6af301dca1fdd342df23d18b Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 1 Oct 2021 17:50:08 +0530 Subject: [PATCH 07/22] Append block uses new ParseFilename Signed-off-by: Annanay --- tempodb/wal/append_block.go | 41 +------------------------------- tempodb/wal/append_block_test.go | 37 +++++++++++++++------------- tempodb/wal/wal.go | 7 ++++-- 3 files changed, 26 insertions(+), 59 deletions(-) diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 25712e0b938..5ef96149cb7 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -69,7 +69,7 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.En // be completed. It can return a warning or a fatal error func newAppendBlockFromFile(filename string, path string) (*AppendBlock, error, error) { var warning error - blockID, tenantID, version, e, dataEncoding, err := parseFilename(filename) + blockID, tenantID, version, e, dataEncoding, err := ParseFilename(filename) if err != nil { return nil, nil, err } @@ -219,42 +219,3 @@ func (a *AppendBlock) file() (*os.File, error) { return a.readFile, err } - -func parseFilename(name string) (uuid.UUID, string, string, backend.Encoding, string, error) { - splits := strings.Split(name, ":") - - if len(splits) != 2 && len(splits) != 4 && len(splits) != 5 { - return uuid.UUID{}, "", "", backend.EncNone, "", fmt.Errorf("unable to parse %s. unexpected number of segments", name) - } - - blockIDString := splits[0] - tenantID := splits[1] - - version := "v0" - encodingString := backend.EncNone.String() - dataEncoding := "" - if len(splits) >= 4 { - version = splits[2] - encodingString = splits[3] - } - - if len(splits) >= 5 { - dataEncoding = splits[4] - } - - blockID, err := uuid.Parse(blockIDString) - if err != nil { - return uuid.UUID{}, "", "", backend.EncNone, "", fmt.Errorf("unable to parse %s. error parsing uuid: %w", name, err) - } - - encoding, err := backend.ParseEncoding(encodingString) - if err != nil { - return uuid.UUID{}, "", "", backend.EncNone, "", fmt.Errorf("unable to parse %s. error parsing encoding: %w", name, err) - } - - if len(tenantID) == 0 || len(version) == 0 { - return uuid.UUID{}, "", "", backend.EncNone, "", fmt.Errorf("unable to parse %s. missing fields", name) - } - - return blockID, tenantID, version, encoding, dataEncoding, nil -} diff --git a/tempodb/wal/append_block_test.go b/tempodb/wal/append_block_test.go index c587bc021fb..fc8e6c1492f 100644 --- a/tempodb/wal/append_block_test.go +++ b/tempodb/wal/append_block_test.go @@ -123,22 +123,6 @@ func TestParseFilename(t *testing.T) { expectedDataEncoding string expectError bool }{ - { - name: "ez-mode", - filename: "123e4567-e89b-12d3-a456-426614174000:foo", - expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), - expectTenant: "foo", - expectedVersion: "v0", - expectedEncoding: backend.EncNone, - }, - { - name: "version and encoding", - filename: "123e4567-e89b-12d3-a456-426614174000:foo:v1:snappy", - expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), - expectTenant: "foo", - expectedVersion: "v1", - expectedEncoding: backend.EncSnappy, - }, { name: "version, enc snappy and dataencoding", filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:snappy:dataencoding", @@ -159,6 +143,15 @@ func TestParseFilename(t *testing.T) { }, { name: "empty dataencoding", + filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:snappy", + expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), + expectTenant: "foo", + expectedVersion: "v2", + expectedEncoding: backend.EncSnappy, + expectedDataEncoding: "", + }, + { + name: "empty dataencoding with semicolon", filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:snappy:", expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), expectTenant: "foo", @@ -216,11 +209,21 @@ func TestParseFilename(t *testing.T) { filename: "123e4567-e89b-12d3-a456-426614174000:test:v1:asdf", expectError: true, }, + { + name: "ez-mode old format", + filename: "123e4567-e89b-12d3-a456-426614174000:foo", + expectError: true, + }, + { + name: "version and encoding old format", + filename: "123e4567-e89b-12d3-a456-426614174000:foo:v1:snappy", + expectError: true, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - actualUUID, actualTenant, actualVersion, actualEncoding, actualDataEncoding, err := parseFilename(tc.filename) + actualUUID, actualTenant, actualVersion, actualEncoding, actualDataEncoding, err := ParseFilename(tc.filename) if tc.expectError { assert.Error(t, err) diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index a0e94b79903..505e5e65e79 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -148,7 +148,7 @@ func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string, name strin func ParseFilename(filename string) (uuid.UUID, string, string, backend.Encoding, string, error) { splits := strings.Split(filename, ":") - if len(splits) != 5 { + if len(splits) != 4 && len(splits) != 5 { return uuid.UUID{}, "", "", backend.EncNone, "", fmt.Errorf("unable to parse %s. unexpected number of segments", filename) } @@ -179,7 +179,10 @@ func ParseFilename(filename string) (uuid.UUID, string, string, backend.Encoding } // fifth is dataEncoding - dataEncoding := splits[4] + dataEncoding := "" + if len(splits) == 5 { + dataEncoding = splits[4] + } return id, tenant, version, encoding, dataEncoding, nil } From 18d2e6432b2deb42cdc2503de16f6b004708fc5e Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 1 Oct 2021 18:32:59 +0530 Subject: [PATCH 08/22] Add tests, benchmarks, pass encoding along correctly Signed-off-by: Annanay --- modules/ingester/instance.go | 2 +- tempodb/search/backend_search_block_test.go | 43 +++--- tempodb/search/block_meta.go | 2 +- tempodb/search/rescan_blocks.go | 3 +- tempodb/search/streaming_search_block.go | 36 +++-- tempodb/search/streaming_search_block_test.go | 142 ++++++++++-------- 6 files changed, 124 insertions(+), 104 deletions(-) diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 5dc45847941..1c7bffd0cb8 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -504,7 +504,7 @@ func (i *instance) resetHeadBlock() error { return err } - b, err := search.NewStreamingSearchBlockForFile(f) + b, err := search.NewStreamingSearchBlockForFile(f, "v2", backend.EncNone) if err != nil { return err } diff --git a/tempodb/search/backend_search_block_test.go b/tempodb/search/backend_search_block_test.go index 2756c45ab60..93719aae4c3 100644 --- a/tempodb/search/backend_search_block_test.go +++ b/tempodb/search/backend_search_block_test.go @@ -31,7 +31,7 @@ func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.E f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644) require.NoError(t, err) - b1, err := NewStreamingSearchBlockForFile(f) + b1, err := NewStreamingSearchBlockForFile(f, "v2", enc) require.NoError(t, err) for i := 0; i < traceCount; i++ { @@ -57,28 +57,33 @@ func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.E func TestBackendSearchBlockSearch(t *testing.T) { traceCount := 50_000 - b2 := newBackendSearchBlockWithTraces(t, traceCount, backend.EncNone, 0) + for _, enc := range backend.SupportedEncoding { + t.Run(enc.String(), func(t *testing.T) { - p := NewSearchPipeline(&tempopb.SearchRequest{ - Tags: map[string]string{"key20": "value_B_20"}, - }) + b2 := newBackendSearchBlockWithTraces(t, traceCount, enc, 0) - sr := NewResults() + p := NewSearchPipeline(&tempopb.SearchRequest{ + Tags: map[string]string{"key20": "value_B_20"}, + }) - sr.StartWorker() - go func() { - defer sr.FinishWorker() - err := b2.Search(context.TODO(), p, sr) - require.NoError(t, err) - }() - sr.AllWorkersStarted() + sr := NewResults() + + sr.StartWorker() + go func() { + defer sr.FinishWorker() + err := b2.Search(context.TODO(), p, sr) + require.NoError(t, err) + }() + sr.AllWorkersStarted() - var results []*tempopb.TraceSearchMetadata - for r := range sr.Results() { - results = append(results, r) + var results []*tempopb.TraceSearchMetadata + for r := range sr.Results() { + results = append(results, r) + } + require.Equal(t, 1, len(results)) + require.Equal(t, traceCount, int(sr.TracesInspected())) + }) } - require.Equal(t, 1, len(results)) - require.Equal(t, traceCount, int(sr.TracesInspected())) } func TestBackendSearchBlockDedupesWAL(t *testing.T) { @@ -114,7 +119,7 @@ func TestBackendSearchBlockDedupesWAL(t *testing.T) { f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644) require.NoError(t, err) - b1, err := NewStreamingSearchBlockForFile(f) + b1, err := NewStreamingSearchBlockForFile(f, "v2", backend.EncNone) require.NoError(t, err) id := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} diff --git a/tempodb/search/block_meta.go b/tempodb/search/block_meta.go index e89db64a12a..f6cebfc3854 100644 --- a/tempodb/search/block_meta.go +++ b/tempodb/search/block_meta.go @@ -12,7 +12,7 @@ import ( type BlockMeta struct { Version string `json:"version"` - Encoding backend.Encoding `json:"encoding"` // Encoding/compression format + Encoding backend.Encoding `json:"v"` // Encoding/compression format IndexPageSize uint32 `json:"indexPageSize"` IndexRecords uint32 `json:"indexRecords"` } diff --git a/tempodb/search/rescan_blocks.go b/tempodb/search/rescan_blocks.go index a7f132e02f6..40145b01a3f 100644 --- a/tempodb/search/rescan_blocks.go +++ b/tempodb/search/rescan_blocks.go @@ -98,6 +98,7 @@ func newStreamingSearchBlockFromWALReplay(searchFilepath, filename string) (*Str file: f, appender: encoding.NewRecordAppender(records), header: blockHeader, - encoding: v, + v: v, + enc: enc, }, warning, nil } diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index b36232550a7..096ba518eb3 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -22,7 +22,8 @@ type StreamingSearchBlock struct { appender encoding.Appender file *os.File header *tempofb.SearchBlockHeaderMutable - encoding encoding.VersionedEncoding + v encoding.VersionedEncoding + enc backend.Encoding } // Close closes the WAL file. Used in tests @@ -38,19 +39,20 @@ func (s *StreamingSearchBlock) Clear() error { // NewStreamingSearchBlockForFile creates a new streaming block that will read/write the given file. // File must be opened for read/write permissions. -func NewStreamingSearchBlockForFile(f *os.File) (*StreamingSearchBlock, error) { - v, err := encoding.FromVersion("v2") +func NewStreamingSearchBlockForFile(f *os.File, version string, enc backend.Encoding) (*StreamingSearchBlock, error) { + v, err := encoding.FromVersion(version) if err != nil { return nil, err } s := &StreamingSearchBlock{ - file: f, - header: tempofb.NewSearchBlockHeaderMutable(), - encoding: v, + file: f, + header: tempofb.NewSearchBlockHeaderMutable(), + v: v, + enc: enc, } // Use versioned encoding to create paged entries - dataWriter, err := s.encoding.NewDataWriter(f, backend.EncNone) + dataWriter, err := s.v.NewDataWriter(f, enc) if err != nil { return nil, err } @@ -86,12 +88,12 @@ func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Resul sr.AddBlockInspected() - dr, err := s.encoding.NewDataReader(backend.NewContextReaderWithAllReader(s.file), backend.EncNone) + dr, err := s.v.NewDataReader(backend.NewContextReaderWithAllReader(s.file), s.enc) if err != nil { return err } - orw := s.encoding.NewObjectReaderWriter() + orw := s.v.NewObjectReaderWriter() rr := s.appender.Records() var pagesBuffer [][]byte @@ -144,17 +146,18 @@ func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Resul func (s *StreamingSearchBlock) Iterator() (encoding.Iterator, error) { iter := &streamingSearchBlockIterator{ - records: s.appender.Records(), - file: s.file, + records: s.appender.Records(), + file: s.file, + pagesBuffer: make([][]byte, 1), } - dr, err := s.encoding.NewDataReader(backend.NewContextReaderWithAllReader(s.file), backend.EncNone) + dr, err := s.v.NewDataReader(backend.NewContextReaderWithAllReader(s.file), s.enc) if err != nil { return nil, err } iter.dataReader = dr - iter.objectRW = s.encoding.NewObjectReaderWriter() + iter.objectRW = s.v.NewObjectReaderWriter() combiner := &DataCombiner{} @@ -168,6 +171,8 @@ type streamingSearchBlockIterator struct { file *os.File dataReader common.DataReader objectRW common.ObjectReaderWriter + + pagesBuffer [][]byte } var _ encoding.Iterator = (*streamingSearchBlockIterator)(nil) @@ -182,9 +187,8 @@ func (s *streamingSearchBlockIterator) Next(ctx context.Context) (common.ID, []b // Use unique buffer that can be returned to the caller. // This is primarily for DedupingIterator which uses 2 buffers at once. var buffer []byte - pagesBuffer := make([][]byte, 1) - pagesBuffer[0] = make([]byte, currentRecord.Length) - pagesBuffer, _, err := s.dataReader.Read(ctx, []common.Record{currentRecord}, pagesBuffer, buffer) + s.pagesBuffer[0] = make([]byte, currentRecord.Length) + pagesBuffer, _, err := s.dataReader.Read(ctx, []common.Record{currentRecord}, s.pagesBuffer, buffer) if err != nil { return nil, nil, err } diff --git a/tempodb/search/streaming_search_block_test.go b/tempodb/search/streaming_search_block_test.go index 63b4baf937c..e921e10e34e 100644 --- a/tempodb/search/streaming_search_block_test.go +++ b/tempodb/search/streaming_search_block_test.go @@ -11,14 +11,15 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/pkg/tempopb" - "github.com/stretchr/testify/require" + "github.com/grafana/tempo/tempodb/backend" ) // newStreamingSearchBlockWithTraces returns (tmpDir path, *StreamingSearchBlock) -func newStreamingSearchBlockWithTraces(traceCount int, t testing.TB) (string, *StreamingSearchBlock) { +func newStreamingSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.Encoding) (string, *StreamingSearchBlock) { tmpDir := t.TempDir() // create search sub-directory @@ -34,10 +35,10 @@ func newStreamingSearchBlockWithTraces(traceCount int, t testing.TB) (string, *S "key4": {"value40", "value41"}, }}).ToBytes()} - f, err := os.OpenFile(path.Join(tmpDir, "search", "1c505e8b-26cd-4621-ba7d-792bb55282d5:single-tenant:v2:none:"), os.O_CREATE|os.O_RDWR, 0644) + f, err := os.OpenFile(path.Join(tmpDir, "search", fmt.Sprintf("1c505e8b-26cd-4621-ba7d-792bb55282d5:single-tenant:v2:%s:", enc.String())), os.O_CREATE|os.O_RDWR, 0644) require.NoError(t, err) - sb, err := NewStreamingSearchBlockForFile(f) + sb, err := NewStreamingSearchBlockForFile(f, "v2", enc) require.NoError(t, err) for i := 0; i < traceCount; i++ { @@ -49,44 +50,49 @@ func newStreamingSearchBlockWithTraces(traceCount int, t testing.TB) (string, *S func TestStreamingSearchBlockReplay(t *testing.T) { traceCount := 100 - walDir, sb := newStreamingSearchBlockWithTraces(traceCount, t) - assert.NotNil(t, sb) - - // close the old file handler - assert.NoError(t, sb.Close()) - - // create new block from the same wal file - blocks, err := RescanBlocks(walDir) - assert.NoError(t, err) - require.Len(t, blocks, 1) - assert.Equal(t, traceCount, len(blocks[0].appender.Records())) - - // search the new block - p := NewSearchPipeline(&tempopb.SearchRequest{ - Tags: map[string]string{"key1": "value10"}, - }) - - sr := NewResults() - - sr.StartWorker() - go func() { - defer sr.FinishWorker() - err := blocks[0].Search(context.TODO(), p, sr) - require.NoError(t, err) - }() - sr.AllWorkersStarted() - - var results []*tempopb.TraceSearchMetadata - for r := range sr.Results() { - results = append(results, r) - } - require.Equal(t, traceCount, len(results)) + for _, enc := range backend.SupportedEncoding { + t.Run(enc.String(), func(t *testing.T) { + walDir, sb := newStreamingSearchBlockWithTraces(t, traceCount, enc) + assert.NotNil(t, sb) + + // close the old file handler + assert.NoError(t, sb.Close()) + + // create new block from the same wal file + blocks, err := RescanBlocks(walDir) + assert.NoError(t, err) + require.Len(t, blocks, 1) + assert.Equal(t, traceCount, len(blocks[0].appender.Records())) + + // search the new block + p := NewSearchPipeline(&tempopb.SearchRequest{ + Tags: map[string]string{"key1": "value10"}, + }) + + sr := NewResults() + + sr.StartWorker() + go func() { + defer sr.FinishWorker() + err := blocks[0].Search(context.TODO(), p, sr) + require.NoError(t, err) + }() + sr.AllWorkersStarted() + + var results []*tempopb.TraceSearchMetadata + for r := range sr.Results() { + results = append(results, r) + } + + require.Equal(t, traceCount, len(results)) + }) + } } func TestStreamingSearchBlockSearchBlock(t *testing.T) { traceCount := 10 - _, sb := newStreamingSearchBlockWithTraces(traceCount, t) + _, sb := newStreamingSearchBlockWithTraces(t, traceCount, backend.EncNone) testCases := []struct { name string @@ -141,35 +147,39 @@ func TestStreamingSearchBlockSearchBlock(t *testing.T) { func BenchmarkStreamingSearchBlockSearch(b *testing.B) { - _, sb := newStreamingSearchBlockWithTraces(b.N, b) - - p := NewSearchPipeline(&tempopb.SearchRequest{ - Tags: map[string]string{"nomatch": "nomatch"}, - }) - - sr := NewResults() - - b.ResetTimer() - start := time.Now() - // Search 10x10 because reading the search data is much faster than creating it, but we need - // to spend at least 1 second to satisfy go bench minimum elapsed time requirement. - loops := 10 - wg := &sync.WaitGroup{} - for i := 0; i < loops; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for j := 0; j < loops; j++ { - err := sb.Search(context.TODO(), p, sr) - require.NoError(b, err) + for _, enc := range backend.SupportedEncoding { + b.Run(enc.String(), func(b *testing.B) { + _, sb := newStreamingSearchBlockWithTraces(b, b.N, enc) + + p := NewSearchPipeline(&tempopb.SearchRequest{ + Tags: map[string]string{SecretExhaustiveSearchTag: "!"}, + }) + + sr := NewResults() + + b.ResetTimer() + start := time.Now() + // Search 10x10 because reading the search data is much faster than creating it, but we need + // to spend at least 1 second to satisfy go bench minimum elapsed time requirement. + loops := 10 + wg := &sync.WaitGroup{} + for i := 0; i < loops; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < loops; j++ { + err := sb.Search(context.TODO(), p, sr) + require.NoError(b, err) + } + }() } - }() - } - wg.Wait() - elapsed := time.Since(start) + wg.Wait() + elapsed := time.Since(start) - fmt.Printf("StreamingSearchBlock search throughput: %v elapsed %.2f MB = %.2f MiB/s throughput \n", - elapsed, - float64(sr.bytesInspected.Load())/(1024*1024), - float64(sr.bytesInspected.Load())/(elapsed.Seconds())/(1024*1024)) + fmt.Printf("StreamingSearchBlock search throughput: %v elapsed %.2f MB = %.2f MiB/s throughput \n", + elapsed, + float64(sr.bytesInspected.Load())/(1024*1024), + float64(sr.bytesInspected.Load())/(elapsed.Seconds())/(1024*1024)) + }) + } } From 481d51f8e04e9bcdb2faeca19c72531f7eac5c70 Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 1 Oct 2021 18:53:01 +0530 Subject: [PATCH 09/22] Changelog Signed-off-by: Annanay --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 080a34754de..f15e27bb584 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ * [CHANGE] **BREAKING CHANGE** Change ingester metric `ingester_bytes_metric_total` in favor of `ingester_bytes_received_total` [#979](https://github.com/grafana/tempo/pull/979) (@mapno) * [FEATURE] Add ability to search ingesters for traces [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio) * [FEATURE] Add runtime config handler [#936](https://github.com/grafana/tempo/pull/936) (@mapno) +* [FEATURE] Search WAL reload and compression(versioned encoding) support [#1000](https://github.com/grafana/tempo/pull/1000) (@annanay25) * [ENHANCEMENT] Added "query blocks" cli option. [#876](https://github.com/grafana/tempo/pull/876) (@joe-elliott) * [ENHANCEMENT] Added "search blocks" cli option. [#972](https://github.com/grafana/tempo/pull/972) (@joe-elliott) * [ENHANCEMENT] Added traceid to `trace too large message`. [#888](https://github.com/grafana/tempo/pull/888) (@mritunjaysharma394) From 99506a3c9113c837ed7d1f0542229ae744b32f1f Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 1 Oct 2021 19:09:19 +0530 Subject: [PATCH 10/22] Post merge cleanup Signed-off-by: Annanay --- tempodb/search/backend_search_block_test.go | 78 +++++++++++---------- 1 file changed, 40 insertions(+), 38 deletions(-) diff --git a/tempodb/search/backend_search_block_test.go b/tempodb/search/backend_search_block_test.go index c5ad9277e4c..332ac37c3e1 100644 --- a/tempodb/search/backend_search_block_test.go +++ b/tempodb/search/backend_search_block_test.go @@ -115,51 +115,53 @@ func TestBackendSearchBlockDedupesWAL(t *testing.T) { }, } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644) - require.NoError(t, err) - - b1, err := NewStreamingSearchBlockForFile(f, "v2", backend.EncNone) - require.NoError(t, err) + for _, enc := range backend.SupportedEncoding { + for _, tc := range testCases { + t.Run(fmt.Sprint(tc.name, "/", enc.String()), func(t *testing.T) { + f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644) + require.NoError(t, err) - id := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} - for i := 0; i < traceCount; i++ { - require.NoError(t, b1.Append(context.Background(), id, tc.searchDataGenerator(id, i))) - } + b1, err := NewStreamingSearchBlockForFile(f, "v2", enc) + require.NoError(t, err) - l, err := local.NewBackend(&local.Config{ - Path: t.TempDir(), - }) - require.NoError(t, err) + id := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} + for i := 0; i < traceCount; i++ { + require.NoError(t, b1.Append(context.Background(), id, tc.searchDataGenerator(id, i))) + } - blockID := uuid.New() - err = NewBackendSearchBlock(b1, backend.NewWriter(l), blockID, testTenantID, backend.EncNone, 0) - require.NoError(t, err) + l, err := local.NewBackend(&local.Config{ + Path: t.TempDir(), + }) + require.NoError(t, err) - b2 := OpenBackendSearchBlock(blockID, testTenantID, backend.NewReader(l)) + blockID := uuid.New() + err = NewBackendSearchBlock(b1, backend.NewWriter(l), blockID, testTenantID, enc, 0) + require.NoError(t, err) - p := NewSearchPipeline(&tempopb.SearchRequest{ - Tags: tc.searchTags, - }) + b2 := OpenBackendSearchBlock(blockID, testTenantID, backend.NewReader(l)) - sr := NewResults() + p := NewSearchPipeline(&tempopb.SearchRequest{ + Tags: tc.searchTags, + }) - sr.StartWorker() - go func() { - defer sr.FinishWorker() - err := b2.Search(context.TODO(), p, sr) - require.NoError(t, err) - }() - sr.AllWorkersStarted() + sr := NewResults() - var results []*tempopb.TraceSearchMetadata - for r := range sr.Results() { - results = append(results, r) - } - require.Equal(t, tc.expectedLenResults, len(results)) - require.Equal(t, tc.expectedLenInspected, int(sr.TracesInspected())) - }) + sr.StartWorker() + go func() { + defer sr.FinishWorker() + err := b2.Search(context.TODO(), p, sr) + require.NoError(t, err) + }() + sr.AllWorkersStarted() + + var results []*tempopb.TraceSearchMetadata + for r := range sr.Results() { + results = append(results, r) + } + require.Equal(t, tc.expectedLenResults, len(results)) + require.Equal(t, tc.expectedLenInspected, int(sr.TracesInspected())) + }) + } } } @@ -217,7 +219,7 @@ func TestBackendSearchBlockFinalSize(t *testing.T) { f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644) require.NoError(t, err) - b1, err := NewStreamingSearchBlockForFile(f) + b1, err := NewStreamingSearchBlockForFile(f, "v2", backend.EncNone) require.NoError(t, err) for i := 0; i < traceCount; i++ { From 98c7e1b982d7301330b9e3ef8ea90af8c965bcbd Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 1 Oct 2021 19:41:23 +0530 Subject: [PATCH 11/22] Err handling for search disabled Signed-off-by: Annanay --- tempodb/search/rescan_blocks.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tempodb/search/rescan_blocks.go b/tempodb/search/rescan_blocks.go index 40145b01a3f..bd7cbc413d8 100644 --- a/tempodb/search/rescan_blocks.go +++ b/tempodb/search/rescan_blocks.go @@ -20,7 +20,9 @@ func RescanBlocks(walPath string) ([]*StreamingSearchBlock, error) { searchFilepath := filepath.Join(walPath, "search") files, err := ioutil.ReadDir(searchFilepath) if err != nil { - return nil, err + // this might happen if search is not enabled, dont err here + level.Warn(cortex_util.Logger).Log("msg", "failed to open search wal directory", "err", err) + return nil, nil } blocks := make([]*StreamingSearchBlock, 0, len(files)) From 29d8eafde2647a1b723ba5c8abb6d8fa5a154b87 Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 1 Oct 2021 19:56:46 +0530 Subject: [PATCH 12/22] Use the right level package, reload backend search blocks Signed-off-by: Annanay --- modules/ingester/instance.go | 4 ++-- modules/ingester/instance_search_test.go | 4 ++-- tempodb/search/backend_search_block.go | 4 ++++ tempodb/search/rescan_blocks.go | 2 +- 4 files changed, 9 insertions(+), 5 deletions(-) diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index f6aebeb55c6..6b837311469 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -623,11 +623,11 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) error { return err } - //sb := search.OpenBackendSearchBlock(i.local, b.BlockMeta().BlockID, b.BlockMeta().TenantID) + sb := search.OpenBackendSearchBlock(b.BlockMeta().BlockID, b.BlockMeta().TenantID, i.localReader) i.blocksMtx.Lock() i.completeBlocks = append(i.completeBlocks, ib) - //i.searchCompleteBlocks[ib] = &searchLocalBlockEntry{b: sb} + i.searchCompleteBlocks[ib] = &searchLocalBlockEntry{b: sb} i.blocksMtx.Unlock() level.Info(log.Logger).Log("msg", "reloaded local block", "tenantID", i.instanceID, "block", id.String(), "flushed", ib.FlushedTime()) diff --git a/modules/ingester/instance_search_test.go b/modules/ingester/instance_search_test.go index 8f82155e96a..499e7f5014e 100644 --- a/modules/ingester/instance_search_test.go +++ b/modules/ingester/instance_search_test.go @@ -136,8 +136,8 @@ func TestInstanceSearch(t *testing.T) { sr, err = i.Search(context.Background(), req) assert.NoError(t, err) - // note: search is experimental and removed on every startup. Verify no search results now - assert.Len(t, sr.Traces, 0) + assert.Len(t, sr.Traces, numTraces/searchAnnotatedFractionDenominator) + checkEqual(t, ids, sr) } func TestInstanceSearchNoData(t *testing.T) { diff --git a/tempodb/search/backend_search_block.go b/tempodb/search/backend_search_block.go index d7f5926b78a..ba839be0166 100644 --- a/tempodb/search/backend_search_block.go +++ b/tempodb/search/backend_search_block.go @@ -147,6 +147,10 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results meta, err := ReadSearchBlockMeta(ctx, s.r, s.id, s.tenantID) if err != nil { + // we create BackendSearchBlocks even if search files are missing, return nil here if meta does not exist + if err == backend.ErrDoesNotExist { + return nil + } return err } diff --git a/tempodb/search/rescan_blocks.go b/tempodb/search/rescan_blocks.go index bd7cbc413d8..b9e0ae5055d 100644 --- a/tempodb/search/rescan_blocks.go +++ b/tempodb/search/rescan_blocks.go @@ -7,7 +7,7 @@ import ( "time" cortex_util "github.com/cortexproject/cortex/pkg/util/log" - "github.com/go-kit/log/level" + "github.com/go-kit/kit/log/level" "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/tempodb/encoding" From 195b2fa6bca4baeba62aa5ba1b7028204016c1bd Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 1 Oct 2021 20:03:57 +0530 Subject: [PATCH 13/22] never refactor variables using an ide Signed-off-by: Annanay --- tempodb/search/block_meta.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tempodb/search/block_meta.go b/tempodb/search/block_meta.go index a9d50291f8d..e153d7baecb 100644 --- a/tempodb/search/block_meta.go +++ b/tempodb/search/block_meta.go @@ -10,7 +10,7 @@ import ( type BlockMeta struct { Version string `json:"version"` - Encoding backend.Encoding `json:"v"` // Encoding/compression format + Encoding backend.Encoding `json:"encoding"` // Encoding/compression format IndexPageSize uint32 `json:"indexPageSize"` IndexRecords uint32 `json:"indexRecords"` } From 0747e091e3ace9c54e1aac35de8364a6fbf08fdf Mon Sep 17 00:00:00 2001 From: Annanay Date: Fri, 1 Oct 2021 23:57:32 +0530 Subject: [PATCH 14/22] Address comments, fix test Signed-off-by: Annanay --- modules/ingester/ingester.go | 2 +- modules/ingester/ingester_test.go | 94 ++++++++++++++---------- modules/ingester/instance.go | 2 +- tempodb/search/streaming_search_block.go | 2 +- tempodb/wal/wal.go | 2 +- 5 files changed, 61 insertions(+), 41 deletions(-) diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index a84581953e9..2d413cee501 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -339,7 +339,7 @@ func (i *Ingester) replayWal() error { return fmt.Errorf("fatal error replaying search wal %w", err) } - // clear any searchBlock that does not have a corresponding entry in tracesBlocks + // clear any searchBlock that does not have a matching wal block for j := len(searchBlocks) - 1; j >= 0; j-- { clear := true for _, tracesBlock := range blocks { diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index d5c9b29c7c9..64d870964d6 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -19,6 +19,7 @@ import ( "github.com/grafana/tempo/modules/overrides" "github.com/grafana/tempo/modules/storage" "github.com/grafana/tempo/pkg/model" + "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/pkg/tempopb" v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1" "github.com/grafana/tempo/pkg/util/test" @@ -211,41 +212,54 @@ func TestWal(t *testing.T) { } } -//func TestSearchWAL(t *testing.T) { -// tmpDir := t.TempDir() -// -// i, _, _ := defaultIngester(t, tmpDir) -// inst, ok := i.getInstanceByID("test") -// require.True(t, ok) -// -// // Write wal -// err := inst.CutCompleteTraces(0, true) -// require.NoError(t, err) -// _, err = inst.CutBlockIfReady(0, 0, true) -// require.NoError(t, err) -// -// // assert that search WAL is being searched -// ctx := user.InjectOrgID(context.Background(), "test") -// searchReq := &tempopb.SearchRequest{Tags: map[string]string{ -// search.SecretExhaustiveSearchTag: "", -// }} -// results, err := inst.Search(ctx, searchReq) -// assert.NoError(t, err) -// assert.Greater(t, results.Metrics.InspectedTraces, 0) -// -// // Shutdown -// err = i.stopping(nil) -// require.NoError(t, err) -// -// // replay wal -// i, _, _ = defaultIngester(t, tmpDir) -// inst, ok = i.getInstanceByID("test") -// require.True(t, ok) -// -// results, err = inst.Search(ctx, searchReq) -// assert.NoError(t, err) -// assert.Greater(t, results.Metrics.InspectedTraces, 0) -//} +func TestSearchWAL(t *testing.T) { + tmpDir := t.TempDir() + + i := defaultIngesterModule(t, tmpDir) + inst, _ := i.getOrCreateInstance("test") + assert.NotNil(t, inst) + + // create some search data + id := make([]byte, 16) + _, err := rand.Read(id) + require.NoError(t, err) + trace := test.MakeTrace(10, id) + traceBytes, err := trace.Marshal() + require.NoError(t, err) + entry := &tempofb.SearchEntryMutable{} + entry.TraceID = id + entry.AddTag("foo", "bar") + searchBytes := entry.ToBytes() + + // push to instance + assert.NoError(t, inst.PushBytes(context.Background(), id, traceBytes, searchBytes)) + + // Write wal + err = inst.CutCompleteTraces(0, true) + require.NoError(t, err) + + // search WAL + ctx := user.InjectOrgID(context.Background(), "test") + searchReq := &tempopb.SearchRequest{Tags: map[string]string{ + "foo": "bar", + }} + results, err := inst.Search(ctx, searchReq) + assert.NoError(t, err) + assert.Equal(t, uint32(1), results.Metrics.InspectedTraces) + + // Shutdown + err = i.stopping(nil) + require.NoError(t, err) + + // replay wal + i = defaultIngesterModule(t, tmpDir) + inst, ok := i.getInstanceByID("test") + require.True(t, ok) + + results, err = inst.Search(ctx, searchReq) + assert.NoError(t, err) + assert.Equal(t, uint32(1), results.Metrics.InspectedTraces) +} // TestWalReplayDeletesLocalBlocks simulates the condition where an ingester restarts after a wal is completed // to the local disk, but before the wal is deleted. On startup both blocks exist, and the ingester now errs @@ -328,7 +342,7 @@ func TestFlush(t *testing.T) { } } -func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace, [][]byte) { +func defaultIngesterModule(t *testing.T, tmpDir string) *Ingester { ingesterConfig := defaultIngesterTestConfig() limits, err := overrides.NewOverrides(defaultLimitsTestConfig()) require.NoError(t, err, "unexpected error creating overrides") @@ -360,13 +374,19 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace, err = ingester.starting(context.Background()) require.NoError(t, err, "unexpected error starting ingester") + return ingester +} + +func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace, [][]byte) { + ingester := defaultIngesterModule(t, tmpDir) + // make some fake traceIDs/requests traces := make([]*tempopb.Trace, 0) traceIDs := make([][]byte, 0) for i := 0; i < 10; i++ { id := make([]byte, 16) - _, err = rand.Read(id) + _, err := rand.Read(id) require.NoError(t, err) trace := test.MakeTrace(10, id) diff --git a/modules/ingester/instance.go b/modules/ingester/instance.go index 6b837311469..d39ddab4d74 100644 --- a/modules/ingester/instance.go +++ b/modules/ingester/instance.go @@ -497,7 +497,7 @@ func (i *instance) resetHeadBlock() error { i.lastBlockCut = time.Now() // Create search data wal file - f, err := i.writer.WAL().NewFile(i.headBlock.BlockID(), i.instanceID, searchDir, "searchdata") + f, err := i.writer.WAL().NewFile(i.headBlock.BlockID(), i.instanceID, searchDir) if err != nil { return err } diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index 096ba518eb3..c7abe88a563 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -124,7 +124,7 @@ func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Resul return err } - sr.AddBytesInspected(uint64(r.Length)) + sr.AddBytesInspected(uint64(len(pagesBuffer[0]))) sr.AddTraceInspected(1) entry := tempofb.SearchEntryFromBytes(pagesBuffer[0]) diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index 85ab28c743c..bebaf4addd8 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -136,7 +136,7 @@ func (w *WAL) NewBlock(id uuid.UUID, tenantID string, dataEncoding string) (*App return newAppendBlock(id, tenantID, w.c.Filepath, w.c.Encoding, dataEncoding) } -func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string, name string) (*os.File, error) { +func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string) (*os.File, error) { p := filepath.Join(w.c.Filepath, dir) err := os.MkdirAll(p, os.ModePerm) if err != nil { From b5e594150eb9a6f3e86e60ceb6ec3e9663a45cb0 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Mon, 4 Oct 2021 17:19:28 -0400 Subject: [PATCH 15/22] Reuse StreamingSearchBlock iterator in search, relocate dedupe test Signed-off-by: Martin Disibio --- tempodb/search/backend_search_block_test.go | 127 ---------------- tempodb/search/streaming_search_block.go | 34 ++--- tempodb/search/streaming_search_block_test.go | 135 ++++++++++++++++-- 3 files changed, 133 insertions(+), 163 deletions(-) diff --git a/tempodb/search/backend_search_block_test.go b/tempodb/search/backend_search_block_test.go index 332ac37c3e1..22341195fb9 100644 --- a/tempodb/search/backend_search_block_test.go +++ b/tempodb/search/backend_search_block_test.go @@ -7,9 +7,7 @@ import ( "os" "path" "strconv" - "sync" "testing" - "time" "github.com/google/uuid" "github.com/grafana/tempo/pkg/tempofb" @@ -87,131 +85,6 @@ func TestBackendSearchBlockSearch(t *testing.T) { } } -func TestBackendSearchBlockDedupesWAL(t *testing.T) { - traceCount := 1_000 - - testCases := []struct { - name string - searchDataGenerator func(traceID []byte, i int) [][]byte - searchTags map[string]string - expectedLenResults int - expectedLenInspected int - }{ - { - name: "distinct traces", - searchDataGenerator: genSearchData, - searchTags: map[string]string{"key10": "value_A_10", "key20": "value_B_20"}, - expectedLenResults: 1, - expectedLenInspected: 1, - }, - { - name: "empty traces", - searchDataGenerator: func(traceID []byte, i int) [][]byte { - return [][]byte{} - }, - searchTags: map[string]string{"key10": "value_A_10"}, - expectedLenResults: 0, - expectedLenInspected: 0, - }, - } - - for _, enc := range backend.SupportedEncoding { - for _, tc := range testCases { - t.Run(fmt.Sprint(tc.name, "/", enc.String()), func(t *testing.T) { - f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644) - require.NoError(t, err) - - b1, err := NewStreamingSearchBlockForFile(f, "v2", enc) - require.NoError(t, err) - - id := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} - for i := 0; i < traceCount; i++ { - require.NoError(t, b1.Append(context.Background(), id, tc.searchDataGenerator(id, i))) - } - - l, err := local.NewBackend(&local.Config{ - Path: t.TempDir(), - }) - require.NoError(t, err) - - blockID := uuid.New() - err = NewBackendSearchBlock(b1, backend.NewWriter(l), blockID, testTenantID, enc, 0) - require.NoError(t, err) - - b2 := OpenBackendSearchBlock(blockID, testTenantID, backend.NewReader(l)) - - p := NewSearchPipeline(&tempopb.SearchRequest{ - Tags: tc.searchTags, - }) - - sr := NewResults() - - sr.StartWorker() - go func() { - defer sr.FinishWorker() - err := b2.Search(context.TODO(), p, sr) - require.NoError(t, err) - }() - sr.AllWorkersStarted() - - var results []*tempopb.TraceSearchMetadata - for r := range sr.Results() { - results = append(results, r) - } - require.Equal(t, tc.expectedLenResults, len(results)) - require.Equal(t, tc.expectedLenInspected, int(sr.TracesInspected())) - }) - } - } - -} - -func BenchmarkBackendSearchBlockSearch(b *testing.B) { - pageSizesMB := []float32{0.5, 1, 2} - - for _, enc := range backend.SupportedEncoding { - for _, sz := range pageSizesMB { - b.Run(fmt.Sprint(enc.String(), "/", sz, "MiB"), func(b *testing.B) { - - b2 := newBackendSearchBlockWithTraces(b, b.N, enc, int(sz*1024*1024)) - - // Use secret tag to perform exhaustive search - p := NewSearchPipeline(&tempopb.SearchRequest{ - Tags: map[string]string{SecretExhaustiveSearchTag: "!"}, - }) - - sr := NewResults() - - b.ResetTimer() - start := time.Now() - // Search 10x10 because reading the search data is much faster than creating it, but we need - // to spend at least 1 second to satisfy go bench minimum elapsed time requirement. - loops := 10 - wg := &sync.WaitGroup{} - for i := 0; i < loops; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for j := 0; j < loops; j++ { - err := b2.Search(context.TODO(), p, sr) - require.NoError(b, err) - } - }() - } - wg.Wait() - elapsed := time.Since(start) - fmt.Printf("BackendSearchBlock search throughput: %v elapsed %.2f MB = %.2f MiB/s \t %d traces = %.2fM traces/s \n", - elapsed, - float64(sr.bytesInspected.Load())/(1024*1024), - float64(sr.bytesInspected.Load())/(elapsed.Seconds())/(1024*1024), - sr.TracesInspected(), - float64(sr.TracesInspected())/(elapsed.Seconds())/1_000_000, - ) - }) - } - } -} - func TestBackendSearchBlockFinalSize(t *testing.T) { traceCount := 10000 pageSizesMB := []float32{1} diff --git a/tempodb/search/streaming_search_block.go b/tempodb/search/streaming_search_block.go index c7abe88a563..c52d6c8025a 100644 --- a/tempodb/search/streaming_search_block.go +++ b/tempodb/search/streaming_search_block.go @@ -84,50 +84,32 @@ func (s *StreamingSearchBlock) Search(ctx context.Context, p Pipeline, sr *Resul return nil } - var buf []byte - sr.AddBlockInspected() - dr, err := s.v.NewDataReader(backend.NewContextReaderWithAllReader(s.file), s.enc) + iter, err := s.Iterator() if err != nil { return err } + defer iter.Close() - orw := s.v.NewObjectReaderWriter() - - rr := s.appender.Records() - var pagesBuffer [][]byte - var buffer []byte - for _, r := range rr { + for { if sr.Quit() { return nil } - if r.Length == 0 { - continue - } - - // Reset/resize buffer - if cap(buf) < int(r.Length) { - buf = make([]byte, r.Length) - } - buf = buf[:r.Length] - - pagesBuffer, buffer, err = dr.Read(ctx, []common.Record{r}, pagesBuffer, buffer) - if err != nil { - return err + _, obj, err := iter.Next(ctx) + if err == io.EOF { + break } - - _, pagesBuffer[0], err = orw.UnmarshalObjectFromReader(bytes.NewReader(pagesBuffer[0])) if err != nil { return err } - sr.AddBytesInspected(uint64(len(pagesBuffer[0]))) + sr.AddBytesInspected(uint64(len(obj))) sr.AddTraceInspected(1) - entry := tempofb.SearchEntryFromBytes(pagesBuffer[0]) + entry := tempofb.SearchEntryFromBytes(obj) if !p.Matches(entry) { continue diff --git a/tempodb/search/streaming_search_block_test.go b/tempodb/search/streaming_search_block_test.go index e921e10e34e..510b1fa2072 100644 --- a/tempodb/search/streaming_search_block_test.go +++ b/tempodb/search/streaming_search_block_test.go @@ -2,7 +2,9 @@ package search import ( "context" + "encoding/binary" "fmt" + "io" "os" "path" "path/filepath" @@ -16,6 +18,7 @@ import ( "github.com/grafana/tempo/pkg/tempofb" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding/common" ) // newStreamingSearchBlockWithTraces returns (tmpDir path, *StreamingSearchBlock) @@ -25,16 +28,6 @@ func newStreamingSearchBlockWithTraces(t testing.TB, traceCount int, enc backend // create search sub-directory require.NoError(t, os.MkdirAll(filepath.Join(tmpDir, "search"), os.ModePerm)) - id := []byte{1, 2, 3, 4, 5, 6, 7, 8} - searchData := [][]byte{(&tempofb.SearchEntryMutable{ - TraceID: id, - Tags: tempofb.SearchDataMap{ - "key1": {"value10", "value11"}, - "key2": {"value20", "value21"}, - "key3": {"value30", "value31"}, - "key4": {"value40", "value41"}, - }}).ToBytes()} - f, err := os.OpenFile(path.Join(tmpDir, "search", fmt.Sprintf("1c505e8b-26cd-4621-ba7d-792bb55282d5:single-tenant:v2:%s:", enc.String())), os.O_CREATE|os.O_RDWR, 0644) require.NoError(t, err) @@ -42,6 +35,20 @@ func newStreamingSearchBlockWithTraces(t testing.TB, traceCount int, enc backend require.NoError(t, err) for i := 0; i < traceCount; i++ { + id := []byte{0, 0, 0, 0, 0, 0, 0, 0} + + // ensure unique ids + binary.LittleEndian.PutUint32(id, uint32(i)) + + searchData := [][]byte{(&tempofb.SearchEntryMutable{ + TraceID: id, + Tags: tempofb.SearchDataMap{ + "key1": {"value10", "value11"}, + "key2": {"value20", "value21"}, + "key3": {"value30", "value31"}, + "key4": {"value40", "value41"}, + }}).ToBytes()} + require.NoError(t, sb.Append(context.Background(), id, searchData)) } @@ -145,6 +152,114 @@ func TestStreamingSearchBlockSearchBlock(t *testing.T) { } } +func TestStreamingSearchBlockIteratorDedupes(t *testing.T) { + traceCount := 1_000 + + testCases := []struct { + name string + searchDataGenerator func(traceID []byte, i int) [][]byte + searchTags map[string]string + expectedLenResults int + expectedLenInspected int + }{ + { + name: "distinct traces", + searchDataGenerator: genSearchData, + searchTags: map[string]string{"key10": "value_A_10", "key20": "value_B_20"}, + expectedLenResults: 1, + expectedLenInspected: 1, + }, + { + name: "empty traces", + searchDataGenerator: func(traceID []byte, i int) [][]byte { + return [][]byte{} + }, + searchTags: map[string]string{"key10": "value_A_10"}, + expectedLenResults: 0, + expectedLenInspected: 0, + }, + } + + for _, enc := range backend.SupportedEncoding { + for _, tc := range testCases { + t.Run(fmt.Sprint(tc.name, "/", enc.String()), func(t *testing.T) { + f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644) + require.NoError(t, err) + + b1, err := NewStreamingSearchBlockForFile(f, "v2", enc) + require.NoError(t, err) + + id := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} + for i := 0; i < traceCount; i++ { + require.NoError(t, b1.Append(context.Background(), id, tc.searchDataGenerator(id, i))) + } + + iter, err := b1.Iterator() + require.NoError(t, err) + + var results []common.ID + for { + id, _, err := iter.Next(context.TODO()) + if err == io.EOF { + break + } + require.NoError(t, err) + results = append(results, id) + } + + require.Equal(t, tc.expectedLenResults, len(results)) + }) + } + } + +} + +func BenchmarkBackendSearchBlockSearch(b *testing.B) { + pageSizesMB := []float32{0.5, 1, 2} + + for _, enc := range backend.SupportedEncoding { + for _, sz := range pageSizesMB { + b.Run(fmt.Sprint(enc.String(), "/", sz, "MiB"), func(b *testing.B) { + + b2 := newBackendSearchBlockWithTraces(b, b.N, enc, int(sz*1024*1024)) + + // Use secret tag to perform exhaustive search + p := NewSearchPipeline(&tempopb.SearchRequest{ + Tags: map[string]string{SecretExhaustiveSearchTag: "!"}, + }) + + sr := NewResults() + + b.ResetTimer() + start := time.Now() + // Search 10x10 because reading the search data is much faster than creating it, but we need + // to spend at least 1 second to satisfy go bench minimum elapsed time requirement. + loops := 10 + wg := &sync.WaitGroup{} + for i := 0; i < loops; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < loops; j++ { + err := b2.Search(context.TODO(), p, sr) + require.NoError(b, err) + } + }() + } + wg.Wait() + elapsed := time.Since(start) + fmt.Printf("BackendSearchBlock search throughput: %v elapsed %.2f MB = %.2f MiB/s \t %d traces = %.2fM traces/s \n", + elapsed, + float64(sr.bytesInspected.Load())/(1024*1024), + float64(sr.bytesInspected.Load())/(elapsed.Seconds())/(1024*1024), + sr.TracesInspected(), + float64(sr.TracesInspected())/(elapsed.Seconds())/1_000_000, + ) + }) + } + } +} + func BenchmarkStreamingSearchBlockSearch(b *testing.B) { for _, enc := range backend.SupportedEncoding { From 5a163fef1b384fb2f7ecf66fa81e136abbdd4784 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Tue, 5 Oct 2021 14:55:36 -0400 Subject: [PATCH 16/22] Make wal search encoding configurable, default to gzip like backend block encoding Signed-off-by: Martin Disibio --- modules/storage/config.go | 1 + tempodb/wal/wal.go | 1 + 2 files changed, 2 insertions(+) diff --git a/modules/storage/config.go b/modules/storage/config.go index 2d827f180b6..018bacc134f 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -35,6 +35,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Trace.WAL = &wal.Config{} f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.") cfg.Trace.WAL.Encoding = backend.EncSnappy + cfg.Trace.WAL.SearchEncoding = backend.EncGZIP cfg.Trace.Block = &encoding.BlockConfig{} f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .01, "Bloom Filter False Positive.") diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index bebaf4addd8..772f5a5e017 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -31,6 +31,7 @@ type Config struct { CompletedFilepath string BlocksFilepath string Encoding backend.Encoding `yaml:"encoding"` + SearchEncoding backend.Encoding `yaml:"search_encoding"` } func New(c *Config) (*WAL, error) { From de3a1e22103736fc0e2b38103945c87f0d274ca8 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Tue, 5 Oct 2021 14:56:26 -0400 Subject: [PATCH 17/22] Make wal search encoding configurable, default to gzip like backend block encoding Signed-off-by: Martin Disibio --- docs/tempo/website/configuration/_index.md | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index 6a85fd24c31..3b9174c3a6d 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -532,6 +532,10 @@ storage: # (default: snappy) [encoding: ] + # search data encoding/compression. same options as wal encoding. + # (default: gzip) + [search_encoding: ] + # block configuration block: @@ -550,7 +554,7 @@ storage: # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2 [encoding: ] - # search data encoding/compression. same options as blocks. + # search data encoding/compression. same options as block encoding. # (default: gzip) [search_encoding: ] From 1aaca429b003a85c9375a6ca69ccbeaa0ca8b1c3 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Tue, 5 Oct 2021 14:57:03 -0400 Subject: [PATCH 18/22] Simplify some search tests which were doing more work than seemed necessary Signed-off-by: Martin Disibio --- tempodb/search/backend_search_block_test.go | 2 +- tempodb/search/streaming_search_block_test.go | 47 +++++++++---------- 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/tempodb/search/backend_search_block_test.go b/tempodb/search/backend_search_block_test.go index 22341195fb9..c95f87b2ae9 100644 --- a/tempodb/search/backend_search_block_test.go +++ b/tempodb/search/backend_search_block_test.go @@ -54,7 +54,7 @@ func newBackendSearchBlockWithTraces(t testing.TB, traceCount int, enc backend.E } func TestBackendSearchBlockSearch(t *testing.T) { - traceCount := 50_000 + traceCount := 10_000 for _, enc := range backend.SupportedEncoding { t.Run(enc.String(), func(t *testing.T) { diff --git a/tempodb/search/streaming_search_block_test.go b/tempodb/search/streaming_search_block_test.go index 510b1fa2072..769882c2cc0 100644 --- a/tempodb/search/streaming_search_block_test.go +++ b/tempodb/search/streaming_search_block_test.go @@ -180,38 +180,35 @@ func TestStreamingSearchBlockIteratorDedupes(t *testing.T) { }, } - for _, enc := range backend.SupportedEncoding { - for _, tc := range testCases { - t.Run(fmt.Sprint(tc.name, "/", enc.String()), func(t *testing.T) { - f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644) - require.NoError(t, err) + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + f, err := os.OpenFile(path.Join(t.TempDir(), "searchdata"), os.O_CREATE|os.O_RDWR, 0644) + require.NoError(t, err) - b1, err := NewStreamingSearchBlockForFile(f, "v2", enc) - require.NoError(t, err) + b1, err := NewStreamingSearchBlockForFile(f, "v2", backend.EncNone) + require.NoError(t, err) - id := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} - for i := 0; i < traceCount; i++ { - require.NoError(t, b1.Append(context.Background(), id, tc.searchDataGenerator(id, i))) - } + id := []byte{0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F} + for i := 0; i < traceCount; i++ { + require.NoError(t, b1.Append(context.Background(), id, tc.searchDataGenerator(id, i))) + } - iter, err := b1.Iterator() - require.NoError(t, err) + iter, err := b1.Iterator() + require.NoError(t, err) - var results []common.ID - for { - id, _, err := iter.Next(context.TODO()) - if err == io.EOF { - break - } - require.NoError(t, err) - results = append(results, id) + var results []common.ID + for { + id, _, err := iter.Next(context.TODO()) + if err == io.EOF { + break } + require.NoError(t, err) + results = append(results, id) + } - require.Equal(t, tc.expectedLenResults, len(results)) - }) - } + require.Equal(t, tc.expectedLenResults, len(results)) + }) } - } func BenchmarkBackendSearchBlockSearch(b *testing.B) { From 5b7485f4674dd46aab737d98b7032949dd2cac08 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Tue, 5 Oct 2021 15:15:58 -0400 Subject: [PATCH 19/22] Comment out flaky test as discussed Signed-off-by: Martin Disibio --- modules/ingester/ingester_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index 64d870964d6..e9a8eb1fcbc 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -261,10 +261,11 @@ func TestSearchWAL(t *testing.T) { assert.Equal(t, uint32(1), results.Metrics.InspectedTraces) } +// TODO - This test is flaky and commented out until it's fixed // TestWalReplayDeletesLocalBlocks simulates the condition where an ingester restarts after a wal is completed // to the local disk, but before the wal is deleted. On startup both blocks exist, and the ingester now errs // on the side of caution and chooses to replay the wal instead of rediscovering the local block. -func TestWalReplayDeletesLocalBlocks(t *testing.T) { +/*func TestWalReplayDeletesLocalBlocks(t *testing.T) { tmpDir := t.TempDir() i, _, _ := defaultIngester(t, tmpDir) @@ -299,6 +300,7 @@ func TestWalReplayDeletesLocalBlocks(t *testing.T) { require.True(t, ok) // After restart we only have the 1 wal block + // TODO - fix race conditions here around access inst fields outside of mutex require.Len(t, inst.completingBlocks, 1) require.Len(t, inst.completeBlocks, 0) require.Equal(t, blockID, inst.completingBlocks[0].BlockID()) @@ -307,6 +309,7 @@ func TestWalReplayDeletesLocalBlocks(t *testing.T) { err = i.stopping(nil) require.NoError(t, err) } +*/ func TestFlush(t *testing.T) { tmpDir, err := os.MkdirTemp("/tmp", "") From 97dafa21c23939f4116b3aa959385fb90b3ebfc1 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 6 Oct 2021 09:26:07 -0400 Subject: [PATCH 20/22] Code review suggestions Signed-off-by: Martin Disibio --- modules/ingester/ingester.go | 1 + modules/ingester/ingester_test.go | 6 ++---- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/modules/ingester/ingester.go b/modules/ingester/ingester.go index 2d413cee501..876e791241f 100644 --- a/modules/ingester/ingester.go +++ b/modules/ingester/ingester.go @@ -381,6 +381,7 @@ func (i *Ingester) replayWal() error { for _, s := range searchBlocks { if b.BlockID() == s.BlockID { searchWALBlock = s + break } } instance.AddCompletingBlock(b, searchWALBlock) diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index e9a8eb1fcbc..351eabde94a 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -248,8 +248,7 @@ func TestSearchWAL(t *testing.T) { assert.Equal(t, uint32(1), results.Metrics.InspectedTraces) // Shutdown - err = i.stopping(nil) - require.NoError(t, err) + require.NoError(t, i.stopping(nil)) // replay wal i = defaultIngesterModule(t, tmpDir) @@ -328,8 +327,7 @@ func TestFlush(t *testing.T) { } // stopping the ingester should force cut all live traces to disk - err = ingester.stopping(nil) - require.NoError(t, err) + require.NoError(t, ingester.stopping(nil)) // create new ingester. this should replay wal! ingester, _, _ = defaultIngester(t, tmpDir) From 7b6a81f87f3d48ab3da5e0c053c3bce6fc1fa142 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 6 Oct 2021 09:26:32 -0400 Subject: [PATCH 21/22] Code review suggestions, add tests for ParseFileName Signed-off-by: Martin Disibio --- tempodb/wal/wal.go | 3 ++- tempodb/wal/wal_test.go | 48 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index 772f5a5e017..0fa45811d78 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -149,7 +149,8 @@ func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string) (*os.File, return os.OpenFile(filepath.Join(p, filename), os.O_CREATE|os.O_RDWR, 0644) } -// ParseFilename returns (blockID, tenant, version, encoding, dataEncoding, error) +// ParseFilename returns (blockID, tenant, version, encoding, dataEncoding, error). +// Example: "00000000-0000-0000-0000-000000000000:1:v2:snappy:v1" func ParseFilename(filename string) (uuid.UUID, string, string, backend.Encoding, string, error) { splits := strings.Split(filename, ":") diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index 441bb797dbe..f5b42b86a2c 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -3,6 +3,7 @@ package wal import ( "bytes" "context" + "errors" "io" "math/rand" "os" @@ -269,6 +270,53 @@ func testAppendReplayFind(t *testing.T, e backend.Encoding) { require.NoError(t, err) } +func TestParseFileName(t *testing.T) { + + testCases := []struct { + name string + filename string + blockID uuid.UUID + tenant string + version string + encoding backend.Encoding + dataEncoding string + err error + }{ + { + "wal", + "00000000-0000-0000-0000-000000000000:1:v2:snappy:v1", + uuid.MustParse("00000000-0000-0000-0000-000000000000"), "1", "v2", backend.EncSnappy, "v1", nil, + }, + { + "search wal", + "00000000-0000-0000-0000-000000000000:1:v2:snappy:", + uuid.MustParse("00000000-0000-0000-0000-000000000000"), "1", "v2", backend.EncSnappy, "", nil, + }, + { + "missing segments", + "xyz", + uuid.Nil, "", "", backend.EncNone, "", errors.New("unable to parse xyz. unexpected number of segments"), + }, + { + "no data encoding", + "00000000-0000-0000-0000-000000000000:1:v2:snappy", + uuid.MustParse("00000000-0000-0000-0000-000000000000"), "1", "v2", backend.EncSnappy, "", nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + b, tn, v, e, de, err := ParseFilename(tc.filename) + require.Equal(t, tc.blockID, b) + require.Equal(t, tc.tenant, tn) + require.Equal(t, tc.version, v) + require.Equal(t, tc.encoding, e) + require.Equal(t, tc.dataEncoding, de) + require.Equal(t, tc.err, err) + }) + } +} + func BenchmarkWALNone(b *testing.B) { benchmarkWriteFindReplay(b, backend.EncNone) } From 7d6a3b2b312070d3e45bca9a35d105975ab36abb Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 6 Oct 2021 09:39:11 -0400 Subject: [PATCH 22/22] Code review suggestions Signed-off-by: Martin Disibio --- modules/ingester/ingester_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index 351eabde94a..61d561e5359 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -235,8 +235,7 @@ func TestSearchWAL(t *testing.T) { assert.NoError(t, inst.PushBytes(context.Background(), id, traceBytes, searchBytes)) // Write wal - err = inst.CutCompleteTraces(0, true) - require.NoError(t, err) + require.NoError(t, inst.CutCompleteTraces(0, true)) // search WAL ctx := user.InjectOrgID(context.Background(), "test")