Skip to content

Commit

Permalink
Correctly pass down wal encoding config (#1037)
Browse files Browse the repository at this point in the history
* Correctly pass down wal encoding config, move tests around

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* sync version b/n wal and streamingsearchblock

Signed-off-by: Annanay <annanayagarwal@gmail.com>
  • Loading branch information
annanay25 committed Oct 18, 2021
1 parent 0662f16 commit 0dbc48f
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 213 deletions.
4 changes: 2 additions & 2 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,12 +497,12 @@ func (i *instance) resetHeadBlock() error {
i.lastBlockCut = time.Now()

// Create search data wal file
f, bufferedWriter, err := i.writer.WAL().NewFile(i.headBlock.BlockID(), i.instanceID, searchDir)
f, bufferedWriter, version, enc, err := i.writer.WAL().NewFile(i.headBlock.BlockID(), i.instanceID, searchDir)
if err != nil {
return err
}

b, err := search.NewStreamingSearchBlockForFile(f, bufferedWriter, "v2", backend.EncNone)
b, err := search.NewStreamingSearchBlockForFile(f, bufferedWriter, version, enc)
if err != nil {
return err
}
Expand Down
48 changes: 48 additions & 0 deletions tempodb/search/backend_search_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"os"
"path"
"strconv"
"sync"
"testing"
"time"

"github.com/google/uuid"
"github.com/grafana/tempo/pkg/tempofb"
Expand Down Expand Up @@ -125,3 +127,49 @@ func TestBackendSearchBlockFinalSize(t *testing.T) {
}
}
}

func BenchmarkBackendSearchBlockSearch(b *testing.B) {
pageSizesMB := []float32{0.5, 1, 2}

for _, enc := range backend.SupportedEncoding {
for _, sz := range pageSizesMB {
b.Run(fmt.Sprint(enc.String(), "/", sz, "MiB"), func(b *testing.B) {

b2 := newBackendSearchBlockWithTraces(b, b.N, enc, int(sz*1024*1024))

// Use secret tag to perform exhaustive search
p := NewSearchPipeline(&tempopb.SearchRequest{
Tags: map[string]string{SecretExhaustiveSearchTag: "!"},
})

sr := NewResults()

b.ResetTimer()
start := time.Now()
// Search 10x10 because reading the search data is much faster than creating it, but we need
// to spend at least 1 second to satisfy go bench minimum elapsed time requirement.
loops := 10
wg := &sync.WaitGroup{}
for i := 0; i < loops; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < loops; j++ {
err := b2.Search(context.TODO(), p, sr)
require.NoError(b, err)
}
}()
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("BackendSearchBlock search throughput: %v elapsed %.2f MB = %.2f MiB/s \t %d traces = %.2fM traces/s \n",
elapsed,
float64(sr.bytesInspected.Load())/(1024*1024),
float64(sr.bytesInspected.Load())/(elapsed.Seconds())/(1024*1024),
sr.TracesInspected(),
float64(sr.TracesInspected())/(elapsed.Seconds())/1_000_000,
)
})
}
}
}
46 changes: 0 additions & 46 deletions tempodb/search/streaming_search_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,52 +214,6 @@ func TestStreamingSearchBlockIteratorDedupes(t *testing.T) {
}
}

func BenchmarkBackendSearchBlockSearch(b *testing.B) {
pageSizesMB := []float32{0.5, 1, 2}

for _, enc := range backend.SupportedEncoding {
for _, sz := range pageSizesMB {
b.Run(fmt.Sprint(enc.String(), "/", sz, "MiB"), func(b *testing.B) {

b2 := newBackendSearchBlockWithTraces(b, b.N, enc, int(sz*1024*1024))

// Use secret tag to perform exhaustive search
p := NewSearchPipeline(&tempopb.SearchRequest{
Tags: map[string]string{SecretExhaustiveSearchTag: "!"},
})

sr := NewResults()

b.ResetTimer()
start := time.Now()
// Search 10x10 because reading the search data is much faster than creating it, but we need
// to spend at least 1 second to satisfy go bench minimum elapsed time requirement.
loops := 10
wg := &sync.WaitGroup{}
for i := 0; i < loops; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < loops; j++ {
err := b2.Search(context.TODO(), p, sr)
require.NoError(b, err)
}
}()
}
wg.Wait()
elapsed := time.Since(start)
fmt.Printf("BackendSearchBlock search throughput: %v elapsed %.2f MB = %.2f MiB/s \t %d traces = %.2fM traces/s \n",
elapsed,
float64(sr.bytesInspected.Load())/(1024*1024),
float64(sr.bytesInspected.Load())/(elapsed.Seconds())/(1024*1024),
sr.TracesInspected(),
float64(sr.TracesInspected())/(elapsed.Seconds())/1_000_000,
)
})
}
}
}

func TestStreamingSearchBlockBuffersAndFlushes(t *testing.T) {
// direct call to flush
testBufferAndFlush(t, func(s *StreamingSearchBlock) {
Expand Down
128 changes: 0 additions & 128 deletions tempodb/wal/append_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,131 +155,3 @@ func TestFullFilename(t *testing.T) {
})
}
}

