Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bloom sharding #192

Merged
merged 19 commits into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions tempodb/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ var (
type AppendTracker interface{}

type Writer interface {
Write(ctx context.Context, meta *encoding.BlockMeta, bBloom []byte, bIndex []byte, objectFilePath string) error
Write(ctx context.Context, meta *encoding.BlockMeta, bBloom [][]byte, bIndex []byte, objectFilePath string) error

WriteBlockMeta(ctx context.Context, tracker AppendTracker, meta *encoding.BlockMeta, bBloom []byte, bIndex []byte) error
WriteBlockMeta(ctx context.Context, tracker AppendTracker, meta *encoding.BlockMeta, bBloom [][]byte, bIndex []byte) error
AppendObject(ctx context.Context, tracker AppendTracker, meta *encoding.BlockMeta, bObject []byte) (AppendTracker, error)
}

type Reader interface {
Tenants(ctx context.Context) ([]string, error)
Blocks(ctx context.Context, tenantID string) ([]uuid.UUID, error)
BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*encoding.BlockMeta, error)
Bloom(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error)
Bloom(ctx context.Context, blockID uuid.UUID, tenantID string, bloomShard int) ([]byte, error)
Index(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error)
Object(ctx context.Context, blockID uuid.UUID, tenantID string, offset uint64, buffer []byte) error

Expand Down
9 changes: 5 additions & 4 deletions tempodb/backend/diskcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
)

type missFunc func(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error)
type bloomMissFunc func(ctx context.Context, blockID uuid.UUID, tenantID string, bloomShard int) ([]byte, error)
type indexMissFunc func(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error)

const (
typeBloom = "bloom"
Expand Down Expand Up @@ -95,8 +96,8 @@ func (r *reader) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID stri
return r.next.BlockMeta(ctx, blockID, tenantID)
}

func (r *reader) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
b, skippableErr, err := r.readOrCacheKeyToDisk(ctx, blockID, tenantID, typeBloom, r.next.Bloom)
func (r *reader) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string, bloomShard int) ([]byte, error) {
b, skippableErr, err := r.readOrCacheBloom(ctx, blockID, tenantID, typeBloom, bloomShard, r.next.Bloom)

if skippableErr != nil {
metricDiskCache.WithLabelValues(typeBloom, "error").Inc()
Expand All @@ -109,7 +110,7 @@ func (r *reader) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string)
}

