Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloom sharding #192

Merged
merged 19 commits into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
## master / unreleased

* [CHANGE] Bloom filters are now sharded to reduce size and improve caching, as blocks grow. This is a **breaking change** and all data stored before this change will **not** be queryable. [192](https://github.com/grafana/tempo/pull/192)
* [ENHANCEMENT] CI checks for vendored dependencies using `make vendor-check`. Update CONTRIBUTING.md to reflect the same before checking in files in a PR. [#274](https://github.com/grafana/tempo/pull/274)
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ require (
go.uber.org/zap v1.15.0
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
google.golang.org/api v0.29.0
google.golang.org/genproto v0.0.0-20201026171402-d4b8fe4fd877 // indirect
google.golang.org/genproto v0.0.0-20201028140639-c77dae4b0522 // indirect
google.golang.org/grpc v1.33.1
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/yaml.v2 v2.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2311,8 +2311,8 @@ google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEY
google.golang.org/genproto v0.0.0-20200603110839-e855014d5736/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/genproto v0.0.0-20200710124503-20a17af7bd0e h1:k+p/u26/lVeNEpdxSeUrm7rTvoFckBKaf7gTzgmHyDA=
google.golang.org/genproto v0.0.0-20200710124503-20a17af7bd0e/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201026171402-d4b8fe4fd877 h1:d4k3uIU763E31Rk4UZPA47oOoBymMsDImV3U4mGhX9E=
google.golang.org/genproto v0.0.0-20201026171402-d4b8fe4fd877/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201028140639-c77dae4b0522 h1:7RoRaOmOAXwqnurgQ5g5/d0yCi9ha2UxuTZULXudK7A=
google.golang.org/genproto v0.0.0-20201028140639-c77dae4b0522/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/grpc v1.29.1 h1:EC2SB8S04d2r73uptxphDSUG+kTKVgjRPF+N3xpxRB4=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
2 changes: 1 addition & 1 deletion modules/ingester/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestInstance(t *testing.T) {
block = i.GetBlockToBeFlushed()
}
assert.NotNil(t, block)
assert.Nil(t, i.completingBlock, 1)
assert.Nil(t, i.completingBlock)
assert.Len(t, i.completeBlocks, 1)

err = ingester.store.WriteBlock(context.Background(), block)
Expand Down
6 changes: 3 additions & 3 deletions tempodb/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ var (
type AppendTracker interface{}

type Writer interface {
Write(ctx context.Context, meta *encoding.BlockMeta, bBloom []byte, bIndex []byte, objectFilePath string) error
Write(ctx context.Context, meta *encoding.BlockMeta, bBloom [][]byte, bIndex []byte, objectFilePath string) error

WriteBlockMeta(ctx context.Context, tracker AppendTracker, meta *encoding.BlockMeta, bBloom []byte, bIndex []byte) error
WriteBlockMeta(ctx context.Context, tracker AppendTracker, meta *encoding.BlockMeta, bBloom [][]byte, bIndex []byte) error
AppendObject(ctx context.Context, tracker AppendTracker, meta *encoding.BlockMeta, bObject []byte) (AppendTracker, error)
}

type Reader interface {
Tenants(ctx context.Context) ([]string, error)
Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, error)
BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*encoding.BlockMeta, error)
Bloom(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error)
Bloom(ctx context.Context, blockID uuid.UUID, tenantID string, bloomShard int) ([]byte, error)
Index(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error)
Object(ctx context.Context, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error

Expand Down
14 changes: 10 additions & 4 deletions tempodb/backend/diskcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"strconv"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
Expand All @@ -15,7 +16,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

type missFunc func(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error)
type bloomMissFunc func(ctx context.Context, blockID uuid.UUID, tenantID string, bloomShard int) ([]byte, error)
type indexMissFunc func(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error)

const (
typeBloom = "bloom"
Expand Down Expand Up @@ -95,8 +97,8 @@ func (r *reader) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID stri
return r.next.BlockMeta(ctx, blockID, tenantID)
}

func (r *reader) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
b, skippableErr, err := r.readOrCacheKeyToDisk(ctx, blockID, tenantID, typeBloom, r.next.Bloom)
func (r *reader) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string, bloomShard int) ([]byte, error) {
b, skippableErr, err := r.readOrCacheBloom(ctx, blockID, tenantID, bloomShard, r.next.Bloom)

if skippableErr != nil {
metricDiskCache.WithLabelValues(typeBloom, "error").Inc()
Expand All @@ -109,7 +111,7 @@ func (r *reader) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string)
}

func (r *reader) Index(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
b, skippableErr, err := r.readOrCacheKeyToDisk(ctx, blockID, tenantID, typeIndex, r.next.Index)
b, skippableErr, err := r.readOrCacheIndex(ctx, blockID, tenantID, r.next.Index)

if skippableErr != nil {
metricDiskCache.WithLabelValues(typeIndex, "error").Inc()
Expand All @@ -134,3 +136,7 @@ func (r *reader) Shutdown() {
func key(blockID uuid.UUID, tenantID string, t string) string {
return blockID.String() + ":" + tenantID + ":" + t
}

func bloomKey(blockID uuid.UUID, tenantID string, t string, shardNum int) string {
return blockID.String() + ":" + tenantID + ":" + t + ":" + strconv.Itoa(shardNum)
}
39 changes: 36 additions & 3 deletions tempodb/backend/diskcache/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ import (
"github.com/karrick/godirwalk"
)

func (r *reader) readOrCacheKeyToDisk(ctx context.Context, blockID uuid.UUID, tenantID string, t string, miss missFunc) ([]byte, error, error) {
// TODO: factor out common code with readOrCacheIndexToDisk into separate function
func (r *reader) readOrCacheBloom(ctx context.Context, blockID uuid.UUID, tenantID string, shardNum int, miss bloomMissFunc) ([]byte, error, error) {
var skippableError error

k := key(blockID, tenantID, t)
k := bloomKey(blockID, tenantID, typeBloom, shardNum)
filename := path.Join(r.cfg.Path, k)

bytes, err := ioutil.ReadFile(filename)
Expand All @@ -30,7 +31,39 @@ func (r *reader) readOrCacheKeyToDisk(ctx context.Context, blockID uuid.UUID, te
return bytes, nil, nil
}

metricDiskCacheMiss.WithLabelValues(t).Inc()
metricDiskCacheMiss.WithLabelValues(typeBloom).Inc()
bytes, err = miss(ctx, blockID, tenantID, shardNum)
if err != nil {
return nil, nil, err // backend store error. need to bubble this up
}

if bytes != nil {
err = r.writeKeyToDisk(filename, bytes)
if err != nil {
skippableError = err
}
}

return bytes, skippableError, nil
}

func (r *reader) readOrCacheIndex(ctx context.Context, blockID uuid.UUID, tenantID string, miss indexMissFunc) ([]byte, error, error) {
var skippableError error

k := key(blockID, tenantID, typeIndex)
filename := path.Join(r.cfg.Path, k)

bytes, err := ioutil.ReadFile(filename)

if err != nil && !os.IsNotExist(err) {
skippableError = err
}

if bytes != nil {
return bytes, nil, nil
}

metricDiskCacheMiss.WithLabelValues(typeIndex).Inc()
bytes, err = miss(ctx, blockID, tenantID)
if err != nil {
return nil, nil, err // backend store error. need to bubble this up
Expand Down
43 changes: 34 additions & 9 deletions tempodb/backend/diskcache/disk_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,21 @@ func TestReadOrCache(t *testing.T) {
assert.NoError(t, err, "unexpected error creating temp dir")

missBytes := []byte{0x01}
missCalled := 0
missFunc := func(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
missCalled++
indexMissCalled := 0
bloomMissCalled := 0

// indexMiss function to be called when the key is not cached
indexMiss := func(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
indexMissCalled++
return missBytes, nil
}
// bloomMiss function to be called when the bloomKey is not cached
bloomMiss := func(ctx context.Context, blockID uuid.UUID, tenantID string, shardNum int) ([]byte, error) {
bloomMissCalled++
return missBytes, nil
}

// create new cache
cache, err := New(nil, &Config{
Path: tempDir,
MaxDiskMBs: 1024,
Expand All @@ -38,17 +47,33 @@ func TestReadOrCache(t *testing.T) {
blockID := uuid.New()
tenantID := testTenantID

bytes, skippableErr, err := cache.(*reader).readOrCacheKeyToDisk(context.Background(), blockID, tenantID, "type", missFunc)
// get key from cache
bytes, skippableErr, err := cache.(*reader).readOrCacheIndex(context.Background(), blockID, tenantID, indexMiss)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
assert.Equal(t, 1, missCalled)
assert.Equal(t, 1, indexMissCalled)

bytes, skippableErr, err = cache.(*reader).readOrCacheKeyToDisk(context.Background(), blockID, tenantID, "type", missFunc)
// make sure the missFunc is not called again since the key is cached already
bytes, skippableErr, err = cache.(*reader).readOrCacheIndex(context.Background(), blockID, tenantID, indexMiss)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
assert.Equal(t, 1, missCalled)
assert.Equal(t, 1, indexMissCalled)

// get key from cache
bytes, skippableErr, err = cache.(*reader).readOrCacheBloom(context.Background(), blockID, tenantID, 0, bloomMiss)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
assert.Equal(t, 1, bloomMissCalled)

// make sure the missFunc is not called again since the key is cached already
bytes, skippableErr, err = cache.(*reader).readOrCacheBloom(context.Background(), blockID, tenantID, 0, bloomMiss)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
assert.Equal(t, 1, bloomMissCalled)
}

func TestJanitor(t *testing.T) {
Expand All @@ -75,7 +100,7 @@ func TestJanitor(t *testing.T) {
blockID := uuid.New()
tenantID := testTenantID

bytes, skippableErr, err := cache.(*reader).readOrCacheKeyToDisk(context.Background(), blockID, tenantID, "type", missFunc)
bytes, skippableErr, err := cache.(*reader).readOrCacheIndex(context.Background(), blockID, tenantID, missFunc)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
Expand All @@ -96,7 +121,7 @@ func TestJanitor(t *testing.T) {
blockID := uuid.New()
tenantID := testTenantID

bytes, skippableErr, err := cache.(*reader).readOrCacheKeyToDisk(context.Background(), blockID, tenantID, "type", missFunc)
bytes, skippableErr, err := cache.(*reader).readOrCacheIndex(context.Background(), blockID, tenantID, missFunc)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
Expand Down
14 changes: 5 additions & 9 deletions tempodb/backend/gcs/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import (
"context"
"encoding/json"
"fmt"
"path"

"cloud.google.com/go/storage"
"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/util"
"github.com/grafana/tempo/tempodb/encoding"
"google.golang.org/api/iterator"
)

func (rw *readerWriter) MarkBlockCompacted(blockID uuid.UUID, tenantID string) error {
// move meta file to a new location
metaFilename := rw.metaFileName(blockID, tenantID)
compactedMetaFilename := rw.compactedMetaFileName(blockID, tenantID)
metaFilename := util.MetaFileName(blockID, tenantID)
compactedMetaFilename := util.CompactedMetaFileName(blockID, tenantID)

src := rw.bucket.Object(metaFilename)
dst := rw.bucket.Object(compactedMetaFilename)
Expand All @@ -41,7 +41,7 @@ func (rw *readerWriter) ClearBlock(blockID uuid.UUID, tenantID string) error {

ctx := context.TODO()
iter := rw.bucket.Objects(ctx, &storage.Query{
Prefix: rw.rootPath(blockID, tenantID),
Prefix: util.RootPath(blockID, tenantID),
Versions: false,
})

Expand All @@ -65,7 +65,7 @@ func (rw *readerWriter) ClearBlock(blockID uuid.UUID, tenantID string) error {
}

func (rw *readerWriter) CompactedBlockMeta(blockID uuid.UUID, tenantID string) (*encoding.CompactedBlockMeta, error) {
name := rw.compactedMetaFileName(blockID, tenantID)
name := util.CompactedMetaFileName(blockID, tenantID)

bytes, modTime, err := rw.readAllWithModTime(context.Background(), name)
if err == storage.ErrObjectNotExist {
Expand All @@ -84,7 +84,3 @@ func (rw *readerWriter) CompactedBlockMeta(blockID uuid.UUID, tenantID string) (

return out, err
}

func (rw *readerWriter) compactedMetaFileName(blockID uuid.UUID, tenantID string) string {
return path.Join(rw.rootPath(blockID, tenantID), "meta.compacted.json")
}
Loading