From c61b67851b576a673176582f3388eb3c7f0c7145 Mon Sep 17 00:00:00 2001 From: Annanay Agarwal Date: Thu, 20 May 2021 21:30:59 +0530 Subject: [PATCH] Add configurable bloom filters (#644) * Add configurable bloom filters Signed-off-by: Annanay * Create filter based on size and fp Signed-off-by: Annanay * Use bloom.EstimateParameters and check for bloom size rather than fp Signed-off-by: Annanay * Lint Signed-off-by: Annanay * Use uint16 in block meta Signed-off-by: Annanay * Increase test timeout to 20m Signed-off-by: Annanay * Add more testing Signed-off-by: Annanay * Address comments, rename config key, Write() => Marshal() Signed-off-by: Annanay * Fix tests Signed-off-by: Annanay * Sanity checks on shardCount Signed-off-by: Annanay * Fix test Signed-off-by: Annanay * More testing around bloom shard counts Signed-off-by: Annanay * fix TestBlockCleanup Signed-off-by: Annanay * CHANGELOG Signed-off-by: Annanay * Correctly set meta.TotalObjects in newAppendBlockFromFile Signed-off-by: Annanay * make fmt Signed-off-by: Annanay * reduce maxShardCount, use appender.length() over len(records) Signed-off-by: Annanay --- CHANGELOG.md | 1 + modules/ingester/ingester_test.go | 3 +- modules/ingester/instance_test.go | 3 +- modules/querier/querier_test.go | 3 +- modules/storage/config.go | 4 +- tempodb/backend/block_meta.go | 1 + tempodb/backend/block_meta_test.go | 27 ++++ tempodb/backend/local/local_test.go | 5 +- tempodb/compactor_bookmark_test.go | 1 + tempodb/compactor_test.go | 5 + tempodb/encoding/backend_block.go | 2 +- tempodb/encoding/block.go | 4 +- tempodb/encoding/common/bloom.go | 68 +++++--- tempodb/encoding/common/bloom_test.go | 106 +++++++++++- tempodb/encoding/config.go | 5 + tempodb/encoding/streaming_block.go | 3 +- tempodb/encoding/streaming_block_test.go | 9 +- tempodb/retention_test.go | 6 +- tempodb/tempodb_test.go | 196 +++-------------------- tempodb/wal/append_block.go | 1 + 20 files changed, 245 insertions(+), 208 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5815d680820..92fc12e2701 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ * [ENHANCEMENT] Zipkin support - Dedupe span IDs based on span.Kind (client/server) in Query Frontend. [#687](https://github.com/grafana/tempo/pull/687) * [ENHANCEMENT] Reduce marshalling in the ingesters to improve performance. [#694](https://github.com/grafana/tempo/pull/694) This is kind of a **breaking change**. Rollout all ingesters before any other component to prevent dropped spans. +* [ENHANCEMENT] Allow setting the bloom filter shard size with support dynamic shard count.[#644](https://github.com/grafana/tempo/pull/644) * [CHANGE] Fix Query Frontend grpc settings to avoid noisy error log. [#690](https://github.com/grafana/tempo/pull/690) * [CHANGE] GCS SDK update v1.12.0 => v.15.0, ReadAllWithEstimate used in GCS/S3 backends. [#693](https://github.com/grafana/tempo/pull/693) diff --git a/modules/ingester/ingester_test.go b/modules/ingester/ingester_test.go index 5620f64a51a..7e0587c62a4 100644 --- a/modules/ingester/ingester_test.go +++ b/modules/ingester/ingester_test.go @@ -259,7 +259,8 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace, }, Block: &encoding.BlockConfig{ IndexDownsampleBytes: 2, - BloomFP: .01, + BloomFP: 0.01, + BloomShardSizeBytes: 100_000, Encoding: backend.EncLZ4_1M, IndexPageSizeBytes: 1000, }, diff --git a/modules/ingester/instance_test.go b/modules/ingester/instance_test.go index 2c5932010fb..c4ba9906e9f 100644 --- a/modules/ingester/instance_test.go +++ b/modules/ingester/instance_test.go @@ -520,7 +520,8 @@ func defaultInstance(t require.TestingT, tmpDir string) *instance { }, Block: &encoding.BlockConfig{ IndexDownsampleBytes: 2, - BloomFP: .01, + BloomFP: 0.01, + BloomShardSizeBytes: 100_000, Encoding: backend.EncLZ4_1M, IndexPageSizeBytes: 1000, }, diff --git a/modules/querier/querier_test.go b/modules/querier/querier_test.go index b410733dd85..fbbd6fef99b 100644 --- a/modules/querier/querier_test.go +++ b/modules/querier/querier_test.go @@ -57,7 +57,8 @@ func TestReturnAllHits(t *testing.T) { Block: &encoding.BlockConfig{ Encoding: backend.EncNone, IndexDownsampleBytes: 10, - BloomFP: .05, + BloomFP: 0.01, + BloomShardSizeBytes: 100_000, IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ diff --git a/modules/storage/config.go b/modules/storage/config.go index 125810323fc..456d30c9232 100644 --- a/modules/storage/config.go +++ b/modules/storage/config.go @@ -5,6 +5,7 @@ import ( "time" cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache" + "github.com/grafana/tempo/pkg/util" "github.com/grafana/tempo/tempodb" "github.com/grafana/tempo/tempodb/backend" @@ -37,7 +38,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet) cfg.Trace.WAL.Encoding = backend.EncNone cfg.Trace.Block = &encoding.BlockConfig{} - f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom False Positive.") + f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom Filter False Positive.") + f.IntVar(&cfg.Trace.Block.BloomShardSizeBytes, util.PrefixConfig(prefix, "trace.block.bloom-filter-shard-size-bytes"), 250*1024, "Bloom Filter Shard Size in bytes.") f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 1024*1024, "Number of bytes (before compression) per index record.") f.IntVar(&cfg.Trace.Block.IndexPageSizeBytes, util.PrefixConfig(prefix, "trace.block.index-page-size-bytes"), 250*1024, "Number of bytes per index page.") cfg.Trace.Block.Encoding = backend.EncZstd diff --git a/tempodb/backend/block_meta.go b/tempodb/backend/block_meta.go index 6115c7ef35a..a1f2597b1eb 100644 --- a/tempodb/backend/block_meta.go +++ b/tempodb/backend/block_meta.go @@ -28,6 +28,7 @@ type BlockMeta struct { IndexPageSize uint32 `json:"indexPageSize"` // Size of each index page in bytes TotalRecords uint32 `json:"totalRecords"` // Total Records stored in the index file DataEncoding string `json:"dataEncoding"` // DataEncoding is a string provided externally, but tracked by tempodb that indicates the way the bytes are encoded + BloomShardCount uint16 `json:"bloomShards"` // Number of bloom filter shards } func NewBlockMeta(tenantID string, blockID uuid.UUID, version string, encoding Encoding, dataEncoding string) *BlockMeta { diff --git a/tempodb/backend/block_meta_test.go b/tempodb/backend/block_meta_test.go index 68f980eb5ad..e37fadf72d1 100644 --- a/tempodb/backend/block_meta_test.go +++ b/tempodb/backend/block_meta_test.go @@ -2,6 +2,7 @@ package backend import ( "bytes" + "encoding/json" "math/rand" "testing" @@ -41,3 +42,29 @@ func TestBlockMeta(t *testing.T) { assert.Equal(t, 1, bytes.Compare(b.MaxID, b.MinID)) assert.Equal(t, 2, b.TotalObjects) } + +func TestBlockMetaParsing(t *testing.T) { + inputJSON := ` +{ + "format": "v0", + "blockID": "00000000-0000-0000-0000-000000000000", + "minID": "AAAAAAAAAAAAOO0z0LnnHg==", + "maxID": "AAAAAAAAAAD/o61w2bYIDg==", + "tenantID": "single-tenant", + "startTime": "2021-01-01T00:00:00.0000000Z", + "endTime": "2021-01-02T00:00:00.0000000Z", + "totalObjects": 10, + "size": 12345, + "compactionLevel": 0, + "encoding": "zstd", + "indexPageSize": 250000, + "totalRecords": 124356, + "dataEncoding": "", + "bloomShards": 244 +} +` + + blockMeta := BlockMeta{} + err := json.Unmarshal([]byte(inputJSON), &blockMeta) + assert.NoError(t, err, "expected to be able to unmarshal from JSON") +} diff --git a/tempodb/backend/local/local_test.go b/tempodb/backend/local/local_test.go index 956331de5f8..b69194bd6ce 100644 --- a/tempodb/backend/local/local_test.go +++ b/tempodb/backend/local/local_test.go @@ -10,9 +10,10 @@ import ( "testing" "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/grafana/tempo/tempodb/backend" "github.com/grafana/tempo/tempodb/encoding/common" - "github.com/stretchr/testify/assert" ) const objectName = "test" @@ -107,7 +108,7 @@ func TestCompaction(t *testing.T) { BlockID: blockID, } - shardNum := common.GetShardNum() + shardNum := common.ValidateShardCount(int(fakeMeta.BloomShardCount)) fakeBloom := make([][]byte, shardNum) fakeIndex := make([]byte, 20) fakeTraces := make([]byte, 200) diff --git a/tempodb/compactor_bookmark_test.go b/tempodb/compactor_bookmark_test.go index e2abdc4710c..225e27b527a 100644 --- a/tempodb/compactor_bookmark_test.go +++ b/tempodb/compactor_bookmark_test.go @@ -35,6 +35,7 @@ func TestCurrentClear(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 17, BloomFP: .01, + BloomShardSizeBytes: 100_000, Encoding: backend.EncGZIP, IndexPageSizeBytes: 1000, }, diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 276db4c94e6..735fb7a5cd0 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -64,6 +64,7 @@ func TestCompaction(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 11, BloomFP: .01, + BloomShardSizeBytes: 100_000, Encoding: backend.EncLZ4_4M, IndexPageSizeBytes: 1000, }, @@ -192,6 +193,7 @@ func TestSameIDCompaction(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 11, BloomFP: .01, + BloomShardSizeBytes: 100_000, Encoding: backend.EncSnappy, IndexPageSizeBytes: 1000, }, @@ -279,6 +281,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 11, BloomFP: .01, + BloomShardSizeBytes: 100_000, Encoding: backend.EncNone, IndexPageSizeBytes: 1000, }, @@ -345,6 +348,7 @@ func TestCompactionMetrics(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 11, BloomFP: .01, + BloomShardSizeBytes: 100_000, Encoding: backend.EncNone, IndexPageSizeBytes: 1000, }, @@ -415,6 +419,7 @@ func TestCompactionIteratesThroughTenants(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 11, BloomFP: .01, + BloomShardSizeBytes: 100_000, Encoding: backend.EncLZ4_64k, IndexPageSizeBytes: 1000, }, diff --git a/tempodb/encoding/backend_block.go b/tempodb/encoding/backend_block.go index 4a32db388e7..6578b9c616c 100644 --- a/tempodb/encoding/backend_block.go +++ b/tempodb/encoding/backend_block.go @@ -48,7 +48,7 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) { span.SetTag("block", b.meta.BlockID.String()) - shardKey := common.ShardKeyForTraceID(id) + shardKey := common.ShardKeyForTraceID(id, int(b.meta.BloomShardCount)) blockID := b.meta.BlockID tenantID := b.meta.TenantID diff --git a/tempodb/encoding/block.go b/tempodb/encoding/block.go index e56a4a1acb9..08f24c7fb1f 100644 --- a/tempodb/encoding/block.go +++ b/tempodb/encoding/block.go @@ -26,7 +26,7 @@ func bloomName(shard int) string { // writeBlockMeta writes the bloom filter, meta and index to the passed in backend.Writer func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMeta, indexBytes []byte, b *common.ShardedBloomFilter) error { - blooms, err := b.WriteTo() + blooms, err := b.Marshal() if err != nil { return err } @@ -81,7 +81,7 @@ func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader, } // Bloom - for i := 0; i < common.GetShardNum(); i++ { + for i := 0; i < common.ValidateShardCount(int(meta.BloomShardCount)); i++ { err = copy(bloomName(i)) if err != nil { return err diff --git a/tempodb/encoding/common/bloom.go b/tempodb/encoding/common/bloom.go index d7bea5c0570..ce0490978bb 100644 --- a/tempodb/encoding/common/bloom.go +++ b/tempodb/encoding/common/bloom.go @@ -2,41 +2,63 @@ package common import ( "bytes" + "math" - "github.com/grafana/tempo/pkg/util" + cortex_util "github.com/cortexproject/cortex/pkg/util/log" + "github.com/go-kit/kit/log/level" "github.com/willf/bloom" + + "github.com/grafana/tempo/pkg/util" ) -const shardNum = 10 +const ( + legacyShardCount = 10 + minShardCount = 1 + maxShardCount = 1000 +) type ShardedBloomFilter struct { blooms []*bloom.BloomFilter } -func NewWithEstimates(n uint, fp float64) *ShardedBloomFilter { - b := &ShardedBloomFilter{ - blooms: make([]*bloom.BloomFilter, shardNum), +// NewBloom creates a ShardedBloomFilter +func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter { + // estimate the number of shards needed + // m: number of bits in the filter + // k: number of hash functions + var shardCount uint + m, k := bloom.EstimateParameters(estimatedObjects, fp) + shardCount = uint(math.Ceil(float64(m) / (float64(shardSize) * 8.0))) + + if shardCount < minShardCount { + shardCount = minShardCount } - itemsPerBloom := n / shardNum - if itemsPerBloom == 0 { - itemsPerBloom = 1 + if shardCount > maxShardCount { + shardCount = maxShardCount + level.Warn(cortex_util.Logger).Log("msg", "required bloom filter shard count exceeded max. consider increasing bloom_filter_shard_size_bytes") } - for i := 0; i < shardNum; i++ { - b.blooms[i] = bloom.NewWithEstimates(itemsPerBloom, fp) + + b := &ShardedBloomFilter{ + blooms: make([]*bloom.BloomFilter, shardCount), + } + + for i := 0; i < int(shardCount); i++ { + // New(m uint, k uint) creates a new Bloom filter with _m_ bits and _k_ hashing functions + b.blooms[i] = bloom.New(shardSize*8, k) } return b } func (b *ShardedBloomFilter) Add(traceID []byte) { - shardKey := ShardKeyForTraceID(traceID) + shardKey := ShardKeyForTraceID(traceID, len(b.blooms)) b.blooms[shardKey].Add(traceID) } -// WriteTo is a wrapper around bloom.WriteTo -func (b *ShardedBloomFilter) WriteTo() ([][]byte, error) { - bloomBytes := make([][]byte, shardNum) +// Marshal is a wrapper around bloom.WriteTo +func (b *ShardedBloomFilter) Marshal() ([][]byte, error) { + bloomBytes := make([][]byte, len(b.blooms)) for i, f := range b.blooms { bloomBuffer := &bytes.Buffer{} _, err := f.WriteTo(bloomBuffer) @@ -48,16 +70,24 @@ func (b *ShardedBloomFilter) WriteTo() ([][]byte, error) { return bloomBytes, nil } -func ShardKeyForTraceID(traceID []byte) int { - return int(util.TokenForTraceID(traceID)) % shardNum +func (b *ShardedBloomFilter) GetShardCount() int { + return len(b.blooms) } // Test implements bloom.Test -> required only for testing func (b *ShardedBloomFilter) Test(traceID []byte) bool { - shardKey := ShardKeyForTraceID(traceID) + shardKey := ShardKeyForTraceID(traceID, len(b.blooms)) return b.blooms[shardKey].Test(traceID) } -func GetShardNum() int { - return shardNum +func ShardKeyForTraceID(traceID []byte, shardCount int) int { + return int(util.TokenForTraceID(traceID)) % ValidateShardCount(shardCount) +} + +// For backward compatibility +func ValidateShardCount(shardCount int) int { + if shardCount == 0 { + return legacyShardCount + } + return shardCount } diff --git a/tempodb/encoding/common/bloom_test.go b/tempodb/encoding/common/bloom_test.go index de692237f84..754f5d53a1c 100644 --- a/tempodb/encoding/common/bloom_test.go +++ b/tempodb/encoding/common/bloom_test.go @@ -12,7 +12,7 @@ import ( func TestShardedBloom(t *testing.T) { // create a bunch of traceIDs var err error - const numTraces = 1000 + const numTraces = 10000 traceIDs := make([][]byte, 0) for i := 0; i < numTraces; i++ { id := make([]byte, 16) @@ -23,7 +23,9 @@ func TestShardedBloom(t *testing.T) { // create sharded bloom filter const bloomFP = .01 - b := NewWithEstimates(uint(numTraces), bloomFP) + shardSize := uint(100) + estimatedObjects := uint(numTraces) + b := NewBloom(bloomFP, shardSize, estimatedObjects) // add traceIDs to sharded bloom filter for _, traceID := range traceIDs { @@ -31,18 +33,21 @@ func TestShardedBloom(t *testing.T) { } // get byte representation - bloomBytes, err := b.WriteTo() + bloomBytes, err := b.Marshal() assert.NoError(t, err) - assert.Len(t, bloomBytes, shardNum) + assert.Len(t, bloomBytes, b.GetShardCount()) // parse byte representation into willf_bloom.Bloomfilter var filters []*willf_bloom.BloomFilter - for i := 0; i < shardNum; i++ { + for i := 0; i < b.GetShardCount(); i++ { filters = append(filters, &willf_bloom.BloomFilter{}) } for i, singleBloom := range bloomBytes { _, err = filters[i].ReadFrom(bytes.NewReader(singleBloom)) assert.NoError(t, err) + + // assert that parsed form has the expected size + assert.Equal(t, shardSize*8, filters[i].Cap()) // * 8 because need bits from bytes } // confirm that the sharded bloom and parsed form give the same result @@ -52,9 +57,98 @@ func TestShardedBloom(t *testing.T) { if !found { missingCount++ } - assert.Equal(t, found, filters[ShardKeyForTraceID(traceID)].Test(traceID)) + assert.Equal(t, found, filters[ShardKeyForTraceID(traceID, b.GetShardCount())].Test(traceID)) } // check that missingCount is less than bloomFP assert.LessOrEqual(t, float64(missingCount), bloomFP*numTraces) } + +func TestShardedBloomFalsePositive(t *testing.T) { + tests := []struct { + name string + bloomFP float64 + shardSize uint + estimatedObjects uint + }{ + { + name: "regular", + bloomFP: 0.05, + shardSize: 250 * 1024, + estimatedObjects: 10_000_000, + }, + { + name: "large estimated objects", + bloomFP: 0.01, + shardSize: 100, + estimatedObjects: 10000, + }, + { + name: "large shard size", + bloomFP: 0.01, + shardSize: 100000, + estimatedObjects: 10, + }, + } + + for _, tt := range tests { + tt := tt // capture range variable, needed for running test cases in parallel + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + b := NewBloom(tt.bloomFP, tt.shardSize, tt.estimatedObjects) + + // get byte representation + bloomBytes, err := b.Marshal() + assert.NoError(t, err) + + // parse byte representation into willf_bloom.Bloomfilter + var filters []*willf_bloom.BloomFilter + for i := 0; i < b.GetShardCount(); i++ { + filters = append(filters, &willf_bloom.BloomFilter{}) + } + + for i, singleBloom := range bloomBytes { + _, err = filters[i].ReadFrom(bytes.NewReader(singleBloom)) + assert.NoError(t, err) + assert.LessOrEqual(t, filters[i].EstimateFalsePositiveRate(tt.estimatedObjects/uint(b.GetShardCount())), tt.bloomFP) + } + }) + } +} + +func TestBloomShardCount(t *testing.T) { + tests := []struct { + name string + bloomFP float64 + shardSize uint + estimatedObjects uint + expectedShards uint + }{ + { + name: "too many shards", + bloomFP: 0.01, + shardSize: 1, + estimatedObjects: 100000, + expectedShards: maxShardCount, + }, + { + name: "too few shards", + bloomFP: 0.01, + shardSize: 10, + estimatedObjects: 1, + expectedShards: minShardCount, + }, + } + + for _, tt := range tests { + tt := tt // capture range variable, needed for running test cases in parallel + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + b := NewBloom(tt.bloomFP, tt.shardSize, tt.estimatedObjects) + assert.Equal(t, int(tt.expectedShards), b.GetShardCount()) + }) + } + +} diff --git a/tempodb/encoding/config.go b/tempodb/encoding/config.go index 13dea780ff0..69fe00bc25e 100644 --- a/tempodb/encoding/config.go +++ b/tempodb/encoding/config.go @@ -11,6 +11,7 @@ type BlockConfig struct { IndexDownsampleBytes int `yaml:"index_downsample_bytes"` IndexPageSizeBytes int `yaml:"index_page_size_bytes"` BloomFP float64 `yaml:"bloom_filter_false_positive"` + BloomShardSizeBytes int `yaml:"bloom_filter_shard_size_bytes"` Encoding backend.Encoding `yaml:"encoding"` } @@ -28,5 +29,9 @@ func ValidateConfig(b *BlockConfig) error { return fmt.Errorf("invalid bloom filter fp rate %v", b.BloomFP) } + if b.BloomShardSizeBytes <= 0 { + return fmt.Errorf("Positive value required for bloom-filter shard size") + } + return nil } diff --git a/tempodb/encoding/streaming_block.go b/tempodb/encoding/streaming_block.go index 01887ab078e..0bedfab1712 100644 --- a/tempodb/encoding/streaming_block.go +++ b/tempodb/encoding/streaming_block.go @@ -41,7 +41,7 @@ func NewStreamingBlock(cfg *BlockConfig, id uuid.UUID, tenantID string, metas [] c := &StreamingBlock{ encoding: LatestEncoding(), compactedMeta: backend.NewBlockMeta(tenantID, id, currentVersion, cfg.Encoding, dataEncoding), - bloom: common.NewWithEstimates(uint(estimatedObjects), cfg.BloomFP), + bloom: common.NewBloom(cfg.BloomFP, uint(cfg.BloomShardSizeBytes), uint(estimatedObjects)), inMetas: metas, cfg: cfg, } @@ -132,6 +132,7 @@ func (c *StreamingBlock) Complete(ctx context.Context, tracker backend.AppendTra meta.TotalRecords = uint32(len(records)) // casting meta.IndexPageSize = uint32(c.cfg.IndexPageSizeBytes) + meta.BloomShardCount = uint16(c.bloom.GetShardCount()) return bytesFlushed, writeBlockMeta(ctx, w, meta, indexBytes, c.bloom) } diff --git a/tempodb/encoding/streaming_block_test.go b/tempodb/encoding/streaming_block_test.go index b283ee56bd1..b2a348d9646 100644 --- a/tempodb/encoding/streaming_block_test.go +++ b/tempodb/encoding/streaming_block_test.go @@ -59,7 +59,8 @@ func TestStreamingBlockAddObject(t *testing.T) { numObjects := (rand.Int() % 20) + 1 cb, err := NewStreamingBlock(&BlockConfig{ - BloomFP: .01, + BloomFP: 0.01, + BloomShardSizeBytes: 100, IndexDownsampleBytes: indexDownsample, Encoding: backend.EncGZIP, }, uuid.New(), testTenantID, metas, numObjects) @@ -117,6 +118,7 @@ func TestStreamingBlockAddObject(t *testing.T) { assert.Equal(t, testTenantID, meta.TenantID) assert.Equal(t, numObjects, meta.TotalObjects) assert.Greater(t, meta.Size, uint64(0)) + assert.Greater(t, cb.bloom.GetShardCount(), 0) // bloom for _, id := range ids { @@ -136,6 +138,7 @@ func TestStreamingBlockAll(t *testing.T) { &BlockConfig{ IndexDownsampleBytes: 1000, BloomFP: .01, + BloomShardSizeBytes: 10_000, Encoding: enc, IndexPageSizeBytes: 1000, }, @@ -237,6 +240,7 @@ func streamingBlock(t *testing.T, cfg *BlockConfig, w backend.Writer) (*Streamin originatingMeta.StartTime = time.Now().Add(-5 * time.Minute) originatingMeta.EndTime = time.Now().Add(5 * time.Minute) originatingMeta.DataEncoding = "foo" + originatingMeta.TotalObjects = numMsgs // calc expected records byteCounter := 0 @@ -259,6 +263,8 @@ func streamingBlock(t *testing.T, cfg *BlockConfig, w backend.Writer) (*Streamin block, err := NewStreamingBlock(cfg, originatingMeta.BlockID, originatingMeta.TenantID, []*backend.BlockMeta{originatingMeta}, originatingMeta.TotalObjects) require.NoError(t, err, "unexpected error completing block") + expectedBloomShards := block.bloom.GetShardCount() + ctx := context.Background() for { id, data, err := iter.Next(ctx) @@ -287,6 +293,7 @@ func streamingBlock(t *testing.T, cfg *BlockConfig, w backend.Writer) (*Streamin require.Equal(t, originatingMeta.EndTime, block.BlockMeta().EndTime) require.Equal(t, originatingMeta.TenantID, block.BlockMeta().TenantID) require.Equal(t, originatingMeta.DataEncoding, block.BlockMeta().DataEncoding) + require.Equal(t, expectedBloomShards, int(block.BlockMeta().BloomShardCount)) // Verify block size was written require.Greater(t, block.BlockMeta().Size, uint64(0)) diff --git a/tempodb/retention_test.go b/tempodb/retention_test.go index 6c8e2225ae9..e9a2cecf5ff 100644 --- a/tempodb/retention_test.go +++ b/tempodb/retention_test.go @@ -29,7 +29,8 @@ func TestRetention(t *testing.T) { }, Block: &encoding.BlockConfig{ IndexDownsampleBytes: 17, - BloomFP: .01, + BloomFP: 0.01, + BloomShardSizeBytes: 100_000, Encoding: backend.EncLZ4_256k, IndexPageSizeBytes: 1000, }, @@ -84,7 +85,8 @@ func TestBlockRetentionOverride(t *testing.T) { }, Block: &encoding.BlockConfig{ IndexDownsampleBytes: 17, - BloomFP: .01, + BloomFP: 0.01, + BloomShardSizeBytes: 100_000, Encoding: backend.EncLZ4_256k, IndexPageSizeBytes: 1000, }, diff --git a/tempodb/tempodb_test.go b/tempodb/tempodb_test.go index 927f176a24c..ebfa8a773a8 100644 --- a/tempodb/tempodb_test.go +++ b/tempodb/tempodb_test.go @@ -10,9 +10,11 @@ import ( "time" "github.com/go-kit/kit/log" - "github.com/golang/protobuf/proto" "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/grafana/tempo/pkg/tempopb" "github.com/grafana/tempo/pkg/util/test" "github.com/grafana/tempo/tempodb/backend" @@ -20,20 +22,18 @@ import ( "github.com/grafana/tempo/tempodb/encoding" "github.com/grafana/tempo/tempodb/encoding/common" "github.com/grafana/tempo/tempodb/wal" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) const ( testTenantID = "fake" testTenantID2 = "fake2" + tmpdir = "/tmp" testDataEncoding = "blerg" ) -func TestDB(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") - defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") +func testConfig(t *testing.T, enc backend.Encoding, blocklistPoll time.Duration) (Reader, Writer, Compactor, string) { + tempDir, err := ioutil.TempDir(tmpdir, "") + require.NoError(t, err) r, w, c, err := New(&Config{ Backend: "local", @@ -43,15 +43,22 @@ func TestDB(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsampleBytes: 17, BloomFP: .01, - Encoding: backend.EncGZIP, + BloomShardSizeBytes: 100_000, + Encoding: enc, IndexPageSizeBytes: 1000, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), }, - BlocklistPoll: 0, + BlocklistPoll: blocklistPoll, }, log.NewNopLogger()) - assert.NoError(t, err) + require.NoError(t, err) + return r, w, c, tempDir +} + +func TestDB(t *testing.T) { + r, w, c, tempDir := testConfig(t, backend.EncGZIP, 0) + defer os.RemoveAll(tempDir) c.EnableCompaction(&CompactorConfig{ ChunkSizeBytes: 10, @@ -108,28 +115,8 @@ func TestBlockSharding(t *testing.T) { // push a req with some traceID // cut headblock & write to backend // search with different shards and check if its respecting the params - - tempDir, err := ioutil.TempDir("/tmp", "") + r, w, _, tempDir := testConfig(t, backend.EncLZ4_256k, 0) defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, w, _, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) - assert.NoError(t, err) // create block with known ID blockID := uuid.New() @@ -182,27 +169,8 @@ func TestBlockSharding(t *testing.T) { } func TestNilOnUnknownTenantID(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") + r, _, _, tempDir := testConfig(t, backend.EncLZ4_256k, 0) defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, _, _, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) - assert.NoError(t, err) buff, _, err := r.Find(context.Background(), "unknown", []byte{0x01}, BlockIDMin, BlockIDMax) assert.Nil(t, buff) @@ -210,27 +178,8 @@ func TestNilOnUnknownTenantID(t *testing.T) { } func TestBlockCleanup(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") + r, w, c, tempDir := testConfig(t, backend.EncLZ4_256k, 0) defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, w, c, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) - assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ ChunkSizeBytes: 10, @@ -242,7 +191,6 @@ func TestBlockCleanup(t *testing.T) { blockID := uuid.New() wal := w.WAL() - assert.NoError(t, err) head, err := wal.NewBlock(blockID, testTenantID, "") assert.NoError(t, err) @@ -294,23 +242,8 @@ func TestCleanMissingTenants(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - r, _, _, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join("/tmp", "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join("/tmp", "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) - assert.NoError(t, err) + r, _, _, tempDir := testConfig(t, backend.EncLZ4_256k, 0) + defer os.RemoveAll(tempDir) rw := r.(*readerWriter) @@ -351,27 +284,8 @@ func checkBlocklists(t *testing.T, expectedID uuid.UUID, expectedB int, expected } func TestUpdateBlocklist(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") + r, _, _, tempDir := testConfig(t, backend.EncLZ4_256k, 0) defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, _, _, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) - assert.NoError(t, err) rw := r.(*readerWriter) @@ -539,27 +453,8 @@ func TestUpdateBlocklist(t *testing.T) { } func TestUpdateBlocklistCompacted(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") + r, _, _, tempDir := testConfig(t, backend.EncLZ4_256k, 0) defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, _, _, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: 0, - }, log.NewNopLogger()) - assert.NoError(t, err) rw := r.(*readerWriter) @@ -861,27 +756,8 @@ func TestIncludeCompactedBlock(t *testing.T) { } func TestSearchCompactedBlocks(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") + r, w, c, tempDir := testConfig(t, backend.EncLZ4_256k, time.Minute) defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - r, w, c, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: time.Minute, - }, log.NewNopLogger()) - assert.NoError(t, err) c.EnableCompaction(&CompactorConfig{ ChunkSizeBytes: 10, @@ -891,7 +767,6 @@ func TestSearchCompactedBlocks(t *testing.T) { }, &mockSharder{}, &mockOverrides{}) wal := w.WAL() - assert.NoError(t, err) head, err := wal.NewBlock(uuid.New(), testTenantID, "") assert.NoError(t, err) @@ -967,27 +842,8 @@ func TestSearchCompactedBlocks(t *testing.T) { } func TestCompleteBlock(t *testing.T) { - tempDir, err := ioutil.TempDir("/tmp", "") + _, w, _, tempDir := testConfig(t, backend.EncLZ4_256k, time.Minute) defer os.RemoveAll(tempDir) - assert.NoError(t, err, "unexpected error creating temp dir") - - _, w, _, err := New(&Config{ - Backend: "local", - Local: &local.Config{ - Path: path.Join(tempDir, "traces"), - }, - Block: &encoding.BlockConfig{ - IndexDownsampleBytes: 17, - BloomFP: .01, - Encoding: backend.EncLZ4_256k, - IndexPageSizeBytes: 1000, - }, - WAL: &wal.Config{ - Filepath: path.Join(tempDir, "wal"), - }, - BlocklistPoll: time.Minute, - }, log.NewNopLogger()) - assert.NoError(t, err) wal := w.WAL() diff --git a/tempodb/wal/append_block.go b/tempodb/wal/append_block.go index 6b5bb3eae06..5e485728800 100644 --- a/tempodb/wal/append_block.go +++ b/tempodb/wal/append_block.go @@ -139,6 +139,7 @@ func newAppendBlockFromFile(filename string, path string) (*AppendBlock, error, common.SortRecords(records) b.appender = encoding.NewRecordAppender(records) + b.meta.TotalObjects = b.appender.Length() return b, warning, nil }