Skip to content

Commit

Permalink
Block v2: Paged indexes/data (#577)
Browse files Browse the repository at this point in the history
* Added v2

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added v2 page reader

Signed-off-by: Joe Elliott <number101010@gmail.com>

* simplified bytes counting test

Signed-off-by: Joe Elliott <number101010@gmail.com>

* possibly cleaned up or made pagereaders more complicated

Signed-off-by: Joe Elliott <number101010@gmail.com>

* paged index writer/reader:at

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Cleaned up tests

Signed-off-by: Joe Elliott <number101010@gmail.com>

* moved 'base' objects to a 'base' folder

Signed-off-by: Joe Elliott <number101010@gmail.com>

* tests cleanup

Signed-off-by: Joe Elliott <number101010@gmail.com>

* clean up

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added header logic

Signed-off-by: Joe Elliott <number101010@gmail.com>

* added page header

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added min/max ids and checksums to index header

Signed-off-by: Joe Elliott <number101010@gmail.com>

* cleanup

Signed-off-by: Joe Elliott <number101010@gmail.com>

* note

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added binary record search

Signed-off-by: Joe Elliott <number101010@gmail.com>

* cleanup

Signed-off-by: Joe Elliott <number101010@gmail.com>

* switched to xxhash

Signed-off-by: Joe Elliott <number101010@gmail.com>

* removed min/max ids

Signed-off-by: Joe Elliott <number101010@gmail.com>

* all tests pass

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added require

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added/adjusted defaults

Signed-off-by: Joe Elliott <number101010@gmail.com>

* lint/casting

Signed-off-by: Joe Elliott <number101010@gmail.com>

* changelog

Signed-off-by: Joe Elliott <number101010@gmail.com>

* go mod

Signed-off-by: Joe Elliott <number101010@gmail.com>

* pageReader/Writer => dataReader/Writer

Signed-off-by: Joe Elliott <number101010@gmail.com>

* Added search with errors

Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott committed Mar 10, 2021
1 parent 8e8dd62 commit fc4f477
Show file tree
Hide file tree
Showing 49 changed files with 1,041 additions and 201 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## master / unreleased

* [CHANGE] Update to Go 1.16, latest OpenTelemetry proto definition and collector [#546](https://github.com/grafana/tempo/pull/546)
* [FEATURE] Add page based access to the index file. [#557](https://github.com/grafana/tempo/pull/557)
* [ENHANCEMENT] Add a Shutdown handler to flush data to backend, at "/shutdown". [#526](https://github.com/grafana/tempo/pull/526)
* [ENHANCEMENT] Queriers now query all (healthy) ingesters for a trace to mitigate 404s on ingester rollouts/scaleups.
This is a **breaking change** and will likely result in query errors on rollout as the query signature b/n QueryFrontend & Querier has changed. [#557](https://github.com/grafana/tempo/pull/557)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
contrib.go.opencensus.io/exporter/prometheus v0.2.0
github.com/Azure/azure-storage-blob-go v0.8.0
github.com/alecthomas/kong v0.2.11
github.com/cespare/xxhash v1.1.0
github.com/cortexproject/cortex v1.6.1-0.20210205171041-527f9b58b93c
github.com/dustin/go-humanize v1.0.0
github.com/go-kit/kit v0.10.0
Expand Down
1 change: 1 addition & 0 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace,
IndexDownsampleBytes: 2,
BloomFP: .01,
Encoding: backend.EncLZ4_1M,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: tmpDir,
Expand Down
1 change: 1 addition & 0 deletions modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,7 @@ func defaultInstance(t assert.TestingT, tmpDir string) *instance {
IndexDownsampleBytes: 2,
BloomFP: .01,
Encoding: backend.EncLZ4_1M,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: tmpDir,
Expand Down
1 change: 1 addition & 0 deletions modules/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestReturnAllHits(t *testing.T) {
Encoding: backend.EncNone,
IndexDownsampleBytes: 10,
BloomFP: .05,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down
3 changes: 2 additions & 1 deletion modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)

cfg.Trace.Block = &encoding.BlockConfig{}
f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom False Positive.")
f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 2*1024*1024, "Number of bytes (before compression) per index record.")
f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 1024*1024, "Number of bytes (before compression) per index record.")
f.IntVar(&cfg.Trace.Block.IndexPageSizeBytes, util.PrefixConfig(prefix, "trace.block.index-page-size-bytes"), 250*1024, "Number of bytes per index page.")
cfg.Trace.Block.Encoding = backend.EncZstd

cfg.Trace.Azure = &azure.Config{}
Expand Down
24 changes: 24 additions & 0 deletions pkg/sort/search.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package sort

// SearchWithErrors is forked from https://golang.org/src/sort/search.go
// but with added support for errors
func SearchWithErrors(n int, f func(int) (bool, error)) (int, error) {
// Define f(-1) == false and f(n) == true.
// Invariant: f(i-1) == false, f(j) == true.
i, j := 0, n
for i < j {
h := int(uint(i+j) >> 1) // avoid overflow when computing h
// i ≤ h < j
b, e := f(h)
if e != nil {
return -1, e
}
if !b {
i = h + 1 // preserves f(i-1) == false
} else {
j = h // preserves f(j) == true
}
}
// i == j, f(i-1) == false, and f(j) (= f(i)) == true => answer is i.
return i, nil
}
2 changes: 2 additions & 0 deletions tempodb/backend/block_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type BlockMeta struct {
Size uint64 `json:"size"`
CompactionLevel uint8 `json:"compactionLevel"`
Encoding Encoding `json:"encoding"`
IndexPageSize uint32 `json:"indexPageSize"`
TotalRecords uint32 `json:"totalRecords"`
}

func NewBlockMeta(tenantID string, blockID uuid.UUID, version string, encoding Encoding) *BlockMeta {
Expand Down
10 changes: 6 additions & 4 deletions tempodb/compactor_bookmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ import (
"github.com/grafana/tempo/tempodb/encoding"
"github.com/grafana/tempo/tempodb/wal"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCurrentClear(t *testing.T) {
tempDir, err := ioutil.TempDir("/tmp", "")
defer os.RemoveAll(tempDir)
assert.NoError(t, err, "unexpected error creating temp dir")
require.NoError(t, err, "unexpected error creating temp dir")

r, w, c, err := New(&Config{
Backend: "local",
Expand All @@ -35,13 +36,14 @@ func TestCurrentClear(t *testing.T) {
IndexDownsampleBytes: 17,
BloomFP: .01,
Encoding: backend.EncGZIP,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
},
BlocklistPoll: 0,
}, log.NewNopLogger())
assert.NoError(t, err)
require.NoError(t, err)

c.EnableCompaction(&CompactorConfig{
ChunkSizeBytes: 10,
Expand All @@ -51,12 +53,12 @@ func TestCurrentClear(t *testing.T) {
}, &mockSharder{}, &mockOverrides{})

wal := w.WAL()
assert.NoError(t, err)
require.NoError(t, err)

recordCount := 10
blockID := uuid.New()
head, err := wal.NewBlock(blockID, testTenantID)
assert.NoError(t, err)
require.NoError(t, err)

for i := 0; i < recordCount; i++ {
id := make([]byte, 16)
Expand Down
14 changes: 8 additions & 6 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/tempo/pkg/tempopb"
"github.com/grafana/tempo/pkg/util/test"
Expand Down Expand Up @@ -64,12 +65,14 @@ func TestCompaction(t *testing.T) {
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncLZ4_4M,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
},
BlocklistPoll: 0,
}, log.NewNopLogger())
require.NoError(t, err)

c.EnableCompaction(&CompactorConfig{
ChunkSizeBytes: 10,
Expand Down Expand Up @@ -190,6 +193,7 @@ func TestSameIDCompaction(t *testing.T) {
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncSnappy,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -277,6 +281,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncNone,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -342,6 +347,7 @@ func TestCompactionMetrics(t *testing.T) {
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncNone,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -390,12 +396,7 @@ func TestCompactionMetrics(t *testing.T) {

bytesEnd, err := test.GetCounterVecValue(metricCompactionBytesWritten, "0")
assert.NoError(t, err)
bytesPerRecord :=
4 /* total length */ +
4 /* id length */ +
16 /* id */ +
3 /* test record length */
assert.Equal(t, float64(blockCount*recordCount*bytesPerRecord), bytesEnd-bytesStart)
assert.Greater(t, bytesEnd, bytesStart) // calculating the exact bytes requires knowledge of the bytes as written in the blocks. just make sure it goes up
}

func TestCompactionIteratesThroughTenants(t *testing.T) {
Expand All @@ -416,6 +417,7 @@ func TestCompactionIteratesThroughTenants(t *testing.T) {
IndexDownsampleBytes: 11,
BloomFP: .01,
Encoding: backend.EncLZ4_64k,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down
12 changes: 6 additions & 6 deletions tempodb/encoding/appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@ type Appender interface {
}

type appender struct {
pageWriter common.PageWriter
dataWriter common.DataWriter
records []*common.Record
currentOffset uint64
}

// NewAppender returns an appender. This appender simply appends new objects
// to the provided pageWriter.
func NewAppender(pageWriter common.PageWriter) Appender {
// to the provided dataWriter.
func NewAppender(dataWriter common.DataWriter) Appender {
return &appender{
pageWriter: pageWriter,
dataWriter: dataWriter,
}
}

// Append appends the id/object to the writer. Note that the caller is giving up ownership of the two byte arrays backing the slices.
// Copies should be made and passed in if this is a problem
func (a *appender) Append(id common.ID, b []byte) error {
length, err := a.pageWriter.Write(id, b)
length, err := a.dataWriter.Write(id, b)
if err != nil {
return err
}
Expand Down Expand Up @@ -66,5 +66,5 @@ func (a *appender) DataLength() uint64 {
}

func (a *appender) Complete() error {
return a.pageWriter.Complete()
return a.dataWriter.Complete()
}
4 changes: 2 additions & 2 deletions tempodb/encoding/appender_buffered.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// index
type bufferedAppender struct {
// output writer
writer common.PageWriter
writer common.DataWriter

// record keeping
records []*common.Record
Expand All @@ -23,7 +23,7 @@ type bufferedAppender struct {

// NewBufferedAppender returns an bufferedAppender. This appender builds a writes to
// the provided writer and also builds a downsampled records slice.
func NewBufferedAppender(writer common.PageWriter, indexDownsample int, totalObjectsEstimate int) (Appender, error) {
func NewBufferedAppender(writer common.DataWriter, indexDownsample int, totalObjectsEstimate int) (Appender, error) {
return &bufferedAppender{
writer: writer,
indexDownsampleBytes: indexDownsample,
Expand Down
18 changes: 10 additions & 8 deletions tempodb/encoding/backend_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ func NewBackendBlock(meta *backend.BlockMeta, r backend.Reader) (*BackendBlock,
encoding = v0Encoding{}
case "v1":
encoding = v1Encoding{}
case "v2":
encoding = v2Encoding{}
default:
return nil, fmt.Errorf("%s is not a valid block version", meta.Version)
}
Expand Down Expand Up @@ -74,20 +76,20 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) {
}

indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader)
indexReader, err := b.encoding.newIndexReader(indexReaderAt)
indexReader, err := b.encoding.newIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords))
if err != nil {
return nil, fmt.Errorf("error building index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}

ra := backend.NewContextReader(b.meta, nameObjects, b.reader)
pageReader, err := b.encoding.newPageReader(ra, b.meta.Encoding)
dataReader, err := b.encoding.newDataReader(ra, b.meta.Encoding)
if err != nil {
return nil, fmt.Errorf("error building page reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}
defer pageReader.Close()
defer dataReader.Close()

// passing nil for objectCombiner here. this is fine b/c a backend block should never have dupes
finder := NewPagedFinder(indexReader, pageReader, nil)
finder := NewPagedFinder(indexReader, dataReader, nil)
objectBytes, err := finder.Find(ctx, id)

if err != nil {
Expand All @@ -101,16 +103,16 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) {
func (b *BackendBlock) Iterator(chunkSizeBytes uint32) (Iterator, error) {
// read index
ra := backend.NewContextReader(b.meta, nameObjects, b.reader)
pageReader, err := b.encoding.newPageReader(ra, b.meta.Encoding)
dataReader, err := b.encoding.newDataReader(ra, b.meta.Encoding)
if err != nil {
return nil, fmt.Errorf("failed to create pageReader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
return nil, fmt.Errorf("failed to create dataReader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}

indexReaderAt := backend.NewContextReader(b.meta, nameIndex, b.reader)
reader, err := b.encoding.newIndexReader(indexReaderAt)
reader, err := b.encoding.newIndexReader(indexReaderAt, int(b.meta.IndexPageSize), int(b.meta.TotalRecords))
if err != nil {
return nil, fmt.Errorf("failed to create index reader (%s, %s): %w", b.meta.TenantID, b.meta.BlockID, err)
}

return newPagedIterator(chunkSizeBytes, reader, pageReader), nil
return newPagedIterator(chunkSizeBytes, reader, dataReader), nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package v0
package base

import (
"encoding/binary"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package v0
package base

import (
"bytes"
Expand Down
Loading

0 comments on commit fc4f477

Please sign in to comment.