Skip to content

Commit

Permalink
Reload search blocks and replay search WAL (#1000)
Browse files Browse the repository at this point in the history
* Checkpoint: Initial implementation of v2 search WAL

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* better handling of reload of blocks without search data

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Checkpoint

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* another commit another wal replay

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* wip: ingester search test

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Fix Rescan search blocks, move ParseFilename into wal folder

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Append block uses new ParseFilename

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Add tests, benchmarks, pass encoding along correctly

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Changelog

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Post merge cleanup

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Err handling for search disabled

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Use the right level package, reload backend search blocks

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* never refactor variables using an ide

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Address comments, fix test

Signed-off-by: Annanay <annanayagarwal@gmail.com>

* Reuse StreamingSearchBlock iterator in search, relocate dedupe test

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Make wal search encoding configurable, default to gzip like backend block encoding

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Make wal search encoding configurable, default to gzip like backend block encoding

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Simplify some search tests which were doing more work than seemed necessary

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Comment out flaky test as discussed

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Code review suggestions

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Code review suggestions, add tests for ParseFileName

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

* Code review suggestions

Signed-off-by: Martin Disibio <mdisibio@gmail.com>

Co-authored-by: Martin Disibio <mdisibio@gmail.com>
  • Loading branch information
annanay25 and mdisibio authored Oct 6, 2021
1 parent 323b280 commit 98b6f73
Show file tree
Hide file tree
Showing 19 changed files with 732 additions and 361 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [CHANGE] **BREAKING CHANGE** Change ingester metric `ingester_bytes_metric_total` in favor of `ingester_bytes_received_total` [#979](https://github.com/grafana/tempo/pull/979) (@mapno)
* [FEATURE] Add ability to search ingesters for traces [#806](https://github.com/grafana/tempo/pull/806) (@mdisibio)
* [FEATURE] Add runtime config handler [#936](https://github.com/grafana/tempo/pull/936) (@mapno)
* [FEATURE] Search WAL reload and compression(versioned encoding) support [#1000](https://github.com/grafana/tempo/pull/1000) (@annanay25)
* [ENHANCEMENT] Added "query blocks" cli option. [#876](https://github.com/grafana/tempo/pull/876) (@joe-elliott)
* [ENHANCEMENT] Added "search blocks" cli option. [#972](https://github.com/grafana/tempo/pull/972) (@joe-elliott)
* [ENHANCEMENT] Added traceid to `trace too large message`. [#888](https://github.com/grafana/tempo/pull/888) (@mritunjaysharma394)
Expand Down
6 changes: 5 additions & 1 deletion docs/tempo/website/configuration/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,10 @@ storage:
# (default: snappy)
[encoding: <string>]
# search data encoding/compression. same options as wal encoding.
# (default: gzip)
[search_encoding: <string>]
# block configuration
block:
Expand All @@ -557,7 +561,7 @@ storage:
# block encoding/compression. options: none, gzip, lz4-64k, lz4-256k, lz4-1M, lz4, snappy, zstd, s2
[encoding: <string>]
# search data encoding/compression. same options as blocks.
# search data encoding/compression. same options as block encoding.
# (default: gzip)
[search_encoding: <string>]
Expand Down
38 changes: 34 additions & 4 deletions modules/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"sync"
"time"

"github.com/grafana/tempo/tempodb/search"

"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
Expand Down Expand Up @@ -106,9 +108,6 @@ func (i *Ingester) starting(ctx context.Context) error {
return fmt.Errorf("failed to rediscover local blocks %w", err)
}

// Search data is considered experimental and removed on every startup.
i.clearSearchData()

// Now that user states have been created, we can start the lifecycler.
// Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context
if err := i.lifecycler.StartAsync(context.Background()); err != nil {
Expand Down Expand Up @@ -335,6 +334,30 @@ func (i *Ingester) replayWal() error {
return fmt.Errorf("fatal error replaying wal %w", err)
}

searchBlocks, err := search.RescanBlocks(i.store.WAL().GetFilepath())
if err != nil {
return fmt.Errorf("fatal error replaying search wal %w", err)
}

// clear any searchBlock that does not have a matching wal block
for j := len(searchBlocks) - 1; j >= 0; j-- {
clear := true
for _, tracesBlock := range blocks {
if searchBlocks[j].BlockID == tracesBlock.BlockID() {
clear = false
break
}
}

if clear {
err := searchBlocks[j].Clear()
if err != nil { // just log the error
level.Warn(log.Logger).Log("msg", "error clearing search WAL file", "blockID", searchBlocks[j].BlockID, "err", err)
}
searchBlocks = append(searchBlocks[:j], searchBlocks[j+1:]...)
}
}

for _, b := range blocks {
tenantID := b.Meta().TenantID

Expand All @@ -354,7 +377,14 @@ func (i *Ingester) replayWal() error {
return err
}

instance.AddCompletingBlock(b)
var searchWALBlock *search.StreamingSearchBlock
for _, s := range searchBlocks {
if b.BlockID() == s.BlockID {
searchWALBlock = s
break
}
}
instance.AddCompletingBlock(b, searchWALBlock)

i.enqueue(&flushOp{
kind: opKindComplete,
Expand Down
10 changes: 0 additions & 10 deletions modules/ingester/ingester_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package ingester
import (
"context"

"github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/grafana/tempo/pkg/tempopb"
"github.com/weaveworks/common/user"
)
Expand Down Expand Up @@ -66,11 +64,3 @@ func (i *Ingester) SearchTagValues(ctx context.Context, req *tempopb.SearchTagVa

return resp, nil
}

func (i *Ingester) clearSearchData() {
// clear wal
err := i.store.WAL().ClearFolder(searchDir)
if err != nil {
level.Error(log.Logger).Log("msg", "error clearing search data from wal")
}
}
66 changes: 61 additions & 5 deletions modules/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/tempo/modules/overrides"
"github.com/grafana/tempo/modules/storage"
"github.com/grafana/tempo/pkg/model"
"github.com/grafana/tempo/pkg/tempofb"
"github.com/grafana/tempo/pkg/tempopb"
v1 "github.com/grafana/tempo/pkg/tempopb/trace/v1"
"github.com/grafana/tempo/pkg/util/test"
Expand Down Expand Up @@ -211,10 +212,58 @@ func TestWal(t *testing.T) {
}
}

func TestSearchWAL(t *testing.T) {
tmpDir := t.TempDir()

i := defaultIngesterModule(t, tmpDir)
inst, _ := i.getOrCreateInstance("test")
assert.NotNil(t, inst)

// create some search data
id := make([]byte, 16)
_, err := rand.Read(id)
require.NoError(t, err)
trace := test.MakeTrace(10, id)
traceBytes, err := trace.Marshal()
require.NoError(t, err)
entry := &tempofb.SearchEntryMutable{}
entry.TraceID = id
entry.AddTag("foo", "bar")
searchBytes := entry.ToBytes()

// push to instance
assert.NoError(t, inst.PushBytes(context.Background(), id, traceBytes, searchBytes))

// Write wal
require.NoError(t, inst.CutCompleteTraces(0, true))

// search WAL
ctx := user.InjectOrgID(context.Background(), "test")
searchReq := &tempopb.SearchRequest{Tags: map[string]string{
"foo": "bar",
}}
results, err := inst.Search(ctx, searchReq)
assert.NoError(t, err)
assert.Equal(t, uint32(1), results.Metrics.InspectedTraces)

// Shutdown
require.NoError(t, i.stopping(nil))

// replay wal
i = defaultIngesterModule(t, tmpDir)
inst, ok := i.getInstanceByID("test")
require.True(t, ok)

results, err = inst.Search(ctx, searchReq)
assert.NoError(t, err)
assert.Equal(t, uint32(1), results.Metrics.InspectedTraces)
}

// TODO - This test is flaky and commented out until it's fixed
// TestWalReplayDeletesLocalBlocks simulates the condition where an ingester restarts after a wal is completed
// to the local disk, but before the wal is deleted. On startup both blocks exist, and the ingester now errs
// on the side of caution and chooses to replay the wal instead of rediscovering the local block.
func TestWalReplayDeletesLocalBlocks(t *testing.T) {
/*func TestWalReplayDeletesLocalBlocks(t *testing.T) {
tmpDir := t.TempDir()
i, _, _ := defaultIngester(t, tmpDir)
Expand Down Expand Up @@ -249,6 +298,7 @@ func TestWalReplayDeletesLocalBlocks(t *testing.T) {
require.True(t, ok)
// After restart we only have the 1 wal block
// TODO - fix race conditions here around access inst fields outside of mutex
require.Len(t, inst.completingBlocks, 1)
require.Len(t, inst.completeBlocks, 0)
require.Equal(t, blockID, inst.completingBlocks[0].BlockID())
Expand All @@ -257,6 +307,7 @@ func TestWalReplayDeletesLocalBlocks(t *testing.T) {
err = i.stopping(nil)
require.NoError(t, err)
}
*/

func TestFlush(t *testing.T) {
tmpDir, err := os.MkdirTemp("/tmp", "")
Expand All @@ -275,8 +326,7 @@ func TestFlush(t *testing.T) {
}

// stopping the ingester should force cut all live traces to disk
err = ingester.stopping(nil)
require.NoError(t, err)
require.NoError(t, ingester.stopping(nil))

// create new ingester. this should replay wal!
ingester, _, _ = defaultIngester(t, tmpDir)
Expand All @@ -292,7 +342,7 @@ func TestFlush(t *testing.T) {
}
}

func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace, [][]byte) {
func defaultIngesterModule(t *testing.T, tmpDir string) *Ingester {
ingesterConfig := defaultIngesterTestConfig()
limits, err := overrides.NewOverrides(defaultLimitsTestConfig())
require.NoError(t, err, "unexpected error creating overrides")
Expand Down Expand Up @@ -324,13 +374,19 @@ func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace,
err = ingester.starting(context.Background())
require.NoError(t, err, "unexpected error starting ingester")

return ingester
}

func defaultIngester(t *testing.T, tmpDir string) (*Ingester, []*tempopb.Trace, [][]byte) {
ingester := defaultIngesterModule(t, tmpDir)

// make some fake traceIDs/requests
traces := make([]*tempopb.Trace, 0)

traceIDs := make([][]byte, 0)
for i := 0; i < 10; i++ {
id := make([]byte, 16)
_, err = rand.Read(id)
_, err := rand.Read(id)
require.NoError(t, err)

trace := test.MakeTrace(10, id)
Expand Down
16 changes: 11 additions & 5 deletions modules/ingester/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,17 @@ func (i *instance) FindTraceByID(ctx context.Context, id []byte) (*tempopb.Trace
// AddCompletingBlock adds an AppendBlock directly to the slice of completing blocks.
// This is used during wal replay. It is expected that calling code will add the appropriate
// jobs to the queue to eventually flush these.
func (i *instance) AddCompletingBlock(b *wal.AppendBlock) {
func (i *instance) AddCompletingBlock(b *wal.AppendBlock, s *search.StreamingSearchBlock) {
i.blocksMtx.Lock()
defer i.blocksMtx.Unlock()

i.completingBlocks = append(i.completingBlocks, b)

// search WAL
if s == nil {
return
}
i.searchAppendBlocks[b] = &searchStreamingBlockEntry{b: s}
}

// getOrCreateTrace will return a new trace object for the given request
Expand Down Expand Up @@ -491,12 +497,12 @@ func (i *instance) resetHeadBlock() error {
i.lastBlockCut = time.Now()

// Create search data wal file
f, err := i.writer.WAL().NewFile(i.headBlock.BlockID(), i.instanceID, searchDir, "searchdata")
f, err := i.writer.WAL().NewFile(i.headBlock.BlockID(), i.instanceID, searchDir)
if err != nil {
return err
}

b, err := search.NewStreamingSearchBlockForFile(f)
b, err := search.NewStreamingSearchBlockForFile(f, "v2", backend.EncNone)
if err != nil {
return err
}
Expand Down Expand Up @@ -617,11 +623,11 @@ func (i *instance) rediscoverLocalBlocks(ctx context.Context) error {
return err
}

//sb := search.OpenBackendSearchBlock(i.local, b.BlockMeta().BlockID, b.BlockMeta().TenantID)
sb := search.OpenBackendSearchBlock(b.BlockMeta().BlockID, b.BlockMeta().TenantID, i.localReader)

i.blocksMtx.Lock()
i.completeBlocks = append(i.completeBlocks, ib)
//i.searchCompleteBlocks[ib] = sb
i.searchCompleteBlocks[ib] = &searchLocalBlockEntry{b: sb}
i.blocksMtx.Unlock()

level.Info(log.Logger).Log("msg", "reloaded local block", "tenantID", i.instanceID, "block", id.String(), "flushed", ib.FlushedTime())
Expand Down
4 changes: 2 additions & 2 deletions modules/ingester/instance_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,8 @@ func TestInstanceSearch(t *testing.T) {

sr, err = i.Search(context.Background(), req)
assert.NoError(t, err)
// note: search is experimental and removed on every startup. Verify no search results now
assert.Len(t, sr.Traces, 0)
assert.Len(t, sr.Traces, numTraces/searchAnnotatedFractionDenominator)
checkEqual(t, ids, sr)
}

func TestInstanceSearchNoData(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions modules/storage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func (cfg *Config) RegisterFlagsAndApplyDefaults(prefix string, f *flag.FlagSet)
cfg.Trace.WAL = &wal.Config{}
f.StringVar(&cfg.Trace.WAL.Filepath, util.PrefixConfig(prefix, "trace.wal.path"), "/var/tempo/wal", "Path at which store WAL blocks.")
cfg.Trace.WAL.Encoding = backend.EncSnappy
cfg.Trace.WAL.SearchEncoding = backend.EncGZIP

cfg.Trace.Block = &encoding.BlockConfig{}
f.Float64Var(&cfg.Trace.Block.BloomFP, util.PrefixConfig(prefix, "trace.block.bloom-filter-false-positive"), .01, "Bloom Filter False Positive.")
Expand Down
2 changes: 1 addition & 1 deletion tempodb/backend/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (e Encoding) MarshalJSON() ([]byte, error) {
return buffer.Bytes(), nil
}

// ParseEncoding parses an chunk encoding (compression algorithm) by its name.
// ParseEncoding parses a chunk encoding (compression algorithm) by its name.
func ParseEncoding(enc string) (Encoding, error) {
for _, e := range SupportedEncoding {
if strings.EqualFold(e.String(), enc) {
Expand Down
4 changes: 4 additions & 0 deletions tempodb/search/backend_search_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ func (s *BackendSearchBlock) Search(ctx context.Context, p Pipeline, sr *Results

meta, err := ReadSearchBlockMeta(ctx, s.r, s.id, s.tenantID)
if err != nil {
// we create BackendSearchBlocks even if search files are missing, return nil here if meta does not exist
if err == backend.ErrDoesNotExist {
return nil
}
return err
}

Expand Down
Loading

0 comments on commit 98b6f73

Please sign in to comment.