Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correctly pass down wal encoding config #1037

Merged
merged 3 commits into from
Oct 18, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,12 @@ func (i *instance) resetHeadBlock() error {
i.lastBlockCut = time.Now()

// Create search data wal file
f, bufferedWriter, err := i.writer.WAL().NewFile(i.headBlock.BlockID(), i.instanceID, searchDir)
f, bufferedWriter, version, enc, err := i.writer.WAL().NewFile(i.headBlock.BlockID(), i.instanceID, searchDir)
if err != nil {
return err
}

b, err := search.NewStreamingSearchBlockForFile(f, bufferedWriter, "v2", backend.EncNone)
b, err := search.NewStreamingSearchBlockForFile(f, bufferedWriter, version, enc)
if err != nil {
return err
}
Expand Down
48 changes: 48 additions & 0 deletions tempodb/search/backend_search_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"os"
"path"
"strconv"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/pkg/tempofb"
Expand Down Expand Up @@ -125,3 +127,49 @@ func TestBackendSearchBlockFinalSize(t *testing.T) {
}
}
}

func BenchmarkBackendSearchBlockSearch(b *testing.B) {
pageSizesMB := []float32{0.5, 1, 2}

for _, enc := range backend.SupportedEncoding {
for _, sz := range pageSizesMB {
b.Run(fmt.Sprint(enc.String(), "/", sz, "MiB"), func(b *testing.B) {

b2 := newBackendSearchBlockWithTraces(b, b.N, enc, int(sz*1024*1024))

// Use secret tag to perform exhaustive search
p := NewSearchPipeline(&tempopb.SearchRequest{
Tags: map[string]string{SecretExhaustiveSearchTag: "!"},
})

sr := NewResults()

b.ResetTimer()
start := time.Now()
// Search 10x10 because reading the search data is much faster than creating it, but we need
// to spend at least 1 second to satisfy go bench minimum elapsed time requirement.
loops := 10
wg := &sync.WaitGroup{}
for i := 0; i < loops; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < loops; j++ {
err := b2.Search(context.TODO(), p, sr)
require.NoError(b, err)
}
}()
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("BackendSearchBlock search throughput: %v elapsed %.2f MB = %.2f MiB/s \t %d traces = %.2fM traces/s \n",
elapsed,
float64(sr.bytesInspected.Load())/(1024*1024),
float64(sr.bytesInspected.Load())/(elapsed.Seconds())/(1024*1024),
sr.TracesInspected(),
float64(sr.TracesInspected())/(elapsed.Seconds())/1_000_000,
)
})
}
}
}
46 changes: 0 additions & 46 deletions tempodb/search/streaming_search_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,52 +214,6 @@ func TestStreamingSearchBlockIteratorDedupes(t *testing.T) {
}
}

func BenchmarkBackendSearchBlockSearch(b *testing.B) {
pageSizesMB := []float32{0.5, 1, 2}

for _, enc := range backend.SupportedEncoding {
for _, sz := range pageSizesMB {
b.Run(fmt.Sprint(enc.String(), "/", sz, "MiB"), func(b *testing.B) {

b2 := newBackendSearchBlockWithTraces(b, b.N, enc, int(sz*1024*1024))

// Use secret tag to perform exhaustive search
p := NewSearchPipeline(&tempopb.SearchRequest{
Tags: map[string]string{SecretExhaustiveSearchTag: "!"},
})

sr := NewResults()

b.ResetTimer()
start := time.Now()
// Search 10x10 because reading the search data is much faster than creating it, but we need
// to spend at least 1 second to satisfy go bench minimum elapsed time requirement.
loops := 10
wg := &sync.WaitGroup{}
for i := 0; i < loops; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < loops; j++ {
err := b2.Search(context.TODO(), p, sr)
require.NoError(b, err)
}
}()
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("BackendSearchBlock search throughput: %v elapsed %.2f MB = %.2f MiB/s \t %d traces = %.2fM traces/s \n",
elapsed,
float64(sr.bytesInspected.Load())/(1024*1024),
float64(sr.bytesInspected.Load())/(elapsed.Seconds())/(1024*1024),
sr.TracesInspected(),
float64(sr.TracesInspected())/(elapsed.Seconds())/1_000_000,
)
})
}
}
}

func TestStreamingSearchBlockBuffersAndFlushes(t *testing.T) {
// direct call to flush
testBufferAndFlush(t, func(s *StreamingSearchBlock) {
Expand Down
128 changes: 0 additions & 128 deletions tempodb/wal/append_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,131 +155,3 @@ func TestFullFilename(t *testing.T) {
})
}
}

