From ba4bf66b8937ee00ce34e9c50f91ad95d03a847c Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 7 Apr 2021 13:01:52 -0400 Subject: [PATCH 01/21] Exposed versioned encoding Signed-off-by: Joe Elliott --- tempodb/encoding/backend_block.go | 16 ++------ tempodb/encoding/compactor_block.go | 4 +- tempodb/encoding/complete_block.go | 4 +- tempodb/encoding/versioned.go | 58 +++++++++++++++++++++-------- tempodb/encoding/versioned_test.go | 13 ++++++- 5 files changed, 62 insertions(+), 33 deletions(-) diff --git a/tempodb/encoding/backend_block.go b/tempodb/encoding/backend_block.go index 3a768b1432c..6130f3e5696 100644 --- a/tempodb/encoding/backend_block.go +++ b/tempodb/encoding/backend_block.go @@ -14,7 +14,7 @@ import ( // BackendBlock represents a block already in the backend. type BackendBlock struct { - encoding versionedEncoding + encoding VersionedEncoding meta *backend.BlockMeta reader backend.Reader @@ -23,17 +23,9 @@ type BackendBlock struct { // NewBackendBlock returns a BackendBlock for the given backend.BlockMeta // It is version aware. func NewBackendBlock(meta *backend.BlockMeta, r backend.Reader) (*BackendBlock, error) { - var encoding versionedEncoding - - switch meta.Version { - case "v0": - encoding = v0Encoding{} - case "v1": - encoding = v1Encoding{} - case "v2": - encoding = v2Encoding{} - default: - return nil, fmt.Errorf("%s is not a valid block version", meta.Version) + encoding, err := EncodingByVersion(meta.Version) + if err != nil { + return nil, err } return &BackendBlock{ diff --git a/tempodb/encoding/compactor_block.go b/tempodb/encoding/compactor_block.go index 65c0182e69a..60a022c687b 100644 --- a/tempodb/encoding/compactor_block.go +++ b/tempodb/encoding/compactor_block.go @@ -11,7 +11,7 @@ import ( ) type CompactorBlock struct { - encoding versionedEncoding + encoding VersionedEncoding compactedMeta *backend.BlockMeta inMetas []*backend.BlockMeta @@ -36,7 +36,7 @@ func NewCompactorBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas [] } c := &CompactorBlock{ - encoding: latestEncoding(), + encoding: LatestEncoding(), compactedMeta: backend.NewBlockMeta(tenantID, id, currentVersion, cfg.Encoding), bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP), inMetas: metas, diff --git a/tempodb/encoding/complete_block.go b/tempodb/encoding/complete_block.go index 599ae854803..465fc9d695b 100644 --- a/tempodb/encoding/complete_block.go +++ b/tempodb/encoding/complete_block.go @@ -17,7 +17,7 @@ import ( // A CompleteBlock also knows the filepath of the append wal file it was cut from. It is responsible for // cleaning this block up once it has been flushed to the backend. type CompleteBlock struct { - encoding versionedEncoding + encoding VersionedEncoding meta *backend.BlockMeta bloom *common.ShardedBloomFilter @@ -35,7 +35,7 @@ type CompleteBlock struct { // NewCompleteBlock creates a new block and takes _ALL_ the parameters necessary to build the ordered, deduped file on disk func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iterator Iterator, estimatedObjects int, filepath string) (*CompleteBlock, error) { c := &CompleteBlock{ - encoding: latestEncoding(), + encoding: LatestEncoding(), meta: backend.NewBlockMeta(originatingMeta.TenantID, originatingMeta.BlockID, currentVersion, cfg.Encoding), bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP), records: make([]*common.Record, 0), diff --git a/tempodb/encoding/versioned.go b/tempodb/encoding/versioned.go index 73d581f276a..ff9a62a9d71 100644 --- a/tempodb/encoding/versioned.go +++ b/tempodb/encoding/versioned.go @@ -1,6 +1,7 @@ package encoding import ( + "fmt" "io" "github.com/grafana/tempo/tempodb/backend" @@ -12,24 +13,12 @@ import ( const currentVersion = "v2" -// latestEncoding is used by Compactor and Complete block -func latestEncoding() versionedEncoding { - return v2Encoding{} -} - -// allEncodings returns all encodings -func allEncodings() []versionedEncoding { - return []versionedEncoding{ - v0Encoding{}, - v1Encoding{}, - v2Encoding{}, - } -} - -// versionedEncoding has a whole bunch of versioned functionality. This is +// VersionedEncoding has a whole bunch of versioned functionality. This is // currently quite sloppy and could easily be tightened up to just a few methods // but it is what it is for now! -type versionedEncoding interface { +type VersionedEncoding interface { + Version() string + newDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) newIndexWriter(pageSizeBytes int) common.IndexWriter @@ -40,9 +29,40 @@ type versionedEncoding interface { newRecordReaderWriter() common.RecordReaderWriter } +// EncodingByVersion returns a versioned encoding for the provided string +func EncodingByVersion(v string) (VersionedEncoding, error) { + switch v { + case "v0": + return v0Encoding{}, nil + case "v1": + return v1Encoding{}, nil + case "v2": + return v2Encoding{}, nil + } + + return nil, fmt.Errorf("%s is not a valid block version", v) +} + +// LatestEncoding is used by Compactor and Complete block +func LatestEncoding() VersionedEncoding { + return v2Encoding{} +} + +// allEncodings returns all encodings +func allEncodings() []VersionedEncoding { + return []VersionedEncoding{ + v0Encoding{}, + v1Encoding{}, + v2Encoding{}, + } +} + // v0Encoding type v0Encoding struct{} +func (v v0Encoding) Version() string { + return "v0" +} func (v v0Encoding) newIndexWriter(pageSizeBytes int) common.IndexWriter { return v0.NewIndexWriter() } @@ -65,6 +85,9 @@ func (v v0Encoding) newRecordReaderWriter() common.RecordReaderWriter { // v1Encoding type v1Encoding struct{} +func (v v1Encoding) Version() string { + return "v1" +} func (v v1Encoding) newIndexWriter(pageSizeBytes int) common.IndexWriter { return v1.NewIndexWriter() } @@ -87,6 +110,9 @@ func (v v1Encoding) newRecordReaderWriter() common.RecordReaderWriter { // v2Encoding type v2Encoding struct{} +func (v v2Encoding) Version() string { + return "v2" +} func (v v2Encoding) newIndexWriter(pageSizeBytes int) common.IndexWriter { return v2.NewIndexWriter(pageSizeBytes) } diff --git a/tempodb/encoding/versioned_test.go b/tempodb/encoding/versioned_test.go index 544e9a35914..47508c5382b 100644 --- a/tempodb/encoding/versioned_test.go +++ b/tempodb/encoding/versioned_test.go @@ -11,15 +11,26 @@ import ( "github.com/stretchr/testify/require" ) +func TestEncodingByVersionErrors(t *testing.T) { + encoding, err := EncodingByVersion("definitely-not-a-real-version") + assert.Error(t, err) + assert.Nil(t, encoding) +} + func TestAllVersions(t *testing.T) { for _, v := range allEncodings() { + encoding, err := EncodingByVersion(v.Version()) + + require.Equal(t, v.Version(), encoding.Version()) + require.NoError(t, err) + for _, e := range backend.SupportedEncoding { testDataWriterReader(t, v, e) } } } -func testDataWriterReader(t *testing.T, v versionedEncoding, e backend.Encoding) { +func testDataWriterReader(t *testing.T, v VersionedEncoding, e backend.Encoding) { tests := []struct { readerBytes []byte }{ From 0cefb0d14595815af4abf924777863de8ef18983 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 7 Apr 2021 13:06:21 -0400 Subject: [PATCH 02/21] Exposed versioned encoding methods Signed-off-by: Joe Elliott --- tempodb/encoding/backend_block.go | 12 +++---- tempodb/encoding/compactor_block.go | 4 +-- tempodb/encoding/complete_block.go | 8 ++--- tempodb/encoding/complete_block_test.go | 2 +- tempodb/encoding/versioned.go | 48 ++++++++++++------------- tempodb/encoding/versioned_test.go | 6 ++-- 6 files changed, 40 insertions(+), 40 deletions(-) diff --git a/tempodb/encoding/backend_block.go b/tempodb/encoding/backend_block.go index 6130f3e5696..fb3ef3e9e32 100644 --- a/tempodb/encoding/backend_block.go +++ b/tempodb/encoding/backend_block.go @@ -68,20 +68,20 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) { } indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader) - indexReader, err := b.encoding.newIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords)) + indexReader, err := b.encoding.NewIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords)) if err != nil { return nil, fmt.Errorf("error building index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } ra := backend.NewContextReader(b.meta, nameObjects, b.reader) - dataReader, err := b.encoding.newDataReader(ra, b.meta.Encoding) + dataReader, err := b.encoding.NewDataReader(ra, b.meta.Encoding) if err != nil { return nil, fmt.Errorf("error building page reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } defer dataReader.Close() // passing nil for objectCombiner here. this is fine b/c a backend block should never have dupes - finder := NewPagedFinder(indexReader, dataReader, nil, b.encoding.newObjectReaderWriter()) + finder := NewPagedFinder(indexReader, dataReader, nil, b.encoding.NewObjectReaderWriter()) objectBytes, err := finder.Find(ctx, id) if err != nil { @@ -95,7 +95,7 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) { func (b *BackendBlock) Iterator(chunkSizeBytes uint32) (Iterator, error) { // read index ra := backend.NewContextReader(b.meta, nameObjects, b.reader) - dataReader, err := b.encoding.newDataReader(ra, b.meta.Encoding) + dataReader, err := b.encoding.NewDataReader(ra, b.meta.Encoding) if err != nil { return nil, fmt.Errorf("failed to create dataReader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } @@ -105,12 +105,12 @@ func (b *BackendBlock) Iterator(chunkSizeBytes uint32) (Iterator, error) { return nil, err } - return newPagedIterator(chunkSizeBytes, reader, dataReader, b.encoding.newObjectReaderWriter()), nil + return newPagedIterator(chunkSizeBytes, reader, dataReader, b.encoding.NewObjectReaderWriter()), nil } func (b *BackendBlock) NewIndexReader() (common.IndexReader, error) { indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader) - reader, err := b.encoding.newIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords)) + reader, err := b.encoding.NewIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords)) if err != nil { return nil, fmt.Errorf("failed to create index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } diff --git a/tempodb/encoding/compactor_block.go b/tempodb/encoding/compactor_block.go index 60a022c687b..3eadbb58938 100644 --- a/tempodb/encoding/compactor_block.go +++ b/tempodb/encoding/compactor_block.go @@ -44,7 +44,7 @@ func NewCompactorBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas [] } c.appendBuffer = &bytes.Buffer{} - dataWriter, err := c.encoding.newDataWriter(c.appendBuffer, cfg.Encoding) + dataWriter, err := c.encoding.NewDataWriter(c.appendBuffer, cfg.Encoding) if err != nil { return nil, fmt.Errorf("failed to create page writer: %w", err) } @@ -115,7 +115,7 @@ func (c *CompactorBlock) Complete(ctx context.Context, tracker backend.AppendTra records := c.appender.Records() meta := c.BlockMeta() - indexWriter := c.encoding.newIndexWriter(c.cfg.IndexPageSizeBytes) + indexWriter := c.encoding.NewIndexWriter(c.cfg.IndexPageSizeBytes) indexBytes, err := indexWriter.Write(records) if err != nil { return 0, err diff --git a/tempodb/encoding/complete_block.go b/tempodb/encoding/complete_block.go index 465fc9d695b..19fb9ba6563 100644 --- a/tempodb/encoding/complete_block.go +++ b/tempodb/encoding/complete_block.go @@ -49,7 +49,7 @@ func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iter } defer appendFile.Close() - dataWriter, err := c.encoding.newDataWriter(appendFile, cfg.Encoding) + dataWriter, err := c.encoding.NewDataWriter(appendFile, cfg.Encoding) if err != nil { return nil, err } @@ -117,7 +117,7 @@ func (c *CompleteBlock) Write(ctx context.Context, w backend.Writer) error { return err } - indexWriter := c.encoding.newIndexWriter(c.cfg.IndexPageSizeBytes) + indexWriter := c.encoding.NewIndexWriter(c.cfg.IndexPageSizeBytes) indexBytes, err := indexWriter.Write(c.records) if err != nil { return err @@ -148,13 +148,13 @@ func (c *CompleteBlock) Find(id common.ID, combiner common.ObjectCombiner) ([]by return nil, err } - dataReader, err := c.encoding.newDataReader(backend.NewContextReaderWithAllReader(file), c.meta.Encoding) + dataReader, err := c.encoding.NewDataReader(backend.NewContextReaderWithAllReader(file), c.meta.Encoding) if err != nil { return nil, err } defer dataReader.Close() - finder := NewPagedFinder(common.Records(c.records), dataReader, combiner, c.encoding.newObjectReaderWriter()) + finder := NewPagedFinder(common.Records(c.records), dataReader, combiner, c.encoding.NewObjectReaderWriter()) return finder.Find(context.Background(), id) } diff --git a/tempodb/encoding/complete_block_test.go b/tempodb/encoding/complete_block_test.go index a7de7966aa7..457eb5a697e 100644 --- a/tempodb/encoding/complete_block_test.go +++ b/tempodb/encoding/complete_block_test.go @@ -301,7 +301,7 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa require.NoError(b, err) pr, err := v2.NewDataReader(v0.NewDataReader(backend.NewContextReaderWithAllReader(file)), encoding) require.NoError(b, err) - iterator = newPagedIterator(10*1024*1024, common.Records(cb.records), pr, backendBlock.encoding.newObjectReaderWriter()) + iterator = newPagedIterator(10*1024*1024, common.Records(cb.records), pr, backendBlock.encoding.NewObjectReaderWriter()) for { id, _, err := iterator.Next(context.Background()) diff --git a/tempodb/encoding/versioned.go b/tempodb/encoding/versioned.go index ff9a62a9d71..82a2b3c800b 100644 --- a/tempodb/encoding/versioned.go +++ b/tempodb/encoding/versioned.go @@ -19,14 +19,14 @@ const currentVersion = "v2" type VersionedEncoding interface { Version() string - newDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) - newIndexWriter(pageSizeBytes int) common.IndexWriter + NewDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) + NewIndexWriter(pageSizeBytes int) common.IndexWriter - newDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) - newIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) + NewDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) + NewIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) - newObjectReaderWriter() common.ObjectReaderWriter - newRecordReaderWriter() common.RecordReaderWriter + NewObjectReaderWriter() common.ObjectReaderWriter + NewRecordReaderWriter() common.RecordReaderWriter } // EncodingByVersion returns a versioned encoding for the provided string @@ -63,22 +63,22 @@ type v0Encoding struct{} func (v v0Encoding) Version() string { return "v0" } -func (v v0Encoding) newIndexWriter(pageSizeBytes int) common.IndexWriter { +func (v v0Encoding) NewIndexWriter(pageSizeBytes int) common.IndexWriter { return v0.NewIndexWriter() } -func (v v0Encoding) newDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { +func (v v0Encoding) NewDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { return v0.NewDataWriter(writer), nil // ignore encoding. v0 DataWriter writes raw bytes } -func (v v0Encoding) newIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { +func (v v0Encoding) NewIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { return v0.NewIndexReader(ra) } -func (v v0Encoding) newDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { +func (v v0Encoding) NewDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { return v0.NewDataReader(ra), nil } -func (v v0Encoding) newObjectReaderWriter() common.ObjectReaderWriter { +func (v v0Encoding) NewObjectReaderWriter() common.ObjectReaderWriter { return v0.NewObjectReaderWriter() } -func (v v0Encoding) newRecordReaderWriter() common.RecordReaderWriter { +func (v v0Encoding) NewRecordReaderWriter() common.RecordReaderWriter { return v0.NewRecordReaderWriter() } @@ -88,22 +88,22 @@ type v1Encoding struct{} func (v v1Encoding) Version() string { return "v1" } -func (v v1Encoding) newIndexWriter(pageSizeBytes int) common.IndexWriter { +func (v v1Encoding) NewIndexWriter(pageSizeBytes int) common.IndexWriter { return v1.NewIndexWriter() } -func (v v1Encoding) newDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { +func (v v1Encoding) NewDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { return v1.NewDataWriter(writer, encoding) } -func (v v1Encoding) newIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { +func (v v1Encoding) NewIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { return v1.NewIndexReader(ra) } -func (v v1Encoding) newDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { +func (v v1Encoding) NewDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { return v1.NewDataReader(v0.NewDataReader(ra), encoding) } -func (v v1Encoding) newObjectReaderWriter() common.ObjectReaderWriter { +func (v v1Encoding) NewObjectReaderWriter() common.ObjectReaderWriter { return v1.NewObjectReaderWriter() } -func (v v1Encoding) newRecordReaderWriter() common.RecordReaderWriter { +func (v v1Encoding) NewRecordReaderWriter() common.RecordReaderWriter { return v1.NewRecordReaderWriter() } @@ -113,21 +113,21 @@ type v2Encoding struct{} func (v v2Encoding) Version() string { return "v2" } -func (v v2Encoding) newIndexWriter(pageSizeBytes int) common.IndexWriter { +func (v v2Encoding) NewIndexWriter(pageSizeBytes int) common.IndexWriter { return v2.NewIndexWriter(pageSizeBytes) } -func (v v2Encoding) newDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { +func (v v2Encoding) NewDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { return v2.NewDataWriter(writer, encoding) } -func (v v2Encoding) newIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { +func (v v2Encoding) NewIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { return v2.NewIndexReader(ra, pageSizeBytes, totalPages) } -func (v v2Encoding) newDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { +func (v v2Encoding) NewDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { return v2.NewDataReader(v0.NewDataReader(ra), encoding) } -func (v v2Encoding) newObjectReaderWriter() common.ObjectReaderWriter { +func (v v2Encoding) NewObjectReaderWriter() common.ObjectReaderWriter { return v2.NewObjectReaderWriter() } -func (v v2Encoding) newRecordReaderWriter() common.RecordReaderWriter { +func (v v2Encoding) NewRecordReaderWriter() common.RecordReaderWriter { return v2.NewRecordReaderWriter() } diff --git a/tempodb/encoding/versioned_test.go b/tempodb/encoding/versioned_test.go index 47508c5382b..514ba61cba0 100644 --- a/tempodb/encoding/versioned_test.go +++ b/tempodb/encoding/versioned_test.go @@ -44,7 +44,7 @@ func testDataWriterReader(t *testing.T, v VersionedEncoding, e backend.Encoding) for _, tc := range tests { buff := bytes.NewBuffer([]byte{}) - dataWriter, err := v.newDataWriter(buff, e) + dataWriter, err := v.NewDataWriter(buff, e) require.NoError(t, err) _, err = dataWriter.Write([]byte{0x01}, tc.readerBytes) @@ -57,7 +57,7 @@ func testDataWriterReader(t *testing.T, v VersionedEncoding, e backend.Encoding) require.NoError(t, err) reader := bytes.NewReader(buff.Bytes()) - dataReader, err := v.newDataReader(backend.NewContextReaderWithAllReader(reader), e) + dataReader, err := v.NewDataReader(backend.NewContextReaderWithAllReader(reader), e) require.NoError(t, err) defer dataReader.Close() @@ -70,7 +70,7 @@ func testDataWriterReader(t *testing.T, v VersionedEncoding, e backend.Encoding) require.NoError(t, err) require.Len(t, actual, 1) - i := NewIterator(bytes.NewReader(actual[0]), v.newObjectReaderWriter()) + i := NewIterator(bytes.NewReader(actual[0]), v.NewObjectReaderWriter()) defer i.Close() id, obj, err := i.Next(context.Background()) From 17da83030236a9f990712f5ce6a5118ecf003674 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 7 Apr 2021 14:25:52 -0400 Subject: [PATCH 03/21] use versioned encoding Signed-off-by: Joe Elliott --- tempodb/wal/append_block.go | 20 +++++++++++++++----- tempodb/wal/replay_block.go | 22 ++++++++++++++++++++-- tempodb/wal/wal.go | 11 ++--------- tempodb/wal/wal_test.go | 3 +-- 4 files changed, 38 insertions(+), 18 deletions(-) diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 8c409256f0d..24827e5dbf8 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -8,7 +8,6 @@ import ( "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" - v0 "github.com/grafana/tempo/tempodb/encoding/v0" ) // these values should never be used. these dummy values can help detect @@ -21,13 +20,16 @@ const appendBlockEncoding = backend.EncNone // in the order it was received and an in memory sorted index. type AppendBlock struct { block + encoding encoding.VersionedEncoding appendFile *os.File appender encoding.Appender } func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBlock, error) { + v, _ := encoding.EncodingByVersion("v0") h := &AppendBlock{ + encoding: v, block: block{ meta: backend.NewBlockMeta(tenantID, id, appendBlockVersion, appendBlockEncoding), filepath: filepath, @@ -41,7 +43,12 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBloc return nil, err } h.appendFile = f - h.appender = encoding.NewAppender(v0.NewDataWriter(f)) + + dataWriter, err := h.encoding.NewDataWriter(f, appendBlockEncoding) + if err != nil { + return nil, err + } + h.appender = encoding.NewAppender(dataWriter) return h, nil } @@ -82,7 +89,7 @@ func (h *AppendBlock) Complete(cfg *encoding.BlockConfig, w *WAL, combiner commo return nil, err } - iterator := encoding.NewRecordIterator(records, readFile, v0.NewObjectReaderWriter()) + iterator := encoding.NewRecordIterator(records, readFile, h.encoding.NewObjectReaderWriter()) iterator, err = encoding.NewDedupingIterator(iterator, combiner) if err != nil { return nil, err @@ -104,9 +111,12 @@ func (h *AppendBlock) Find(id common.ID, combiner common.ObjectCombiner) ([]byte return nil, err } - dataReader := v0.NewDataReader(backend.NewContextReaderWithAllReader(file)) + dataReader, err := h.encoding.NewDataReader(backend.NewContextReaderWithAllReader(file), appendBlockEncoding) + if err != nil { + return nil, err + } defer dataReader.Close() - finder := encoding.NewPagedFinder(common.Records(records), dataReader, combiner, v0.NewObjectReaderWriter()) + finder := encoding.NewPagedFinder(common.Records(records), dataReader, combiner, h.encoding.NewObjectReaderWriter()) return finder.Find(context.Background(), id) } diff --git a/tempodb/wal/replay_block.go b/tempodb/wal/replay_block.go index f3457b05ad1..4fd683c9563 100644 --- a/tempodb/wal/replay_block.go +++ b/tempodb/wal/replay_block.go @@ -3,12 +3,30 @@ package wal import ( "os" + "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" - v0 "github.com/grafana/tempo/tempodb/encoding/v0" ) type ReplayBlock struct { block + encoding encoding.VersionedEncoding +} + +func NewReplayBlock(filename string, path string) (*ReplayBlock, error) { + v, _ := encoding.EncodingByVersion("v1") // jpe :( + + blockID, tenantID, err := parseFilename(filename) + if err != nil { + return nil, err + } + + return &ReplayBlock{ + block: block{ + meta: backend.NewBlockMeta(tenantID, blockID, appendBlockVersion, appendBlockEncoding), + filepath: path, + }, + encoding: v, // jpe actually get the right one + }, nil } func (r *ReplayBlock) Iterator() (encoding.Iterator, error) { @@ -18,7 +36,7 @@ func (r *ReplayBlock) Iterator() (encoding.Iterator, error) { return nil, err } - return encoding.NewIterator(f, v0.NewObjectReaderWriter()), nil + return encoding.NewIterator(f, r.encoding.NewObjectReaderWriter()), nil } func (r *ReplayBlock) TenantID() string { diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index 6f4978e5633..0fa94c51bf7 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -8,7 +8,6 @@ import ( "strings" "github.com/google/uuid" - "github.com/grafana/tempo/tempodb/backend" ) const ( @@ -66,18 +65,12 @@ func (w *WAL) AllBlocks() ([]*ReplayBlock, error) { continue } - name := f.Name() - blockID, tenantID, err := parseFilename(name) + r, err := NewReplayBlock(f.Name(), w.c.Filepath) if err != nil { return nil, err } - blocks = append(blocks, &ReplayBlock{ - block: block{ - meta: backend.NewBlockMeta(tenantID, blockID, appendBlockVersion, appendBlockEncoding), - filepath: w.c.Filepath, - }, - }) + blocks = append(blocks, r) } return blocks, nil diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index 6efd8c05587..052e462262d 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -16,7 +16,6 @@ import ( "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" - v0 "github.com/grafana/tempo/tempodb/encoding/v0" ) const ( @@ -115,7 +114,7 @@ func TestAppend(t *testing.T) { records := block.appender.Records() file, err := block.file() assert.NoError(t, err) - iterator := encoding.NewRecordIterator(records, file, v0.NewObjectReaderWriter()) + iterator := encoding.NewRecordIterator(records, file, block.encoding.NewObjectReaderWriter()) i := 0 for { From 12c1747e00e6253f17f9a2b20c98947b3c56345b Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 7 Apr 2021 14:37:51 -0400 Subject: [PATCH 04/21] Handle v1 encoding Signed-off-by: Joe Elliott --- tempodb/encoding/appender.go | 12 +++++++++--- tempodb/wal/append_block.go | 1 + tempodb/wal/replay_block.go | 2 +- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/tempodb/encoding/appender.go b/tempodb/encoding/appender.go index 7f8bf04d4b8..150f2505c99 100644 --- a/tempodb/encoding/appender.go +++ b/tempodb/encoding/appender.go @@ -33,7 +33,12 @@ func NewAppender(dataWriter common.DataWriter) Appender { // Append appends the id/object to the writer. Note that the caller is giving up ownership of the two byte arrays backing the slices. // Copies should be made and passed in if this is a problem func (a *appender) Append(id common.ID, b []byte) error { - length, err := a.dataWriter.Write(id, b) + _, err := a.dataWriter.Write(id, b) + if err != nil { + return err + } + + bytesWritten, err := a.dataWriter.CutPage() if err != nil { return err } @@ -46,10 +51,11 @@ func (a *appender) Append(id common.ID, b []byte) error { a.records[i] = &common.Record{ ID: id, Start: a.currentOffset, - Length: uint32(length), + Length: uint32(bytesWritten), } - a.currentOffset += uint64(length) + a.currentOffset += uint64(bytesWritten) + return nil } diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 24827e5dbf8..71a80aa7c39 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -48,6 +48,7 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBloc if err != nil { return nil, err } + h.appender = encoding.NewAppender(dataWriter) return h, nil diff --git a/tempodb/wal/replay_block.go b/tempodb/wal/replay_block.go index 4fd683c9563..d7b84a54bb7 100644 --- a/tempodb/wal/replay_block.go +++ b/tempodb/wal/replay_block.go @@ -13,7 +13,7 @@ type ReplayBlock struct { } func NewReplayBlock(filename string, path string) (*ReplayBlock, error) { - v, _ := encoding.EncodingByVersion("v1") // jpe :( + v, _ := encoding.EncodingByVersion("v0") // jpe blockID, tenantID, err := parseFilename(filename) if err != nil { From bd27012e34a76b4a09a615567742163a177bb19a Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Wed, 7 Apr 2021 14:54:21 -0400 Subject: [PATCH 05/21] working with v2 - kind of Signed-off-by: Joe Elliott --- tempodb/encoding/complete_block_test.go | 2 +- tempodb/encoding/iterator_record.go | 21 ++++++++++++--------- tempodb/wal/append_block.go | 11 ++++++++--- tempodb/wal/replay_block.go | 2 +- tempodb/wal/wal_test.go | 6 +++++- 5 files changed, 27 insertions(+), 15 deletions(-) diff --git a/tempodb/encoding/complete_block_test.go b/tempodb/encoding/complete_block_test.go index 457eb5a697e..eb8582031f1 100644 --- a/tempodb/encoding/complete_block_test.go +++ b/tempodb/encoding/complete_block_test.go @@ -198,7 +198,7 @@ func completeBlock(t *testing.T, cfg *BlockConfig, tempDir string) (*CompleteBlo expectedRecords++ } - iterator := NewRecordIterator(appender.Records(), bytes.NewReader(buffer.Bytes()), v0.NewObjectReaderWriter()) + iterator := NewRecordIterator(appender.Records(), v0.NewDataReader(backend.NewContextReaderWithAllReader(bytes.NewReader(buffer.Bytes()))), v0.NewObjectReaderWriter()) block, err := NewCompleteBlock(cfg, originatingMeta, iterator, numMsgs, tempDir) require.NoError(t, err, "unexpected error completing block") diff --git a/tempodb/encoding/iterator_record.go b/tempodb/encoding/iterator_record.go index d2dfba900e5..978f367103a 100644 --- a/tempodb/encoding/iterator_record.go +++ b/tempodb/encoding/iterator_record.go @@ -3,26 +3,27 @@ package encoding import ( "bytes" "context" - "io" + "errors" "github.com/grafana/tempo/tempodb/encoding/common" ) type recordIterator struct { - records []*common.Record - ra io.ReaderAt + records []*common.Record + objectRW common.ObjectReaderWriter + dataR common.DataReader currentIterator Iterator } // NewRecordIterator returns a recordIterator. This iterator is used for iterating through // a series of objects by reading them one at a time from Records. -func NewRecordIterator(r []*common.Record, ra io.ReaderAt, objectRW common.ObjectReaderWriter) Iterator { +func NewRecordIterator(r []*common.Record, dataR common.DataReader, objectRW common.ObjectReaderWriter) Iterator { return &recordIterator{ records: r, - ra: ra, objectRW: objectRW, + dataR: dataR, } } @@ -39,14 +40,15 @@ func (i *recordIterator) Next(ctx context.Context) (common.ID, []byte, error) { // read the next record and create an iterator if len(i.records) > 0 { - record := i.records[0] - - buff := make([]byte, record.Length) - _, err := i.ra.ReadAt(buff, int64(record.Start)) + pages, err := i.dataR.Read(ctx, i.records[:1]) if err != nil { return nil, nil, err } + if len(pages) == 0 { + return nil, nil, errors.New("unexpected 0 length pages from dataReader") + } + buff := pages[0] i.currentIterator = NewIterator(bytes.NewReader(buff), i.objectRW) i.records = i.records[1:] @@ -58,4 +60,5 @@ func (i *recordIterator) Next(ctx context.Context) (common.ID, []byte, error) { } func (i *recordIterator) Close() { + i.dataR.Close() } diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 71a80aa7c39..3dd58cf8181 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -14,7 +14,7 @@ import ( // if they leak elsewhere. append blocks are not versioned and do not // support different encodings const appendBlockVersion = "append" -const appendBlockEncoding = backend.EncNone +const appendBlockEncoding = backend.EncNone // jpe make configurable // AppendBlock is a block that is actively used to append new objects to. It stores all data in the appendFile // in the order it was received and an in memory sorted index. @@ -27,7 +27,7 @@ type AppendBlock struct { } func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBlock, error) { - v, _ := encoding.EncodingByVersion("v0") + v, _ := encoding.EncodingByVersion("v2") h := &AppendBlock{ encoding: v, block: block{ @@ -90,7 +90,12 @@ func (h *AppendBlock) Complete(cfg *encoding.BlockConfig, w *WAL, combiner commo return nil, err } - iterator := encoding.NewRecordIterator(records, readFile, h.encoding.NewObjectReaderWriter()) + dataReader, err := h.encoding.NewDataReader(backend.NewContextReaderWithAllReader(readFile), appendBlockEncoding) + if err != nil { + return nil, err + } + + iterator := encoding.NewRecordIterator(records, dataReader, h.encoding.NewObjectReaderWriter()) iterator, err = encoding.NewDedupingIterator(iterator, combiner) if err != nil { return nil, err diff --git a/tempodb/wal/replay_block.go b/tempodb/wal/replay_block.go index d7b84a54bb7..a63ad727500 100644 --- a/tempodb/wal/replay_block.go +++ b/tempodb/wal/replay_block.go @@ -13,7 +13,7 @@ type ReplayBlock struct { } func NewReplayBlock(filename string, path string) (*ReplayBlock, error) { - v, _ := encoding.EncodingByVersion("v0") // jpe + v, _ := encoding.EncodingByVersion("v2") // jpe - derps mcgurps, iterate over datareader? blockID, tenantID, err := parseFilename(filename) if err != nil { diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index 052e462262d..a7925a50712 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -114,7 +114,11 @@ func TestAppend(t *testing.T) { records := block.appender.Records() file, err := block.file() assert.NoError(t, err) - iterator := encoding.NewRecordIterator(records, file, block.encoding.NewObjectReaderWriter()) + + dataReader, err := block.encoding.NewDataReader(backend.NewContextReaderWithAllReader(file), appendBlockEncoding) + assert.NoError(t, err) + iterator := encoding.NewRecordIterator(records, dataReader, block.encoding.NewObjectReaderWriter()) + defer iterator.Close() i := 0 for { From 6567f68f299636f36b173672b2e2f79fa9d07072 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 8 Apr 2021 09:52:19 -0400 Subject: [PATCH 06/21] Added filename tests Signed-off-by: Joe Elliott --- tempodb/wal/append_block.go | 2 +- tempodb/wal/block.go | 27 ++++++++++- tempodb/wal/block_test.go | 97 +++++++++++++++++++++++++++++++++++++ tempodb/wal/replay_block.go | 2 +- tempodb/wal/wal.go | 19 -------- 5 files changed, 125 insertions(+), 22 deletions(-) create mode 100644 tempodb/wal/block_test.go diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 3dd58cf8181..1bb31eae75a 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -27,7 +27,7 @@ type AppendBlock struct { } func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBlock, error) { - v, _ := encoding.EncodingByVersion("v2") + v, _ := encoding.EncodingByVersion("v2") // jpe check error? get the right one h := &AppendBlock{ encoding: v, block: block{ diff --git a/tempodb/wal/block.go b/tempodb/wal/block.go index 8f06ae93449..707f31501a6 100644 --- a/tempodb/wal/block.go +++ b/tempodb/wal/block.go @@ -3,8 +3,11 @@ package wal import ( "fmt" "os" + "path/filepath" + "strings" "sync" + "github.com/google/uuid" "github.com/grafana/tempo/tempodb/backend" ) @@ -17,7 +20,7 @@ type block struct { } func (b *block) fullFilename() string { - return fmt.Sprintf("%s/%v:%v", b.filepath, b.meta.BlockID, b.meta.TenantID) + return filepath.Join(b.filepath, fmt.Sprintf("%v:%v", b.meta.BlockID, b.meta.TenantID)) } func (b *block) file() (*os.File, error) { @@ -32,3 +35,25 @@ func (b *block) file() (*os.File, error) { return b.readFile, err } + +func parseFilename(name string) (uuid.UUID, string, error) { + i := strings.Index(name, ":") + + if i < 0 { + return uuid.UUID{}, "", fmt.Errorf("unable to parse %s. no colon", name) + } + + blockIDString := name[:i] + tenantID := name[i+1:] + + blockID, err := uuid.Parse(blockIDString) + if err != nil { + return uuid.UUID{}, "", err + } + + if len(tenantID) == 0 { + return uuid.UUID{}, "", fmt.Errorf("unable to parse %s. empty tenant", name) + } + + return blockID, tenantID, nil +} diff --git a/tempodb/wal/block_test.go b/tempodb/wal/block_test.go new file mode 100644 index 00000000000..6514b31abad --- /dev/null +++ b/tempodb/wal/block_test.go @@ -0,0 +1,97 @@ +package wal + +import ( + "testing" + + "github.com/google/uuid" + "github.com/grafana/tempo/tempodb/backend" + "github.com/stretchr/testify/assert" +) + +func TestFullFilename(t *testing.T) { + tests := []struct { + name string + b *block + expected string + }{ + { + name: "ez-mode", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v1", backend.EncNone), + filepath: "/blerg", + }, + expected: "/blerg/123e4567-e89b-12d3-a456-426614174000:foo", + }, + { + name: "nopath", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v1", backend.EncNone), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expected, tc.b.fullFilename()) + }) + } +} + +func TestParseFilename(t *testing.T) { + tests := []struct { + name string + filename string + expectUUID uuid.UUID + expectTenant string + expectError bool + }{ + { + name: "ez-mode", + filename: "123e4567-e89b-12d3-a456-426614174000:foo", + expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), + expectTenant: "foo", + }, + { + name: "path fails", + filename: "/blerg/123e4567-e89b-12d3-a456-426614174000:foo", + expectError: true, + }, + { + name: "no :", + filename: "123e4567-e89b-12d3-a456-426614174000", + expectError: true, + }, + { + name: "empty string", + filename: "", + expectError: true, + }, + { + name: "bad uuid", + filename: "123e4:foo", + expectError: true, + }, + { + name: "no tenant", + filename: "123e4567-e89b-12d3-a456-426614174000:", + expectError: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + actualUUID, actualTenant, err := parseFilename(tc.filename) + + if tc.expectError { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + assert.Equal(t, tc.expectUUID, actualUUID) + assert.Equal(t, tc.expectTenant, actualTenant) + }) + } +} diff --git a/tempodb/wal/replay_block.go b/tempodb/wal/replay_block.go index a63ad727500..31bae8697d8 100644 --- a/tempodb/wal/replay_block.go +++ b/tempodb/wal/replay_block.go @@ -15,7 +15,7 @@ type ReplayBlock struct { func NewReplayBlock(filename string, path string) (*ReplayBlock, error) { v, _ := encoding.EncodingByVersion("v2") // jpe - derps mcgurps, iterate over datareader? - blockID, tenantID, err := parseFilename(filename) + blockID, tenantID, err := parseFilename(filename) // jpe add version/encoding here if err != nil { return nil, err } diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index 0fa94c51bf7..40ac0abe11b 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -5,7 +5,6 @@ import ( "io/ioutil" "os" "path/filepath" - "strings" "github.com/google/uuid" ) @@ -79,21 +78,3 @@ func (w *WAL) AllBlocks() ([]*ReplayBlock, error) { func (w *WAL) NewBlock(id uuid.UUID, tenantID string) (*AppendBlock, error) { return newAppendBlock(id, tenantID, w.c.Filepath) } - -func parseFilename(name string) (uuid.UUID, string, error) { - i := strings.Index(name, ":") - - if i < 0 { - return uuid.UUID{}, "", fmt.Errorf("unable to parse %s", name) - } - - blockIDString := name[:i] - tenantID := name[i+1:] - - blockID, err := uuid.Parse(blockIDString) - if err != nil { - return uuid.UUID{}, "", err - } - - return blockID, tenantID, nil -} From 8258eb8144d0cd378d10c51c6d290e30897a5e78 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 8 Apr 2021 09:55:54 -0400 Subject: [PATCH 07/21] Added block tests Signed-off-by: Joe Elliott --- tempodb/wal/block_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tempodb/wal/block_test.go b/tempodb/wal/block_test.go index 6514b31abad..ba9f1c0de4a 100644 --- a/tempodb/wal/block_test.go +++ b/tempodb/wal/block_test.go @@ -34,7 +34,8 @@ func TestFullFilename(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - assert.Equal(t, tc.expected, tc.b.fullFilename()) + actual := tc.b.fullFilename() + assert.Equal(t, tc.expected, actual) }) } } From 7c8838d64d4efd88792661781fc3cf8f942750e2 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 8 Apr 2021 09:56:24 -0400 Subject: [PATCH 08/21] Added block tests Signed-off-by: Joe Elliott --- tempodb/wal/block.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tempodb/wal/block.go b/tempodb/wal/block.go index 707f31501a6..36e3b8155e9 100644 --- a/tempodb/wal/block.go +++ b/tempodb/wal/block.go @@ -20,7 +20,7 @@ type block struct { } func (b *block) fullFilename() string { - return filepath.Join(b.filepath, fmt.Sprintf("%v:%v", b.meta.BlockID, b.meta.TenantID)) + return filepath.Join(b.filepath, fmt.Sprintf("%v:%v:%v:%v", b.meta.BlockID, b.meta.TenantID, b.meta.Version, b.meta.Encoding)) } func (b *block) file() (*os.File, error) { From c119b1932d099ec373103805bc42b865becbc053 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 8 Apr 2021 14:33:13 -0400 Subject: [PATCH 09/21] Improved parsefilename Signed-off-by: Joe Elliott --- tempodb/wal/append_block.go | 6 +- tempodb/wal/block.go | 32 +++++++--- tempodb/wal/block_test.go | 119 ++++++++++++++++++++++++++++++++---- tempodb/wal/replay_block.go | 9 ++- 4 files changed, 138 insertions(+), 28 deletions(-) diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 1bb31eae75a..21159446a8b 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -13,7 +13,6 @@ import ( // these values should never be used. these dummy values can help detect // if they leak elsewhere. append blocks are not versioned and do not // support different encodings -const appendBlockVersion = "append" const appendBlockEncoding = backend.EncNone // jpe make configurable // AppendBlock is a block that is actively used to append new objects to. It stores all data in the appendFile @@ -27,11 +26,12 @@ type AppendBlock struct { } func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBlock, error) { - v, _ := encoding.EncodingByVersion("v2") // jpe check error? get the right one + v := encoding.LatestEncoding() + h := &AppendBlock{ encoding: v, block: block{ - meta: backend.NewBlockMeta(tenantID, id, appendBlockVersion, appendBlockEncoding), + meta: backend.NewBlockMeta(tenantID, id, v.Version(), appendBlockEncoding), filepath: filepath, }, } diff --git a/tempodb/wal/block.go b/tempodb/wal/block.go index 36e3b8155e9..7f15a8cdedf 100644 --- a/tempodb/wal/block.go +++ b/tempodb/wal/block.go @@ -36,24 +36,36 @@ func (b *block) file() (*os.File, error) { return b.readFile, err } -func parseFilename(name string) (uuid.UUID, string, error) { - i := strings.Index(name, ":") +func parseFilename(name string) (uuid.UUID, string, string, backend.Encoding, error) { + splits := strings.Split(name, ":") - if i < 0 { - return uuid.UUID{}, "", fmt.Errorf("unable to parse %s. no colon", name) + if len(splits) != 2 && len(splits) != 4 { + return uuid.UUID{}, "", "", backend.EncNone, fmt.Errorf("unable to parse %s. unexpected number of segments", name) } - blockIDString := name[:i] - tenantID := name[i+1:] + blockIDString := splits[0] + tenantID := splits[1] + + version := "v0" + encodingString := backend.EncNone.String() + if len(splits) == 4 { + version = splits[2] + encodingString = splits[3] + } blockID, err := uuid.Parse(blockIDString) if err != nil { - return uuid.UUID{}, "", err + return uuid.UUID{}, "", "", backend.EncNone, fmt.Errorf("unable to parse %s. error parsing uuid: %w", name, err) + } + + encoding, err := backend.ParseEncoding(encodingString) + if err != nil { + return uuid.UUID{}, "", "", backend.EncNone, fmt.Errorf("unable to parse %s. error parsing encoding: %w", name, err) } - if len(tenantID) == 0 { - return uuid.UUID{}, "", fmt.Errorf("unable to parse %s. empty tenant", name) + if len(tenantID) == 0 || len(version) == 0 { + return uuid.UUID{}, "", "", backend.EncNone, fmt.Errorf("unable to parse %s. missing fields", name) } - return blockID, tenantID, nil + return blockID, tenantID, version, encoding, nil } diff --git a/tempodb/wal/block_test.go b/tempodb/wal/block_test.go index ba9f1c0de4a..d4b2aaad7b5 100644 --- a/tempodb/wal/block_test.go +++ b/tempodb/wal/block_test.go @@ -20,7 +20,7 @@ func TestFullFilename(t *testing.T) { meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v1", backend.EncNone), filepath: "/blerg", }, - expected: "/blerg/123e4567-e89b-12d3-a456-426614174000:foo", + expected: "/blerg/123e4567-e89b-12d3-a456-426614174000:foo:v1:none", }, { name: "nopath", @@ -28,7 +28,63 @@ func TestFullFilename(t *testing.T) { meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v1", backend.EncNone), filepath: "", }, - expected: "123e4567-e89b-12d3-a456-426614174000:foo", + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v1:none", + }, + { + name: "gzip", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncGZIP), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:gzip", + }, + { + name: "lz41M", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncLZ4_1M), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:lz4-1M", + }, + { + name: "lz4256k", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncLZ4_256k), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:lz4-256k", + }, + { + name: "lz4M", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncLZ4_4M), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:lz4", + }, + { + name: "lz64k", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncLZ4_64k), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:lz4-64k", + }, + { + name: "snappy", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncSnappy), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:snappy", + }, + { + name: "zstd", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncZstd), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:zstd", }, } @@ -42,17 +98,29 @@ func TestFullFilename(t *testing.T) { func TestParseFilename(t *testing.T) { tests := []struct { - name string - filename string - expectUUID uuid.UUID - expectTenant string - expectError bool + name string + filename string + expectUUID uuid.UUID + expectTenant string + expectedVersion string + expectedEncoding backend.Encoding + expectError bool }{ { - name: "ez-mode", - filename: "123e4567-e89b-12d3-a456-426614174000:foo", - expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), - expectTenant: "foo", + name: "ez-mode", + filename: "123e4567-e89b-12d3-a456-426614174000:foo", + expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), + expectTenant: "foo", + expectedVersion: "v0", + expectedEncoding: backend.EncNone, + }, + { + name: "version and encoding", + filename: "123e4567-e89b-12d3-a456-426614174000:foo:v1:snappy", + expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), + expectTenant: "foo", + expectedVersion: "v1", + expectedEncoding: backend.EncSnappy, }, { name: "path fails", @@ -79,11 +147,36 @@ func TestParseFilename(t *testing.T) { filename: "123e4567-e89b-12d3-a456-426614174000:", expectError: true, }, + { + name: "no version", + filename: "123e4567-e89b-12d3-a456-426614174000:test::none", + expectError: true, + }, + { + name: "wrong splits - 5", + filename: "123e4567-e89b-12d3-a456-426614174000:test:test:test:test", + expectError: true, + }, + { + name: "wrong splits - 3", + filename: "123e4567-e89b-12d3-a456-426614174000:test:test", + expectError: true, + }, + { + name: "wrong splits - 1", + filename: "123e4567-e89b-12d3-a456-426614174000", + expectError: true, + }, + { + name: "bad encoding", + filename: "123e4567-e89b-12d3-a456-426614174000:test:v1:asdf", + expectError: true, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - actualUUID, actualTenant, err := parseFilename(tc.filename) + actualUUID, actualTenant, actualVersion, actualEncoding, err := parseFilename(tc.filename) if tc.expectError { assert.Error(t, err) @@ -93,6 +186,8 @@ func TestParseFilename(t *testing.T) { assert.NoError(t, err) assert.Equal(t, tc.expectUUID, actualUUID) assert.Equal(t, tc.expectTenant, actualTenant) + assert.Equal(t, tc.expectedEncoding, actualEncoding) + assert.Equal(t, tc.expectedVersion, actualVersion) }) } } diff --git a/tempodb/wal/replay_block.go b/tempodb/wal/replay_block.go index 31bae8697d8..022090379d3 100644 --- a/tempodb/wal/replay_block.go +++ b/tempodb/wal/replay_block.go @@ -13,16 +13,19 @@ type ReplayBlock struct { } func NewReplayBlock(filename string, path string) (*ReplayBlock, error) { - v, _ := encoding.EncodingByVersion("v2") // jpe - derps mcgurps, iterate over datareader? + blockID, tenantID, version, e, err := parseFilename(filename) + if err != nil { + return nil, err + } - blockID, tenantID, err := parseFilename(filename) // jpe add version/encoding here + v, err := encoding.EncodingByVersion(version) if err != nil { return nil, err } return &ReplayBlock{ block: block{ - meta: backend.NewBlockMeta(tenantID, blockID, appendBlockVersion, appendBlockEncoding), + meta: backend.NewBlockMeta(tenantID, blockID, version, e), filepath: path, }, encoding: v, // jpe actually get the right one From 0362400f90f6a8dec9854f8cffe1f3aa226adf17 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 8 Apr 2021 17:05:48 -0400 Subject: [PATCH 10/21] add backend/encoding support for wal iteration. get v0 working Signed-off-by: Joe Elliott --- tempodb/backend/readerat.go | 15 +++++++++++++++ tempodb/encoding/common/types.go | 7 +++++++ tempodb/encoding/v0/data_reader.go | 27 +++++++++++++++++++++++++++ tempodb/encoding/v0/object.go | 2 +- tempodb/encoding/v1/data_reader.go | 5 +++++ tempodb/encoding/v2/data_reader.go | 6 ++++++ tempodb/wal/append_block.go | 2 +- tempodb/wal/replay_block.go | 9 +++++++-- 8 files changed, 69 insertions(+), 4 deletions(-) diff --git a/tempodb/backend/readerat.go b/tempodb/backend/readerat.go index 401b58416c2..72457305a87 100644 --- a/tempodb/backend/readerat.go +++ b/tempodb/backend/readerat.go @@ -4,6 +4,8 @@ import ( "context" "io" "io/ioutil" + + "github.com/grafana/tempo/tempodb/encoding/common" ) // ContextReader is an io.ReaderAt interface that passes context. It is used to simplify access to backend objects @@ -11,6 +13,9 @@ import ( type ContextReader interface { ReadAt(ctx context.Context, p []byte, off int64) (int, error) ReadAll(ctx context.Context) ([]byte, error) + + // Return an io.Reader representing the underlying. May not be supported by all implementations + Reader() (io.Reader, error) } // backendReader is a shim that allows a backend.Reader to be used as a ContextReader @@ -40,6 +45,11 @@ func (b *backendReader) ReadAll(ctx context.Context) ([]byte, error) { return b.r.Read(ctx, b.name, b.meta.BlockID, b.meta.TenantID) } +// Reader implements ContextReader +func (b *backendReader) Reader() (io.Reader, error) { + return nil, common.ErrUnsupported +} + // AllReader is an interface that supports both io.Reader and io.ReaderAt methods type AllReader interface { io.Reader @@ -67,3 +77,8 @@ func (r *allReader) ReadAt(ctx context.Context, p []byte, off int64) (int, error func (r *allReader) ReadAll(ctx context.Context) ([]byte, error) { return ioutil.ReadAll(r.r) } + +// Reader implements ContextReader +func (r *allReader) Reader() (io.Reader, error) { + return r.r, nil +} diff --git a/tempodb/encoding/common/types.go b/tempodb/encoding/common/types.go index 009103ec1be..69c6378f68c 100644 --- a/tempodb/encoding/common/types.go +++ b/tempodb/encoding/common/types.go @@ -2,11 +2,15 @@ package common import ( "context" + "fmt" "io" ) // This file contains types that need to be referenced by both the ./encoding and ./encoding/vX packages. // It primarily exists here to break dependency loops. +var ( + ErrUnsupported = fmt.Errorf("meta does not exist") +) // ID in TempoDB type ID []byte @@ -31,6 +35,9 @@ type ObjectCombiner interface { type DataReader interface { Read(context.Context, []*Record) ([][]byte, error) Close() + + // NextPage can be used to iterate at a page at a time. May return ErrUnsupported for older formats + NextPage() ([]byte, error) } // IndexReader is used to abstract away the details of an index. Currently diff --git a/tempodb/encoding/v0/data_reader.go b/tempodb/encoding/v0/data_reader.go index baec5f7199c..b0e29588963 100644 --- a/tempodb/encoding/v0/data_reader.go +++ b/tempodb/encoding/v0/data_reader.go @@ -2,6 +2,7 @@ package v0 import ( "context" + "encoding/binary" "fmt" "github.com/grafana/tempo/tempodb/backend" @@ -64,5 +65,31 @@ func (r *dataReader) Read(ctx context.Context, records []*common.Record) ([][]by return slicePages, nil } +// Close implements common.DataReader func (r *dataReader) Close() { } + +// NextPage implements common.DataReader +func (r *dataReader) NextPage() ([]byte, error) { // jpe test + reader, err := r.r.Reader() + if err != nil { + return nil, err + } + + // v0 pages are just single objects. this method will return one object at a time from the encapsulated reader + var totalLength uint32 + err = binary.Read(reader, binary.LittleEndian, &totalLength) + if err != nil { + return nil, err + } + + page := make([]byte, totalLength) + binary.LittleEndian.PutUint32(page, totalLength) + + _, err = reader.Read(page[uint32Size:]) + if err != nil { + return nil, err + } + + return page, nil +} diff --git a/tempodb/encoding/v0/object.go b/tempodb/encoding/v0/object.go index 82e87a88c1b..43054489430 100644 --- a/tempodb/encoding/v0/object.go +++ b/tempodb/encoding/v0/object.go @@ -54,7 +54,7 @@ func (object) MarshalObjectToWriter(id common.ID, b []byte, w io.Writer) (int, e func (object) UnmarshalObjectFromReader(r io.Reader) (common.ID, []byte, error) { var totalLength uint32 err := binary.Read(r, binary.LittleEndian, &totalLength) - if err == io.EOF { + if err == io.EOF { // jpe this needs to return io.EOF return nil, nil, nil } else if err != nil { return nil, nil, err diff --git a/tempodb/encoding/v1/data_reader.go b/tempodb/encoding/v1/data_reader.go index c266821132b..541536e50a6 100644 --- a/tempodb/encoding/v1/data_reader.go +++ b/tempodb/encoding/v1/data_reader.go @@ -69,3 +69,8 @@ func (r *dataReader) Close() { r.pool.PutReader(r.compressedReader) } } + +// NextPage implements common.DataReader (kind of) +func (r *dataReader) NextPage() ([]byte, error) { + return nil, common.ErrUnsupported +} diff --git a/tempodb/encoding/v2/data_reader.go b/tempodb/encoding/v2/data_reader.go index 55dba6a8351..b56a9e7087d 100644 --- a/tempodb/encoding/v2/data_reader.go +++ b/tempodb/encoding/v2/data_reader.go @@ -32,6 +32,7 @@ func NewDataReader(r common.DataReader, encoding backend.Encoding) (common.DataR return v1DataReader, nil } +// Read implements common.DataReader func (r *dataReader) Read(ctx context.Context, records []*common.Record) ([][]byte, error) { v0Pages, err := r.dataReader.Read(ctx, records) if err != nil { @@ -54,3 +55,8 @@ func (r *dataReader) Read(ctx context.Context, records []*common.Record) ([][]by func (r *dataReader) Close() { r.dataReader.Close() } + +// NextPage implements common.DataReader (kind of) +func (r *dataReader) NextPage() ([]byte, error) { + return nil, common.ErrUnsupported // jpe add support +} diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 21159446a8b..0e980e14474 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -26,7 +26,7 @@ type AppendBlock struct { } func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBlock, error) { - v := encoding.LatestEncoding() + v := encoding.LatestEncoding() // jpe get to work with latest encoding h := &AppendBlock{ encoding: v, diff --git a/tempodb/wal/replay_block.go b/tempodb/wal/replay_block.go index 022090379d3..1645cba17f2 100644 --- a/tempodb/wal/replay_block.go +++ b/tempodb/wal/replay_block.go @@ -28,7 +28,7 @@ func NewReplayBlock(filename string, path string) (*ReplayBlock, error) { meta: backend.NewBlockMeta(tenantID, blockID, version, e), filepath: path, }, - encoding: v, // jpe actually get the right one + encoding: v, }, nil } @@ -39,7 +39,12 @@ func (r *ReplayBlock) Iterator() (encoding.Iterator, error) { return nil, err } - return encoding.NewIterator(f, r.encoding.NewObjectReaderWriter()), nil + dataReader, err := r.encoding.NewDataReader(backend.NewContextReaderWithAllReader(f), r.meta.Encoding) + if err != nil { + return nil, err + } + + return encoding.NewWALIterator(dataReader, r.encoding.NewObjectReaderWriter()), nil } func (r *ReplayBlock) TenantID() string { From 17120fff64ceb5b3bd90803c6db34f110ad5a808 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 8 Apr 2021 17:06:30 -0400 Subject: [PATCH 11/21] comment Signed-off-by: Joe Elliott --- tempodb/encoding/v1/data_reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tempodb/encoding/v1/data_reader.go b/tempodb/encoding/v1/data_reader.go index 541536e50a6..50abd5df205 100644 --- a/tempodb/encoding/v1/data_reader.go +++ b/tempodb/encoding/v1/data_reader.go @@ -72,5 +72,5 @@ func (r *dataReader) Close() { // NextPage implements common.DataReader (kind of) func (r *dataReader) NextPage() ([]byte, error) { - return nil, common.ErrUnsupported + return nil, common.ErrUnsupported // v1 can never support this b/c the pages don't have any header information attached. v1 data blocks can only be accessed by record } From a83e2f0f724608002ab292874b90cf67bea80550 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 9 Apr 2021 09:10:23 -0400 Subject: [PATCH 12/21] v2 wal Signed-off-by: Joe Elliott --- tempodb/encoding/common/types.go | 2 +- tempodb/encoding/complete_block_test.go | 2 +- tempodb/encoding/iterator_wal.go | 44 ++++++++++++++++++ tempodb/encoding/v1/data_reader.go | 42 +++++++++++++---- tempodb/encoding/v1/data_reader_test.go | 3 +- tempodb/encoding/v2/data_reader.go | 25 ++++++++--- tempodb/encoding/v2/page.go | 36 +++++++++++++++ tempodb/encoding/versioned.go | 4 +- tempodb/wal/wal_test.go | 60 ++++++++++++++++++++++++- 9 files changed, 196 insertions(+), 22 deletions(-) create mode 100644 tempodb/encoding/iterator_wal.go diff --git a/tempodb/encoding/common/types.go b/tempodb/encoding/common/types.go index 69c6378f68c..2a4a01d9f53 100644 --- a/tempodb/encoding/common/types.go +++ b/tempodb/encoding/common/types.go @@ -9,7 +9,7 @@ import ( // This file contains types that need to be referenced by both the ./encoding and ./encoding/vX packages. // It primarily exists here to break dependency loops. var ( - ErrUnsupported = fmt.Errorf("meta does not exist") + ErrUnsupported = fmt.Errorf("unsupported") ) // ID in TempoDB diff --git a/tempodb/encoding/complete_block_test.go b/tempodb/encoding/complete_block_test.go index eb8582031f1..814604023be 100644 --- a/tempodb/encoding/complete_block_test.go +++ b/tempodb/encoding/complete_block_test.go @@ -299,7 +299,7 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa b.ResetTimer() file, err := os.Open(cb.fullFilename()) require.NoError(b, err) - pr, err := v2.NewDataReader(v0.NewDataReader(backend.NewContextReaderWithAllReader(file)), encoding) + pr, err := v2.NewDataReader(backend.NewContextReaderWithAllReader(file), encoding) require.NoError(b, err) iterator = newPagedIterator(10*1024*1024, common.Records(cb.records), pr, backendBlock.encoding.NewObjectReaderWriter()) diff --git a/tempodb/encoding/iterator_wal.go b/tempodb/encoding/iterator_wal.go new file mode 100644 index 00000000000..b6b997fe439 --- /dev/null +++ b/tempodb/encoding/iterator_wal.go @@ -0,0 +1,44 @@ +package encoding + +import ( + "context" + "io" + + "github.com/grafana/tempo/tempodb/encoding/common" +) + +type walIterator struct { + o common.ObjectReaderWriter + d common.DataReader + + currentPage []byte +} + +// NewWALIterator iterates over pages from a datareader directly like one would for the WAL +func NewWALIterator(d common.DataReader, o common.ObjectReaderWriter) Iterator { + return &walIterator{ + o: o, + d: d, + } +} + +func (i *walIterator) Next(_ context.Context) (common.ID, []byte, error) { + var ( + id common.ID + obj []byte + err error + ) + i.currentPage, id, obj, err = i.o.UnmarshalAndAdvanceBuffer(i.currentPage) + if err == io.EOF { + i.currentPage, err = i.d.NextPage() + if err != nil { + return nil, nil, err + } + i.currentPage, id, obj, err = i.o.UnmarshalAndAdvanceBuffer(i.currentPage) + } + return id, obj, err +} + +func (i *walIterator) Close() { + i.d.Close() +} diff --git a/tempodb/encoding/v1/data_reader.go b/tempodb/encoding/v1/data_reader.go index 50abd5df205..62d2acf5712 100644 --- a/tempodb/encoding/v1/data_reader.go +++ b/tempodb/encoding/v1/data_reader.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" + v0 "github.com/grafana/tempo/tempodb/encoding/v0" ) type dataReader struct { @@ -17,8 +18,13 @@ type dataReader struct { compressedReader io.Reader } -// NewDataReader is useful for nesting compression inside of a different reader -func NewDataReader(r common.DataReader, encoding backend.Encoding) (common.DataReader, error) { +// NewDataReader creates a datareader that supports compression +func NewDataReader(r backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { + return NewNestedDataReader(v0.NewDataReader(r), encoding) +} + +// NewNestedDataReader is useful for nesting compression inside of a different reader +func NewNestedDataReader(r common.DataReader, encoding backend.Encoding) (common.DataReader, error) { pool, err := getReaderPool(encoding) if err != nil { return nil, err @@ -42,16 +48,12 @@ func (r *dataReader) Read(ctx context.Context, records []*common.Record) ([][]by // now decompress decompressedPages := make([][]byte, 0, len(compressedPages)) for _, page := range compressedPages { - if r.compressedReader == nil { - r.compressedReader, err = r.pool.GetReader(bytes.NewReader(page)) - } else { - r.compressedReader, err = r.pool.ResetReader(bytes.NewReader(page), r.compressedReader) - } + reader, err := r.getCompressedReader(page) if err != nil { return nil, err } - page, err := ioutil.ReadAll(r.compressedReader) + page, err := ioutil.ReadAll(reader) if err != nil { return nil, err } @@ -72,5 +74,27 @@ func (r *dataReader) Close() { // NextPage implements common.DataReader (kind of) func (r *dataReader) NextPage() ([]byte, error) { - return nil, common.ErrUnsupported // v1 can never support this b/c the pages don't have any header information attached. v1 data blocks can only be accessed by record + page, err := r.dataReader.NextPage() + if err != nil { + return nil, err + } + reader, err := r.getCompressedReader(page) + if err != nil { + return nil, err + } + page, err = ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + return page, nil +} + +func (r *dataReader) getCompressedReader(page []byte) (io.Reader, error) { + var err error + if r.compressedReader == nil { + r.compressedReader, err = r.pool.GetReader(bytes.NewReader(page)) + } else { + r.compressedReader, err = r.pool.ResetReader(bytes.NewReader(page), r.compressedReader) + } + return r.compressedReader, err } diff --git a/tempodb/encoding/v1/data_reader_test.go b/tempodb/encoding/v1/data_reader_test.go index ee21812706b..cf41ebaa37b 100644 --- a/tempodb/encoding/v1/data_reader_test.go +++ b/tempodb/encoding/v1/data_reader_test.go @@ -7,7 +7,6 @@ import ( "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" - v0 "github.com/grafana/tempo/tempodb/encoding/v0" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -40,7 +39,7 @@ func TestAllEncodings(t *testing.T) { require.NoError(t, err) encryptedBytes := buff.Bytes() - reader, err := NewDataReader(v0.NewDataReader(backend.NewContextReaderWithAllReader(bytes.NewReader(encryptedBytes))), enc) + reader, err := NewDataReader(backend.NewContextReaderWithAllReader(bytes.NewReader(encryptedBytes)), enc) require.NoError(t, err) defer reader.Close() diff --git a/tempodb/encoding/v2/data_reader.go b/tempodb/encoding/v2/data_reader.go index b56a9e7087d..738579946ca 100644 --- a/tempodb/encoding/v2/data_reader.go +++ b/tempodb/encoding/v2/data_reader.go @@ -5,11 +5,13 @@ import ( "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" + v0 "github.com/grafana/tempo/tempodb/encoding/v0" v1 "github.com/grafana/tempo/tempodb/encoding/v1" ) type dataReader struct { - dataReader common.DataReader + contextReader backend.ContextReader + dataReader common.DataReader } // constDataHeader is a singleton data header. the data header is @@ -18,13 +20,14 @@ type dataReader struct { var constDataHeader = &dataHeader{} // NewDataReader constructs a v2 DataReader that handles paged...reading -func NewDataReader(r common.DataReader, encoding backend.Encoding) (common.DataReader, error) { +func NewDataReader(r backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { v2DataReader := &dataReader{ - dataReader: r, + contextReader: r, + dataReader: v0.NewDataReader(r), } // wrap the paged reader in a compressed/v1 reader and return that - v1DataReader, err := v1.NewDataReader(v2DataReader, encoding) + v1DataReader, err := v1.NewNestedDataReader(v2DataReader, encoding) if err != nil { return nil, err } @@ -56,7 +59,17 @@ func (r *dataReader) Close() { r.dataReader.Close() } -// NextPage implements common.DataReader (kind of) +// NextPage implements common.DataReader func (r *dataReader) NextPage() ([]byte, error) { - return nil, common.ErrUnsupported // jpe add support + reader, err := r.contextReader.Reader() + if err != nil { + return nil, err + } + + page, err := unmarshalPageFromReader(reader, constDataHeader) + if err != nil { + return nil, err + } + + return page.data, nil } diff --git a/tempodb/encoding/v2/page.go b/tempodb/encoding/v2/page.go index fd35a3d281a..2d361d62a2c 100644 --- a/tempodb/encoding/v2/page.go +++ b/tempodb/encoding/v2/page.go @@ -51,6 +51,42 @@ func unmarshalPageFromBytes(b []byte, header pageHeader) (*page, error) { }, nil } +func unmarshalPageFromReader(r io.Reader, header pageHeader) (*page, error) { // jpe test + totalHeaderSize := baseHeaderSize + header.headerLength() + + var totalLength uint32 + var headerLength uint16 + + err := binary.Read(r, binary.LittleEndian, &totalLength) + if err != nil { + return nil, err + } + err = binary.Read(r, binary.LittleEndian, &headerLength) + if err != nil { + return nil, err + } + headerBytes := make([]byte, headerLength) + _, err = r.Read(headerBytes) + if err != nil { + return nil, err + } + err = header.unmarshalHeader(headerBytes) + if err != nil { + return nil, err + } + dataLength := int(totalLength) - totalHeaderSize + pageBytes := make([]byte, dataLength) + _, err = r.Read(pageBytes) + if err != nil { + return nil, err + } + + return &page{ + data: pageBytes, + header: header, + }, nil +} + // marshalPageToWriter marshals the page bytes to the passed writer func marshalPageToWriter(b []byte, w io.Writer, header pageHeader) (int, error) { var headerLength uint16 diff --git a/tempodb/encoding/versioned.go b/tempodb/encoding/versioned.go index 82a2b3c800b..f982b8320f4 100644 --- a/tempodb/encoding/versioned.go +++ b/tempodb/encoding/versioned.go @@ -98,7 +98,7 @@ func (v v1Encoding) NewIndexReader(ra backend.ContextReader, pageSizeBytes int, return v1.NewIndexReader(ra) } func (v v1Encoding) NewDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { - return v1.NewDataReader(v0.NewDataReader(ra), encoding) + return v1.NewDataReader(ra, encoding) } func (v v1Encoding) NewObjectReaderWriter() common.ObjectReaderWriter { return v1.NewObjectReaderWriter() @@ -123,7 +123,7 @@ func (v v2Encoding) NewIndexReader(ra backend.ContextReader, pageSizeBytes int, return v2.NewIndexReader(ra, pageSizeBytes, totalPages) } func (v v2Encoding) NewDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { - return v2.NewDataReader(v0.NewDataReader(ra), encoding) + return v2.NewDataReader(ra, encoding) } func (v v2Encoding) NewObjectReaderWriter() common.ObjectReaderWriter { return v2.NewObjectReaderWriter() diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index a7925a50712..aec7b63c2e8 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -2,6 +2,7 @@ package wal import ( "context" + "io" "io/ioutil" "math/rand" "os" @@ -11,6 +12,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" @@ -100,7 +102,7 @@ func TestAppend(t *testing.T) { block, err := wal.NewBlock(blockID, testTenantID) assert.NoError(t, err, "unexpected error creating block") - numMsgs := 1 + numMsgs := 100 reqs := make([]*tempopb.PushRequest, 0, numMsgs) for i := 0; i < numMsgs; i++ { req := test.MakeRequest(rand.Int()%1000, []byte{0x01}) @@ -212,6 +214,62 @@ func TestWorkDir(t *testing.T) { assert.Len(t, files, 0, "work dir should be empty") } +func TestAppendReplay(t *testing.T) { + tempDir, err := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + require.NoError(t, err, "unexpected error creating temp dir") + + wal, err := New(&Config{ + Filepath: tempDir, + }) + require.NoError(t, err, "unexpected error creating temp wal") + + blockID := uuid.New() + + block, err := wal.NewBlock(blockID, testTenantID) + require.NoError(t, err, "unexpected error creating block") + + objects := 1000 + objs := make([][]byte, 0, objects) + ids := make([][]byte, 0, objects) + for i := 0; i < objects; i++ { + id := make([]byte, 16) + rand.Read(id) + obj := test.MakeRequest(rand.Int()%10, id) + ids = append(ids, id) + bObj, err := proto.Marshal(obj) + require.NoError(t, err) + objs = append(objs, bObj) + + block.Write(id, bObj) + require.NoError(t, err, "unexpected error writing req") + } + + blocks, err := wal.AllBlocks() + require.NoError(t, err, "unexpected error getting blocks") + require.Len(t, blocks, 1) + + replay := blocks[0] + iterator, err := replay.Iterator() + require.NoError(t, err) + defer iterator.Close() + + i := 0 + for { + id, obj, err := iterator.Next(context.Background()) + if err == io.EOF { + break + } else { + require.NoError(t, err) + } + assert.Equal(t, objs[i], obj) + assert.Equal(t, ids[i], []byte(id)) + i++ + } + + assert.Equal(t, objects, i) +} + func BenchmarkWriteRead(b *testing.B) { tempDir, _ := ioutil.TempDir("/tmp", "") defer os.RemoveAll(tempDir) From 753aa417ebde31263e89e95cb10da8ecaa19339b Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 9 Apr 2021 09:27:25 -0400 Subject: [PATCH 13/21] Added bench Signed-off-by: Joe Elliott --- tempodb/wal/wal_test.go | 40 +++++++++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index aec7b63c2e8..6b64a05cd45 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -282,19 +282,41 @@ func BenchmarkWriteRead(b *testing.B) { // 1 million requests, 10k spans per request block, _ := wal.NewBlock(blockID, testTenantID) - numMsgs := 100 - reqs := make([]*tempopb.PushRequest, 0, numMsgs) - for i := 0; i < numMsgs; i++ { - req := test.MakeRequest(100, []byte{}) - reqs = append(reqs, req) + objects := 1000 + objs := make([][]byte, 0, objects) + ids := make([][]byte, 0, objects) + for i := 0; i < objects; i++ { + id := make([]byte, 16) + rand.Read(id) + obj := test.MakeRequest(rand.Int()%10, id) + ids = append(ids, id) + bObj, err := proto.Marshal(obj) + require.NoError(b, err) + objs = append(objs, bObj) + + block.Write(id, bObj) + require.NoError(b, err, "unexpected error writing req") } b.ResetTimer() for i := 0; i < b.N; i++ { - for _, req := range reqs { - bytes, _ := proto.Marshal(req) - _ = block.Write(test.MustTraceID(req), bytes) - _, _ = block.Find(test.MustTraceID(req), &mockCombiner{}) + for j, obj := range objs { + err := block.Write(ids[j], obj) + require.NoError(b, err) + } + j := 0 + replayBlocks, err := wal.AllBlocks() + require.NoError(b, err) + iter, err := replayBlocks[0].Iterator() + require.NoError(b, err) + + for { + _, _, err := iter.Next(context.Background()) + if err == io.EOF { + break + } + require.NoError(b, err) + j++ } } } From c7e0b6bce78e83cb311f9d3df7535afdbc31dab5 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 9 Apr 2021 09:39:26 -0400 Subject: [PATCH 14/21] made encoding configurable Signed-off-by: Joe Elliott --- example/docker-compose/etc/tempo-azure.yaml | 1 + .../docker-compose/etc/tempo-gcs-fake.yaml | 1 + example/docker-compose/etc/tempo-local.yaml | 1 + .../docker-compose/etc/tempo-s3-minio.yaml | 1 + modules/storage/config.go | 1 + tempodb/wal/append_block.go | 20 +++++++++---------- tempodb/wal/wal.go | 8 +++++--- tempodb/wal/wal_test.go | 4 ++-- 8 files changed, 21 insertions(+), 16 deletions(-) diff --git a/example/docker-compose/etc/tempo-azure.yaml b/example/docker-compose/etc/tempo-azure.yaml index f00fa7858a7..4e080f4e354 100644 --- a/example/docker-compose/etc/tempo-azure.yaml +++ b/example/docker-compose/etc/tempo-azure.yaml @@ -39,6 +39,7 @@ storage: encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: path: /tmp/tempo/wal # where to store the the wal locally + encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd azure: container-name: tempo # how to store data in azure endpoint-suffix: azurite:10000 diff --git a/example/docker-compose/etc/tempo-gcs-fake.yaml b/example/docker-compose/etc/tempo-gcs-fake.yaml index ed81e74c9c2..e86f2718eff 100644 --- a/example/docker-compose/etc/tempo-gcs-fake.yaml +++ b/example/docker-compose/etc/tempo-gcs-fake.yaml @@ -40,6 +40,7 @@ storage: encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: path: /tmp/tempo/wal # where to store the the wal locally + encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd gcs: bucket_name: tempo endpoint: https://gcs:4443/storage/v1/ diff --git a/example/docker-compose/etc/tempo-local.yaml b/example/docker-compose/etc/tempo-local.yaml index 0843053e85e..64d69bc0467 100644 --- a/example/docker-compose/etc/tempo-local.yaml +++ b/example/docker-compose/etc/tempo-local.yaml @@ -39,6 +39,7 @@ storage: encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: path: /tmp/tempo/wal # where to store the the wal locally + encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd local: path: /tmp/tempo/blocks pool: diff --git a/example/docker-compose/etc/tempo-s3-minio.yaml b/example/docker-compose/etc/tempo-s3-minio.yaml index b1d7a1b914a..5af7d6e8b04 100644 --- a/example/docker-compose/etc/tempo-s3-minio.yaml +++ b/example/docker-compose/etc/tempo-s3-minio.yaml @@ -40,6 +40,7 @@ storage: encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: path: /tmp/tempo/wal # where to store the the wal locally + encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd s3: bucket: tempo # how to store data in s3 endpoint: minio:9000 diff --git a/modules/storage/config.go b/modules/storage/config.go index 97703b5e316..49fef348b9f 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -33,6 +33,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Trace.WAL = &wal.Config{} f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.") + cfg.Trace.WAL.Encoding = backend.EncNone // jpe snappy? cfg.Trace.Block = &encoding.BlockConfig{} f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom False Positive.") diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 0e980e14474..545f9afb345 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -10,11 +10,6 @@ import ( "github.com/grafana/tempo/tempodb/encoding/common" ) -// these values should never be used. these dummy values can help detect -// if they leak elsewhere. append blocks are not versioned and do not -// support different encodings -const appendBlockEncoding = backend.EncNone // jpe make configurable - // AppendBlock is a block that is actively used to append new objects to. It stores all data in the appendFile // in the order it was received and an in memory sorted index. type AppendBlock struct { @@ -25,13 +20,16 @@ type AppendBlock struct { appender encoding.Appender } -func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBlock, error) { - v := encoding.LatestEncoding() // jpe get to work with latest encoding +func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding) (*AppendBlock, error) { + v, err := encoding.EncodingByVersion("v2") // let's pin wal files instead of tracking latest for safety + if err != nil { + return nil, err + } h := &AppendBlock{ encoding: v, block: block{ - meta: backend.NewBlockMeta(tenantID, id, v.Version(), appendBlockEncoding), + meta: backend.NewBlockMeta(tenantID, id, v.Version(), e), filepath: filepath, }, } @@ -44,7 +42,7 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBloc } h.appendFile = f - dataWriter, err := h.encoding.NewDataWriter(f, appendBlockEncoding) + dataWriter, err := h.encoding.NewDataWriter(f, e) if err != nil { return nil, err } @@ -90,7 +88,7 @@ func (h *AppendBlock) Complete(cfg *encoding.BlockConfig, w *WAL, combiner commo return nil, err } - dataReader, err := h.encoding.NewDataReader(backend.NewContextReaderWithAllReader(readFile), appendBlockEncoding) + dataReader, err := h.encoding.NewDataReader(backend.NewContextReaderWithAllReader(readFile), h.meta.Encoding) if err != nil { return nil, err } @@ -117,7 +115,7 @@ func (h *AppendBlock) Find(id common.ID, combiner common.ObjectCombiner) ([]byte return nil, err } - dataReader, err := h.encoding.NewDataReader(backend.NewContextReaderWithAllReader(file), appendBlockEncoding) + dataReader, err := h.encoding.NewDataReader(backend.NewContextReaderWithAllReader(file), h.meta.Encoding) if err != nil { return nil, err } diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index 40ac0abe11b..3ae11d564e2 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -7,6 +7,7 @@ import ( "path/filepath" "github.com/google/uuid" + "github.com/grafana/tempo/tempodb/backend" ) const ( @@ -18,8 +19,9 @@ type WAL struct { } type Config struct { - Filepath string `yaml:"path"` - CompletedFilepath string + Filepath string `yaml:"path"` + CompletedFilepath string `yaml:"completed_file_path"` + Encoding backend.Encoding `yaml:"encoding"` } func New(c *Config) (*WAL, error) { @@ -76,5 +78,5 @@ func (w *WAL) AllBlocks() ([]*ReplayBlock, error) { } func (w *WAL) NewBlock(id uuid.UUID, tenantID string) (*AppendBlock, error) { - return newAppendBlock(id, tenantID, w.c.Filepath) + return newAppendBlock(id, tenantID, w.c.Filepath, w.c.Encoding) } diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index 6b64a05cd45..05482a5f134 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -117,7 +117,7 @@ func TestAppend(t *testing.T) { file, err := block.file() assert.NoError(t, err) - dataReader, err := block.encoding.NewDataReader(backend.NewContextReaderWithAllReader(file), appendBlockEncoding) + dataReader, err := block.encoding.NewDataReader(backend.NewContextReaderWithAllReader(file), backend.EncNone) assert.NoError(t, err) iterator := encoding.NewRecordIterator(records, dataReader, block.encoding.NewObjectReaderWriter()) defer iterator.Close() @@ -214,7 +214,7 @@ func TestWorkDir(t *testing.T) { assert.Len(t, files, 0, "work dir should be empty") } -func TestAppendReplay(t *testing.T) { +func TestAppendReplay(t *testing.T) { // jpe add find and test all encodings tempDir, err := ioutil.TempDir("/tmp", "") defer os.RemoveAll(tempDir) require.NoError(t, err, "unexpected error creating temp dir") From 78dae823ea4e30e88a91d2d1dfd064d39dd7c635 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 9 Apr 2021 10:30:51 -0400 Subject: [PATCH 15/21] improved testing and benchmarking Signed-off-by: Joe Elliott --- tempodb/wal/wal_test.go | 71 +++++++++++++++++++++++++++++++---------- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index 05482a5f134..b31a625ab94 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -214,13 +214,20 @@ func TestWorkDir(t *testing.T) { assert.Len(t, files, 0, "work dir should be empty") } -func TestAppendReplay(t *testing.T) { // jpe add find and test all encodings +func TestAppendReplayFind(t *testing.T) { + for _, e := range backend.SupportedEncoding { + testAppendReplayFind(t, e) + } +} + +func testAppendReplayFind(t *testing.T, e backend.Encoding) { // jpe add find and test all encodings tempDir, err := ioutil.TempDir("/tmp", "") defer os.RemoveAll(tempDir) require.NoError(t, err, "unexpected error creating temp dir") wal, err := New(&Config{ Filepath: tempDir, + Encoding: e, }) require.NoError(t, err, "unexpected error creating temp wal") @@ -245,6 +252,12 @@ func TestAppendReplay(t *testing.T) { // jpe add find and test all encodings require.NoError(t, err, "unexpected error writing req") } + for i, id := range ids { + obj, err := block.Find(id, &mockCombiner{}) + require.NoError(t, err) + assert.Equal(t, objs[i], obj) + } + blocks, err := wal.AllBlocks() require.NoError(t, err, "unexpected error getting blocks") require.Len(t, blocks, 1) @@ -270,18 +283,23 @@ func TestAppendReplay(t *testing.T) { // jpe add find and test all encodings assert.Equal(t, objects, i) } -func BenchmarkWriteRead(b *testing.B) { - tempDir, _ := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - - wal, _ := New(&Config{ - Filepath: tempDir, - }) - - blockID := uuid.New() +func BenchmarkWALNone(b *testing.B) { + benchmarkWriteFindReplay(b, backend.EncNone) +} +func BenchmarkWALSnappy(b *testing.B) { + benchmarkWriteFindReplay(b, backend.EncSnappy) +} +func BenchmarkWALLZ4(b *testing.B) { + benchmarkWriteFindReplay(b, backend.EncLZ4_1M) +} +func BenchmarkWALGZIP(b *testing.B) { + benchmarkWriteFindReplay(b, backend.EncGZIP) +} +func BenchmarkWALZSTD(b *testing.B) { + benchmarkWriteFindReplay(b, backend.EncZstd) +} - // 1 million requests, 10k spans per request - block, _ := wal.NewBlock(blockID, testTenantID) +func benchmarkWriteFindReplay(b *testing.B, encoding backend.Encoding) { objects := 1000 objs := make([][]byte, 0, objects) ids := make([][]byte, 0, objects) @@ -293,23 +311,39 @@ func BenchmarkWriteRead(b *testing.B) { bObj, err := proto.Marshal(obj) require.NoError(b, err) objs = append(objs, bObj) - - block.Write(id, bObj) - require.NoError(b, err, "unexpected error writing req") } - + mockCombiner := &mockCombiner{} b.ResetTimer() + for i := 0; i < b.N; i++ { + tempDir, _ := ioutil.TempDir("/tmp", "") + wal, _ := New(&Config{ + Filepath: tempDir, + Encoding: encoding, + }) + + blockID := uuid.New() + block, err := wal.NewBlock(blockID, testTenantID) + require.NoError(b, err) + + // write for j, obj := range objs { err := block.Write(ids[j], obj) require.NoError(b, err) } + + // find + for _, id := range ids { + _, err := block.Find(id, mockCombiner) + require.NoError(b, err) + } + + // replay j := 0 replayBlocks, err := wal.AllBlocks() require.NoError(b, err) iter, err := replayBlocks[0].Iterator() require.NoError(b, err) - for { _, _, err := iter.Next(context.Background()) if err == io.EOF { @@ -318,5 +352,8 @@ func BenchmarkWriteRead(b *testing.B) { require.NoError(b, err) j++ } + + block.Clear() + os.RemoveAll(tempDir) } } From de1639aed1647db430fa30bf7be6e2e6bd378a48 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 9 Apr 2021 10:59:20 -0400 Subject: [PATCH 16/21] Test additional functionality Signed-off-by: Joe Elliott --- modules/storage/config.go | 2 +- tempodb/encoding/v0/data_reader.go | 2 +- tempodb/encoding/v0/data_reader_test.go | 41 ++++++++++++++++++++++++- tempodb/encoding/v0/object.go | 2 +- tempodb/encoding/v2/page.go | 2 +- tempodb/encoding/v2/page_test.go | 5 +++ tempodb/wal/wal_test.go | 2 +- 7 files changed, 50 insertions(+), 6 deletions(-) diff --git a/modules/storage/config.go b/modules/storage/config.go index 49fef348b9f..d43485a2588 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -33,7 +33,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Trace.WAL = &wal.Config{} f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.") - cfg.Trace.WAL.Encoding = backend.EncNone // jpe snappy? + cfg.Trace.WAL.Encoding = backend.EncNone cfg.Trace.Block = &encoding.BlockConfig{} f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom False Positive.") diff --git a/tempodb/encoding/v0/data_reader.go b/tempodb/encoding/v0/data_reader.go index b0e29588963..7f2650c1243 100644 --- a/tempodb/encoding/v0/data_reader.go +++ b/tempodb/encoding/v0/data_reader.go @@ -70,7 +70,7 @@ func (r *dataReader) Close() { } // NextPage implements common.DataReader -func (r *dataReader) NextPage() ([]byte, error) { // jpe test +func (r *dataReader) NextPage() ([]byte, error) { reader, err := r.r.Reader() if err != nil { return nil, err diff --git a/tempodb/encoding/v0/data_reader_test.go b/tempodb/encoding/v0/data_reader_test.go index 149eec3d7a8..456d6e7d812 100644 --- a/tempodb/encoding/v0/data_reader_test.go +++ b/tempodb/encoding/v0/data_reader_test.go @@ -3,15 +3,17 @@ package v0 import ( "bytes" "context" + "io" + "math/rand" "testing" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestDataReader(t *testing.T) { - tests := []struct { readerBytes []byte records []*common.Record @@ -136,3 +138,40 @@ func TestDataReader(t *testing.T) { assert.Equal(t, tc.expectedBytes, actual) } } + +func TestWriterReaderNextPage(t *testing.T) { + buff := bytes.NewBuffer(nil) + writer := NewDataWriter(buff) + + pageCount := 10 + pages := make([][]byte, 0, pageCount) + for i := 0; i < pageCount; i++ { + data := make([]byte, 200) + rand.Read(data) + pages = append(pages, data) + + _, err := writer.Write([]byte{0x01}, data) + require.NoError(t, err) + _, err = writer.CutPage() + require.NoError(t, err) + } + err := writer.Complete() + require.NoError(t, err) + + reader := NewDataReader(backend.NewContextReaderWithAllReader(bytes.NewReader(buff.Bytes()))) + i := 0 + for { + page, err := reader.NextPage() + if err == io.EOF { + break + } + require.NoError(t, err) + + _, id, obj, err := staticObject.UnmarshalAndAdvanceBuffer(page) + require.NoError(t, err) + + assert.Equal(t, pages[i], obj) + assert.Equal(t, []byte{0x01}, []byte(id)) + i++ + } +} diff --git a/tempodb/encoding/v0/object.go b/tempodb/encoding/v0/object.go index 43054489430..82e87a88c1b 100644 --- a/tempodb/encoding/v0/object.go +++ b/tempodb/encoding/v0/object.go @@ -54,7 +54,7 @@ func (object) MarshalObjectToWriter(id common.ID, b []byte, w io.Writer) (int, e func (object) UnmarshalObjectFromReader(r io.Reader) (common.ID, []byte, error) { var totalLength uint32 err := binary.Read(r, binary.LittleEndian, &totalLength) - if err == io.EOF { // jpe this needs to return io.EOF + if err == io.EOF { return nil, nil, nil } else if err != nil { return nil, nil, err diff --git a/tempodb/encoding/v2/page.go b/tempodb/encoding/v2/page.go index 2d361d62a2c..fc472afccd1 100644 --- a/tempodb/encoding/v2/page.go +++ b/tempodb/encoding/v2/page.go @@ -51,7 +51,7 @@ func unmarshalPageFromBytes(b []byte, header pageHeader) (*page, error) { }, nil } -func unmarshalPageFromReader(r io.Reader, header pageHeader) (*page, error) { // jpe test +func unmarshalPageFromReader(r io.Reader, header pageHeader) (*page, error) { totalHeaderSize := baseHeaderSize + header.headerLength() var totalLength uint32 diff --git a/tempodb/encoding/v2/page_test.go b/tempodb/encoding/v2/page_test.go index 99ad141312d..c8b84b6ac46 100644 --- a/tempodb/encoding/v2/page_test.go +++ b/tempodb/encoding/v2/page_test.go @@ -67,6 +67,11 @@ func TestPageMarshalToWriter(t *testing.T) { require.NoError(t, err) assert.Equal(t, tc.expected, page.data) assert.Equal(t, tc.field, page.header.(*testHeader).field) + + page, err = unmarshalPageFromReader(buff, &testHeader{}) + require.NoError(t, err) + assert.Equal(t, tc.expected, page.data) + assert.Equal(t, tc.field, page.header.(*testHeader).field) } } diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index 1b58cb5d0c6..a9e1d7453bc 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -168,7 +168,7 @@ func TestAppendReplayFind(t *testing.T) { } } -func testAppendReplayFind(t *testing.T, e backend.Encoding) { // jpe add find and test all encodings +func testAppendReplayFind(t *testing.T, e backend.Encoding) { tempDir, err := ioutil.TempDir("/tmp", "") defer os.RemoveAll(tempDir) require.NoError(t, err, "unexpected error creating temp dir") From 5138bbd632dea6fb7ae785ce4f9213e9a2d45011 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 9 Apr 2021 11:06:19 -0400 Subject: [PATCH 17/21] Updated docs Signed-off-by: Joe Elliott --- docs/tempo/website/configuration/_index.md | 6 +++++- docs/tempo/website/configuration/compression.md | 16 +++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index df4b299f2a1..b3615d47d4c 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -121,7 +121,6 @@ storage: backend: gcs # store traces in gcs gcs: bucket_name: ops-tools-tracing-ops # store traces in this bucket - blocklist_poll: 5m # how often to repoll the backend for new blocks blocklist_poll_concurrency: 50 # optional. Number of blocks to process in parallel during polling. Default is 50. cache: memcached # optional cache configuration @@ -138,6 +137,11 @@ storage: queue_depth: 2000 # length of job queue wal: path: /var/tempo/wal # where to store the head blocks while they are being appended to + encoding: none # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd + block: + bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives + index_downsample_bytes: 1_000_000 # number of bytes per index record + encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd ``` ## Memberlist diff --git a/docs/tempo/website/configuration/compression.md b/docs/tempo/website/configuration/compression.md index dc1bac98e1b..79fc3baa66c 100644 --- a/docs/tempo/website/configuration/compression.md +++ b/docs/tempo/website/configuration/compression.md @@ -29,4 +29,18 @@ The following options are supported: It is important to note that although all of these compression formats are supported in Tempo, at Grafana we use zstd and it's possible/probable that the other compression algorithms may have issue at scale. Please -file an issue if you stumble upon any problems! \ No newline at end of file +file an issue if you stumble upon any problems! + +## WAL + +The WAL also supports compression. By default this is turned off because it comes with a small performance penalty. +However, it does reduce disk i/o and add checksums to the WAL which are valuable in higher volume installations. + +``` +storage: + trace: + wal: + encoding: snappy +``` + +If WAL compression is turned on it is recommend to use snappy. All of the above options are supported. \ No newline at end of file From 86ec2cbb169061890b77a933c3aca7c8b095a228 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 9 Apr 2021 11:12:23 -0400 Subject: [PATCH 18/21] changelog Signed-off-by: Joe Elliott --- CHANGELOG.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 851e85e0a13..44472c71c11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,12 +8,15 @@ `ingestion_burst_size` limit override has been removed in favour of `ingestion_burst_size_bytes`. This is a **breaking change** to the overrides config section. [#630](https://github.com/grafana/tempo/pull/630) * [FEATURE] Add page based access to the index file. [#557](https://github.com/grafana/tempo/pull/557) +* [FEATURE] WAL Compression/checksums. [#638](https://github.com/grafana/tempo/pull/638) * [ENHANCEMENT] Add a Shutdown handler to flush data to backend, at "/shutdown". [#526](https://github.com/grafana/tempo/pull/526) * [ENHANCEMENT] Queriers now query all (healthy) ingesters for a trace to mitigate 404s on ingester rollouts/scaleups. This is a **breaking change** and will likely result in query errors on rollout as the query signature b/n QueryFrontend & Querier has changed. [#557](https://github.com/grafana/tempo/pull/557) * [ENHANCEMENT] Add list compaction-summary command to tempo-cli [#588](https://github.com/grafana/tempo/pull/588) * [ENHANCEMENT] Add list and view index commands to tempo-cli [#611](https://github.com/grafana/tempo/pull/611) * [ENHANCEMENT] Add a configurable prefix for HTTP endpoints. [#631](https://github.com/grafana/tempo/pull/631) +* [ENHANCEMENT] Add kafka receiver. [#613](https://github.com/grafana/tempo/pull/613) +* [ENHANCEMENT] Upgrade OTel collector to `v0.21.0`. [#613](https://github.com/grafana/tempo/pull/627) * [BUGFIX] Fixes permissions errors on startup in GCS. [#554](https://github.com/grafana/tempo/pull/554) * [BUGFIX] Fixes error where Dell ECS cannot list objects. [#561](https://github.com/grafana/tempo/pull/561) * [BUGFIX] Fixes listing blocks in S3 when the list is truncated. [#567](https://github.com/grafana/tempo/pull/567) @@ -23,8 +26,6 @@ * [BUGFIX] Fixes issue where Tempo would not parse odd length trace ids [#605](https://github.com/grafana/tempo/pull/605) * [BUGFIX] Sort traces on flush to reduce unexpected recombination work by compactors [#606](https://github.com/grafana/tempo/pull/606) * [BUGFIX] Ingester fully persists blocks locally to reduce amount of work done after restart [#628](https://github.com/grafana/tempo/pull/628) -* [ENHANCEMENT] Add kafka receiver. [#613](https://github.com/grafana/tempo/pull/613) -* [ENHANCEMENT] Upgrade OTel collector to `v0.21.0`. [#613](https://github.com/grafana/tempo/pull/627) ## v0.6.0 From 55c00730658b29371a2b0cd566420303a1105eaa Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Fri, 9 Apr 2021 11:24:15 -0400 Subject: [PATCH 19/21] lint Signed-off-by: Joe Elliott --- tempodb/encoding/backend_block.go | 2 +- tempodb/encoding/versioned.go | 4 ++-- tempodb/encoding/versioned_test.go | 6 +++--- tempodb/wal/append_block.go | 2 +- tempodb/wal/replay_block.go | 2 +- tempodb/wal/wal_test.go | 5 +++-- 6 files changed, 11 insertions(+), 10 deletions(-) diff --git a/tempodb/encoding/backend_block.go b/tempodb/encoding/backend_block.go index 88b05c1f828..b3cf5000bab 100644 --- a/tempodb/encoding/backend_block.go +++ b/tempodb/encoding/backend_block.go @@ -23,7 +23,7 @@ type BackendBlock struct { // NewBackendBlock returns a BackendBlock for the given backend.BlockMeta // It is version aware. func NewBackendBlock(meta *backend.BlockMeta, r backend.Reader) (*BackendBlock, error) { - encoding, err := EncodingByVersion(meta.Version) + encoding, err := FromVersion(meta.Version) if err != nil { return nil, err } diff --git a/tempodb/encoding/versioned.go b/tempodb/encoding/versioned.go index f982b8320f4..cf6aeb60d53 100644 --- a/tempodb/encoding/versioned.go +++ b/tempodb/encoding/versioned.go @@ -29,8 +29,8 @@ type VersionedEncoding interface { NewRecordReaderWriter() common.RecordReaderWriter } -// EncodingByVersion returns a versioned encoding for the provided string -func EncodingByVersion(v string) (VersionedEncoding, error) { +// FromVersion returns a versioned encoding for the provided string +func FromVersion(v string) (VersionedEncoding, error) { switch v { case "v0": return v0Encoding{}, nil diff --git a/tempodb/encoding/versioned_test.go b/tempodb/encoding/versioned_test.go index 514ba61cba0..efa4db15f61 100644 --- a/tempodb/encoding/versioned_test.go +++ b/tempodb/encoding/versioned_test.go @@ -11,15 +11,15 @@ import ( "github.com/stretchr/testify/require" ) -func TestEncodingByVersionErrors(t *testing.T) { - encoding, err := EncodingByVersion("definitely-not-a-real-version") +func TestFromVersionErrors(t *testing.T) { + encoding, err := FromVersion("definitely-not-a-real-version") assert.Error(t, err) assert.Nil(t, encoding) } func TestAllVersions(t *testing.T) { for _, v := range allEncodings() { - encoding, err := EncodingByVersion(v.Version()) + encoding, err := FromVersion(v.Version()) require.Equal(t, v.Version(), encoding.Version()) require.NoError(t, err) diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 76b8b2e398e..b5c74722738 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -21,7 +21,7 @@ type AppendBlock struct { } func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding) (*AppendBlock, error) { - v, err := encoding.EncodingByVersion("v2") // let's pin wal files instead of tracking latest for safety + v, err := encoding.FromVersion("v2") // let's pin wal files instead of tracking latest for safety if err != nil { return nil, err } diff --git a/tempodb/wal/replay_block.go b/tempodb/wal/replay_block.go index 1645cba17f2..6b679e32a92 100644 --- a/tempodb/wal/replay_block.go +++ b/tempodb/wal/replay_block.go @@ -18,7 +18,7 @@ func NewReplayBlock(filename string, path string) (*ReplayBlock, error) { return nil, err } - v, err := encoding.EncodingByVersion(version) + v, err := encoding.FromVersion(version) if err != nil { return nil, err } diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index a9e1d7453bc..fd6f6cf83f9 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -196,7 +196,7 @@ func testAppendReplayFind(t *testing.T, e backend.Encoding) { require.NoError(t, err) objs = append(objs, bObj) - block.Write(id, bObj) + err = block.Write(id, bObj) require.NoError(t, err, "unexpected error writing req") } @@ -301,7 +301,8 @@ func benchmarkWriteFindReplay(b *testing.B, encoding backend.Encoding) { j++ } - block.Clear() + err = block.Clear() + require.NoError(b, err) os.RemoveAll(tempDir) } } From 7482bcc91a8e8d4be3b6773fe0da917871d14865 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 12 Apr 2021 07:56:26 -0400 Subject: [PATCH 20/21] snappy => none Signed-off-by: Joe Elliott --- docs/tempo/website/configuration/compression.md | 2 +- example/docker-compose/etc/tempo-azure.yaml | 2 +- example/docker-compose/etc/tempo-gcs-fake.yaml | 2 +- example/docker-compose/etc/tempo-local.yaml | 2 +- example/docker-compose/etc/tempo-s3-minio.yaml | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/tempo/website/configuration/compression.md b/docs/tempo/website/configuration/compression.md index 79fc3baa66c..acf7efd5b48 100644 --- a/docs/tempo/website/configuration/compression.md +++ b/docs/tempo/website/configuration/compression.md @@ -40,7 +40,7 @@ However, it does reduce disk i/o and add checksums to the WAL which are valuable storage: trace: wal: - encoding: snappy + encoding: none ``` If WAL compression is turned on it is recommend to use snappy. All of the above options are supported. \ No newline at end of file diff --git a/example/docker-compose/etc/tempo-azure.yaml b/example/docker-compose/etc/tempo-azure.yaml index 4e080f4e354..abe80b8c7b9 100644 --- a/example/docker-compose/etc/tempo-azure.yaml +++ b/example/docker-compose/etc/tempo-azure.yaml @@ -39,7 +39,7 @@ storage: encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: path: /tmp/tempo/wal # where to store the the wal locally - encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd + encoding: none # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd azure: container-name: tempo # how to store data in azure endpoint-suffix: azurite:10000 diff --git a/example/docker-compose/etc/tempo-gcs-fake.yaml b/example/docker-compose/etc/tempo-gcs-fake.yaml index e86f2718eff..36979af3c97 100644 --- a/example/docker-compose/etc/tempo-gcs-fake.yaml +++ b/example/docker-compose/etc/tempo-gcs-fake.yaml @@ -40,7 +40,7 @@ storage: encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: path: /tmp/tempo/wal # where to store the the wal locally - encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd + encoding: none # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd gcs: bucket_name: tempo endpoint: https://gcs:4443/storage/v1/ diff --git a/example/docker-compose/etc/tempo-local.yaml b/example/docker-compose/etc/tempo-local.yaml index 64d69bc0467..2e83856c3b4 100644 --- a/example/docker-compose/etc/tempo-local.yaml +++ b/example/docker-compose/etc/tempo-local.yaml @@ -39,7 +39,7 @@ storage: encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: path: /tmp/tempo/wal # where to store the the wal locally - encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd + encoding: none # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd local: path: /tmp/tempo/blocks pool: diff --git a/example/docker-compose/etc/tempo-s3-minio.yaml b/example/docker-compose/etc/tempo-s3-minio.yaml index 5af7d6e8b04..82c9223cf06 100644 --- a/example/docker-compose/etc/tempo-s3-minio.yaml +++ b/example/docker-compose/etc/tempo-s3-minio.yaml @@ -40,7 +40,7 @@ storage: encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: path: /tmp/tempo/wal # where to store the the wal locally - encoding: snappy # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd + encoding: none # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd s3: bucket: tempo # how to store data in s3 endpoint: minio:9000 From 2891fc37421ffc84b3bfaadc03581d2838335d50 Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Mon, 12 Apr 2021 08:05:52 -0400 Subject: [PATCH 21/21] walIterator => recordlessIterator Signed-off-by: Joe Elliott --- .../{iterator_wal.go => iterator_recordless.go} | 12 ++++++------ tempodb/wal/replay_block.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) rename tempodb/encoding/{iterator_wal.go => iterator_recordless.go} (59%) diff --git a/tempodb/encoding/iterator_wal.go b/tempodb/encoding/iterator_recordless.go similarity index 59% rename from tempodb/encoding/iterator_wal.go rename to tempodb/encoding/iterator_recordless.go index b6b997fe439..386d23e30af 100644 --- a/tempodb/encoding/iterator_wal.go +++ b/tempodb/encoding/iterator_recordless.go @@ -7,22 +7,22 @@ import ( "github.com/grafana/tempo/tempodb/encoding/common" ) -type walIterator struct { +type recordlessIterator struct { o common.ObjectReaderWriter d common.DataReader currentPage []byte } -// NewWALIterator iterates over pages from a datareader directly like one would for the WAL -func NewWALIterator(d common.DataReader, o common.ObjectReaderWriter) Iterator { - return &walIterator{ +// NewRecordlessIterator iterates over pages from a datareader directly without requiring records +func NewRecordlessIterator(d common.DataReader, o common.ObjectReaderWriter) Iterator { + return &recordlessIterator{ o: o, d: d, } } -func (i *walIterator) Next(_ context.Context) (common.ID, []byte, error) { +func (i *recordlessIterator) Next(_ context.Context) (common.ID, []byte, error) { var ( id common.ID obj []byte @@ -39,6 +39,6 @@ func (i *walIterator) Next(_ context.Context) (common.ID, []byte, error) { return id, obj, err } -func (i *walIterator) Close() { +func (i *recordlessIterator) Close() { i.d.Close() } diff --git a/tempodb/wal/replay_block.go b/tempodb/wal/replay_block.go index 6b679e32a92..24ce0c95a4e 100644 --- a/tempodb/wal/replay_block.go +++ b/tempodb/wal/replay_block.go @@ -44,7 +44,7 @@ func (r *ReplayBlock) Iterator() (encoding.Iterator, error) { return nil, err } - return encoding.NewWALIterator(dataReader, r.encoding.NewObjectReaderWriter()), nil + return encoding.NewRecordlessIterator(dataReader, r.encoding.NewObjectReaderWriter()), nil } func (r *ReplayBlock) TenantID() string {