func (r *reader) Index(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
b, skippableErr, err := r.readOrCacheKeyToDisk(ctx, blockID, tenantID, typeIndex, r.next.Index)
b, skippableErr, err := r.readOrCacheIndex(ctx, blockID, tenantID, typeIndex, r.next.Index)

if skippableErr != nil {
metricDiskCache.WithLabelValues(typeIndex, "error").Inc()
Expand Down
35 changes: 34 additions & 1 deletion tempodb/backend/diskcache/disk_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,40 @@ import (
"github.com/karrick/godirwalk"
)

func (r *reader) readOrCacheKeyToDisk(ctx context.Context, blockID uuid.UUID, tenantID string, t string, miss missFunc) ([]byte, error, error) {
// TODO: factor out common code with readOrCacheIndexToDisk into separate function
func (r *reader) readOrCacheBloom(ctx context.Context, blockID uuid.UUID, tenantID string, t string, shardNum int, miss bloomMissFunc) ([]byte, error, error) {
var skippableError error

k := key(blockID, tenantID, t)
filename := path.Join(r.cfg.Path, k)

bytes, err := ioutil.ReadFile(filename)

if err != nil && !os.IsNotExist(err) {
skippableError = err
}

if bytes != nil {
return bytes, nil, nil
}

metricDiskCacheMiss.WithLabelValues(t).Inc()
bytes, err = miss(ctx, blockID, tenantID, shardNum)
if err != nil {
return nil, nil, err // backend store error. need to bubble this up
}

if bytes != nil {
err = r.writeKeyToDisk(filename, bytes)
if err != nil {
skippableError = err
}
}

return bytes, skippableError, nil
}

func (r *reader) readOrCacheIndex(ctx context.Context, blockID uuid.UUID, tenantID string, t string, miss indexMissFunc) ([]byte, error, error) {
var skippableError error

k := key(blockID, tenantID, t)
Expand Down
29 changes: 20 additions & 9 deletions tempodb/backend/diskcache/disk_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ func TestReadOrCache(t *testing.T) {
assert.NoError(t, err, "unexpected error creating temp dir")

missBytes := []byte{0x01}
missCalled := 0
missFunc := func(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
missCalled++
indexMissCalled := 0
bloomMissCalled := 0
indexMiss := func(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
indexMissCalled++
return missBytes, nil
}
bloomMiss := func(ctx context.Context, blockID uuid.UUID, tenantID string, shardNum int) ([]byte, error) {
bloomMissCalled++
return missBytes, nil
}

Expand All @@ -38,17 +43,23 @@ func TestReadOrCache(t *testing.T) {
blockID := uuid.New()
tenantID := testTenantID

bytes, skippableErr, err := cache.(*reader).readOrCacheKeyToDisk(context.Background(), blockID, tenantID, "type", missFunc)
bytes, skippableErr, err := cache.(*reader).readOrCacheIndex(context.Background(), blockID, tenantID, "type", indexMiss)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
assert.Equal(t, 1, missCalled)
assert.Equal(t, 1, indexMissCalled)

bytes, skippableErr, err = cache.(*reader).readOrCacheKeyToDisk(context.Background(), blockID, tenantID, "type", missFunc)
bytes, skippableErr, err = cache.(*reader).readOrCacheIndex(context.Background(), blockID, tenantID, "type", indexMiss)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
assert.Equal(t, 1, missCalled)
assert.Equal(t, 1, indexMissCalled)

bytes, skippableErr, err = cache.(*reader).readOrCacheBloom(context.Background(), blockID, tenantID, "type", 1, bloomMiss)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
assert.Equal(t, 1, bloomMissCalled)
}

func TestJanitor(t *testing.T) {
Expand All @@ -75,7 +86,7 @@ func TestJanitor(t *testing.T) {
blockID := uuid.New()
tenantID := testTenantID

bytes, skippableErr, err := cache.(*reader).readOrCacheKeyToDisk(context.Background(), blockID, tenantID, "type", missFunc)
bytes, skippableErr, err := cache.(*reader).readOrCacheIndex(context.Background(), blockID, tenantID, "type", missFunc)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
Expand All @@ -96,7 +107,7 @@ func TestJanitor(t *testing.T) {
blockID := uuid.New()
tenantID := testTenantID

bytes, skippableErr, err := cache.(*reader).readOrCacheKeyToDisk(context.Background(), blockID, tenantID, "type", missFunc)
bytes, skippableErr, err := cache.(*reader).readOrCacheIndex(context.Background(), blockID, tenantID, "type", missFunc)
assert.NoError(t, err)
assert.NoError(t, skippableErr)
assert.Equal(t, missBytes, bytes)
Expand Down
14 changes: 5 additions & 9 deletions tempodb/backend/gcs/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import (
"context"
"encoding/json"
"fmt"
"path"

"cloud.google.com/go/storage"
"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/backend/util"
"github.com/grafana/tempo/tempodb/encoding"
"google.golang.org/api/iterator"
)

func (rw *readerWriter) MarkBlockCompacted(blockID uuid.UUID, tenantID string) error {
// move meta file to a new location
metaFilename := rw.metaFileName(blockID, tenantID)
compactedMetaFilename := rw.compactedMetaFileName(blockID, tenantID)
metaFilename := util.MetaFileName(blockID, tenantID)
compactedMetaFilename := util.CompactedMetaFileName(blockID, tenantID)

src := rw.bucket.Object(metaFilename)
dst := rw.bucket.Object(compactedMetaFilename)
Expand All @@ -41,7 +41,7 @@ func (rw *readerWriter) ClearBlock(blockID uuid.UUID, tenantID string) error {

ctx := context.TODO()
iter := rw.bucket.Objects(ctx, &storage.Query{
Prefix: rw.rootPath(blockID, tenantID),
Prefix: util.RootPath(blockID, tenantID),
Versions: false,
})

Expand All @@ -65,7 +65,7 @@ func (rw *readerWriter) ClearBlock(blockID uuid.UUID, tenantID string) error {
}

func (rw *readerWriter) CompactedBlockMeta(blockID uuid.UUID, tenantID string) (*encoding.CompactedBlockMeta, error) {
name := rw.compactedMetaFileName(blockID, tenantID)
name := util.CompactedMetaFileName(blockID, tenantID)

bytes, modTime, err := rw.readAllWithModTime(context.Background(), name)
if err == storage.ErrObjectNotExist {
Expand All @@ -84,7 +84,3 @@ func (rw *readerWriter) CompactedBlockMeta(blockID uuid.UUID, tenantID string) (

return out, err
}

func (rw *readerWriter) compactedMetaFileName(blockID uuid.UUID, tenantID string) string {
return path.Join(rw.rootPath(blockID, tenantID), "meta.compacted.json")
}
54 changes: 19 additions & 35 deletions tempodb/backend/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"io"
"io/ioutil"
"os"
"path"
"strings"
"time"

"github.com/grafana/tempo/tempodb/backend/util"
"github.com/grafana/tempo/tempodb/encoding/bloom"

"cloud.google.com/go/storage"
"github.com/google/uuid"
"github.com/grafana/tempo/tempodb/backend"
Expand Down Expand Up @@ -49,7 +51,7 @@ func New(cfg *Config) (backend.Reader, backend.Writer, backend.Compactor, error)
return rw, rw, rw, nil
}

func (rw *readerWriter) Write(ctx context.Context, meta *encoding.BlockMeta, bBloom []byte, bIndex []byte, objectFilePath string) error {
func (rw *readerWriter) Write(ctx context.Context, meta *encoding.BlockMeta, bBloom [][]byte, bIndex []byte, objectFilePath string) error {
blockID := meta.BlockID
tenantID := meta.TenantID

Expand All @@ -64,7 +66,7 @@ func (rw *readerWriter) Write(ctx context.Context, meta *encoding.BlockMeta, bBl
}
defer src.Close()

w := rw.writer(ctx, rw.objectFileName(blockID, tenantID))
w := rw.writer(ctx, util.ObjectFileName(blockID, tenantID))
defer w.Close()
_, err = io.Copy(w, src)
if err != nil {
Expand All @@ -79,7 +81,7 @@ func (rw *readerWriter) Write(ctx context.Context, meta *encoding.BlockMeta, bBl
return nil
}

func (rw *readerWriter) WriteBlockMeta(ctx context.Context, tracker backend.AppendTracker, meta *encoding.BlockMeta, bBloom []byte, bIndex []byte) error {
func (rw *readerWriter) WriteBlockMeta(ctx context.Context, tracker backend.AppendTracker, meta *encoding.BlockMeta, bBloom [][]byte, bIndex []byte) error {
if tracker != nil {
w := tracker.(*storage.Writer)
_ = w.Close()
Expand All @@ -88,12 +90,14 @@ func (rw *readerWriter) WriteBlockMeta(ctx context.Context, tracker backend.Appe
blockID := meta.BlockID
tenantID := meta.TenantID

err := rw.writeAll(ctx, rw.bloomFileName(blockID, tenantID), bBloom)
if err != nil {
return err
for i := 0; i < bloom.GetShardNum(); i++ {
err := rw.writeAll(ctx, util.BloomFileName(blockID, tenantID, i), bBloom[i])
if err != nil {
return err
}
}

err = rw.writeAll(ctx, rw.indexFileName(blockID, tenantID), bIndex)
err := rw.writeAll(ctx, util.IndexFileName(blockID, tenantID), bIndex)
if err != nil {
return err
}
Expand All @@ -104,7 +108,7 @@ func (rw *readerWriter) WriteBlockMeta(ctx context.Context, tracker backend.Appe
}

// write meta last. this will prevent blocklist from returning a partial block
err = rw.writeAll(ctx, rw.metaFileName(blockID, tenantID), bMeta)
err = rw.writeAll(ctx, util.MetaFileName(blockID, tenantID), bMeta)
if err != nil {
return err
}
Expand All @@ -118,7 +122,7 @@ func (rw *readerWriter) AppendObject(ctx context.Context, tracker backend.Append
blockID := meta.BlockID
tenantID := meta.TenantID

w = rw.writer(ctx, rw.objectFileName(blockID, tenantID))
w = rw.writer(ctx, util.ObjectFileName(blockID, tenantID))
} else {
w = tracker.(*storage.Writer)
}
Expand Down Expand Up @@ -189,7 +193,7 @@ func (rw *readerWriter) Blocks(ctx context.Context, tenantID string) ([]uuid.UUI
}

func (rw *readerWriter) BlockMeta(ctx context.Context, blockID uuid.UUID, tenantID string) (*encoding.BlockMeta, error) {
name := rw.metaFileName(blockID, tenantID)
name := util.MetaFileName(blockID, tenantID)

bytes, err := rw.readAll(ctx, name)
if err == storage.ErrObjectNotExist {
Expand All @@ -208,54 +212,34 @@ func (rw *readerWriter) BlockMeta(ctx context.Context, blockID uuid.UUID, tenant
return out, nil
}

func (rw *readerWriter) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
func (rw *readerWriter) Bloom(ctx context.Context, blockID uuid.UUID, tenantID string, shardNum int) ([]byte, error) {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "gcs.Bloom")
defer span.Finish()

name := rw.bloomFileName(blockID, tenantID)
name := util.BloomFileName(blockID, tenantID, shardNum)
return rw.readAll(derivedCtx, name)
}

func (rw *readerWriter) Index(ctx context.Context, blockID uuid.UUID, tenantID string) ([]byte, error) {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "gcs.Index")
defer span.Finish()

name := rw.indexFileName(blockID, tenantID)
name := util.IndexFileName(blockID, tenantID)
return rw.readAll(derivedCtx, name)
}

func (rw *readerWriter) Object(ctx context.Context, blockID uuid.UUID, tenantID string, start uint64, buffer []byte) error {
span, derivedCtx := opentracing.StartSpanFromContext(ctx, "gcs.Object")
defer span.Finish()

name := rw.objectFileName(blockID, tenantID)
name := util.ObjectFileName(blockID, tenantID)
return rw.readRange(derivedCtx, name, int64(start), buffer)
}

func (rw *readerWriter) Shutdown() {

}

func (rw *readerWriter) metaFileName(blockID uuid.UUID, tenantID string) string {
return path.Join(rw.rootPath(blockID, tenantID), "meta.json")
}

func (rw *readerWriter) bloomFileName(blockID uuid.UUID, tenantID string) string {
return path.Join(rw.rootPath(blockID, tenantID), "bloom")
}

func (rw *readerWriter) indexFileName(blockID uuid.UUID, tenantID string) string {
return path.Join(rw.rootPath(blockID, tenantID), "index")
}

func (rw *readerWriter) objectFileName(blockID uuid.UUID, tenantID string) string {
return path.Join(rw.rootPath(blockID, tenantID), "data")
}

func (rw *readerWriter) rootPath(blockID uuid.UUID, tenantID string) string {
return path.Join(tenantID, blockID.String())
}

func (rw *readerWriter) writeAll(ctx context.Context, name string, b []byte) error {
w := rw.writer(ctx, name)
defer w.Close()
Expand Down
1 change: 1 addition & 0 deletions tempodb/backend/local/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path"

"github.com/google/uuid"

"github.com/grafana/tempo/tempodb/backend"
"github.com/grafana/tempo/tempodb/encoding"
)
Expand Down
Loading