func TestParseFilename(t *testing.T) {
tests := []struct {
name string
filename string
expectUUID uuid.UUID
expectTenant string
expectedVersion string
expectedEncoding backend.Encoding
expectedDataEncoding string
expectError bool
}{
{
name: "version, enc snappy and dataencoding",
filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:snappy:dataencoding",
expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"),
expectTenant: "foo",
expectedVersion: "v2",
expectedEncoding: backend.EncSnappy,
expectedDataEncoding: "dataencoding",
},
{
name: "version, enc none and dataencoding",
filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:none:dataencoding",
expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"),
expectTenant: "foo",
expectedVersion: "v2",
expectedEncoding: backend.EncNone,
expectedDataEncoding: "dataencoding",
},
{
name: "empty dataencoding",
filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:snappy",
expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"),
expectTenant: "foo",
expectedVersion: "v2",
expectedEncoding: backend.EncSnappy,
expectedDataEncoding: "",
},
{
name: "empty dataencoding with semicolon",
filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:snappy:",
expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"),
expectTenant: "foo",
expectedVersion: "v2",
expectedEncoding: backend.EncSnappy,
expectedDataEncoding: "",
},
{
name: "path fails",
filename: "/blerg/123e4567-e89b-12d3-a456-426614174000:foo",
expectError: true,
},
{
name: "no :",
filename: "123e4567-e89b-12d3-a456-426614174000",
expectError: true,
},
{
name: "empty string",
filename: "",
expectError: true,
},
{
name: "bad uuid",
filename: "123e4:foo",
expectError: true,
},
{
name: "no tenant",
filename: "123e4567-e89b-12d3-a456-426614174000:",
expectError: true,
},
{
name: "no version",
filename: "123e4567-e89b-12d3-a456-426614174000:test::none",
expectError: true,
},
{
name: "wrong splits - 6",
filename: "123e4567-e89b-12d3-a456-426614174000:test:test:test:test:test",
expectError: true,
},
{
name: "wrong splits - 3",
filename: "123e4567-e89b-12d3-a456-426614174000:test:test",
expectError: true,
},
{
name: "wrong splits - 1",
filename: "123e4567-e89b-12d3-a456-426614174000",
expectError: true,
},
{
name: "bad encoding",
filename: "123e4567-e89b-12d3-a456-426614174000:test:v1:asdf",
expectError: true,
},
{
name: "ez-mode old format",
filename: "123e4567-e89b-12d3-a456-426614174000:foo",
expectError: true,
},
{
name: "deprecated version",
filename: "123e4567-e89b-12d3-a456-426614174000:foo:v1:snappy",
expectError: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
actualUUID, actualTenant, actualVersion, actualEncoding, actualDataEncoding, err := ParseFilename(tc.filename)

if tc.expectError {
assert.Error(t, err)
return
}

assert.NoError(t, err)
assert.Equal(t, tc.expectUUID, actualUUID)
assert.Equal(t, tc.expectTenant, actualTenant)
assert.Equal(t, tc.expectedEncoding, actualEncoding)
assert.Equal(t, tc.expectedVersion, actualVersion)
assert.Equal(t, tc.expectedDataEncoding, actualDataEncoding)
})
}
}
13 changes: 8 additions & 5 deletions tempodb/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,24 @@ func (w *WAL) NewBlock(id uuid.UUID, tenantID string, dataEncoding string) (*App
return newAppendBlock(id, tenantID, w.c.Filepath, w.c.Encoding, dataEncoding, w.c.WriteBufferSize)
}

func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string) (*os.File, *bufio.Writer, error) {
func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string) (*os.File, *bufio.Writer, string, backend.Encoding, error) {
// search WAL pinned to v2 for now
walFileVersion := "v2"

p := filepath.Join(w.c.Filepath, dir)
err := os.MkdirAll(p, os.ModePerm)
if err != nil {
return nil, nil, err
return nil, nil, "", backend.EncNone, err
}

// blockID, tenantID, version, encoding (compression), dataEncoding
filename := fmt.Sprintf("%v:%v:%v:%v:%v", blockid, tenantid, "v2", backend.EncNone, "")
filename := fmt.Sprintf("%v:%v:%v:%v:%v", blockid, tenantid, walFileVersion, w.c.SearchEncoding, "")
file, err := os.OpenFile(filepath.Join(p, filename), os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, nil, err
return nil, nil, "", backend.EncNone, err
}

return file, bufio.NewWriterSize(file, w.c.WriteBufferSize), nil
return file, bufio.NewWriterSize(file, w.c.WriteBufferSize), walFileVersion, w.c.SearchEncoding, nil
}

// ParseFilename returns (blockID, tenant, version, encoding, dataEncoding, error).
Expand Down
Loading