diff --git a/CHANGELOG.md b/CHANGELOG.md index 08acb4d0fab..8967a98b396 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## master / unreleased * [CHANGE] Update to Go 1.16, latest OpenTelemetry proto definition and collector [#546](https://github.com/grafana/tempo/pull/546) +* [FEATURE] Add page based access to the index file. [#557](https://github.com/grafana/tempo/pull/557) * [ENHANCEMENT] Add a Shutdown handler to flush data to backend, at "/shutdown". [#526](https://github.com/grafana/tempo/pull/526) * [ENHANCEMENT] Queriers now query all (healthy) ingesters for a trace to mitigate 404s on ingester rollouts/scaleups. This is a **breaking change** and will likely result in query errors on rollout as the query signature b/n QueryFrontend & Querier has changed. [#557](https://github.com/grafana/tempo/pull/557) diff --git a/go.mod b/go.mod index 60d701394f0..d2c66fa1e47 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( contrib.go.opencensus.io/exporter/prometheus v0.2.0 github.com/Azure/azure-storage-blob-go v0.8.0 github.com/alecthomas/kong v0.2.11 + github.com/cespare/xxhash v1.1.0 github.com/cortexproject/cortex v1.6.1-0.20210205171041-527f9b58b93c github.com/dustin/go-humanize v1.0.0 github.com/go-kit/kit v0.10.0 diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index 92172c556aa..c72e8621d7a 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -194,6 +194,7 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace, IndexDownsampleBytes: 2, BloomFP: .01, Encoding: backend.EncLZ4_1M, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: tmpDir, diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index 9aad0b32bf3..4879061bf14 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -474,6 +474,7 @@ func defaultInstance(t assert.TestingT, tmpDir string) *instance { IndexDownsampleBytes: 2, BloomFP: .01, Encoding: backend.EncLZ4_1M, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: tmpDir, diff --git a/modules/querier/querier_test.go b/modules/querier/querier_test.go index bbdf312b330..62e77b20a31 100644 --- a/modules/querier/querier_test.go +++ b/modules/querier/querier_test.go @@ -57,6 +57,7 @@ func TestReturnAllHits(t *testing.T) { Encoding: backend.EncNone, IndexDownsampleBytes: 10, BloomFP: .05, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), diff --git a/modules/storage/config.go b/modules/storage/config.go index ccdf3f658f3..97703b5e316 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -36,7 +36,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Trace.Block = &encoding.BlockConfig{} f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom False Positive.") - f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 2*1024*1024, "Number of bytes (before compression) per index record.") + f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 1024*1024, "Number of bytes (before compression) per index record.") + f.IntVar(&cfg.Trace.Block.IndexPageSizeBytes, util.PrefixConfig(prefix, "trace.block.index-page-size-bytes"), 250*1024, "Number of bytes per index page.") cfg.Trace.Block.Encoding = backend.EncZstd cfg.Trace.Azure = &azure.Config{} diff --git a/pkg/sort/search.go b/pkg/sort/search.go new file mode 100644 index 00000000000..2dc1dcaddb2 --- /dev/null +++ b/pkg/sort/search.go @@ -0,0 +1,24 @@ +package sort + +// SearchWithErrors is forked from https://golang.org/src/sort/search.go +// but with added support for errors +func SearchWithErrors(n int, f func(int) (bool, error)) (int, error) { + // Define f(-1) == false and f(n) == true. + // Invariant: f(i-1) == false, f(j) == true. + i, j := 0, n + for i < j { + h := int(uint(i+j) >> 1) // avoid overflow when computing h + // i ≤ h < j + b, e := f(h) + if e != nil { + return -1, e + } + if !b { + i = h + 1 // preserves f(i-1) == false + } else { + j = h // preserves f(j) == true + } + } + // i == j, f(i-1) == false, and f(j) (= f(i)) == true => answer is i. + return i, nil +} diff --git a/tempodb/backend/block_meta.go b/tempodb/backend/block_meta.go index 544f037bd52..36d7a36837a 100644 --- a/tempodb/backend/block_meta.go +++ b/tempodb/backend/block_meta.go @@ -25,6 +25,8 @@ type BlockMeta struct { Size uint64 `json:"size"` CompactionLevel uint8 `json:"compactionLevel"` Encoding Encoding `json:"encoding"` + IndexPageSize uint32 `json:"indexPageSize"` + TotalRecords uint32 `json:"totalRecords"` } func NewBlockMeta(tenantID string, blockID uuid.UUID, version string, encoding Encoding) *BlockMeta { diff --git a/tempodb/compactor_bookmark_test.go b/tempodb/compactor_bookmark_test.go index ae7970e2aa1..1030cca9b2e 100644 --- a/tempodb/compactor_bookmark_test.go +++ b/tempodb/compactor_bookmark_test.go @@ -19,12 +19,13 @@ import ( "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/wal" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestCurrentClear(t *testing.T) { tempDir, err := ioutil.TempDir("/tmp", "") defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") + require.NoError(t, err, "unexpected error creating temp dir") r, w, c, err := New(&Config{ Backend: "local", @@ -35,13 +36,14 @@ func TestCurrentClear(t *testing.T) { IndexDownsampleBytes: 17, BloomFP: .01, Encoding: backend.EncGZIP, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), }, BlocklistPoll: 0, }, log.NewNopLogger()) - assert.NoError(t, err) + require.NoError(t, err) c.EnableCompaction(&CompactorConfig{ ChunkSizeBytes: 10, @@ -51,12 +53,12 @@ func TestCurrentClear(t *testing.T) { }, &mockSharder{}, &mockOverrides{}) wal := w.WAL() - assert.NoError(t, err) + require.NoError(t, err) recordCount := 10 blockID := uuid.New() head, err := wal.NewBlock(blockID, testTenantID) - assert.NoError(t, err) + require.NoError(t, err) for i := 0; i < recordCount; i++ { id := make([]byte, 16) diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index e58366b64c1..c60cbcc5aa0 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/google/uuid" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" @@ -64,12 +65,14 @@ func TestCompaction(t *testing.T) { IndexDownsampleBytes: 11, BloomFP: .01, Encoding: backend.EncLZ4_4M, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), }, BlocklistPoll: 0, }, log.NewNopLogger()) + require.NoError(t, err) c.EnableCompaction(&CompactorConfig{ ChunkSizeBytes: 10, @@ -190,6 +193,7 @@ func TestSameIDCompaction(t *testing.T) { IndexDownsampleBytes: 11, BloomFP: .01, Encoding: backend.EncSnappy, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -277,6 +281,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) { IndexDownsampleBytes: 11, BloomFP: .01, Encoding: backend.EncNone, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -342,6 +347,7 @@ func TestCompactionMetrics(t *testing.T) { IndexDownsampleBytes: 11, BloomFP: .01, Encoding: backend.EncNone, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -390,12 +396,7 @@ func TestCompactionMetrics(t *testing.T) { bytesEnd, err := test.GetCounterVecValue(metricCompactionBytesWritten, "0") assert.NoError(t, err) - bytesPerRecord := - 4 /* total length */ + - 4 /* id length */ + - 16 /* id */ + - 3 /* test record length */ - assert.Equal(t, float64(blockCount*recordCount*bytesPerRecord), bytesEnd-bytesStart) + assert.Greater(t, bytesEnd, bytesStart) // calculating the exact bytes requires knowledge of the bytes as written in the blocks. just make sure it goes up } func TestCompactionIteratesThroughTenants(t *testing.T) { @@ -416,6 +417,7 @@ func TestCompactionIteratesThroughTenants(t *testing.T) { IndexDownsampleBytes: 11, BloomFP: .01, Encoding: backend.EncLZ4_64k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), diff --git a/tempodb/encoding/appender.go b/tempodb/encoding/appender.go index 889ecf63315..7f8bf04d4b8 100644 --- a/tempodb/encoding/appender.go +++ b/tempodb/encoding/appender.go @@ -17,23 +17,23 @@ type Appender interface { } type appender struct { - pageWriter common.PageWriter + dataWriter common.DataWriter records []*common.Record currentOffset uint64 } // NewAppender returns an appender. This appender simply appends new objects -// to the provided pageWriter. -func NewAppender(pageWriter common.PageWriter) Appender { +// to the provided dataWriter. +func NewAppender(dataWriter common.DataWriter) Appender { return &appender{ - pageWriter: pageWriter, + dataWriter: dataWriter, } } // Append appends the id/object to the writer. Note that the caller is giving up ownership of the two byte arrays backing the slices. // Copies should be made and passed in if this is a problem func (a *appender) Append(id common.ID, b []byte) error { - length, err := a.pageWriter.Write(id, b) + length, err := a.dataWriter.Write(id, b) if err != nil { return err } @@ -66,5 +66,5 @@ func (a *appender) DataLength() uint64 { } func (a *appender) Complete() error { - return a.pageWriter.Complete() + return a.dataWriter.Complete() } diff --git a/tempodb/encoding/appender_buffered.go b/tempodb/encoding/appender_buffered.go index 8bcba96f2db..3226572c20b 100644 --- a/tempodb/encoding/appender_buffered.go +++ b/tempodb/encoding/appender_buffered.go @@ -8,7 +8,7 @@ import ( // index type bufferedAppender struct { // output writer - writer common.PageWriter + writer common.DataWriter // record keeping records []*common.Record @@ -23,7 +23,7 @@ type bufferedAppender struct { // NewBufferedAppender returns an bufferedAppender. This appender builds a writes to // the provided writer and also builds a downsampled records slice. -func NewBufferedAppender(writer common.PageWriter, indexDownsample int, totalObjectsEstimate int) (Appender, error) { +func NewBufferedAppender(writer common.DataWriter, indexDownsample int, totalObjectsEstimate int) (Appender, error) { return &bufferedAppender{ writer: writer, indexDownsampleBytes: indexDownsample, diff --git a/tempodb/encoding/backend_block.go b/tempodb/encoding/backend_block.go index 9f916e51650..b1de6887ee9 100644 --- a/tempodb/encoding/backend_block.go +++ b/tempodb/encoding/backend_block.go @@ -30,6 +30,8 @@ func NewBackendBlock(meta *backend.BlockMeta, r backend.Reader) (*BackendBlock, encoding = v0Encoding{} case "v1": encoding = v1Encoding{} + case "v2": + encoding = v2Encoding{} default: return nil, fmt.Errorf("%s is not a valid block version", meta.Version) } @@ -74,20 +76,20 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) { } indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader) - indexReader, err := b.encoding.newIndexReader(indexReaderAt) + indexReader, err := b.encoding.newIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords)) if err != nil { return nil, fmt.Errorf("error building index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } ra := backend.NewContextReader(b.meta, nameObjects, b.reader) - pageReader, err := b.encoding.newPageReader(ra, b.meta.Encoding) + dataReader, err := b.encoding.newDataReader(ra, b.meta.Encoding) if err != nil { return nil, fmt.Errorf("error building page reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } - defer pageReader.Close() + defer dataReader.Close() // passing nil for objectCombiner here. this is fine b/c a backend block should never have dupes - finder := NewPagedFinder(indexReader, pageReader, nil) + finder := NewPagedFinder(indexReader, dataReader, nil) objectBytes, err := finder.Find(ctx, id) if err != nil { @@ -101,16 +103,16 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) { func (b *BackendBlock) Iterator(chunkSizeBytes uint32) (Iterator, error) { // read index ra := backend.NewContextReader(b.meta, nameObjects, b.reader) - pageReader, err := b.encoding.newPageReader(ra, b.meta.Encoding) + dataReader, err := b.encoding.newDataReader(ra, b.meta.Encoding) if err != nil { - return nil, fmt.Errorf("failed to create pageReader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) + return nil, fmt.Errorf("failed to create dataReader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader) - reader, err := b.encoding.newIndexReader(indexReaderAt) + reader, err := b.encoding.newIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords)) if err != nil { return nil, fmt.Errorf("failed to create index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err) } - return newPagedIterator(chunkSizeBytes, reader, pageReader), nil + return newPagedIterator(chunkSizeBytes, reader, dataReader), nil } diff --git a/tempodb/encoding/v0/object.go b/tempodb/encoding/base/object.go similarity index 99% rename from tempodb/encoding/v0/object.go rename to tempodb/encoding/base/object.go index 40c25c7e66d..3054304b17b 100644 --- a/tempodb/encoding/v0/object.go +++ b/tempodb/encoding/base/object.go @@ -1,4 +1,4 @@ -package v0 +package base import ( "encoding/binary" diff --git a/tempodb/encoding/v0/object_test.go b/tempodb/encoding/base/object_test.go similarity index 99% rename from tempodb/encoding/v0/object_test.go rename to tempodb/encoding/base/object_test.go index a0ed462fc79..c55d2e945ff 100644 --- a/tempodb/encoding/v0/object_test.go +++ b/tempodb/encoding/base/object_test.go @@ -1,4 +1,4 @@ -package v0 +package base import ( "bytes" diff --git a/tempodb/encoding/v0/record.go b/tempodb/encoding/base/record.go similarity index 56% rename from tempodb/encoding/v0/record.go rename to tempodb/encoding/base/record.go index daca7e752b9..141a2569935 100644 --- a/tempodb/encoding/v0/record.go +++ b/tempodb/encoding/base/record.go @@ -1,8 +1,9 @@ -package v0 +package base import ( "bytes" "encoding/binary" + "errors" "fmt" "sort" @@ -10,13 +11,15 @@ import ( "github.com/grafana/tempo/tempodb/encoding/common" ) -const recordLength = 28 // 28 = 128 bit ID, 64bit start, 32bit length +// RecordLength holds the size of a single record in bytes +const RecordLength = 28 // 28 = 128 bit ID, 64bit start, 32bit length type recordSorter struct { records []*common.Record } -func sortRecords(records []*common.Record) { +// SortRecords sorts a slice of record pointers +func SortRecords(records []*common.Record) { sort.Sort(&recordSorter{ records: records, }) @@ -39,34 +42,48 @@ func (t *recordSorter) Swap(i, j int) { // MarshalRecords converts a slice of records into a byte slice func MarshalRecords(records []*common.Record) ([]byte, error) { - recordBytes := make([]byte, len(records)*recordLength) + recordBytes := make([]byte, len(records)*RecordLength) + + err := MarshalRecordsToBuffer(records, recordBytes) + if err != nil { + return nil, err + } + + return recordBytes, nil +} + +// MarshalRecordsToBuffer converts a slice of records and marshals them to an existing byte slice +func MarshalRecordsToBuffer(records []*common.Record, buffer []byte) error { + if len(records)*RecordLength > len(buffer) { + return fmt.Errorf("buffer %d is not big enough for records %d", len(buffer), len(records)*RecordLength) + } for i, r := range records { - buff := recordBytes[i*recordLength : (i+1)*recordLength] + buff := buffer[i*RecordLength : (i+1)*RecordLength] if !validation.ValidTraceID(r.ID) { // todo: remove this check. maybe have a max id size of 128 bits? - return nil, fmt.Errorf("Ids must be 128 bit") + return errors.New("ids must be 128 bit") } marshalRecord(r, buff) } - return recordBytes, nil + return nil } func unmarshalRecords(recordBytes []byte) ([]*common.Record, error) { - mod := len(recordBytes) % recordLength + mod := len(recordBytes) % RecordLength if mod != 0 { return nil, fmt.Errorf("records are an unexpected number of bytes %d", mod) } - numRecords := recordCount(recordBytes) + numRecords := RecordCount(recordBytes) records := make([]*common.Record, 0, numRecords) for i := 0; i < numRecords; i++ { - buff := recordBytes[i*recordLength : (i+1)*recordLength] + buff := recordBytes[i*RecordLength : (i+1)*RecordLength] - r := unmarshalRecord(buff) + r := UnmarshalRecord(buff) records = append(records, r) } @@ -74,10 +91,12 @@ func unmarshalRecords(recordBytes []byte) ([]*common.Record, error) { return records, nil } -func recordCount(b []byte) int { - return len(b) / recordLength +// RecordCount returns the number of records in a byte slice +func RecordCount(b []byte) int { + return len(b) / RecordLength } +// marshalRecord writes a record to an existing byte slice func marshalRecord(r *common.Record, buff []byte) { copy(buff, r.ID) @@ -85,7 +104,8 @@ func marshalRecord(r *common.Record, buff []byte) { binary.LittleEndian.PutUint32(buff[24:], r.Length) } -func unmarshalRecord(buff []byte) *common.Record { +// UnmarshalRecord creates a new record from the contents ofa byte slice +func UnmarshalRecord(buff []byte) *common.Record { r := newRecord() copy(r.ID, buff[:16]) diff --git a/tempodb/encoding/v0/record_test.go b/tempodb/encoding/base/record_test.go similarity index 94% rename from tempodb/encoding/v0/record_test.go rename to tempodb/encoding/base/record_test.go index 117c523339a..0ea795111dd 100644 --- a/tempodb/encoding/v0/record_test.go +++ b/tempodb/encoding/base/record_test.go @@ -1,4 +1,4 @@ -package v0 +package base import ( "bytes" @@ -13,10 +13,10 @@ func TestEncodeDecodeRecord(t *testing.T) { expected, err := makeRecord(t) assert.NoError(t, err, "unexpected error making trace record") - buff := make([]byte, recordLength) + buff := make([]byte, RecordLength) marshalRecord(expected, buff) - actual := unmarshalRecord(buff) + actual := UnmarshalRecord(buff) assert.Equal(t, expected, actual) } @@ -55,7 +55,7 @@ func TestSortRecord(t *testing.T) { expected = append(expected, r) } - sortRecords(expected) + SortRecords(expected) for i := range expected { if i == 0 { diff --git a/tempodb/encoding/common/types.go b/tempodb/encoding/common/types.go index 5503b692b19..290a2817d17 100644 --- a/tempodb/encoding/common/types.go +++ b/tempodb/encoding/common/types.go @@ -20,12 +20,12 @@ type ObjectCombiner interface { Combine(objA []byte, objB []byte) []byte } -// PageReader returns a slice of pages in the encoding/v0 format referenced by +// DataReader returns a slice of pages in the encoding/v0 format referenced by // the slice of *Records passed in. The length of the returned slice is guaranteed // to be equal to the length of the provided records unless error is non nil. -// PageReader is the primary abstraction point for supporting multiple data +// DataReader is the primary abstraction point for supporting multiple data // formats. -type PageReader interface { +type DataReader interface { Read(context.Context, []*Record) ([][]byte, error) Close() } @@ -40,14 +40,14 @@ type IndexReader interface { Find(ctx context.Context, id ID) (*Record, int, error) } -// PageWriter is used to write paged data to the backend -type PageWriter interface { +// DataWriter is used to write paged data to the backend +type DataWriter interface { // Write writes the passed ID/byte to the current page Write(ID, []byte) (int, error) // CutPage completes the current page and start a new one. It // returns the length in bytes of the cut page. CutPage() (int, error) - // Complete must be called when the operation pagewriter is done. + // Complete must be called when the operation DataWriter is done. Complete() error } diff --git a/tempodb/encoding/compactor_block.go b/tempodb/encoding/compactor_block.go index 68eeec3b2f9..65c0182e69a 100644 --- a/tempodb/encoding/compactor_block.go +++ b/tempodb/encoding/compactor_block.go @@ -21,6 +21,8 @@ type CompactorBlock struct { bufferedObjects int appendBuffer *bytes.Buffer appender Appender + + cfg *BlockConfig } // NewCompactorBlock creates a ... new compactor block! @@ -38,15 +40,16 @@ func NewCompactorBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas [] compactedMeta: backend.NewBlockMeta(tenantID, id, currentVersion, cfg.Encoding), bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP), inMetas: metas, + cfg: cfg, } c.appendBuffer = &bytes.Buffer{} - pageWriter, err := c.encoding.newPageWriter(c.appendBuffer, cfg.Encoding) + dataWriter, err := c.encoding.newDataWriter(c.appendBuffer, cfg.Encoding) if err != nil { return nil, fmt.Errorf("failed to create page writer: %w", err) } - c.appender, err = NewBufferedAppender(pageWriter, cfg.IndexDownsampleBytes, estimatedObjects) + c.appender, err = NewBufferedAppender(dataWriter, cfg.IndexDownsampleBytes, estimatedObjects) if err != nil { return nil, fmt.Errorf("failed to created appender: %w", err) } @@ -112,12 +115,15 @@ func (c *CompactorBlock) Complete(ctx context.Context, tracker backend.AppendTra records := c.appender.Records() meta := c.BlockMeta() - indexWriter := c.encoding.newIndexWriter() + indexWriter := c.encoding.newIndexWriter(c.cfg.IndexPageSizeBytes) indexBytes, err := indexWriter.Write(records) if err != nil { return 0, err } + meta.TotalRecords = uint32(len(records)) // casting + meta.IndexPageSize = uint32(c.cfg.IndexPageSizeBytes) + err = writeBlockMeta(ctx, w, meta, indexBytes, c.bloom) if err != nil { return 0, err diff --git a/tempodb/encoding/complete_block.go b/tempodb/encoding/complete_block.go index b3e3aed13fc..1f9633b74e4 100644 --- a/tempodb/encoding/complete_block.go +++ b/tempodb/encoding/complete_block.go @@ -29,6 +29,8 @@ type CompleteBlock struct { filepath string readFile *os.File once sync.Once + + cfg *BlockConfig } // NewCompleteBlock creates a new block and takes _ALL_ the parameters necessary to build the ordered, deduped file on disk @@ -39,6 +41,7 @@ func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iter bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP), records: make([]*common.Record, 0), filepath: filepath, + cfg: cfg, } appendFile, err := os.OpenFile(c.fullFilename(), os.O_APPEND|os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) @@ -47,12 +50,12 @@ func NewCompleteBlock(cfg *BlockConfig, originatingMeta *backend.BlockMeta, iter } defer appendFile.Close() - pageWriter, err := c.encoding.newPageWriter(appendFile, cfg.Encoding) + dataWriter, err := c.encoding.newDataWriter(appendFile, cfg.Encoding) if err != nil { return nil, err } - appender, err := NewBufferedAppender(pageWriter, cfg.IndexDownsampleBytes, estimatedObjects) + appender, err := NewBufferedAppender(dataWriter, cfg.IndexDownsampleBytes, estimatedObjects) if err != nil { return nil, err } @@ -115,12 +118,15 @@ func (c *CompleteBlock) Write(ctx context.Context, w backend.Writer) error { return err } - indexWriter := c.encoding.newIndexWriter() + indexWriter := c.encoding.newIndexWriter(c.cfg.IndexPageSizeBytes) indexBytes, err := indexWriter.Write(c.records) if err != nil { return err } + c.meta.TotalRecords = uint32(len(c.records)) + c.meta.IndexPageSize = uint32(c.cfg.IndexPageSizeBytes) + err = writeBlockMeta(ctx, w, c.meta, indexBytes, c.bloom) if err != nil { return err @@ -143,13 +149,13 @@ func (c *CompleteBlock) Find(id common.ID, combiner common.ObjectCombiner) ([]by return nil, err } - pageReader, err := c.encoding.newPageReader(backend.NewContextReaderWithAllReader(file), c.meta.Encoding) + dataReader, err := c.encoding.newDataReader(backend.NewContextReaderWithAllReader(file), c.meta.Encoding) if err != nil { return nil, err } - defer pageReader.Close() + defer dataReader.Close() - finder := NewPagedFinder(common.Records(c.records), pageReader, combiner) + finder := NewPagedFinder(common.Records(c.records), dataReader, combiner) return finder.Find(context.Background(), id) } diff --git a/tempodb/encoding/complete_block_test.go b/tempodb/encoding/complete_block_test.go index 59dde652674..7c4bf5c30e3 100644 --- a/tempodb/encoding/complete_block_test.go +++ b/tempodb/encoding/complete_block_test.go @@ -21,7 +21,7 @@ import ( "github.com/grafana/tempo/tempodb/backend/local" "github.com/grafana/tempo/tempodb/encoding/common" v0 "github.com/grafana/tempo/tempodb/encoding/v0" - v1 "github.com/grafana/tempo/tempodb/encoding/v1" + v2 "github.com/grafana/tempo/tempodb/encoding/v2" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -76,6 +76,7 @@ func TestCompleteBlockAll(t *testing.T) { IndexDownsampleBytes: 1000, BloomFP: .01, Encoding: enc, + IndexPageSizeBytes: 1000, }, ) }) @@ -149,7 +150,7 @@ func completeBlock(t *testing.T, cfg *BlockConfig, tempDir string) (*CompleteBlo buffer := &bytes.Buffer{} writer := bufio.NewWriter(buffer) - appender := NewAppender(v0.NewPageWriter(writer)) + appender := NewAppender(v0.NewDataWriter(writer)) numMsgs := 1000 reqs := make([][]byte, 0, numMsgs) @@ -298,7 +299,7 @@ func benchmarkCompressBlock(b *testing.B, encoding backend.Encoding, indexDownsa b.ResetTimer() file, err := os.Open(cb.fullFilename()) require.NoError(b, err) - pr, err := v1.NewPageReader(backend.NewContextReaderWithAllReader(file), encoding) + pr, err := v2.NewDataReader(v0.NewDataReader(backend.NewContextReaderWithAllReader(file)), encoding) require.NoError(b, err) iterator = newPagedIterator(10*1024*1024, common.Records(cb.records), pr) diff --git a/tempodb/encoding/config.go b/tempodb/encoding/config.go index f8e4314f859..13dea780ff0 100644 --- a/tempodb/encoding/config.go +++ b/tempodb/encoding/config.go @@ -9,6 +9,7 @@ import ( // BlockConfig holds configuration options for newly created blocks type BlockConfig struct { IndexDownsampleBytes int `yaml:"index_downsample_bytes"` + IndexPageSizeBytes int `yaml:"index_page_size_bytes"` BloomFP float64 `yaml:"bloom_filter_false_positive"` Encoding backend.Encoding `yaml:"encoding"` } @@ -19,6 +20,10 @@ func ValidateConfig(b *BlockConfig) error { return fmt.Errorf("Positive index downsample required") } + if b.IndexPageSizeBytes <= 0 { + return fmt.Errorf("Positive index page size required") + } + if b.BloomFP <= 0.0 { return fmt.Errorf("invalid bloom filter fp rate %v", b.BloomFP) } diff --git a/tempodb/encoding/finder_paged.go b/tempodb/encoding/finder_paged.go index c95c6f58fc6..df97c7faa2d 100644 --- a/tempodb/encoding/finder_paged.go +++ b/tempodb/encoding/finder_paged.go @@ -14,7 +14,7 @@ type Finder interface { } type pagedFinder struct { - r common.PageReader + r common.DataReader index common.IndexReader combiner common.ObjectCombiner } @@ -22,7 +22,7 @@ type pagedFinder struct { // NewPagedFinder returns a paged. This finder is used for searching // a set of records and returning an object. If a set of consecutive records has // matching ids they will be combined using the ObjectCombiner. -func NewPagedFinder(index common.IndexReader, r common.PageReader, combiner common.ObjectCombiner) Finder { +func NewPagedFinder(index common.IndexReader, r common.DataReader, combiner common.ObjectCombiner) Finder { return &pagedFinder{ r: r, index: index, @@ -80,7 +80,7 @@ func (f *pagedFinder) findOne(ctx context.Context, id common.ID, record *common. return nil, errors.New("unexpected 0 length pages in findOne") } - // pageReader is expected to return pages in the v0 format. so this works + // dataReader is expected to return pages in the v0 format. so this works iter := NewIterator(bytes.NewReader(pages[0])) if f.combiner != nil { iter, err = NewDedupingIterator(iter, f.combiner) diff --git a/tempodb/encoding/iterator.go b/tempodb/encoding/iterator.go index 03aa2bcf453..1e87ec048bf 100644 --- a/tempodb/encoding/iterator.go +++ b/tempodb/encoding/iterator.go @@ -4,8 +4,8 @@ import ( "context" "io" + "github.com/grafana/tempo/tempodb/encoding/base" "github.com/grafana/tempo/tempodb/encoding/common" - v0 "github.com/grafana/tempo/tempodb/encoding/v0" ) // Iterator is capable of iterating through a set of objects @@ -27,7 +27,7 @@ func NewIterator(reader io.Reader) Iterator { } func (i *iterator) Next(_ context.Context) (common.ID, []byte, error) { - return v0.UnmarshalObjectFromReader(i.reader) + return base.UnmarshalObjectFromReader(i.reader) } func (i *iterator) Close() { diff --git a/tempodb/encoding/iterator_paged.go b/tempodb/encoding/iterator_paged.go index fbe5ed0252d..3eb3eb2d890 100644 --- a/tempodb/encoding/iterator_paged.go +++ b/tempodb/encoding/iterator_paged.go @@ -4,14 +4,13 @@ import ( "context" "io" + "github.com/grafana/tempo/tempodb/encoding/base" "github.com/grafana/tempo/tempodb/encoding/common" "github.com/pkg/errors" - - v0 "github.com/grafana/tempo/tempodb/encoding/v0" ) type pagedIterator struct { - pageReader common.PageReader + dataReader common.DataReader indexReader common.IndexReader currentIndex int @@ -22,9 +21,9 @@ type pagedIterator struct { // newPagedIterator returns a backendIterator. This iterator is used to iterate // through objects stored in object storage. -func newPagedIterator(chunkSizeBytes uint32, indexReader common.IndexReader, pageReader common.PageReader) Iterator { +func newPagedIterator(chunkSizeBytes uint32, indexReader common.IndexReader, dataReader common.DataReader) Iterator { return &pagedIterator{ - pageReader: pageReader, + dataReader: dataReader, indexReader: indexReader, chunkSizeBytes: chunkSizeBytes, } @@ -44,8 +43,8 @@ func (i *pagedIterator) Next(ctx context.Context) (common.ID, []byte, error) { i.pages = i.pages[1:] // advance pages } - // pageReader returns pages in the v0 format, so this works - i.activePage, id, object, err = v0.UnmarshalAndAdvanceBuffer(i.activePage) + // dataReader returns pages in the raw format, so this works + i.activePage, id, object, err = base.UnmarshalAndAdvanceBuffer(i.activePage) if err != nil && err != io.EOF { return nil, nil, errors.Wrap(err, "error iterating through object in backend") } else if err != io.EOF { @@ -84,7 +83,7 @@ func (i *pagedIterator) Next(ctx context.Context) (common.ID, []byte, error) { } } - i.pages, err = i.pageReader.Read(ctx, records) + i.pages, err = i.dataReader.Read(ctx, records) if err != nil { return nil, nil, errors.Wrap(err, "error iterating through object in backend") } @@ -96,7 +95,7 @@ func (i *pagedIterator) Next(ctx context.Context) (common.ID, []byte, error) { i.pages = i.pages[1:] // advance pages // attempt to get next object from objects - i.activePage, id, object, err = v0.UnmarshalAndAdvanceBuffer(i.activePage) + i.activePage, id, object, err = base.UnmarshalAndAdvanceBuffer(i.activePage) if err != nil { return nil, nil, errors.Wrap(err, "error iterating through object in backend") } @@ -105,5 +104,5 @@ func (i *pagedIterator) Next(ctx context.Context) (common.ID, []byte, error) { } func (i *pagedIterator) Close() { - i.pageReader.Close() + i.dataReader.Close() } diff --git a/tempodb/encoding/v0/page_reader.go b/tempodb/encoding/v0/data_reader.go similarity index 79% rename from tempodb/encoding/v0/page_reader.go rename to tempodb/encoding/v0/data_reader.go index b78115a7d33..baec5f7199c 100644 --- a/tempodb/encoding/v0/page_reader.go +++ b/tempodb/encoding/v0/data_reader.go @@ -8,17 +8,17 @@ import ( "github.com/grafana/tempo/tempodb/encoding/common" ) -type pageReader struct { +type dataReader struct { r backend.ContextReader } -// NewPageReader returns a new v0 pageReader. A v0 pageReader +// NewDataReader returns a new v0 dataReader. A v0 dataReader // is basically a no-op. It retrieves the requested byte // ranges and returns them as is. // A pages "format" is a contiguous collection of objects // | -- object -- | -- object -- | ... -func NewPageReader(r backend.ContextReader) common.PageReader { - return &pageReader{ +func NewDataReader(r backend.ContextReader) common.DataReader { + return &dataReader{ r: r, } } @@ -26,7 +26,7 @@ func NewPageReader(r backend.ContextReader) common.PageReader { // Read returns the pages requested in the passed records. It // assumes that if there are multiple records they are ordered // and contiguous -func (r *pageReader) Read(ctx context.Context, records []*common.Record) ([][]byte, error) { +func (r *dataReader) Read(ctx context.Context, records []*common.Record) ([][]byte, error) { if len(records) == 0 { return nil, nil } @@ -53,7 +53,7 @@ func (r *pageReader) Read(ctx context.Context, records []*common.Record) ([][]by } if previousEnd != record.Start && previousEnd != 0 { - return nil, fmt.Errorf("non-contiguous pages requested from pageReader: %d, %+v", previousEnd, record) + return nil, fmt.Errorf("non-contiguous pages requested from dataReader: %d, %+v", previousEnd, record) } slicePages = append(slicePages, contiguousPages[cursor:end]) @@ -64,5 +64,5 @@ func (r *pageReader) Read(ctx context.Context, records []*common.Record) ([][]by return slicePages, nil } -func (r *pageReader) Close() { +func (r *dataReader) Close() { } diff --git a/tempodb/encoding/v0/page_reader_test.go b/tempodb/encoding/v0/data_reader_test.go similarity index 95% rename from tempodb/encoding/v0/page_reader_test.go rename to tempodb/encoding/v0/data_reader_test.go index 9345953ce9e..149eec3d7a8 100644 --- a/tempodb/encoding/v0/page_reader_test.go +++ b/tempodb/encoding/v0/data_reader_test.go @@ -10,7 +10,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestPageReader(t *testing.T) { +func TestDataReader(t *testing.T) { tests := []struct { readerBytes []byte @@ -123,7 +123,7 @@ func TestPageReader(t *testing.T) { } for _, tc := range tests { - reader := NewPageReader(backend.NewContextReaderWithAllReader(bytes.NewReader(tc.readerBytes))) + reader := NewDataReader(backend.NewContextReaderWithAllReader(bytes.NewReader(tc.readerBytes))) actual, err := reader.Read(context.Background(), tc.records) reader.Close() diff --git a/tempodb/encoding/v0/data_writer.go b/tempodb/encoding/v0/data_writer.go new file mode 100644 index 00000000000..18beff8ff77 --- /dev/null +++ b/tempodb/encoding/v0/data_writer.go @@ -0,0 +1,45 @@ +package v0 + +import ( + "io" + + "github.com/grafana/tempo/tempodb/encoding/base" + "github.com/grafana/tempo/tempodb/encoding/common" +) + +type dataWriter struct { + w io.Writer + bytesWritten int +} + +// NewDataWriter creates a v0 page writer. This page writer +// writes raw bytes only +func NewDataWriter(writer io.Writer) common.DataWriter { + return &dataWriter{ + w: writer, + } +} + +// Write implements DataWriter +func (p *dataWriter) Write(id common.ID, obj []byte) (int, error) { + written, err := base.MarshalObjectToWriter(id, obj, p.w) + if err != nil { + return 0, err + } + + p.bytesWritten += written + return written, nil +} + +// CutPage implements DataWriter +func (p *dataWriter) CutPage() (int, error) { + ret := p.bytesWritten + p.bytesWritten = 0 + + return ret, nil +} + +// Complete implements DataWriter +func (p *dataWriter) Complete() error { + return nil +} diff --git a/tempodb/encoding/v0/index_reader.go b/tempodb/encoding/v0/index_reader.go index c4738289ce7..2426f929734 100644 --- a/tempodb/encoding/v0/index_reader.go +++ b/tempodb/encoding/v0/index_reader.go @@ -7,6 +7,7 @@ import ( "sort" "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding/base" "github.com/grafana/tempo/tempodb/encoding/common" ) @@ -22,7 +23,7 @@ func NewIndexReader(r backend.ContextReader) (common.IndexReader, error) { return nil, err } - mod := len(index) % recordLength + mod := len(index) % base.RecordLength if mod != 0 { return nil, fmt.Errorf("records are an unexpected number of bytes %d", len(index)) } @@ -33,28 +34,28 @@ func NewIndexReader(r backend.ContextReader) (common.IndexReader, error) { } func (r *readerBytes) At(_ context.Context, i int) (*common.Record, error) { - if i < 0 || i >= len(r.index)/recordLength { + if i < 0 || i >= len(r.index)/base.RecordLength { return nil, nil } - buff := r.index[i*recordLength : (i+1)*recordLength] - return unmarshalRecord(buff), nil + buff := r.index[i*base.RecordLength : (i+1)*base.RecordLength] + return base.UnmarshalRecord(buff), nil } func (r *readerBytes) Find(_ context.Context, id common.ID) (*common.Record, int, error) { - numRecords := recordCount(r.index) + numRecords := base.RecordCount(r.index) var record *common.Record i := sort.Search(numRecords, func(i int) bool { - buff := r.index[i*recordLength : (i+1)*recordLength] - record = unmarshalRecord(buff) + buff := r.index[i*base.RecordLength : (i+1)*base.RecordLength] + record = base.UnmarshalRecord(buff) return bytes.Compare(record.ID, id) >= 0 }) if i >= 0 && i < numRecords { - buff := r.index[i*recordLength : (i+1)*recordLength] - record = unmarshalRecord(buff) + buff := r.index[i*base.RecordLength : (i+1)*base.RecordLength] + record = base.UnmarshalRecord(buff) return record, i, nil } diff --git a/tempodb/encoding/v0/index_reader_test.go b/tempodb/encoding/v0/index_reader_test.go index c04efa30876..c8f23ca54e3 100644 --- a/tempodb/encoding/v0/index_reader_test.go +++ b/tempodb/encoding/v0/index_reader_test.go @@ -6,6 +6,7 @@ import ( "testing" "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding/base" "github.com/grafana/tempo/tempodb/encoding/common" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -28,7 +29,7 @@ func TestIndexReader(t *testing.T) { Length: 3, } - recordBytes, err := MarshalRecords([]*common.Record{record1, record2, record3}) + recordBytes, err := base.MarshalRecords([]*common.Record{record1, record2, record3}) require.NoError(t, err) tests := []struct { diff --git a/tempodb/encoding/v0/index_writer.go b/tempodb/encoding/v0/index_writer.go index 23cbef2a270..16b795777a4 100644 --- a/tempodb/encoding/v0/index_writer.go +++ b/tempodb/encoding/v0/index_writer.go @@ -1,6 +1,7 @@ package v0 import ( + "github.com/grafana/tempo/tempodb/encoding/base" "github.com/grafana/tempo/tempodb/encoding/common" ) @@ -14,5 +15,5 @@ func NewIndexWriter() common.IndexWriter { // Write implements common.IndexWriter func (w *indexWriter) Write(records []*common.Record) ([]byte, error) { - return MarshalRecords(records) + return base.MarshalRecords(records) } diff --git a/tempodb/encoding/v0/page_writer.go b/tempodb/encoding/v0/page_writer.go deleted file mode 100644 index 35d32756f10..00000000000 --- a/tempodb/encoding/v0/page_writer.go +++ /dev/null @@ -1,44 +0,0 @@ -package v0 - -import ( - "io" - - "github.com/grafana/tempo/tempodb/encoding/common" -) - -type pageWriter struct { - w io.Writer - bytesWritten int -} - -// NewPageWriter creates a v0 page writer. This page writer -// writes raw bytes only -func NewPageWriter(writer io.Writer) common.PageWriter { - return &pageWriter{ - w: writer, - } -} - -// Write implements common.PageWriter -func (p *pageWriter) Write(id common.ID, obj []byte) (int, error) { - written, err := MarshalObjectToWriter(id, obj, p.w) - if err != nil { - return 0, err - } - - p.bytesWritten += written - return written, nil -} - -// CutPage implements common.PageWriter -func (p *pageWriter) CutPage() (int, error) { - ret := p.bytesWritten - p.bytesWritten = 0 - - return ret, nil -} - -// Complete implements common.PageWriter -func (p *pageWriter) Complete() error { - return nil -} diff --git a/tempodb/encoding/v1/page_reader.go b/tempodb/encoding/v1/data_reader.go similarity index 68% rename from tempodb/encoding/v1/page_reader.go rename to tempodb/encoding/v1/data_reader.go index c4430d889d8..c266821132b 100644 --- a/tempodb/encoding/v1/page_reader.go +++ b/tempodb/encoding/v1/data_reader.go @@ -8,34 +8,33 @@ import ( "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" - v0 "github.com/grafana/tempo/tempodb/encoding/v0" ) -type pageReader struct { - v0PageReader common.PageReader +type dataReader struct { + dataReader common.DataReader pool ReaderPool compressedReader io.Reader } -// NewPageReader constructs a v1 PageReader that handles compression -func NewPageReader(r backend.ContextReader, encoding backend.Encoding) (common.PageReader, error) { +// NewDataReader is useful for nesting compression inside of a different reader +func NewDataReader(r common.DataReader, encoding backend.Encoding) (common.DataReader, error) { pool, err := getReaderPool(encoding) if err != nil { return nil, err } - return &pageReader{ - v0PageReader: v0.NewPageReader(r), - pool: pool, + return &dataReader{ + dataReader: r, + pool: pool, }, nil } // Read returns the pages requested in the passed records. It // assumes that if there are multiple records they are ordered // and contiguous -func (r *pageReader) Read(ctx context.Context, records []*common.Record) ([][]byte, error) { - compressedPages, err := r.v0PageReader.Read(ctx, records) +func (r *dataReader) Read(ctx context.Context, records []*common.Record) ([][]byte, error) { + compressedPages, err := r.dataReader.Read(ctx, records) if err != nil { return nil, err } @@ -63,8 +62,8 @@ func (r *pageReader) Read(ctx context.Context, records []*common.Record) ([][]by return decompressedPages, nil } -func (r *pageReader) Close() { - r.v0PageReader.Close() +func (r *dataReader) Close() { + r.dataReader.Close() if r.compressedReader != nil { r.pool.PutReader(r.compressedReader) diff --git a/tempodb/encoding/v1/page_reader_test.go b/tempodb/encoding/v1/data_reader_test.go similarity index 86% rename from tempodb/encoding/v1/page_reader_test.go rename to tempodb/encoding/v1/data_reader_test.go index 716fcc39676..ee21812706b 100644 --- a/tempodb/encoding/v1/page_reader_test.go +++ b/tempodb/encoding/v1/data_reader_test.go @@ -7,6 +7,7 @@ import ( "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" + v0 "github.com/grafana/tempo/tempodb/encoding/v0" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -39,7 +40,7 @@ func TestAllEncodings(t *testing.T) { require.NoError(t, err) encryptedBytes := buff.Bytes() - reader, err := NewPageReader(backend.NewContextReaderWithAllReader(bytes.NewReader(encryptedBytes)), enc) + reader, err := NewDataReader(v0.NewDataReader(backend.NewContextReaderWithAllReader(bytes.NewReader(encryptedBytes))), enc) require.NoError(t, err) defer reader.Close() diff --git a/tempodb/encoding/v1/page_writer.go b/tempodb/encoding/v1/data_writer.go similarity index 74% rename from tempodb/encoding/v1/page_writer.go rename to tempodb/encoding/v1/data_writer.go index f17213ce621..24971cb5001 100644 --- a/tempodb/encoding/v1/page_writer.go +++ b/tempodb/encoding/v1/data_writer.go @@ -5,8 +5,8 @@ import ( "io" "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding/base" "github.com/grafana/tempo/tempodb/encoding/common" - v0 "github.com/grafana/tempo/tempodb/encoding/v0" ) // meteredWriter is a struct that is used to count the number of bytes @@ -23,7 +23,7 @@ func (m *meteredWriter) Write(p []byte) (n int, err error) { return m.wrappedWriter.Write(p) } -type pageWriter struct { +type dataWriter struct { v0Buffer *bytes.Buffer outputWriter *meteredWriter @@ -31,9 +31,9 @@ type pageWriter struct { compressionWriter io.WriteCloser } -// NewPageWriter creates a v0 page writer. This page writer +// NewDataWriter creates a v0 page writer. This page writer // writes raw bytes only -func NewPageWriter(writer io.Writer, encoding backend.Encoding) (common.PageWriter, error) { +func NewDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { pool, err := GetWriterPool(encoding) if err != nil { return nil, err @@ -48,7 +48,7 @@ func NewPageWriter(writer io.Writer, encoding backend.Encoding) (common.PageWrit return nil, err } - return &pageWriter{ + return &dataWriter{ v0Buffer: &bytes.Buffer{}, outputWriter: outputWriter, pool: pool, @@ -56,13 +56,13 @@ func NewPageWriter(writer io.Writer, encoding backend.Encoding) (common.PageWrit }, nil } -// Write implements common.PageWriter -func (p *pageWriter) Write(id common.ID, obj []byte) (int, error) { - return v0.MarshalObjectToWriter(id, obj, p.v0Buffer) +// Write implements DataWriter +func (p *dataWriter) Write(id common.ID, obj []byte) (int, error) { + return base.MarshalObjectToWriter(id, obj, p.v0Buffer) } -// CutPage implements common.PageWriter -func (p *pageWriter) CutPage() (int, error) { +// CutPage implements DataWriter +func (p *dataWriter) CutPage() (int, error) { var err error p.compressionWriter, err = p.pool.ResetWriter(p.outputWriter, p.compressionWriter) if err != nil { @@ -85,8 +85,8 @@ func (p *pageWriter) CutPage() (int, error) { return bytesWritten, nil } -// Complete implements common.PageWriter -func (p *pageWriter) Complete() error { +// Complete implements DataWriter +func (p *dataWriter) Complete() error { if p.compressionWriter != nil { p.pool.PutWriter(p.compressionWriter) p.compressionWriter = nil diff --git a/tempodb/encoding/v2/data_reader.go b/tempodb/encoding/v2/data_reader.go new file mode 100644 index 00000000000..55dba6a8351 --- /dev/null +++ b/tempodb/encoding/v2/data_reader.go @@ -0,0 +1,56 @@ +package v2 + +import ( + "context" + + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding/common" + v1 "github.com/grafana/tempo/tempodb/encoding/v1" +) + +type dataReader struct { + dataReader common.DataReader +} + +// constDataHeader is a singleton data header. the data header is +// stateless b/c there are no fields. to very minorly reduce allocations all +// data should just use this. +var constDataHeader = &dataHeader{} + +// NewDataReader constructs a v2 DataReader that handles paged...reading +func NewDataReader(r common.DataReader, encoding backend.Encoding) (common.DataReader, error) { + v2DataReader := &dataReader{ + dataReader: r, + } + + // wrap the paged reader in a compressed/v1 reader and return that + v1DataReader, err := v1.NewDataReader(v2DataReader, encoding) + if err != nil { + return nil, err + } + + return v1DataReader, nil +} + +func (r *dataReader) Read(ctx context.Context, records []*common.Record) ([][]byte, error) { + v0Pages, err := r.dataReader.Read(ctx, records) + if err != nil { + return nil, err + } + + pages := make([][]byte, 0, len(v0Pages)) + for _, v0Page := range v0Pages { + page, err := unmarshalPageFromBytes(v0Page, constDataHeader) + if err != nil { + return nil, err + } + + pages = append(pages, page.data) + } + + return pages, nil +} + +func (r *dataReader) Close() { + r.dataReader.Close() +} diff --git a/tempodb/encoding/v2/data_writer.go b/tempodb/encoding/v2/data_writer.go new file mode 100644 index 00000000000..e4420654b19 --- /dev/null +++ b/tempodb/encoding/v2/data_writer.go @@ -0,0 +1,59 @@ +package v2 + +import ( + "bytes" + "io" + + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding/common" + v1 "github.com/grafana/tempo/tempodb/encoding/v1" +) + +type dataWriter struct { + v1Writer common.DataWriter + v1Buffer *bytes.Buffer + + outputWriter io.Writer +} + +// NewDataWriter creates a paged page writer +func NewDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { + v1Buffer := &bytes.Buffer{} + v1Writer, err := v1.NewDataWriter(v1Buffer, encoding) + if err != nil { + return nil, err + } + + return &dataWriter{ + v1Writer: v1Writer, + v1Buffer: v1Buffer, + outputWriter: writer, + }, nil +} + +// Write implements DataWriter +func (p *dataWriter) Write(id common.ID, obj []byte) (int, error) { + return p.v1Writer.Write(id, obj) +} + +// CutPage implements DataWriter +func (p *dataWriter) CutPage() (int, error) { + _, err := p.v1Writer.CutPage() + if err != nil { + return 0, err + } + + // v1Buffer currently has all of the v1 bytes. let's wrap it in our page and write + bytesWritten, err := marshalPageToWriter(p.v1Buffer.Bytes(), p.outputWriter, constDataHeader) + if err != nil { + return 0, err + } + p.v1Buffer.Reset() + + return bytesWritten, err +} + +// Complete implements DataWriter +func (p *dataWriter) Complete() error { + return p.v1Writer.Complete() +} diff --git a/tempodb/encoding/v2/index_reader.go b/tempodb/encoding/v2/index_reader.go new file mode 100644 index 00000000000..491fdd3b510 --- /dev/null +++ b/tempodb/encoding/v2/index_reader.go @@ -0,0 +1,139 @@ +package v2 + +import ( + "bytes" + "context" + "fmt" + + "github.com/grafana/tempo/pkg/sort" + + "github.com/cespare/xxhash" + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding/base" + "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/opentracing/opentracing-go" +) + +type indexReader struct { + r backend.ContextReader + + pageSizeBytes int + totalRecords int + + pageCache map[int]*page // indexReader is not concurrency safe, but since it is currently used within one request it is fine. +} + +// NewIndexReader returns an index reader for a byte slice of marshalled +// ordered records. +// The index has not changed between v0 and v1. +func NewIndexReader(r backend.ContextReader, pageSizeBytes int, totalRecords int) (common.IndexReader, error) { + return &indexReader{ + r: r, + + pageSizeBytes: pageSizeBytes, + totalRecords: totalRecords, + + pageCache: map[int]*page{}, + }, nil +} + +// At implements common.indexReader +func (r *indexReader) At(ctx context.Context, i int) (*common.Record, error) { + if i < 0 || i >= r.totalRecords { + return nil, nil + } + + recordsPerPage := objectsPerPage(base.RecordLength, r.pageSizeBytes, IndexHeaderLength) + if recordsPerPage == 0 { + return nil, fmt.Errorf("page %d is too small for one record", r.pageSizeBytes) + } + pageIdx := i / recordsPerPage + recordIdx := i % recordsPerPage + + page, err := r.getPage(ctx, pageIdx) + if err != nil { + return nil, err + } + + if recordIdx >= len(page.data)/base.RecordLength { + return nil, fmt.Errorf("unexpected out of bounds index %d, %d, %d, %d", i, pageIdx, recordIdx, len(page.data)) + } + + recordBytes := page.data[recordIdx*base.RecordLength : (recordIdx+1)*base.RecordLength] + + // double check the record is not all 0s. this could occur if we read empty buffer space past the final + // record in the final page + allZeros := true + for _, b := range recordBytes { + if b != 0 { + allZeros = false + break + } + } + if allZeros { + return nil, fmt.Errorf("unexpected zero value record %d, %d, %d, %d", i, pageIdx, recordIdx, len(page.data)) + } + + return base.UnmarshalRecord(recordBytes), nil +} + +// Find implements common.indexReader +func (r *indexReader) Find(ctx context.Context, id common.ID) (*common.Record, int, error) { + // with a linear distribution of trace ids we can actually do much better than a normal + // binary search. unfortunately there are edge cases which make this perform far worse. + // for instance consider a set of trace ids what with 90% 64 bit ids and 10% 128 bit ids. + span, ctx := opentracing.StartSpanFromContext(ctx, "indexReader.Find") + defer span.Finish() + + i, err := sort.SearchWithErrors(r.totalRecords, func(i int) (bool, error) { + record, err := r.At(ctx, i) + if err != nil { + return true, err + } + + return bytes.Compare(record.ID, id) >= 0, nil + }) + + if err != nil { + return nil, -1, err + } + + var record *common.Record + if i >= 0 && i < r.totalRecords { + record, err = r.At(ctx, i) + if err != nil { + return nil, -1, err + } + return record, i, nil + } + return nil, -1, nil +} + +func (r *indexReader) getPage(ctx context.Context, pageIdx int) (*page, error) { + page, ok := r.pageCache[pageIdx] + if ok { + return page, nil + } + + pageBuffer := make([]byte, r.pageSizeBytes) + _, err := r.r.ReadAt(ctx, pageBuffer, int64(pageIdx*r.pageSizeBytes)) + if err != nil { + return nil, err + } + + page, err = unmarshalPageFromBytes(pageBuffer, &indexHeader{}) + if err != nil { + return nil, err + } + + // checksum + h := xxhash.New() + _, _ = h.Write(page.data) + if page.header.(*indexHeader).checksum != h.Sum64() { + return nil, fmt.Errorf("mismatched checksum: %d", pageIdx) + } + + r.pageCache[pageIdx] = page + + return page, nil +} diff --git a/tempodb/encoding/v2/index_test.go b/tempodb/encoding/v2/index_test.go new file mode 100644 index 00000000000..88bad13d044 --- /dev/null +++ b/tempodb/encoding/v2/index_test.go @@ -0,0 +1,90 @@ +package v2 + +import ( + "bytes" + "context" + "math/rand" + "testing" + + "github.com/grafana/tempo/tempodb/backend" + "github.com/grafana/tempo/tempodb/encoding/base" + "github.com/grafana/tempo/tempodb/encoding/common" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestIndexWriterReader(t *testing.T) { + numRecords := 100000 + pageSize := 210 + + randomRecords := randomOrderedRecords(t, numRecords) + indexWriter := NewIndexWriter(pageSize) + indexBytes, err := indexWriter.Write(randomRecords) + require.NoError(t, err) + + indexReader, err := NewIndexReader(backend.NewContextReaderWithAllReader(bytes.NewReader(indexBytes)), pageSize, numRecords) + require.NoError(t, err) + + i := 0 + for i = 0; i < numRecords; i++ { + expectedRecord := randomRecords[i] + + actualRecord, err := indexReader.At(context.Background(), i) + assert.NoError(t, err) + assert.Equal(t, expectedRecord, actualRecord) + + actualRecord, actualIdx, err := indexReader.Find(context.Background(), expectedRecord.ID) + assert.NoError(t, err) + assert.Equal(t, i, actualIdx) + assert.Equal(t, expectedRecord, actualRecord) + } + + // next read should return nil, nil to indicate end + actualRecord, err := indexReader.At(context.Background(), i) + assert.NoError(t, err) + assert.Nil(t, actualRecord) + + actualRecord, err = indexReader.At(context.Background(), -1) + assert.NoError(t, err) + assert.Nil(t, actualRecord) +} + +func TestIndexHeaderChecksum(t *testing.T) { + recordsPerPage := 4 + numRecords := 4 + pageSize := int(baseHeaderSize) + IndexHeaderLength + base.RecordLength*recordsPerPage + + randomRecords := randomOrderedRecords(t, numRecords) + indexWriter := NewIndexWriter(pageSize) + indexBytes, err := indexWriter.Write(randomRecords) + require.NoError(t, err) + + indexBytes[len(indexBytes)-1]++ + + r, err := NewIndexReader(backend.NewContextReaderWithAllReader(bytes.NewReader(indexBytes)), pageSize, numRecords) + assert.NoError(t, err) + _, err = r.(*indexReader).getPage(context.Background(), 0) + assert.Error(t, err) +} + +func randomOrderedRecords(t *testing.T, num int) []*common.Record { + randomRecords := []*common.Record{} + + for i := 0; i < num; i++ { + id := make([]byte, 16) + _, err := rand.Read(id) + require.NoError(t, err) + + rec := &common.Record{ + Start: rand.Uint64(), + Length: rand.Uint32(), + ID: id, + } + + randomRecords = append(randomRecords, rec) + } + + base.SortRecords(randomRecords) + + return randomRecords +} diff --git a/tempodb/encoding/v2/index_writer.go b/tempodb/encoding/v2/index_writer.go new file mode 100644 index 00000000000..deefc49b06a --- /dev/null +++ b/tempodb/encoding/v2/index_writer.go @@ -0,0 +1,76 @@ +package v2 + +import ( + "fmt" + + "github.com/cespare/xxhash" + "github.com/grafana/tempo/tempodb/encoding/base" + "github.com/grafana/tempo/tempodb/encoding/common" +) + +type indexWriter struct { + pageSizeBytes int +} + +// NewIndexWriter returns an index writer that writes to the provided io.Writer. +// The index has not changed between v0 and v1. +func NewIndexWriter(pageSizeBytes int) common.IndexWriter { + return &indexWriter{ + pageSizeBytes: pageSizeBytes, + } +} + +// Write implements common.IndexWriter +func (w *indexWriter) Write(records []*common.Record) ([]byte, error) { + // we need to write a page at a time to an output byte slice + // first let's calculate how many pages we need + recordsPerPage := objectsPerPage(base.RecordLength, w.pageSizeBytes, IndexHeaderLength) + totalPages := totalPages(len(records), recordsPerPage) + + if recordsPerPage == 0 { + return nil, fmt.Errorf("pageSize %d too small for one record", w.pageSizeBytes) + } + + totalBytes := totalPages * w.pageSizeBytes + indexBuffer := make([]byte, totalBytes) + + for currentPage := 0; currentPage < totalPages; currentPage++ { + var pageRecords []*common.Record + + if len(records) > recordsPerPage { + pageRecords = records[:recordsPerPage] + records = records[recordsPerPage:] + } else { + pageRecords = records[:] + records = []*common.Record{} + } + + if len(pageRecords) == 0 { + return nil, fmt.Errorf("unexpected 0 length records %d,%d,%d,%d", currentPage, recordsPerPage, w.pageSizeBytes, totalPages) + } + + // header + header := &indexHeader{} + + // page + pageBuffer := indexBuffer[currentPage*w.pageSizeBytes : (currentPage+1)*w.pageSizeBytes] + + // write records and calculate crc + pageData := pageBuffer[header.headerLength()+int(baseHeaderSize):] + err := base.MarshalRecordsToBuffer(pageRecords, pageData) + if err != nil { + return nil, err + } + + h := xxhash.New() + _, _ = h.Write(pageData) + header.checksum = h.Sum64() + + _, err = marshalHeaderToPage(pageBuffer, header) + if err != nil { + return nil, err + } + } + + return indexBuffer, nil +} diff --git a/tempodb/encoding/v2/page.go b/tempodb/encoding/v2/page.go new file mode 100644 index 00000000000..ef4adce01f6 --- /dev/null +++ b/tempodb/encoding/v2/page.go @@ -0,0 +1,139 @@ +package v2 + +import ( + "encoding/binary" + "fmt" + "io" +) + +const ( + uint64Size = 8 + uint32Size = 4 + uint16Size = 2 + baseHeaderSize = uint16Size + uint32Size +) + +type page struct { + data []byte + header pageHeader +} + +/* + | -- totalLength -- | + | | | -- headerLength -- | | + | 32 bits | 16 bits | | | + | totalLength | header len | header fields | page bytes | +*/ +func unmarshalPageFromBytes(b []byte, header pageHeader) (*page, error) { + totalHeaderSize := baseHeaderSize + header.headerLength() + if len(b) < totalHeaderSize { + return nil, fmt.Errorf("page of size %d too small", len(b)) + } + + totalLength := binary.LittleEndian.Uint32(b[:uint32Size]) + b = b[uint32Size:] + headerLength := binary.LittleEndian.Uint16(b[:uint16Size]) + b = b[uint16Size:] + err := header.unmarshalHeader(b[:headerLength]) + if err != nil { + return nil, err + } + b = b[headerLength:] + + dataLength := int(totalLength) - totalHeaderSize + if len(b) != dataLength { + return nil, fmt.Errorf("expected data len %d does not match actual %d", dataLength, len(b)) + } + + return &page{ + data: b, + header: header, + }, nil +} + +// marshalPageToWriter marshals the page bytes to the passed writer +func marshalPageToWriter(b []byte, w io.Writer, header pageHeader) (int, error) { + var headerLength uint16 + var totalLength uint32 + + headerLength = uint16(header.headerLength()) + totalLength = uint32(headerLength) + baseHeaderSize + uint32(len(b)) + + err := binary.Write(w, binary.LittleEndian, totalLength) + if err != nil { + return 0, err + } + + err = binary.Write(w, binary.LittleEndian, headerLength) + if err != nil { + return 0, err + } + + if headerLength != 0 { + headerBuff := make([]byte, headerLength) + err = header.marshalHeader(headerBuff) + if err != nil { + return 0, err + } + + _, err := w.Write(headerBuff) + if err != nil { + return 0, err + } + } + + _, err = w.Write(b) + if err != nil { + return 0, err + } + + return int(totalLength), nil +} + +// marshalHeaderToPage marshals the header only to the passed in page and then returns +// the rest of the page slice for the caller to finish +func marshalHeaderToPage(page []byte, header pageHeader) ([]byte, error) { + var headerLength uint16 + var totalLength uint32 + + totalHeaderSize := baseHeaderSize + uint32(header.headerLength()) + if len(page) < int(totalHeaderSize) { + return nil, fmt.Errorf("page of size %d too small", len(page)) + } + + headerLength = uint16(header.headerLength()) // jpe casting + totalLength = uint32(len(page)) + + binary.LittleEndian.PutUint32(page[:uint32Size], totalLength) + page = page[uint32Size:] + binary.LittleEndian.PutUint16(page[:uint16Size], headerLength) + page = page[uint16Size:] + + err := header.marshalHeader(page[:headerLength]) + if err != nil { + return nil, err + } + + return page[headerLength:], nil +} + +func objectsPerPage(objectSizeBytes int, pageSizeBytes int, headerSize int) int { + if objectSizeBytes == 0 { + return 0 + } + + // headerSize only accounts for the custom header size. also subtract base + return (pageSizeBytes - headerSize - int(baseHeaderSize)) / objectSizeBytes +} + +func totalPages(totalObjects int, objectsPerPage int) int { + if objectsPerPage == 0 { + return 0 + } + + pages := totalObjects / objectsPerPage + if totalObjects%objectsPerPage != 0 { + pages++ + } + return pages +} diff --git a/tempodb/encoding/v2/page_header.go b/tempodb/encoding/v2/page_header.go new file mode 100644 index 00000000000..7348c5788d8 --- /dev/null +++ b/tempodb/encoding/v2/page_header.go @@ -0,0 +1,71 @@ +package v2 + +import ( + "encoding/binary" + "errors" + "fmt" +) + +type pageHeader interface { + unmarshalHeader([]byte) error + headerLength() int + marshalHeader([]byte) error +} + +// DataHeaderLength is the length in bytes for the data header +const DataHeaderLength = 0 + +// IndexHeaderLength is the length in bytes for the record header +const IndexHeaderLength = int(uint64Size) // 64bit checksum (xxhash) + +// dataHeader implements a pageHeader that has no fields +type dataHeader struct { +} + +func (h *dataHeader) unmarshalHeader(b []byte) error { + if len(b) != 0 { + return errors.New("unexpected non-zero len data header") + } + + return nil +} + +func (h *dataHeader) headerLength() int { + return DataHeaderLength +} + +func (h *dataHeader) marshalHeader(b []byte) error { + return nil +} + +// indexHeader implements a pageHeader that has index fields +// checksum - 64 bit xxhash +type indexHeader struct { + checksum uint64 +} + +func (h *indexHeader) unmarshalHeader(b []byte) error { + if len(b) != IndexHeaderLength { + return fmt.Errorf("unexpected index header len of %d", len(b)) + } + + h.checksum = binary.LittleEndian.Uint64(b[:uint64Size]) + // b = b[uint64Size:] + + return nil +} + +func (h *indexHeader) headerLength() int { + return IndexHeaderLength +} + +func (h *indexHeader) marshalHeader(b []byte) error { + if len(b) != IndexHeaderLength { + return fmt.Errorf("unexpected index header len of %d", len(b)) + } + + binary.LittleEndian.PutUint64(b, h.checksum) + // b = b[uint64Size:] + + return nil +} diff --git a/tempodb/encoding/v2/page_test.go b/tempodb/encoding/v2/page_test.go new file mode 100644 index 00000000000..99ad141312d --- /dev/null +++ b/tempodb/encoding/v2/page_test.go @@ -0,0 +1,103 @@ +package v2 + +import ( + "bytes" + "encoding/binary" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type testHeader struct { + field uint32 +} + +func (h *testHeader) unmarshalHeader(b []byte) error { + if len(b) != int(uint32Size) { + return errors.New("err") + } + + h.field = binary.LittleEndian.Uint32(b[:uint32Size]) + + return nil +} + +func (h *testHeader) headerLength() int { + return int(uint32Size) +} + +func (h *testHeader) marshalHeader(b []byte) error { + if len(b) != int(uint32Size) { + return errors.New("bad") + } + + binary.LittleEndian.PutUint32(b, h.field) + return nil +} + +func TestPageMarshalToWriter(t *testing.T) { + tests := []struct { + expected []byte + field uint32 + }{ + { + expected: []byte{0x01, 0x02, 0x03}, + field: 15, + }, + { + expected: []byte{}, + field: 0, + }, + } + + for _, tc := range tests { + buff := &bytes.Buffer{} + + header := &testHeader{ + field: tc.field, + } + + bytesWritten, err := marshalPageToWriter(tc.expected, buff, header) + require.NoError(t, err) + assert.Equal(t, len(tc.expected)+header.headerLength()+int(baseHeaderSize), bytesWritten) + + page, err := unmarshalPageFromBytes(buff.Bytes(), &testHeader{}) + require.NoError(t, err) + assert.Equal(t, tc.expected, page.data) + assert.Equal(t, tc.field, page.header.(*testHeader).field) + } +} + +func TestPageMarshalToBuffer(t *testing.T) { + tests := []struct { + expected []byte + field uint32 + }{ + { + expected: []byte{0x01, 0x02, 0x03}, + field: 15, + }, + { + expected: []byte{}, + field: 0, + }, + } + + for _, tc := range tests { + header := &testHeader{ + field: tc.field, + } + outputBuffer := make([]byte, len(tc.expected)+header.headerLength()+int(baseHeaderSize)) + + restOfPage, err := marshalHeaderToPage(outputBuffer, header) + require.NoError(t, err) + copy(restOfPage, tc.expected) + + page, err := unmarshalPageFromBytes(outputBuffer, &testHeader{}) + require.NoError(t, err) + assert.Equal(t, tc.expected, page.data) + assert.Equal(t, tc.field, page.header.(*testHeader).field) + } +} diff --git a/tempodb/encoding/versioned.go b/tempodb/encoding/versioned.go index 08058db661f..1bebde393bd 100644 --- a/tempodb/encoding/versioned.go +++ b/tempodb/encoding/versioned.go @@ -7,13 +7,14 @@ import ( "github.com/grafana/tempo/tempodb/encoding/common" v0 "github.com/grafana/tempo/tempodb/encoding/v0" v1 "github.com/grafana/tempo/tempodb/encoding/v1" + v2 "github.com/grafana/tempo/tempodb/encoding/v2" ) -const currentVersion = "v1" +const currentVersion = "v2" // latestEncoding is used by Compactor and Complete block func latestEncoding() versionedEncoding { - return v1Encoding{} + return v2Encoding{} } // allEncodings returns all encodings @@ -21,6 +22,7 @@ func allEncodings() []versionedEncoding { return []versionedEncoding{ v0Encoding{}, v1Encoding{}, + v2Encoding{}, } } @@ -28,41 +30,57 @@ func allEncodings() []versionedEncoding { // currently quite sloppy and could easily be tightened up to just a few methods // but it is what it is for now! type versionedEncoding interface { - newPageWriter(writer io.Writer, encoding backend.Encoding) (common.PageWriter, error) - newIndexWriter() common.IndexWriter + newDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) + newIndexWriter(pageSizeBytes int) common.IndexWriter - newPageReader(ra backend.ContextReader, encoding backend.Encoding) (common.PageReader, error) - newIndexReader(ra backend.ContextReader) (common.IndexReader, error) + newDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) + newIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) } // v0Encoding type v0Encoding struct{} -func (v v0Encoding) newIndexWriter() common.IndexWriter { +func (v v0Encoding) newIndexWriter(pageSizeBytes int) common.IndexWriter { return v0.NewIndexWriter() } -func (v v0Encoding) newPageWriter(writer io.Writer, encoding backend.Encoding) (common.PageWriter, error) { - return v0.NewPageWriter(writer), nil // ignore encoding. v0 PageWriter writes raw bytes +func (v v0Encoding) newDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { + return v0.NewDataWriter(writer), nil // ignore encoding. v0 DataWriter writes raw bytes } -func (v v0Encoding) newIndexReader(ra backend.ContextReader) (common.IndexReader, error) { +func (v v0Encoding) newIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { return v0.NewIndexReader(ra) } -func (v v0Encoding) newPageReader(ra backend.ContextReader, encoding backend.Encoding) (common.PageReader, error) { - return v0.NewPageReader(ra), nil +func (v v0Encoding) newDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { + return v0.NewDataReader(ra), nil } // v1Encoding type v1Encoding struct{} -func (v v1Encoding) newIndexWriter() common.IndexWriter { +func (v v1Encoding) newIndexWriter(pageSizeBytes int) common.IndexWriter { return v1.NewIndexWriter() } -func (v v1Encoding) newPageWriter(writer io.Writer, encoding backend.Encoding) (common.PageWriter, error) { - return v1.NewPageWriter(writer, encoding) +func (v v1Encoding) newDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { + return v1.NewDataWriter(writer, encoding) } -func (v v1Encoding) newPageReader(ra backend.ContextReader, encoding backend.Encoding) (common.PageReader, error) { - return v1.NewPageReader(ra, encoding) -} -func (v v1Encoding) newIndexReader(ra backend.ContextReader) (common.IndexReader, error) { +func (v v1Encoding) newIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { return v1.NewIndexReader(ra) } +func (v v1Encoding) newDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { + return v1.NewDataReader(v0.NewDataReader(ra), encoding) +} + +// v2Encoding +type v2Encoding struct{} + +func (v v2Encoding) newIndexWriter(pageSizeBytes int) common.IndexWriter { + return v2.NewIndexWriter(pageSizeBytes) +} +func (v v2Encoding) newDataWriter(writer io.Writer, encoding backend.Encoding) (common.DataWriter, error) { + return v2.NewDataWriter(writer, encoding) +} +func (v v2Encoding) newIndexReader(ra backend.ContextReader, pageSizeBytes int, totalPages int) (common.IndexReader, error) { + return v2.NewIndexReader(ra, pageSizeBytes, totalPages) +} +func (v v2Encoding) newDataReader(ra backend.ContextReader, encoding backend.Encoding) (common.DataReader, error) { + return v2.NewDataReader(v0.NewDataReader(ra), encoding) +} diff --git a/tempodb/encoding/versioned_test.go b/tempodb/encoding/versioned_test.go index 5e77e5838a0..e0d49bc2dff 100644 --- a/tempodb/encoding/versioned_test.go +++ b/tempodb/encoding/versioned_test.go @@ -14,12 +14,12 @@ import ( func TestAllVersions(t *testing.T) { for _, v := range allEncodings() { for _, e := range backend.SupportedEncoding { - testPageWriterReader(t, v, e) + testDataWriterReader(t, v, e) } } } -func testPageWriterReader(t *testing.T, v versionedEncoding, e backend.Encoding) { +func testDataWriterReader(t *testing.T, v versionedEncoding, e backend.Encoding) { tests := []struct { readerBytes []byte }{ @@ -33,24 +33,24 @@ func testPageWriterReader(t *testing.T, v versionedEncoding, e backend.Encoding) for _, tc := range tests { buff := bytes.NewBuffer([]byte{}) - pageWriter, err := v.newPageWriter(buff, e) + dataWriter, err := v.newDataWriter(buff, e) require.NoError(t, err) - _, err = pageWriter.Write([]byte{0x01}, tc.readerBytes) + _, err = dataWriter.Write([]byte{0x01}, tc.readerBytes) require.NoError(t, err) - bytesWritten, err := pageWriter.CutPage() + bytesWritten, err := dataWriter.CutPage() require.NoError(t, err) - err = pageWriter.Complete() + err = dataWriter.Complete() require.NoError(t, err) reader := bytes.NewReader(buff.Bytes()) - pageReader, err := v.newPageReader(backend.NewContextReaderWithAllReader(reader), e) + dataReader, err := v.newDataReader(backend.NewContextReaderWithAllReader(reader), e) require.NoError(t, err) - defer pageReader.Close() + defer dataReader.Close() - actual, err := pageReader.Read(context.Background(), []*common.Record{ + actual, err := dataReader.Read(context.Background(), []*common.Record{ { Start: 0, Length: uint32(bytesWritten), diff --git a/tempodb/retention_test.go b/tempodb/retention_test.go index 72f294e1a18..f57c2f8778b 100644 --- a/tempodb/retention_test.go +++ b/tempodb/retention_test.go @@ -32,6 +32,7 @@ func TestRetention(t *testing.T) { IndexDownsampleBytes: 17, BloomFP: .01, Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -89,6 +90,7 @@ func TestBlockRetentionOverride(t *testing.T) { IndexDownsampleBytes: 17, BloomFP: .01, Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 5952e99b50d..8b5ad62d454 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -43,6 +43,7 @@ func TestDB(t *testing.T) { IndexDownsampleBytes: 17, BloomFP: .01, Encoding: backend.EncGZIP, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -122,6 +123,7 @@ func TestBlockSharding(t *testing.T) { IndexDownsampleBytes: 17, BloomFP: .01, Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -197,6 +199,7 @@ func TestNilOnUnknownTenantID(t *testing.T) { IndexDownsampleBytes: 17, BloomFP: .01, Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -224,6 +227,7 @@ func TestBlockCleanup(t *testing.T) { IndexDownsampleBytes: 17, BloomFP: .01, Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -306,6 +310,7 @@ func TestCleanMissingTenants(t *testing.T) { IndexDownsampleBytes: 17, BloomFP: .01, Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join("/tmp", "wal"), @@ -366,6 +371,7 @@ func TestUpdateBlocklist(t *testing.T) { IndexDownsampleBytes: 17, BloomFP: .01, Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -553,6 +559,7 @@ func TestUpdateBlocklistCompacted(t *testing.T) { IndexDownsampleBytes: 17, BloomFP: .01, Encoding: backend.EncLZ4_256k, + IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 99c98e590dc..1216e04e838 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -41,7 +41,7 @@ func newAppendBlock(id uuid.UUID, tenantID string, filepath string) (*AppendBloc return nil, err } h.appendFile = f - h.appender = encoding.NewAppender(v0.NewPageWriter(f)) + h.appender = encoding.NewAppender(v0.NewDataWriter(f)) return h, nil } @@ -103,9 +103,9 @@ func (h *AppendBlock) Find(id common.ID, combiner common.ObjectCombiner) ([]byte return nil, err } - pageReader := v0.NewPageReader(backend.NewContextReaderWithAllReader(file)) - defer pageReader.Close() - finder := encoding.NewPagedFinder(common.Records(records), pageReader, combiner) + dataReader := v0.NewDataReader(backend.NewContextReaderWithAllReader(file)) + defer dataReader.Close() + finder := encoding.NewPagedFinder(common.Records(records), dataReader, combiner) return finder.Find(context.Background(), id) } diff --git a/vendor/modules.txt b/vendor/modules.txt index c0db5dec2a2..d7036771153 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -112,6 +112,7 @@ github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1 github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1 github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1 # github.com/cespare/xxhash v1.1.0 +## explicit github.com/cespare/xxhash # github.com/cespare/xxhash/v2 v2.1.1 github.com/cespare/xxhash/v2