diff --git a/CHANGELOG.md b/CHANGELOG.md index 851e85e0a13..44472c71c11 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,12 +8,15 @@ `ingestion_burst_size` limit override has been removed in favour of `ingestion_burst_size_bytes`. This is a **breaking change** to the overrides config section. [#630](https://github.com/grafana/tempo/pull/630) * [FEATURE] Add page based access to the index file. [#557](https://github.com/grafana/tempo/pull/557) +* [FEATURE] WAL Compression/checksums. [#638](https://github.com/grafana/tempo/pull/638) * [ENHANCEMENT] Add a Shutdown handler to flush data to backend, at "/shutdown". [#526](https://github.com/grafana/tempo/pull/526) * [ENHANCEMENT] Queriers now query all (healthy) ingesters for a trace to mitigate 404s on ingester rollouts/scaleups. This is a **breaking change** and will likely result in query errors on rollout as the query signature b/n QueryFrontend & Querier has changed. [#557](https://github.com/grafana/tempo/pull/557) * [ENHANCEMENT] Add list compaction-summary command to tempo-cli [#588](https://github.com/grafana/tempo/pull/588) * [ENHANCEMENT] Add list and view index commands to tempo-cli [#611](https://github.com/grafana/tempo/pull/611) * [ENHANCEMENT] Add a configurable prefix for HTTP endpoints. [#631](https://github.com/grafana/tempo/pull/631) +* [ENHANCEMENT] Add kafka receiver. [#613](https://github.com/grafana/tempo/pull/613) +* [ENHANCEMENT] Upgrade OTel collector to `v0.21.0`. [#613](https://github.com/grafana/tempo/pull/627) * [BUGFIX] Fixes permissions errors on startup in GCS. [#554](https://github.com/grafana/tempo/pull/554) * [BUGFIX] Fixes error where Dell ECS cannot list objects. [#561](https://github.com/grafana/tempo/pull/561) * [BUGFIX] Fixes listing blocks in S3 when the list is truncated. [#567](https://github.com/grafana/tempo/pull/567) @@ -23,8 +26,6 @@ * [BUGFIX] Fixes issue where Tempo would not parse odd length trace ids [#605](https://github.com/grafana/tempo/pull/605) * [BUGFIX] Sort traces on flush to reduce unexpected recombination work by compactors [#606](https://github.com/grafana/tempo/pull/606) * [BUGFIX] Ingester fully persists blocks locally to reduce amount of work done after restart [#628](https://github.com/grafana/tempo/pull/628) -* [ENHANCEMENT] Add kafka receiver. [#613](https://github.com/grafana/tempo/pull/613) -* [ENHANCEMENT] Upgrade OTel collector to `v0.21.0`. [#613](https://github.com/grafana/tempo/pull/627) ## v0.6.0 diff --git a/docs/tempo/website/configuration/_index.md b/docs/tempo/website/configuration/_index.md index df4b299f2a1..b3615d47d4c 100644 --- a/docs/tempo/website/configuration/_index.md +++ b/docs/tempo/website/configuration/_index.md @@ -121,7 +121,6 @@ storage: backend: gcs # store traces in gcs gcs: bucket_name: ops-tools-tracing-ops # store traces in this bucket - blocklist_poll: 5m # how often to repoll the backend for new blocks blocklist_poll_concurrency: 50 # optional. Number of blocks to process in parallel during polling. Default is 50. cache: memcached # optional cache configuration @@ -138,6 +137,11 @@ storage: queue_depth: 2000 # length of job queue wal: path: /var/tempo/wal # where to store the head blocks while they are being appended to + encoding: none # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd + block: + bloom_filter_false_positive: .05 # bloom filter false positive rate. lower values create larger filters but fewer false positives + index_downsample_bytes: 1_000_000 # number of bytes per index record + encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd ``` ## Memberlist diff --git a/docs/tempo/website/configuration/compression.md b/docs/tempo/website/configuration/compression.md index dc1bac98e1b..acf7efd5b48 100644 --- a/docs/tempo/website/configuration/compression.md +++ b/docs/tempo/website/configuration/compression.md @@ -29,4 +29,18 @@ The following options are supported: It is important to note that although all of these compression formats are supported in Tempo, at Grafana we use zstd and it's possible/probable that the other compression algorithms may have issue at scale. Please -file an issue if you stumble upon any problems! \ No newline at end of file +file an issue if you stumble upon any problems! + +## WAL + +The WAL also supports compression. By default this is turned off because it comes with a small performance penalty. +However, it does reduce disk i/o and add checksums to the WAL which are valuable in higher volume installations. + +``` +storage: + trace: + wal: + encoding: none +``` + +If WAL compression is turned on it is recommend to use snappy. All of the above options are supported. \ No newline at end of file diff --git a/example/docker-compose/etc/tempo-azure.yaml b/example/docker-compose/etc/tempo-azure.yaml index f00fa7858a7..abe80b8c7b9 100644 --- a/example/docker-compose/etc/tempo-azure.yaml +++ b/example/docker-compose/etc/tempo-azure.yaml @@ -39,6 +39,7 @@ storage: encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: path: /tmp/tempo/wal # where to store the the wal locally + encoding: none # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd azure: container-name: tempo # how to store data in azure endpoint-suffix: azurite:10000 diff --git a/example/docker-compose/etc/tempo-gcs-fake.yaml b/example/docker-compose/etc/tempo-gcs-fake.yaml index ed81e74c9c2..36979af3c97 100644 --- a/example/docker-compose/etc/tempo-gcs-fake.yaml +++ b/example/docker-compose/etc/tempo-gcs-fake.yaml @@ -40,6 +40,7 @@ storage: encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: path: /tmp/tempo/wal # where to store the the wal locally + encoding: none # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd gcs: bucket_name: tempo endpoint: https://gcs:4443/storage/v1/ diff --git a/example/docker-compose/etc/tempo-local.yaml b/example/docker-compose/etc/tempo-local.yaml index 0843053e85e..2e83856c3b4 100644 --- a/example/docker-compose/etc/tempo-local.yaml +++ b/example/docker-compose/etc/tempo-local.yaml @@ -39,6 +39,7 @@ storage: encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: path: /tmp/tempo/wal # where to store the the wal locally + encoding: none # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd local: path: /tmp/tempo/blocks pool: diff --git a/example/docker-compose/etc/tempo-s3-minio.yaml b/example/docker-compose/etc/tempo-s3-minio.yaml index b1d7a1b914a..82c9223cf06 100644 --- a/example/docker-compose/etc/tempo-s3-minio.yaml +++ b/example/docker-compose/etc/tempo-s3-minio.yaml @@ -40,6 +40,7 @@ storage: encoding: zstd # block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd wal: path: /tmp/tempo/wal # where to store the the wal locally + encoding: none # wal encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd s3: bucket: tempo # how to store data in s3 endpoint: minio:9000 diff --git a/modules/storage/config.go b/modules/storage/config.go index 97703b5e316..d43485a2588 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -33,6 +33,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Trace.WAL = &wal.Config{} f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.") + cfg.Trace.WAL.Encoding = backend.EncNone cfg.Trace.Block = &encoding.BlockConfig{} f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom False Positive.") diff --git a/tempodb/backend/readerat.go b/tempodb/backend/readerat.go index 401b58416c2..72457305a87 100644 --- a/tempodb/backend/readerat.go +++ b/tempodb/backend/readerat.go @@ -4,6 +4,8 @@ import ( "context" "io" "io/ioutil" + + "github.com/grafana/tempo/tempodb/encoding/common" ) // ContextReader is an io.ReaderAt interface that passes context. It is used to simplify access to backend objects @@ -11,6 +13,9 @@ import ( type ContextReader interface { ReadAt(ctx context.Context, p []byte, off int64) (int, error) ReadAll(ctx context.Context) ([]byte, error) + + // Return an io.Reader representing the underlying. May not be supported by all implementations + Reader() (io.Reader, error) } // backendReader is a shim that allows a backend.Reader to be used as a ContextReader @@ -40,6 +45,11 @@ func (b *backendReader) ReadAll(ctx context.Context) ([]byte, error) { return b.r.Read(ctx, b.name, b.meta.BlockID, b.meta.TenantID) } +// Reader implements ContextReader +func (b *backendReader) Reader() (io.Reader, error) { + return nil, common.ErrUnsupported +} + // AllReader is an interface that supports both io.Reader and io.ReaderAt methods type AllReader interface { io.Reader @@ -67,3 +77,8 @@ func (r *allReader) ReadAt(ctx context.Context, p []byte, off int64) (int, error func (r *allReader) ReadAll(ctx context.Context) ([]byte, error) { return ioutil.ReadAll(r.r) } + +// Reader implements ContextReader +func (r *allReader) Reader() (io.Reader, error) { + return r.r, nil +} diff --git a/tempodb/encoding/appender.go b/tempodb/encoding/appender.go index 7f8bf04d4b8..150f2505c99 100644 --- a/tempodb/encoding/appender.go +++ b/tempodb/encoding/appender.go @@ -33,7 +33,12 @@ func NewAppender(dataWriter common.DataWriter) Appender { // Append appends the id/object to the writer. Note that the caller is giving up ownership of the two byte arrays backing the slices. // Copies should be made and passed in if this is a problem func (a *appender) Append(id common.ID, b []byte) error { - length, err := a.dataWriter.Write(id, b) + _, err := a.dataWriter.Write(id, b) + if err != nil { + return err + } + + bytesWritten, err := a.dataWriter.CutPage() if err != nil { return err } @@ -46,10 +51,11 @@ func (a *appender) Append(id common.ID, b []byte) error { a.records[i] = &common.Record{ ID: id, Start: a.currentOffset, - Length: uint32(length), + Length: uint32(bytesWritten), } - a.currentOffset += uint64(length) + a.currentOffset += uint64(bytesWritten) + return nil } diff --git a/tempodb/encoding/backend_block.go b/tempodb/encoding/backend_block.go index d61aa3cd27a..b3cf5000bab 100644 --- a/tempodb/encoding/backend_block.go +++ b/tempodb/encoding/backend_block.go @@ -14,7 +14,7 @@ import ( // BackendBlock represents a block already in the backend. type BackendBlock struct { - encoding versionedEncoding + encoding VersionedEncoding meta *backend.BlockMeta reader backend.Reader @@ -23,17 +23,9 @@ type BackendBlock struct { // NewBackendBlock returns a BackendBlock for the given backend.BlockMeta // It is version aware. func NewBackendBlock(meta *backend.BlockMeta, r backend.Reader) (*BackendBlock, error) { - var encoding versionedEncoding - - switch meta.Version { - case "v0": - encoding = v0Encoding{} - case "v1": - encoding = v1Encoding{} - case "v2": - encoding = v2Encoding{} - default: - return nil, fmt.Errorf("%s is not a valid block version", meta.Version) + encoding, err := FromVersion(meta.Version) + if err != nil { + return nil, err } return &BackendBlock{ @@ -76,20 +68,20 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) { } indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader) - indexReader, err := b.encoding.newIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords)) + indexReader, err := b.encoding.NewIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords)) if err != nil { return nil, fmt.Errorf("error building index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } ra := backend.NewContextReader(b.meta, nameObjects, b.reader) - dataReader, err := b.encoding.newDataReader(ra, b.meta.Encoding) + dataReader, err := b.encoding.NewDataReader(ra, b.meta.Encoding) if err != nil { return nil, fmt.Errorf("error building page reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } defer dataReader.Close() // passing nil for objectCombiner here. this is fine b/c a backend block should never have dupes - finder := NewPagedFinder(indexReader, dataReader, nil, b.encoding.newObjectReaderWriter()) + finder := NewPagedFinder(indexReader, dataReader, nil, b.encoding.NewObjectReaderWriter()) objectBytes, err := finder.Find(ctx, id) if err != nil { @@ -103,7 +95,7 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) { func (b *BackendBlock) Iterator(chunkSizeBytes uint32) (Iterator, error) { // read index ra := backend.NewContextReader(b.meta, nameObjects, b.reader) - dataReader, err := b.encoding.newDataReader(ra, b.meta.Encoding) + dataReader, err := b.encoding.NewDataReader(ra, b.meta.Encoding) if err != nil { return nil, fmt.Errorf("failed to create dataReader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } @@ -113,12 +105,12 @@ func (b *BackendBlock) Iterator(chunkSizeBytes uint32) (Iterator, error) { return nil, err } - return newPagedIterator(chunkSizeBytes, reader, dataReader, b.encoding.newObjectReaderWriter()), nil + return newPagedIterator(chunkSizeBytes, reader, dataReader, b.encoding.NewObjectReaderWriter()), nil } func (b *BackendBlock) NewIndexReader() (common.IndexReader, error) { indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader) - reader, err := b.encoding.newIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords)) + reader, err := b.encoding.NewIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords)) if err != nil { return nil, fmt.Errorf("failed to create index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } diff --git a/tempodb/encoding/common/types.go b/tempodb/encoding/common/types.go index 009103ec1be..2a4a01d9f53 100644 --- a/tempodb/encoding/common/types.go +++ b/tempodb/encoding/common/types.go @@ -2,11 +2,15 @@ package common import ( "context" + "fmt" "io" ) // This file contains types that need to be referenced by both the ./encoding and ./encoding/vX packages. // It primarily exists here to break dependency loops. +var ( + ErrUnsupported = fmt.Errorf("unsupported") +) // ID in TempoDB type ID []byte @@ -31,6 +35,9 @@ type ObjectCombiner interface { type DataReader interface { Read(context.Context, []*Record) ([][]byte, error) Close() + + // NextPage can be used to iterate at a page at a time. May return ErrUnsupported for older formats + NextPage() ([]byte, error) } // IndexReader is used to abstract away the details of an index. Currently diff --git a/tempodb/encoding/iterator_record.go b/tempodb/encoding/iterator_record.go index d2dfba900e5..978f367103a 100644 --- a/tempodb/encoding/iterator_record.go +++ b/tempodb/encoding/iterator_record.go @@ -3,26 +3,27 @@ package encoding import ( "bytes" "context" - "io" + "errors" "github.com/grafana/tempo/tempodb/encoding/common" ) type recordIterator struct { - records []*common.Record - ra io.ReaderAt + records []*common.Record + objectRW common.ObjectReaderWriter + dataR common.DataReader currentIterator Iterator } // NewRecordIterator returns a recordIterator. This iterator is used for iterating through // a series of objects by reading them one at a time from Records. -func NewRecordIterator(r []*common.Record, ra io.ReaderAt, objectRW common.ObjectReaderWriter) Iterator { +func NewRecordIterator(r []*common.Record, dataR common.DataReader, objectRW common.ObjectReaderWriter) Iterator { return &recordIterator{ records: r, - ra: ra, objectRW: objectRW, + dataR: dataR, } } @@ -39,14 +40,15 @@ func (i *recordIterator) Next(ctx context.Context) (common.ID, []byte, error) { // read the next record and create an iterator if len(i.records) > 0 { - record := i.records[0] - - buff := make([]byte, record.Length) - _, err := i.ra.ReadAt(buff, int64(record.Start)) + pages, err := i.dataR.Read(ctx, i.records[:1]) if err != nil { return nil, nil, err } + if len(pages) == 0 { + return nil, nil, errors.New("unexpected 0 length pages from dataReader") + } + buff := pages[0] i.currentIterator = NewIterator(bytes.NewReader(buff), i.objectRW) i.records = i.records[1:] @@ -58,4 +60,5 @@ func (i *recordIterator) Next(ctx context.Context) (common.ID, []byte, error) { } func (i *recordIterator) Close() { + i.dataR.Close() } diff --git a/tempodb/encoding/iterator_recordless.go b/tempodb/encoding/iterator_recordless.go new file mode 100644 index 00000000000..386d23e30af --- /dev/null +++ b/tempodb/encoding/iterator_recordless.go @@ -0,0 +1,44 @@ +package encoding + +import ( + "context" + "io" + + "github.com/grafana/tempo/tempodb/encoding/common" +) + +type recordlessIterator struct { + o common.ObjectReaderWriter + d common.DataReader + + currentPage []byte +} + +// NewRecordlessIterator iterates over pages from a datareader directly without requiring records +func NewRecordlessIterator(d common.DataReader, o common.ObjectReaderWriter) Iterator { + return &recordlessIterator{ + o: o, + d: d, + } +} + +func (i *recordlessIterator) Next(_ context.Context) (common.ID, []byte, error) { + var ( + id common.ID + obj []byte + err error + ) + i.currentPage, id, obj, err = i.o.UnmarshalAndAdvanceBuffer(i.currentPage) + if err == io.EOF { + i.currentPage, err = i.d.NextPage() + if err != nil { + return nil, nil, err + } + i.currentPage, id, obj, err = i.o.UnmarshalAndAdvanceBuffer(i.currentPage) + } + return id, obj, err +} + +func (i *recordlessIterator) Close() { + i.d.Close() +} diff --git a/tempodb/encoding/streaming_block.go b/tempodb/encoding/streaming_block.go index 10e91c28b40..505bb2c3315 100644 --- a/tempodb/encoding/streaming_block.go +++ b/tempodb/encoding/streaming_block.go @@ -11,7 +11,7 @@ import ( ) type StreamingBlock struct { - encoding versionedEncoding + encoding VersionedEncoding compactedMeta *backend.BlockMeta inMetas []*backend.BlockMeta @@ -32,7 +32,7 @@ func NewStreamingBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas [] } c := &StreamingBlock{ - encoding: latestEncoding(), + encoding: LatestEncoding(), compactedMeta: backend.NewBlockMeta(tenantID, id, currentVersion, cfg.Encoding), bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP), inMetas: metas, @@ -40,7 +40,7 @@ func NewStreamingBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas [] } c.appendBuffer = &bytes.Buffer{} - dataWriter, err := c.encoding.newDataWriter(c.appendBuffer, cfg.Encoding) + dataWriter, err := c.encoding.NewDataWriter(c.appendBuffer, cfg.Encoding) if err != nil { return nil, fmt.Errorf("failed to create page writer: %w", err) } @@ -117,7 +117,7 @@ func (c *StreamingBlock) Complete(ctx context.Context, tracker backend.AppendTra records := c.appender.Records() meta := c.BlockMeta() - indexWriter := c.encoding.newIndexWriter(c.cfg.IndexPageSizeBytes) + indexWriter := c.encoding.NewIndexWriter(c.cfg.IndexPageSizeBytes) indexBytes, err := indexWriter.Write(records) if err != nil { return 0, err diff --git a/tempodb/encoding/v0/data_reader.go b/tempodb/encoding/v0/data_reader.go index baec5f7199c..7f2650c1243 100644 --- a/tempodb/encoding/v0/data_reader.go +++ b/tempodb/encoding/v0/data_reader.go @@ -2,6 +2,7 @@ package v0 import ( "context" + "encoding/binary" "fmt" "github.com/grafana/tempo/tempodb/backend" @@ -64,5 +65,31 @@ func (r *dataReader) Read(ctx context.Context, records []*common.Record) ([][]by return slicePages, nil } +// Close implements common.DataReader func (r *dataReader) Close() { } + +// NextPage implements common.DataReader +func (r *dataReader) NextPage() ([]byte, error) { + reader, err := r.r.Reader() + if err != nil { + return nil, err + } + + // v0 pages are just single objects. this method will return one object at a time from the encapsulated reader + var totalLength uint32 + err = binary.Read(reader, binary.LittleEndian, &totalLength) + if err != nil { + return nil, err + } + + page := make([]byte, totalLength) + binary.LittleEndian.PutUint32(page, totalLength) + + _, err = reader.Read(page[uint32Size:]) + if err != nil { + return nil, err + } + + return page, nil +} diff --git a/tempodb/encoding/v0/data_reader_test.go b/tempodb/encoding/v0/data_reader_test.go index 149eec3d7a8..456d6e7d812 100644 --- a/tempodb/encoding/v0/data_reader_test.go +++ b/tempodb/encoding/v0/data_reader_test.go @@ -3,15 +3,17 @@ package v0 import ( "bytes" "context" + "io" + "math/rand" "testing" "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestDataReader(t *testing.T) { - tests := []struct { readerBytes []byte records []*common.Record @@ -136,3 +138,40 @@ func TestDataReader(t *testing.T) { assert.Equal(t, tc.expectedBytes, actual) } } + +func TestWriterReaderNextPage(t *testing.T) { + buff := bytes.NewBuffer(nil) + writer := NewDataWriter(buff) + + pageCount := 10 + pages := make([][]byte, 0, pageCount) + for i := 0; i < pageCount; i++ { + data := make([]byte, 200) + rand.Read(data) + pages = append(pages, data) + + _, err := writer.Write([]byte{0x01}, data) + require.NoError(t, err) + _, err = writer.CutPage() + require.NoError(t, err) + } + err := writer.Complete() + require.NoError(t, err) + + reader := NewDataReader(backend.NewContextReaderWithAllReader(bytes.NewReader(buff.Bytes()))) + i := 0 + for { + page, err := reader.NextPage() + if err == io.EOF { + break + } + require.NoError(t, err) + + _, id, obj, err := staticObject.UnmarshalAndAdvanceBuffer(page) + require.NoError(t, err) + + assert.Equal(t, pages[i], obj) + assert.Equal(t, []byte{0x01}, []byte(id)) + i++ + } +} diff --git a/tempodb/encoding/v1/data_reader.go b/tempodb/encoding/v1/data_reader.go index c266821132b..62d2acf5712 100644 --- a/tempodb/encoding/v1/data_reader.go +++ b/tempodb/encoding/v1/data_reader.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" + v0 "github.com/grafana/tempo/tempodb/encoding/v0" ) type dataReader struct { @@ -17,8 +18,13 @@ type dataReader struct { compressedReader io.Reader } -// NewDataReader is useful for nesting compression inside of a different reader -func NewDataReader(r common.DataReader, encoding backend.Encoding) (common.DataReader, error) { +// NewDataReader creates a datareader that supports compression +func NewDataReader(r backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { + return NewNestedDataReader(v0.NewDataReader(r), encoding) +} + +// NewNestedDataReader is useful for nesting compression inside of a different reader +func NewNestedDataReader(r common.DataReader, encoding backend.Encoding) (common.DataReader, error) { pool, err := getReaderPool(encoding) if err != nil { return nil, err @@ -42,16 +48,12 @@ func (r *dataReader) Read(ctx context.Context, records []*common.Record) ([][]by // now decompress decompressedPages := make([][]byte, 0, len(compressedPages)) for _, page := range compressedPages { - if r.compressedReader == nil { - r.compressedReader, err = r.pool.GetReader(bytes.NewReader(page)) - } else { - r.compressedReader, err = r.pool.ResetReader(bytes.NewReader(page), r.compressedReader) - } + reader, err := r.getCompressedReader(page) if err != nil { return nil, err } - page, err := ioutil.ReadAll(r.compressedReader) + page, err := ioutil.ReadAll(reader) if err != nil { return nil, err } @@ -69,3 +71,30 @@ func (r *dataReader) Close() { r.pool.PutReader(r.compressedReader) } } + +// NextPage implements common.DataReader (kind of) +func (r *dataReader) NextPage() ([]byte, error) { + page, err := r.dataReader.NextPage() + if err != nil { + return nil, err + } + reader, err := r.getCompressedReader(page) + if err != nil { + return nil, err + } + page, err = ioutil.ReadAll(reader) + if err != nil { + return nil, err + } + return page, nil +} + +func (r *dataReader) getCompressedReader(page []byte) (io.Reader, error) { + var err error + if r.compressedReader == nil { + r.compressedReader, err = r.pool.GetReader(bytes.NewReader(page)) + } else { + r.compressedReader, err = r.pool.ResetReader(bytes.NewReader(page), r.compressedReader) + } + return r.compressedReader, err +} diff --git a/tempodb/encoding/v1/data_reader_test.go b/tempodb/encoding/v1/data_reader_test.go index ee21812706b..cf41ebaa37b 100644 --- a/tempodb/encoding/v1/data_reader_test.go +++ b/tempodb/encoding/v1/data_reader_test.go @@ -7,7 +7,6 @@ import ( "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" - v0 "github.com/grafana/tempo/tempodb/encoding/v0" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -40,7 +39,7 @@ func TestAllEncodings(t *testing.T) { require.NoError(t, err) encryptedBytes := buff.Bytes() - reader, err := NewDataReader(v0.NewDataReader(backend.NewContextReaderWithAllReader(bytes.NewReader(encryptedBytes))), enc) + reader, err := NewDataReader(backend.NewContextReaderWithAllReader(bytes.NewReader(encryptedBytes)), enc) require.NoError(t, err) defer reader.Close() diff --git a/tempodb/encoding/v2/data_reader.go b/tempodb/encoding/v2/data_reader.go index 55dba6a8351..738579946ca 100644 --- a/tempodb/encoding/v2/data_reader.go +++ b/tempodb/encoding/v2/data_reader.go @@ -5,11 +5,13 @@ import ( "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" + v0 "github.com/grafana/tempo/tempodb/encoding/v0" v1 "github.com/grafana/tempo/tempodb/encoding/v1" ) type dataReader struct { - dataReader common.DataReader + contextReader backend.ContextReader + dataReader common.DataReader } // constDataHeader is a singleton data header. the data header is @@ -18,13 +20,14 @@ type dataReader struct { var constDataHeader = &dataHeader{} // NewDataReader constructs a v2 DataReader that handles paged...reading -func NewDataReader(r common.DataReader, encoding backend.Encoding) (common.DataReader, error) { +func NewDataReader(r backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { v2DataReader := &dataReader{ - dataReader: r, + contextReader: r, + dataReader: v0.NewDataReader(r), } // wrap the paged reader in a compressed/v1 reader and return that - v1DataReader, err := v1.NewDataReader(v2DataReader, encoding) + v1DataReader, err := v1.NewNestedDataReader(v2DataReader, encoding) if err != nil { return nil, err } @@ -32,6 +35,7 @@ func NewDataReader(r common.DataReader, encoding backend.Encoding) (common.DataR return v1DataReader, nil } +// Read implements common.DataReader func (r *dataReader) Read(ctx context.Context, records []*common.Record) ([][]byte, error) { v0Pages, err := r.dataReader.Read(ctx, records) if err != nil { @@ -54,3 +58,18 @@ func (r *dataReader) Read(ctx context.Context, records []*common.Record) ([][]by func (r *dataReader) Close() { r.dataReader.Close() } + +// NextPage implements common.DataReader +func (r *dataReader) NextPage() ([]byte, error) { + reader, err := r.contextReader.Reader() + if err != nil { + return nil, err + } + + page, err := unmarshalPageFromReader(reader, constDataHeader) + if err != nil { + return nil, err + } + + return page.data, nil +} diff --git a/tempodb/encoding/v2/page.go b/tempodb/encoding/v2/page.go index fd35a3d281a..fc472afccd1 100644 --- a/tempodb/encoding/v2/page.go +++ b/tempodb/encoding/v2/page.go @@ -51,6 +51,42 @@ func unmarshalPageFromBytes(b []byte, header pageHeader) (*page, error) { }, nil } +func unmarshalPageFromReader(r io.Reader, header pageHeader) (*page, error) { + totalHeaderSize := baseHeaderSize + header.headerLength() + + var totalLength uint32 + var headerLength uint16 + + err := binary.Read(r, binary.LittleEndian, &totalLength) + if err != nil { + return nil, err + } + err = binary.Read(r, binary.LittleEndian, &headerLength) + if err != nil { + return nil, err + } + headerBytes := make([]byte, headerLength) + _, err = r.Read(headerBytes) + if err != nil { + return nil, err + } + err = header.unmarshalHeader(headerBytes) + if err != nil { + return nil, err + } + dataLength := int(totalLength) - totalHeaderSize + pageBytes := make([]byte, dataLength) + _, err = r.Read(pageBytes) + if err != nil { + return nil, err + } + + return &page{ + data: pageBytes, + header: header, + }, nil +} + // marshalPageToWriter marshals the page bytes to the passed writer func marshalPageToWriter(b []byte, w io.Writer, header pageHeader) (int, error) { var headerLength uint16 diff --git a/tempodb/encoding/v2/page_test.go b/tempodb/encoding/v2/page_test.go index 99ad141312d..c8b84b6ac46 100644 --- a/tempodb/encoding/v2/page_test.go +++ b/tempodb/encoding/v2/page_test.go @@ -67,6 +67,11 @@ func TestPageMarshalToWriter(t *testing.T) { require.NoError(t, err) assert.Equal(t, tc.expected, page.data) assert.Equal(t, tc.field, page.header.(*testHeader).field) + + page, err = unmarshalPageFromReader(buff, &testHeader{}) + require.NoError(t, err) + assert.Equal(t, tc.expected, page.data) + assert.Equal(t, tc.field, page.header.(*testHeader).field) } } diff --git a/tempodb/encoding/versioned.go b/tempodb/encoding/versioned.go index 73d581f276a..cf6aeb60d53 100644 --- a/tempodb/encoding/versioned.go +++ b/tempodb/encoding/versioned.go @@ -1,6 +1,7 @@ package encoding import ( + "fmt" "io" "github.com/grafana/tempo/tempodb/backend" @@ -12,96 +13,121 @@ import ( const currentVersion = "v2" -// latestEncoding is used by Compactor and Complete block -func latestEncoding() versionedEncoding { +// VersionedEncoding has a whole bunch of versioned functionality. This is +// currently quite sloppy and could easily be tightened up to just a few methods +// but it is what it is for now! +type VersionedEncoding interface { + Version() string + + NewDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) + NewIndexWriter(pageSizeBytes int) common.IndexWriter + + NewDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) + NewIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) + + NewObjectReaderWriter() common.ObjectReaderWriter + NewRecordReaderWriter() common.RecordReaderWriter +} + +// FromVersion returns a versioned encoding for the provided string +func FromVersion(v string) (VersionedEncoding, error) { + switch v { + case "v0": + return v0Encoding{}, nil + case "v1": + return v1Encoding{}, nil + case "v2": + return v2Encoding{}, nil + } + + return nil, fmt.Errorf("%s is not a valid block version", v) +} + +// LatestEncoding is used by Compactor and Complete block +func LatestEncoding() VersionedEncoding { return v2Encoding{} } // allEncodings returns all encodings -func allEncodings() []versionedEncoding { - return []versionedEncoding{ +func allEncodings() []VersionedEncoding { + return []VersionedEncoding{ v0Encoding{}, v1Encoding{}, v2Encoding{}, } } -// versionedEncoding has a whole bunch of versioned functionality. This is -// currently quite sloppy and could easily be tightened up to just a few methods -// but it is what it is for now! -type versionedEncoding interface { - newDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) - newIndexWriter(pageSizeBytes int) common.IndexWriter - - newDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) - newIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) - - newObjectReaderWriter() common.ObjectReaderWriter - newRecordReaderWriter() common.RecordReaderWriter -} - // v0Encoding type v0Encoding struct{} -func (v v0Encoding) newIndexWriter(pageSizeBytes int) common.IndexWriter { +func (v v0Encoding) Version() string { + return "v0" +} +func (v v0Encoding) NewIndexWriter(pageSizeBytes int) common.IndexWriter { return v0.NewIndexWriter() } -func (v v0Encoding) newDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { +func (v v0Encoding) NewDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { return v0.NewDataWriter(writer), nil // ignore encoding. v0 DataWriter writes raw bytes } -func (v v0Encoding) newIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { +func (v v0Encoding) NewIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { return v0.NewIndexReader(ra) } -func (v v0Encoding) newDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { +func (v v0Encoding) NewDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { return v0.NewDataReader(ra), nil } -func (v v0Encoding) newObjectReaderWriter() common.ObjectReaderWriter { +func (v v0Encoding) NewObjectReaderWriter() common.ObjectReaderWriter { return v0.NewObjectReaderWriter() } -func (v v0Encoding) newRecordReaderWriter() common.RecordReaderWriter { +func (v v0Encoding) NewRecordReaderWriter() common.RecordReaderWriter { return v0.NewRecordReaderWriter() } // v1Encoding type v1Encoding struct{} -func (v v1Encoding) newIndexWriter(pageSizeBytes int) common.IndexWriter { +func (v v1Encoding) Version() string { + return "v1" +} +func (v v1Encoding) NewIndexWriter(pageSizeBytes int) common.IndexWriter { return v1.NewIndexWriter() } -func (v v1Encoding) newDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { +func (v v1Encoding) NewDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { return v1.NewDataWriter(writer, encoding) } -func (v v1Encoding) newIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { +func (v v1Encoding) NewIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { return v1.NewIndexReader(ra) } -func (v v1Encoding) newDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { - return v1.NewDataReader(v0.NewDataReader(ra), encoding) +func (v v1Encoding) NewDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { + return v1.NewDataReader(ra, encoding) } -func (v v1Encoding) newObjectReaderWriter() common.ObjectReaderWriter { +func (v v1Encoding) NewObjectReaderWriter() common.ObjectReaderWriter { return v1.NewObjectReaderWriter() } -func (v v1Encoding) newRecordReaderWriter() common.RecordReaderWriter { +func (v v1Encoding) NewRecordReaderWriter() common.RecordReaderWriter { return v1.NewRecordReaderWriter() } // v2Encoding type v2Encoding struct{} -func (v v2Encoding) newIndexWriter(pageSizeBytes int) common.IndexWriter { +func (v v2Encoding) Version() string { + return "v2" +} +func (v v2Encoding) NewIndexWriter(pageSizeBytes int) common.IndexWriter { return v2.NewIndexWriter(pageSizeBytes) } -func (v v2Encoding) newDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { +func (v v2Encoding) NewDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { return v2.NewDataWriter(writer, encoding) } -func (v v2Encoding) newIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { +func (v v2Encoding) NewIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { return v2.NewIndexReader(ra, pageSizeBytes, totalPages) } -func (v v2Encoding) newDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { - return v2.NewDataReader(v0.NewDataReader(ra), encoding) +func (v v2Encoding) NewDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { + return v2.NewDataReader(ra, encoding) } -func (v v2Encoding) newObjectReaderWriter() common.ObjectReaderWriter { +func (v v2Encoding) NewObjectReaderWriter() common.ObjectReaderWriter { return v2.NewObjectReaderWriter() } -func (v v2Encoding) newRecordReaderWriter() common.RecordReaderWriter { +func (v v2Encoding) NewRecordReaderWriter() common.RecordReaderWriter { return v2.NewRecordReaderWriter() } diff --git a/tempodb/encoding/versioned_test.go b/tempodb/encoding/versioned_test.go index 544e9a35914..efa4db15f61 100644 --- a/tempodb/encoding/versioned_test.go +++ b/tempodb/encoding/versioned_test.go @@ -11,15 +11,26 @@ import ( "github.com/stretchr/testify/require" ) +func TestFromVersionErrors(t *testing.T) { + encoding, err := FromVersion("definitely-not-a-real-version") + assert.Error(t, err) + assert.Nil(t, encoding) +} + func TestAllVersions(t *testing.T) { for _, v := range allEncodings() { + encoding, err := FromVersion(v.Version()) + + require.Equal(t, v.Version(), encoding.Version()) + require.NoError(t, err) + for _, e := range backend.SupportedEncoding { testDataWriterReader(t, v, e) } } } -func testDataWriterReader(t *testing.T, v versionedEncoding, e backend.Encoding) { +func testDataWriterReader(t *testing.T, v VersionedEncoding, e backend.Encoding) { tests := []struct { readerBytes []byte }{ @@ -33,7 +44,7 @@ func testDataWriterReader(t *testing.T, v versionedEncoding, e backend.Encoding) for _, tc := range tests { buff := bytes.NewBuffer([]byte{}) - dataWriter, err := v.newDataWriter(buff, e) + dataWriter, err := v.NewDataWriter(buff, e) require.NoError(t, err) _, err = dataWriter.Write([]byte{0x01}, tc.readerBytes) @@ -46,7 +57,7 @@ func testDataWriterReader(t *testing.T, v versionedEncoding, e backend.Encoding) require.NoError(t, err) reader := bytes.NewReader(buff.Bytes()) - dataReader, err := v.newDataReader(backend.NewContextReaderWithAllReader(reader), e) + dataReader, err := v.NewDataReader(backend.NewContextReaderWithAllReader(reader), e) require.NoError(t, err) defer dataReader.Close() @@ -59,7 +70,7 @@ func testDataWriterReader(t *testing.T, v versionedEncoding, e backend.Encoding) require.NoError(t, err) require.Len(t, actual, 1) - i := NewIterator(bytes.NewReader(actual[0]), v.newObjectReaderWriter()) + i := NewIterator(bytes.NewReader(actual[0]), v.NewObjectReaderWriter()) defer i.Close() id, obj, err := i.Next(context.Background()) diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 78e0eb555f6..b5c74722738 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -8,28 +8,28 @@ import ( "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" - v0 "github.com/grafana/tempo/tempodb/encoding/v0" ) -// these values should never be used. these dummy values can help detect -// if they leak elsewhere. append blocks are not versioned and do not -// support different encodings -const appendBlockVersion = "append" -const appendBlockEncoding = backend.EncNone - // AppendBlock is a block that is actively used to append new objects to. It stores all data in the appendFile // in the order it was received and an in memory sorted index. type AppendBlock struct { block + encoding encoding.VersionedEncoding appendFile *os.File appender encoding.Appender } -func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBlock, error) { +func newAppendBlock(id uuid.UUID, tenantID string, filepath string, e backend.Encoding) (*AppendBlock, error) { + v, err := encoding.FromVersion("v2") // let's pin wal files instead of tracking latest for safety + if err != nil { + return nil, err + } + h := &AppendBlock{ + encoding: v, block: block{ - meta: backend.NewBlockMeta(tenantID, id, appendBlockVersion, appendBlockEncoding), + meta: backend.NewBlockMeta(tenantID, id, v.Version(), e), filepath: filepath, }, } @@ -41,7 +41,13 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBloc return nil, err } h.appendFile = f - h.appender = encoding.NewAppender(v0.NewDataWriter(f)) + + dataWriter, err := h.encoding.NewDataWriter(f, e) + if err != nil { + return nil, err + } + + h.appender = encoding.NewAppender(dataWriter) return h, nil } @@ -82,7 +88,12 @@ func (h *AppendBlock) GetIterator(combiner common.ObjectCombiner) (encoding.Iter return nil, err } - iterator := encoding.NewRecordIterator(records, readFile, v0.NewObjectReaderWriter()) + dataReader, err := h.encoding.NewDataReader(backend.NewContextReaderWithAllReader(readFile), h.meta.Encoding) + if err != nil { + return nil, err + } + + iterator := encoding.NewRecordIterator(records, dataReader, h.encoding.NewObjectReaderWriter()) iterator, err = encoding.NewDedupingIterator(iterator, combiner) if err != nil { return nil, err @@ -98,9 +109,12 @@ func (h *AppendBlock) Find(id common.ID, combiner common.ObjectCombiner) ([]byte return nil, err } - dataReader := v0.NewDataReader(backend.NewContextReaderWithAllReader(file)) + dataReader, err := h.encoding.NewDataReader(backend.NewContextReaderWithAllReader(file), h.meta.Encoding) + if err != nil { + return nil, err + } defer dataReader.Close() - finder := encoding.NewPagedFinder(common.Records(records), dataReader, combiner, v0.NewObjectReaderWriter()) + finder := encoding.NewPagedFinder(common.Records(records), dataReader, combiner, h.encoding.NewObjectReaderWriter()) return finder.Find(context.Background(), id) } diff --git a/tempodb/wal/block.go b/tempodb/wal/block.go index 8f06ae93449..7f15a8cdedf 100644 --- a/tempodb/wal/block.go +++ b/tempodb/wal/block.go @@ -3,8 +3,11 @@ package wal import ( "fmt" "os" + "path/filepath" + "strings" "sync" + "github.com/google/uuid" "github.com/grafana/tempo/tempodb/backend" ) @@ -17,7 +20,7 @@ type block struct { } func (b *block) fullFilename() string { - return fmt.Sprintf("%s/%v:%v", b.filepath, b.meta.BlockID, b.meta.TenantID) + return filepath.Join(b.filepath, fmt.Sprintf("%v:%v:%v:%v", b.meta.BlockID, b.meta.TenantID, b.meta.Version, b.meta.Encoding)) } func (b *block) file() (*os.File, error) { @@ -32,3 +35,37 @@ func (b *block) file() (*os.File, error) { return b.readFile, err } + +func parseFilename(name string) (uuid.UUID, string, string, backend.Encoding, error) { + splits := strings.Split(name, ":") + + if len(splits) != 2 && len(splits) != 4 { + return uuid.UUID{}, "", "", backend.EncNone, fmt.Errorf("unable to parse %s. unexpected number of segments", name) + } + + blockIDString := splits[0] + tenantID := splits[1] + + version := "v0" + encodingString := backend.EncNone.String() + if len(splits) == 4 { + version = splits[2] + encodingString = splits[3] + } + + blockID, err := uuid.Parse(blockIDString) + if err != nil { + return uuid.UUID{}, "", "", backend.EncNone, fmt.Errorf("unable to parse %s. error parsing uuid: %w", name, err) + } + + encoding, err := backend.ParseEncoding(encodingString) + if err != nil { + return uuid.UUID{}, "", "", backend.EncNone, fmt.Errorf("unable to parse %s. error parsing encoding: %w", name, err) + } + + if len(tenantID) == 0 || len(version) == 0 { + return uuid.UUID{}, "", "", backend.EncNone, fmt.Errorf("unable to parse %s. missing fields", name) + } + + return blockID, tenantID, version, encoding, nil +} diff --git a/tempodb/wal/block_test.go b/tempodb/wal/block_test.go new file mode 100644 index 00000000000..d4b2aaad7b5 --- /dev/null +++ b/tempodb/wal/block_test.go @@ -0,0 +1,193 @@ +package wal + +import ( + "testing" + + "github.com/google/uuid" + "github.com/grafana/tempo/tempodb/backend" + "github.com/stretchr/testify/assert" +) + +func TestFullFilename(t *testing.T) { + tests := []struct { + name string + b *block + expected string + }{ + { + name: "ez-mode", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v1", backend.EncNone), + filepath: "/blerg", + }, + expected: "/blerg/123e4567-e89b-12d3-a456-426614174000:foo:v1:none", + }, + { + name: "nopath", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v1", backend.EncNone), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v1:none", + }, + { + name: "gzip", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncGZIP), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:gzip", + }, + { + name: "lz41M", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncLZ4_1M), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:lz4-1M", + }, + { + name: "lz4256k", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncLZ4_256k), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:lz4-256k", + }, + { + name: "lz4M", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncLZ4_4M), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:lz4", + }, + { + name: "lz64k", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncLZ4_64k), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:lz4-64k", + }, + { + name: "snappy", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncSnappy), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:snappy", + }, + { + name: "zstd", + b: &block{ + meta: backend.NewBlockMeta("foo", uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), "v2", backend.EncZstd), + filepath: "", + }, + expected: "123e4567-e89b-12d3-a456-426614174000:foo:v2:zstd", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + actual := tc.b.fullFilename() + assert.Equal(t, tc.expected, actual) + }) + } +} + +func TestParseFilename(t *testing.T) { + tests := []struct { + name string + filename string + expectUUID uuid.UUID + expectTenant string + expectedVersion string + expectedEncoding backend.Encoding + expectError bool + }{ + { + name: "ez-mode", + filename: "123e4567-e89b-12d3-a456-426614174000:foo", + expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), + expectTenant: "foo", + expectedVersion: "v0", + expectedEncoding: backend.EncNone, + }, + { + name: "version and encoding", + filename: "123e4567-e89b-12d3-a456-426614174000:foo:v1:snappy", + expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"), + expectTenant: "foo", + expectedVersion: "v1", + expectedEncoding: backend.EncSnappy, + }, + { + name: "path fails", + filename: "/blerg/123e4567-e89b-12d3-a456-426614174000:foo", + expectError: true, + }, + { + name: "no :", + filename: "123e4567-e89b-12d3-a456-426614174000", + expectError: true, + }, + { + name: "empty string", + filename: "", + expectError: true, + }, + { + name: "bad uuid", + filename: "123e4:foo", + expectError: true, + }, + { + name: "no tenant", + filename: "123e4567-e89b-12d3-a456-426614174000:", + expectError: true, + }, + { + name: "no version", + filename: "123e4567-e89b-12d3-a456-426614174000:test::none", + expectError: true, + }, + { + name: "wrong splits - 5", + filename: "123e4567-e89b-12d3-a456-426614174000:test:test:test:test", + expectError: true, + }, + { + name: "wrong splits - 3", + filename: "123e4567-e89b-12d3-a456-426614174000:test:test", + expectError: true, + }, + { + name: "wrong splits - 1", + filename: "123e4567-e89b-12d3-a456-426614174000", + expectError: true, + }, + { + name: "bad encoding", + filename: "123e4567-e89b-12d3-a456-426614174000:test:v1:asdf", + expectError: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + actualUUID, actualTenant, actualVersion, actualEncoding, err := parseFilename(tc.filename) + + if tc.expectError { + assert.Error(t, err) + return + } + + assert.NoError(t, err) + assert.Equal(t, tc.expectUUID, actualUUID) + assert.Equal(t, tc.expectTenant, actualTenant) + assert.Equal(t, tc.expectedEncoding, actualEncoding) + assert.Equal(t, tc.expectedVersion, actualVersion) + }) + } +} diff --git a/tempodb/wal/replay_block.go b/tempodb/wal/replay_block.go index f3457b05ad1..24ce0c95a4e 100644 --- a/tempodb/wal/replay_block.go +++ b/tempodb/wal/replay_block.go @@ -3,12 +3,33 @@ package wal import ( "os" + "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" - v0 "github.com/grafana/tempo/tempodb/encoding/v0" ) type ReplayBlock struct { block + encoding encoding.VersionedEncoding +} + +func NewReplayBlock(filename string, path string) (*ReplayBlock, error) { + blockID, tenantID, version, e, err := parseFilename(filename) + if err != nil { + return nil, err + } + + v, err := encoding.FromVersion(version) + if err != nil { + return nil, err + } + + return &ReplayBlock{ + block: block{ + meta: backend.NewBlockMeta(tenantID, blockID, version, e), + filepath: path, + }, + encoding: v, + }, nil } func (r *ReplayBlock) Iterator() (encoding.Iterator, error) { @@ -18,7 +39,12 @@ func (r *ReplayBlock) Iterator() (encoding.Iterator, error) { return nil, err } - return encoding.NewIterator(f, v0.NewObjectReaderWriter()), nil + dataReader, err := r.encoding.NewDataReader(backend.NewContextReaderWithAllReader(f), r.meta.Encoding) + if err != nil { + return nil, err + } + + return encoding.NewRecordlessIterator(dataReader, r.encoding.NewObjectReaderWriter()), nil } func (r *ReplayBlock) TenantID() string { diff --git a/tempodb/wal/wal.go b/tempodb/wal/wal.go index 17cf87f96b2..cb98d81e10b 100644 --- a/tempodb/wal/wal.go +++ b/tempodb/wal/wal.go @@ -5,7 +5,6 @@ import ( "io/ioutil" "os" "path/filepath" - "strings" "github.com/google/uuid" "github.com/grafana/tempo/tempodb/backend" @@ -26,6 +25,7 @@ type Config struct { Filepath string `yaml:"path"` CompletedFilepath string BlocksFilepath string + Encoding backend.Encoding `yaml:"encoding"` } func New(c *Config) (*WAL, error) { @@ -85,45 +85,21 @@ func (w *WAL) AllBlocks() ([]*ReplayBlock, error) { continue } - name := f.Name() - blockID, tenantID, err := parseFilename(name) + r, err := NewReplayBlock(f.Name(), w.c.Filepath) if err != nil { return nil, err } - blocks = append(blocks, &ReplayBlock{ - block: block{ - meta: backend.NewBlockMeta(tenantID, blockID, appendBlockVersion, appendBlockEncoding), - filepath: w.c.Filepath, - }, - }) + blocks = append(blocks, r) } return blocks, nil } func (w *WAL) NewBlock(id uuid.UUID, tenantID string) (*AppendBlock, error) { - return newAppendBlock(id, tenantID, w.c.Filepath) + return newAppendBlock(id, tenantID, w.c.Filepath, w.c.Encoding) } func (w *WAL) LocalBackend() *local.Backend { return w.l } - -func parseFilename(name string) (uuid.UUID, string, error) { - i := strings.Index(name, ":") - - if i < 0 { - return uuid.UUID{}, "", fmt.Errorf("unable to parse %s", name) - } - - blockIDString := name[:i] - tenantID := name[i+1:] - - blockID, err := uuid.Parse(blockIDString) - if err != nil { - return uuid.UUID{}, "", err - } - - return blockID, tenantID, nil -} diff --git a/tempodb/wal/wal_test.go b/tempodb/wal/wal_test.go index 93e46cf7ea3..fd6f6cf83f9 100644 --- a/tempodb/wal/wal_test.go +++ b/tempodb/wal/wal_test.go @@ -2,6 +2,7 @@ package wal import ( "context" + "io" "io/ioutil" "math/rand" "os" @@ -11,11 +12,12 @@ import ( "github.com/golang/protobuf/proto" "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" + "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding" - v0 "github.com/grafana/tempo/tempodb/encoding/v0" ) const ( @@ -100,7 +102,7 @@ func TestAppend(t *testing.T) { block, err := wal.NewBlock(blockID, testTenantID) assert.NoError(t, err, "unexpected error creating block") - numMsgs := 1 + numMsgs := 100 reqs := make([]*tempopb.PushRequest, 0, numMsgs) for i := 0; i < numMsgs; i++ { req := test.MakeRequest(rand.Int()%1000, []byte{0x01}) @@ -114,7 +116,11 @@ func TestAppend(t *testing.T) { records := block.appender.Records() file, err := block.file() assert.NoError(t, err) - iterator := encoding.NewRecordIterator(records, file, v0.NewObjectReaderWriter()) + + dataReader, err := block.encoding.NewDataReader(backend.NewContextReaderWithAllReader(file), backend.EncNone) + assert.NoError(t, err) + iterator := encoding.NewRecordIterator(records, dataReader, block.encoding.NewObjectReaderWriter()) + defer iterator.Close() i := 0 for { @@ -156,31 +162,147 @@ func TestCompletedDirIsRemoved(t *testing.T) { assert.Error(t, err, "completedDir should not exist") } -func BenchmarkWriteRead(b *testing.B) { - tempDir, _ := ioutil.TempDir("/tmp", "") +func TestAppendReplayFind(t *testing.T) { + for _, e := range backend.SupportedEncoding { + testAppendReplayFind(t, e) + } +} + +func testAppendReplayFind(t *testing.T, e backend.Encoding) { + tempDir, err := ioutil.TempDir("/tmp", "") defer os.RemoveAll(tempDir) + require.NoError(t, err, "unexpected error creating temp dir") - wal, _ := New(&Config{ + wal, err := New(&Config{ Filepath: tempDir, + Encoding: e, }) + require.NoError(t, err, "unexpected error creating temp wal") blockID := uuid.New() - // 1 million requests, 10k spans per request - block, _ := wal.NewBlock(blockID, testTenantID) - numMsgs := 100 - reqs := make([]*tempopb.PushRequest, 0, numMsgs) - for i := 0; i < numMsgs; i++ { - req := test.MakeRequest(100, []byte{}) - reqs = append(reqs, req) + block, err := wal.NewBlock(blockID, testTenantID) + require.NoError(t, err, "unexpected error creating block") + + objects := 1000 + objs := make([][]byte, 0, objects) + ids := make([][]byte, 0, objects) + for i := 0; i < objects; i++ { + id := make([]byte, 16) + rand.Read(id) + obj := test.MakeRequest(rand.Int()%10, id) + ids = append(ids, id) + bObj, err := proto.Marshal(obj) + require.NoError(t, err) + objs = append(objs, bObj) + + err = block.Write(id, bObj) + require.NoError(t, err, "unexpected error writing req") + } + + for i, id := range ids { + obj, err := block.Find(id, &mockCombiner{}) + require.NoError(t, err) + assert.Equal(t, objs[i], obj) + } + + blocks, err := wal.AllBlocks() + require.NoError(t, err, "unexpected error getting blocks") + require.Len(t, blocks, 1) + + replay := blocks[0] + iterator, err := replay.Iterator() + require.NoError(t, err) + defer iterator.Close() + + i := 0 + for { + id, obj, err := iterator.Next(context.Background()) + if err == io.EOF { + break + } else { + require.NoError(t, err) + } + assert.Equal(t, objs[i], obj) + assert.Equal(t, ids[i], []byte(id)) + i++ } + assert.Equal(t, objects, i) +} + +func BenchmarkWALNone(b *testing.B) { + benchmarkWriteFindReplay(b, backend.EncNone) +} +func BenchmarkWALSnappy(b *testing.B) { + benchmarkWriteFindReplay(b, backend.EncSnappy) +} +func BenchmarkWALLZ4(b *testing.B) { + benchmarkWriteFindReplay(b, backend.EncLZ4_1M) +} +func BenchmarkWALGZIP(b *testing.B) { + benchmarkWriteFindReplay(b, backend.EncGZIP) +} +func BenchmarkWALZSTD(b *testing.B) { + benchmarkWriteFindReplay(b, backend.EncZstd) +} + +func benchmarkWriteFindReplay(b *testing.B, encoding backend.Encoding) { + objects := 1000 + objs := make([][]byte, 0, objects) + ids := make([][]byte, 0, objects) + for i := 0; i < objects; i++ { + id := make([]byte, 16) + rand.Read(id) + obj := test.MakeRequest(rand.Int()%10, id) + ids = append(ids, id) + bObj, err := proto.Marshal(obj) + require.NoError(b, err) + objs = append(objs, bObj) + } + mockCombiner := &mockCombiner{} b.ResetTimer() + for i := 0; i < b.N; i++ { - for _, req := range reqs { - bytes, _ := proto.Marshal(req) - _ = block.Write(test.MustTraceID(req), bytes) - _, _ = block.Find(test.MustTraceID(req), &mockCombiner{}) + tempDir, _ := ioutil.TempDir("/tmp", "") + wal, _ := New(&Config{ + Filepath: tempDir, + Encoding: encoding, + }) + + blockID := uuid.New() + block, err := wal.NewBlock(blockID, testTenantID) + require.NoError(b, err) + + // write + for j, obj := range objs { + err := block.Write(ids[j], obj) + require.NoError(b, err) } + + // find + for _, id := range ids { + _, err := block.Find(id, mockCombiner) + require.NoError(b, err) + } + + // replay + j := 0 + replayBlocks, err := wal.AllBlocks() + require.NoError(b, err) + iter, err := replayBlocks[0].Iterator() + require.NoError(b, err) + for { + _, _, err := iter.Next(context.Background()) + if err == io.EOF { + break + } + require.NoError(b, err) + j++ + } + + err = block.Clear() + require.NoError(b, err) + os.RemoveAll(tempDir) } }