func TestParseFilename(t *testing.T) {
tests := []struct {
name string
filename string
expectUUID uuid.UUID
expectTenant string
expectedVersion string
expectedEncoding backend.Encoding
expectedDataEncoding string
expectError bool
}{
{
name: "version, enc snappy and dataencoding",
filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:snappy:dataencoding",
expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"),
expectTenant: "foo",
expectedVersion: "v2",
expectedEncoding: backend.EncSnappy,
expectedDataEncoding: "dataencoding",
},
{
name: "version, enc none and dataencoding",
filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:none:dataencoding",
expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"),
expectTenant: "foo",
expectedVersion: "v2",
expectedEncoding: backend.EncNone,
expectedDataEncoding: "dataencoding",
},
{
name: "empty dataencoding",
filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:snappy",
expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"),
expectTenant: "foo",
expectedVersion: "v2",
expectedEncoding: backend.EncSnappy,
expectedDataEncoding: "",
},
{
name: "empty dataencoding with semicolon",
filename: "123e4567-e89b-12d3-a456-426614174000:foo:v2:snappy:",
expectUUID: uuid.MustParse("123e4567-e89b-12d3-a456-426614174000"),
expectTenant: "foo",
expectedVersion: "v2",
expectedEncoding: backend.EncSnappy,
expectedDataEncoding: "",
},
{
name: "path fails",
filename: "/blerg/123e4567-e89b-12d3-a456-426614174000:foo",
expectError: true,
},
{
name: "no :",
filename: "123e4567-e89b-12d3-a456-426614174000",
expectError: true,
},
{
name: "empty string",
filename: "",
expectError: true,
},
{
name: "bad uuid",
filename: "123e4:foo",
expectError: true,
},
{
name: "no tenant",
filename: "123e4567-e89b-12d3-a456-426614174000:",
expectError: true,
},
{
name: "no version",
filename: "123e4567-e89b-12d3-a456-426614174000:test::none",
expectError: true,
},
{
name: "wrong splits - 6",
filename: "123e4567-e89b-12d3-a456-426614174000:test:test:test:test:test",
expectError: true,
},
{
name: "wrong splits - 3",
filename: "123e4567-e89b-12d3-a456-426614174000:test:test",
expectError: true,
},
{
name: "wrong splits - 1",
filename: "123e4567-e89b-12d3-a456-426614174000",
expectError: true,
},
{
name: "bad encoding",
filename: "123e4567-e89b-12d3-a456-426614174000:test:v1:asdf",
expectError: true,
},
{
name: "ez-mode old format",
filename: "123e4567-e89b-12d3-a456-426614174000:foo",
expectError: true,
},
{
name: "deprecated version",
filename: "123e4567-e89b-12d3-a456-426614174000:foo:v1:snappy",
expectError: true,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
actualUUID, actualTenant, actualVersion, actualEncoding, actualDataEncoding, err := ParseFilename(tc.filename)

if tc.expectError {
assert.Error(t, err)
return
}

assert.NoError(t, err)
assert.Equal(t, tc.expectUUID, actualUUID)
assert.Equal(t, tc.expectTenant, actualTenant)
assert.Equal(t, tc.expectedEncoding, actualEncoding)
assert.Equal(t, tc.expectedVersion, actualVersion)
assert.Equal(t, tc.expectedDataEncoding, actualDataEncoding)
})
}
}
13 changes: 8 additions & 5 deletions tempodb/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,24 @@ func (w *WAL) NewBlock(id uuid.UUID, tenantID string, dataEncoding string) (*App
return newAppendBlock(id, tenantID, w.c.Filepath, w.c.Encoding, dataEncoding, w.c.WriteBufferSize)
}

func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string) (*os.File, *bufio.Writer, error) {
func (w *WAL) NewFile(blockid uuid.UUID, tenantid string, dir string) (*os.File, *bufio.Writer, string, backend.Encoding, error) {
// search WAL pinned to v2 for now
walFileVersion := "v2"

p := filepath.Join(w.c.Filepath, dir)
err := os.MkdirAll(p, os.ModePerm)
if err != nil {
return nil, nil, err
return nil, nil, "", backend.EncNone, err
}

// blockID, tenantID, version, encoding (compression), dataEncoding
filename := fmt.Sprintf("%v:%v:%v:%v:%v", blockid, tenantid, "v2", backend.EncNone, "")
filename := fmt.Sprintf("%v:%v:%v:%v:%v", blockid, tenantid, walFileVersion, w.c.SearchEncoding, "")
file, err := os.OpenFile(filepath.Join(p, filename), os.O_CREATE|os.O_RDWR, 0644)
if err != nil {
return nil, nil, err
return nil, nil, "", backend.EncNone, err
}

return file, bufio.NewWriterSize(file, w.c.WriteBufferSize), nil
return file, bufio.NewWriterSize(file, w.c.WriteBufferSize), walFileVersion, w.c.SearchEncoding, nil
}

// ParseFilename returns (blockID, tenant, version, encoding, dataEncoding, error).
Expand Down
Loading

0 comments on commit 0dbc48f

Please sign in to comment.