Skip to content

Commit

Permalink
Add configurable bloom filters (#644)
Browse files Browse the repository at this point in the history
* Add configurable bloom filters

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Create filter based on size and fp

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Use bloom.EstimateParameters and check for bloom size rather than fp

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Lint

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Use uint16 in block meta

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Increase test timeout to 20m

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Add more testing

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Address comments, rename config key, Write() => Marshal()

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Fix tests

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Sanity checks on shardCount

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Fix test

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* More testing around bloom shard counts

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* fix TestBlockCleanup

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* CHANGELOG

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Correctly set meta.TotalObjects in newAppendBlockFromFile

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* make fmt

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* reduce maxShardCount, use appender.length() over len(records)

Signed-off-by: Annanay <annanayagarwal@gmail.com>
  • Loading branch information
annanay25 authored May 20, 2021
1 parent b666b00 commit c61b678
Show file tree
Hide file tree
Showing 20 changed files with 245 additions and 208 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [ENHANCEMENT] Zipkin support - Dedupe span IDs based on span.Kind (client/server) in Query Frontend. [#687](https://github.com/grafana/tempo/pull/687)
* [ENHANCEMENT] Reduce marshalling in the ingesters to improve performance. [#694](https://github.com/grafana/tempo/pull/694)
This is kind of a **breaking change**. Rollout all ingesters before any other component to prevent dropped spans.
* [ENHANCEMENT] Allow setting the bloom filter shard size with support dynamic shard count.[#644](https://github.com/grafana/tempo/pull/644)
* [CHANGE] Fix Query Frontend grpc settings to avoid noisy error log. [#690](https://github.com/grafana/tempo/pull/690)
* [CHANGE] GCS SDK update v1.12.0 => v.15.0, ReadAllWithEstimate used in GCS/S3 backends. [#693](https://github.com/grafana/tempo/pull/693)

Expand Down
3 changes: 2 additions & 1 deletion modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace,
},
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 2,
BloomFP: .01,
BloomFP: 0.01,
BloomShardSizeBytes: 100_000,
Encoding: backend.EncLZ4_1M,
IndexPageSizeBytes: 1000,
},
Expand Down
3 changes: 2 additions & 1 deletion modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,8 @@ func defaultInstance(t require.TestingT, tmpDir string) *instance {
},
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 2,
BloomFP: .01,
BloomFP: 0.01,
BloomShardSizeBytes: 100_000,
Encoding: backend.EncLZ4_1M,
IndexPageSizeBytes: 1000,
},
Expand Down
3 changes: 2 additions & 1 deletion modules/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func TestReturnAllHits(t *testing.T) {
Block: &encoding.BlockConfig{
Encoding: backend.EncNone,
IndexDownsampleBytes: 10,
BloomFP: .05,
BloomFP: 0.01,
BloomShardSizeBytes: 100_000,
IndexPageSizeBytes: 1000,
},
WAL: &wal.Config{
Expand Down
4 changes: 3 additions & 1 deletion modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

cortex_cache "github.com/cortexproject/cortex/pkg/chunk/cache"

"github.com/grafana/tempo/pkg/util"
"github.com/grafana/tempo/tempodb"
"github.com/grafana/tempo/tempodb/backend"
Expand Down Expand Up @@ -37,7 +38,8 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Trace.WAL.Encoding = backend.EncNone

cfg.Trace.Block = &encoding.BlockConfig{}
f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom False Positive.")
f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .05, "Bloom Filter False Positive.")
f.IntVar(&cfg.Trace.Block.BloomShardSizeBytes, util.PrefixConfig(prefix, "trace.block.bloom-filter-shard-size-bytes"), 250*1024, "Bloom Filter Shard Size in bytes.")
f.IntVar(&cfg.Trace.Block.IndexDownsampleBytes, util.PrefixConfig(prefix, "trace.block.index-downsample-bytes"), 1024*1024, "Number of bytes (before compression) per index record.")
f.IntVar(&cfg.Trace.Block.IndexPageSizeBytes, util.PrefixConfig(prefix, "trace.block.index-page-size-bytes"), 250*1024, "Number of bytes per index page.")
cfg.Trace.Block.Encoding = backend.EncZstd
Expand Down
1 change: 1 addition & 0 deletions tempodb/backend/block_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type BlockMeta struct {
IndexPageSize uint32 `json:"indexPageSize"` // Size of each index page in bytes
TotalRecords uint32 `json:"totalRecords"` // Total Records stored in the index file
DataEncoding string `json:"dataEncoding"` // DataEncoding is a string provided externally, but tracked by tempodb that indicates the way the bytes are encoded
BloomShardCount uint16 `json:"bloomShards"` // Number of bloom filter shards
}

func NewBlockMeta(tenantID string, blockID uuid.UUID, version string, encoding Encoding, dataEncoding string) *BlockMeta {
Expand Down
27 changes: 27 additions & 0 deletions tempodb/backend/block_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package backend

import (
"bytes"
"encoding/json"
"math/rand"
"testing"

Expand Down Expand Up @@ -41,3 +42,29 @@ func TestBlockMeta(t *testing.T) {
assert.Equal(t, 1, bytes.Compare(b.MaxID, b.MinID))
assert.Equal(t, 2, b.TotalObjects)
}

func TestBlockMetaParsing(t *testing.T) {
inputJSON := `
{
"format": "v0",
"blockID": "00000000-0000-0000-0000-000000000000",
"minID": "AAAAAAAAAAAAOO0z0LnnHg==",
"maxID": "AAAAAAAAAAD/o61w2bYIDg==",
"tenantID": "single-tenant",
"startTime": "2021-01-01T00:00:00.0000000Z",
"endTime": "2021-01-02T00:00:00.0000000Z",
"totalObjects": 10,
"size": 12345,
"compactionLevel": 0,
"encoding": "zstd",
"indexPageSize": 250000,
"totalRecords": 124356,
"dataEncoding": "",
"bloomShards": 244
}
`

blockMeta := BlockMeta{}
err := json.Unmarshal([]byte(inputJSON), &blockMeta)
assert.NoError(t, err, "expected to be able to unmarshal from JSON")
}
5 changes: 3 additions & 2 deletions tempodb/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
"testing"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding/common"
"github.com/stretchr/testify/assert"
)

const objectName = "test"
Expand Down Expand Up @@ -107,7 +108,7 @@ func TestCompaction(t *testing.T) {
BlockID: blockID,
}

shardNum := common.GetShardNum()
shardNum := common.ValidateShardCount(int(fakeMeta.BloomShardCount))
fakeBloom := make([][]byte, shardNum)
fakeIndex := make([]byte, 20)
fakeTraces := make([]byte, 200)
Expand Down
1 change: 1 addition & 0 deletions tempodb/compactor_bookmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestCurrentClear(t *testing.T) {
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 17,
BloomFP: .01,
BloomShardSizeBytes: 100_000,
Encoding: backend.EncGZIP,
IndexPageSizeBytes: 1000,
},
Expand Down
5 changes: 5 additions & 0 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func TestCompaction(t *testing.T) {
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 11,
BloomFP: .01,
BloomShardSizeBytes: 100_000,
Encoding: backend.EncLZ4_4M,
IndexPageSizeBytes: 1000,
},
Expand Down Expand Up @@ -192,6 +193,7 @@ func TestSameIDCompaction(t *testing.T) {
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 11,
BloomFP: .01,
BloomShardSizeBytes: 100_000,
Encoding: backend.EncSnappy,
IndexPageSizeBytes: 1000,
},
Expand Down Expand Up @@ -279,6 +281,7 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 11,
BloomFP: .01,
BloomShardSizeBytes: 100_000,
Encoding: backend.EncNone,
IndexPageSizeBytes: 1000,
},
Expand Down Expand Up @@ -345,6 +348,7 @@ func TestCompactionMetrics(t *testing.T) {
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 11,
BloomFP: .01,
BloomShardSizeBytes: 100_000,
Encoding: backend.EncNone,
IndexPageSizeBytes: 1000,
},
Expand Down Expand Up @@ -415,6 +419,7 @@ func TestCompactionIteratesThroughTenants(t *testing.T) {
Block: &encoding.BlockConfig{
IndexDownsampleBytes: 11,
BloomFP: .01,
BloomShardSizeBytes: 100_000,
Encoding: backend.EncLZ4_64k,
IndexPageSizeBytes: 1000,
},
Expand Down
2 changes: 1 addition & 1 deletion tempodb/encoding/backend_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (b *BackendBlock) Find(ctx context.Context, id common.ID) ([]byte, error) {

span.SetTag("block", b.meta.BlockID.String())

shardKey := common.ShardKeyForTraceID(id)
shardKey := common.ShardKeyForTraceID(id, int(b.meta.BloomShardCount))
blockID := b.meta.BlockID
tenantID := b.meta.TenantID

Expand Down
4 changes: 2 additions & 2 deletions tempodb/encoding/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func bloomName(shard int) string {

// writeBlockMeta writes the bloom filter, meta and index to the passed in backend.Writer
func writeBlockMeta(ctx context.Context, w backend.Writer, meta *backend.BlockMeta, indexBytes []byte, b *common.ShardedBloomFilter) error {
blooms, err := b.WriteTo()
blooms, err := b.Marshal()
if err != nil {
return err
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func CopyBlock(ctx context.Context, meta *backend.BlockMeta, src backend.Reader,
}

// Bloom
for i := 0; i < common.GetShardNum(); i++ {
for i := 0; i < common.ValidateShardCount(int(meta.BloomShardCount)); i++ {
err = copy(bloomName(i))
if err != nil {
return err
Expand Down
68 changes: 49 additions & 19 deletions tempodb/encoding/common/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,63 @@ package common

import (
"bytes"
"math"

"github.com/grafana/tempo/pkg/util"
cortex_util "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/willf/bloom"

"github.com/grafana/tempo/pkg/util"
)

const shardNum = 10
const (
legacyShardCount = 10
minShardCount = 1
maxShardCount = 1000
)

type ShardedBloomFilter struct {
blooms []*bloom.BloomFilter
}

func NewWithEstimates(n uint, fp float64) *ShardedBloomFilter {
b := &ShardedBloomFilter{
blooms: make([]*bloom.BloomFilter, shardNum),
// NewBloom creates a ShardedBloomFilter
func NewBloom(fp float64, shardSize, estimatedObjects uint) *ShardedBloomFilter {
// estimate the number of shards needed
// m: number of bits in the filter
// k: number of hash functions
var shardCount uint
m, k := bloom.EstimateParameters(estimatedObjects, fp)
shardCount = uint(math.Ceil(float64(m) / (float64(shardSize) * 8.0)))

if shardCount < minShardCount {
shardCount = minShardCount
}

itemsPerBloom := n / shardNum
if itemsPerBloom == 0 {
itemsPerBloom = 1
if shardCount > maxShardCount {
shardCount = maxShardCount
level.Warn(cortex_util.Logger).Log("msg", "required bloom filter shard count exceeded max. consider increasing bloom_filter_shard_size_bytes")
}
for i := 0; i < shardNum; i++ {
b.blooms[i] = bloom.NewWithEstimates(itemsPerBloom, fp)

b := &ShardedBloomFilter{
blooms: make([]*bloom.BloomFilter, shardCount),
}

for i := 0; i < int(shardCount); i++ {
// New(m uint, k uint) creates a new Bloom filter with _m_ bits and _k_ hashing functions
b.blooms[i] = bloom.New(shardSize*8, k)
}

return b
}

func (b *ShardedBloomFilter) Add(traceID []byte) {
shardKey := ShardKeyForTraceID(traceID)
shardKey := ShardKeyForTraceID(traceID, len(b.blooms))
b.blooms[shardKey].Add(traceID)
}

// WriteTo is a wrapper around bloom.WriteTo
func (b *ShardedBloomFilter) WriteTo() ([][]byte, error) {
bloomBytes := make([][]byte, shardNum)
// Marshal is a wrapper around bloom.WriteTo
func (b *ShardedBloomFilter) Marshal() ([][]byte, error) {
bloomBytes := make([][]byte, len(b.blooms))
for i, f := range b.blooms {
bloomBuffer := &bytes.Buffer{}
_, err := f.WriteTo(bloomBuffer)
Expand All @@ -48,16 +70,24 @@ func (b *ShardedBloomFilter) WriteTo() ([][]byte, error) {
return bloomBytes, nil
}

func ShardKeyForTraceID(traceID []byte) int {
return int(util.TokenForTraceID(traceID)) % shardNum
func (b *ShardedBloomFilter) GetShardCount() int {
return len(b.blooms)
}

// Test implements bloom.Test -> required only for testing
func (b *ShardedBloomFilter) Test(traceID []byte) bool {
shardKey := ShardKeyForTraceID(traceID)
shardKey := ShardKeyForTraceID(traceID, len(b.blooms))
return b.blooms[shardKey].Test(traceID)
}

func GetShardNum() int {
return shardNum
func ShardKeyForTraceID(traceID []byte, shardCount int) int {
return int(util.TokenForTraceID(traceID)) % ValidateShardCount(shardCount)
}

// For backward compatibility
func ValidateShardCount(shardCount int) int {
if shardCount == 0 {
return legacyShardCount
}
return shardCount
}
Loading

0 comments on commit c61b678

Please sign in to comment.