From 2a31d3feeff6ee369d1090b63faa6aa19f3eef46 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 18 Nov 2020 13:20:22 -0500 Subject: [PATCH 1/4] Update timeWindowBlockSelector to compact more than 2 blocks at a time, configurable min/max limits --- tempodb/compaction_block_selector.go | 25 ++++-- tempodb/compaction_block_selector_test.go | 99 ++++++++++++++++++++++- tempodb/compactor.go | 2 +- tempodb/compactor_test.go | 4 +- 4 files changed, 118 insertions(+), 12 deletions(-) diff --git a/tempodb/compaction_block_selector.go b/tempodb/compaction_block_selector.go index 9b6e78d6926..61e7281acab 100644 --- a/tempodb/compaction_block_selector.go +++ b/tempodb/compaction_block_selector.go @@ -14,7 +14,9 @@ type CompactionBlockSelector interface { } const ( - activeWindowDuration = 24 * time.Hour + activeWindowDuration = 24 * time.Hour + defaultMinInputBlocks = 2 + defaultMaxInputBlocks = 8 ) /*************************** Simple Block Selector **************************/ @@ -57,15 +59,19 @@ func (sbs *simpleBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, string type timeWindowBlockSelector struct { blocklist []*encoding.BlockMeta + MinInputBlocks int + MaxInputBlocks int MaxCompactionRange time.Duration // Size of the time window - say 6 hours MaxCompactionObjects int // maximum size of compacted objects } var _ (CompactionBlockSelector) = (*timeWindowBlockSelector)(nil) -func newTimeWindowBlockSelector(blocklist []*encoding.BlockMeta, maxCompactionRange time.Duration, maxCompactionObjects int) CompactionBlockSelector { +func newTimeWindowBlockSelector(blocklist []*encoding.BlockMeta, maxCompactionRange time.Duration, maxCompactionObjects int, minInputBlocks int, maxInputBlocks int) CompactionBlockSelector { twbs := &timeWindowBlockSelector{ blocklist: append([]*encoding.BlockMeta(nil), blocklist...), + MinInputBlocks: minInputBlocks, + MaxInputBlocks: maxInputBlocks, MaxCompactionRange: maxCompactionRange, MaxCompactionObjects: maxCompactionObjects, } @@ -126,7 +132,7 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s } // did we find enough blocks? - if len(windowBlocks) >= inputBlocks { + if len(windowBlocks) >= twbs.MinInputBlocks { var compactBlocks []*encoding.BlockMeta // blocks in the currently active window @@ -140,9 +146,14 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s // the active window should be compacted by level if activeWindow <= blockWindow { // search forward for inputBlocks in a row that have the same compaction level - for i := 0; i+inputBlocks-1 < len(windowBlocks); i++ { - if windowBlocks[i].CompactionLevel == windowBlocks[i+inputBlocks-1].CompactionLevel { - compactBlocks = windowBlocks[i : i+inputBlocks] + maxOffset := len(windowBlocks) - (twbs.MinInputBlocks - 1) + for i := 0; i <= maxOffset; i++ { + for j := i + 1; j <= maxOffset && j-i+1 <= twbs.MaxInputBlocks; j++ { + if windowBlocks[i].CompactionLevel == windowBlocks[j].CompactionLevel { + compactBlocks = windowBlocks[i : j+1] + } + } + if len(compactBlocks) > 0 { break } } @@ -153,7 +164,7 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s hashString = fmt.Sprintf("%v-%v-%v", compactBlocks[0].TenantID, compactBlocks[0].CompactionLevel, currentWindow) } } else { // all other windows will be compacted using their two smallest blocks - compactBlocks = windowBlocks[:inputBlocks] + compactBlocks = windowBlocks[:twbs.MinInputBlocks] hashString = fmt.Sprintf("%v-%v", compactBlocks[0].TenantID, currentWindow) } diff --git a/tempodb/compaction_block_selector_test.go b/tempodb/compaction_block_selector_test.go index 0659342a9cf..e1268c01b02 100644 --- a/tempodb/compaction_block_selector_test.go +++ b/tempodb/compaction_block_selector_test.go @@ -18,6 +18,8 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { tests := []struct { name string blocklist []*encoding.BlockMeta + minInputBlocks int // optional, defaults to global const + maxInputBlocks int // optional, defaults to global const expected []*encoding.BlockMeta expectedHash string expectedSecond []*encoding.BlockMeta @@ -76,6 +78,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { EndTime: now, }, }, + maxInputBlocks: 2, expected: []*encoding.BlockMeta{ { BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), @@ -157,6 +160,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { TotalObjects: 12, }, }, + maxInputBlocks: 2, expected: []*encoding.BlockMeta{ { BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), @@ -336,11 +340,102 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { }, expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Add(-timeWindow).Unix()), }, + { + name: "doesn't choose across time windows", + blocklist: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + EndTime: now, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + EndTime: now.Add(-timeWindow), + }, + }, + expected: nil, + expectedHash: "", + expectedSecond: nil, + expectedHash2: "", + }, + { + // First compaction gets 3 blocks, second compaction gets 2 more + name: "choose more than 2 blocks", + maxInputBlocks: 3, + blocklist: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + EndTime: now, + TotalObjects: 1, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + EndTime: now, + TotalObjects: 2, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"), + EndTime: now, + TotalObjects: 3, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"), + EndTime: now, + TotalObjects: 4, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000005"), + EndTime: now, + TotalObjects: 5, + }, + }, + expected: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + EndTime: now, + TotalObjects: 1, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + EndTime: now, + TotalObjects: 2, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"), + EndTime: now, + TotalObjects: 3, + }, + }, + expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedSecond: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"), + EndTime: now, + TotalObjects: 4, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000005"), + EndTime: now, + TotalObjects: 5, + }, + }, + expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - selector := newTimeWindowBlockSelector(tt.blocklist, time.Second, 100) + + min := defaultMinInputBlocks + if tt.minInputBlocks > 0 { + min = tt.minInputBlocks + } + + max := defaultMaxInputBlocks + if tt.maxInputBlocks > 0 { + max = tt.maxInputBlocks + } + + selector := newTimeWindowBlockSelector(tt.blocklist, time.Second, 100, min, max) actual, hash := selector.BlocksToCompact() assert.Equal(t, tt.expected, actual) @@ -530,7 +625,7 @@ func TestTimeWindowBlockSelectorSort(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - selector := newTimeWindowBlockSelector(tt.blocklist, timeWindow, 100) + selector := newTimeWindowBlockSelector(tt.blocklist, timeWindow, 100, defaultMinInputBlocks, defaultMaxInputBlocks) actual := selector.(*timeWindowBlockSelector).blocklist assert.Equal(t, tt.expected, actual) }) diff --git a/tempodb/compactor.go b/tempodb/compactor.go index 5493577bfbd..369fe20798f 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -65,7 +65,7 @@ func (rw *readerWriter) doCompaction() { tenantID := tenants[rand.Intn(len(tenants))].(string) blocklist := rw.blocklist(tenantID) - blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, rw.compactorCfg.MaxCompactionObjects) + blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, rw.compactorCfg.MaxCompactionObjects, defaultMinInputBlocks, defaultMaxInputBlocks) start := time.Now() diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 556766abef6..f0f0a4bc8b9 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -125,7 +125,7 @@ func TestCompaction(t *testing.T) { rw.pollBlocklist() blocklist := rw.blocklist(testTenantID) - blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000) + blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000, defaultMinInputBlocks, 2) expectedCompactions := len(blocklist) / inputBlocks compactions := 0 @@ -230,7 +230,7 @@ func TestSameIDCompaction(t *testing.T) { var blocks []*encoding.BlockMeta blocklist := rw.blocklist(testTenantID) - blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000) + blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000, defaultMinInputBlocks, 2) blocks, _ = blockSelector.BlocksToCompact() assert.Len(t, blocks, inputBlocks) From 772976e0b923ce85f8082e534463c960da9f7680 Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 18 Nov 2020 13:33:40 -0500 Subject: [PATCH 2/4] Update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 749dfa16087..705af7f1bcd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ * [CHANGE] From path.Join to filepath.Join [#338](https://github.com/grafana/tempo/pull/338) * [CHANGE] Upgrade Cortex from v1.3.0 to v.1.4.0 [#341](https://github.com/grafana/tempo/pull/341) +* [CHANGE] Compact more than 2 blocks at a time [#348](https://github.com/grafana/tempo/pull/348) * [ENHANCEMENT] Add tempodb_compaction_objects_combined metric. [#339](https://github.com/grafana/tempo/pull/339) * [BUGFIX] Frequent errors logged by compactor regarding meta not found [#327](https://github.com/grafana/tempo/pull/327) * [BUGFIX] Fix distributors panicking on rollout [#343](https://github.com/grafana/tempo/pull/343) From bb05cbe16be5a3d407a5bcca2bb4ed6673a114df Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Wed, 18 Nov 2020 18:19:34 -0500 Subject: [PATCH 3/4] Fix block selector to return any number of blocks between min and max --- tempodb/compaction_block_selector.go | 26 ++++++----- tempodb/compaction_block_selector_test.go | 53 +++++++++++++++++++++++ 2 files changed, 69 insertions(+), 10 deletions(-) diff --git a/tempodb/compaction_block_selector.go b/tempodb/compaction_block_selector.go index 61e7281acab..c0f791b229c 100644 --- a/tempodb/compaction_block_selector.go +++ b/tempodb/compaction_block_selector.go @@ -146,14 +146,17 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s // the active window should be compacted by level if activeWindow <= blockWindow { // search forward for inputBlocks in a row that have the same compaction level + // Gather as many as possible while staying within limits maxOffset := len(windowBlocks) - (twbs.MinInputBlocks - 1) for i := 0; i <= maxOffset; i++ { - for j := i + 1; j <= maxOffset && j-i+1 <= twbs.MaxInputBlocks; j++ { - if windowBlocks[i].CompactionLevel == windowBlocks[j].CompactionLevel { - compactBlocks = windowBlocks[i : j+1] - } + for j := i + 1; j <= maxOffset && + windowBlocks[i].CompactionLevel == windowBlocks[j].CompactionLevel && + len(compactBlocks)+1 <= twbs.MaxInputBlocks && + totalObjects(compactBlocks)+windowBlocks[j].TotalObjects <= twbs.MaxCompactionObjects; j++ { + compactBlocks = windowBlocks[i : j+1] } if len(compactBlocks) > 0 { + // Found a stripe of blocks break } } @@ -168,12 +171,7 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s hashString = fmt.Sprintf("%v-%v", compactBlocks[0].TenantID, currentWindow) } - // are they small enough - totalObjects := 0 - for _, block := range compactBlocks { - totalObjects += block.TotalObjects - } - if totalObjects > twbs.MaxCompactionObjects { + if totalObjects(compactBlocks) > twbs.MaxCompactionObjects { compact = false } @@ -202,6 +200,14 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s return nil, "" } +func totalObjects(blocks []*encoding.BlockMeta) int { + totalObjects := 0 + for _, b := range blocks { + totalObjects += b.TotalObjects + } + return totalObjects +} + func (twbs *timeWindowBlockSelector) windowForBlock(meta *encoding.BlockMeta) int64 { return twbs.windowForTime(meta.EndTime) } diff --git a/tempodb/compaction_block_selector_test.go b/tempodb/compaction_block_selector_test.go index e1268c01b02..3605b53bbd6 100644 --- a/tempodb/compaction_block_selector_test.go +++ b/tempodb/compaction_block_selector_test.go @@ -357,6 +357,59 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { expectedSecond: nil, expectedHash2: "", }, + { + name: "doesn't exceed max compaction objects", + blocklist: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + TotalObjects: 99, + EndTime: now, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + TotalObjects: 2, + EndTime: now, + }, + }, + expected: nil, + expectedHash: "", + expectedSecond: nil, + expectedHash2: "", + }, + { + name: "Returns as many blocks as possible without exceeding max compaction objects", + blocklist: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + TotalObjects: 50, + EndTime: now, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + TotalObjects: 50, + EndTime: now, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"), + TotalObjects: 50, + EndTime: now, + }}, + expected: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + TotalObjects: 50, + EndTime: now, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + TotalObjects: 50, + EndTime: now, + }, + }, + expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), + expectedSecond: nil, + expectedHash2: "", + }, { // First compaction gets 3 blocks, second compaction gets 2 more name: "choose more than 2 blocks", From 0171fbbc5a1a8bdcae2e90e2e12dc743296f4e0e Mon Sep 17 00:00:00 2001 From: Martin Disibio Date: Thu, 19 Nov 2020 09:13:19 -0500 Subject: [PATCH 4/4] Fix block selector to evaluate all window blocks and case where minimum block count was not honored --- tempodb/compaction_block_selector.go | 7 ++- tempodb/compaction_block_selector_test.go | 66 +++++++++++++++++++++++ 2 files changed, 69 insertions(+), 4 deletions(-) diff --git a/tempodb/compaction_block_selector.go b/tempodb/compaction_block_selector.go index c0f791b229c..15e06a33585 100644 --- a/tempodb/compaction_block_selector.go +++ b/tempodb/compaction_block_selector.go @@ -147,9 +147,8 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s if activeWindow <= blockWindow { // search forward for inputBlocks in a row that have the same compaction level // Gather as many as possible while staying within limits - maxOffset := len(windowBlocks) - (twbs.MinInputBlocks - 1) - for i := 0; i <= maxOffset; i++ { - for j := i + 1; j <= maxOffset && + for i := 0; i <= len(windowBlocks)-twbs.MinInputBlocks+1; i++ { + for j := i + 1; j <= len(windowBlocks)-1 && windowBlocks[i].CompactionLevel == windowBlocks[j].CompactionLevel && len(compactBlocks)+1 <= twbs.MaxInputBlocks && totalObjects(compactBlocks)+windowBlocks[j].TotalObjects <= twbs.MaxCompactionObjects; j++ { @@ -162,7 +161,7 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s } compact = false - if len(compactBlocks) > 0 { + if len(compactBlocks) >= twbs.MinInputBlocks { compact = true hashString = fmt.Sprintf("%v-%v-%v", compactBlocks[0].TenantID, compactBlocks[0].CompactionLevel, currentWindow) } diff --git a/tempodb/compaction_block_selector_test.go b/tempodb/compaction_block_selector_test.go index 3605b53bbd6..703cd0a168e 100644 --- a/tempodb/compaction_block_selector_test.go +++ b/tempodb/compaction_block_selector_test.go @@ -473,6 +473,72 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) { }, expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()), }, + { + name: "honors minimum block count", + blocklist: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + EndTime: now, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + EndTime: now, + }, + }, + minInputBlocks: 3, + maxInputBlocks: 3, + expected: nil, + expectedHash: "", + expectedSecond: nil, + expectedHash2: "", + }, + { + name: "can choose blocks not at the lowest compaction level", + blocklist: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"), + EndTime: now, + CompactionLevel: 0, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + EndTime: now, + CompactionLevel: 1, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"), + EndTime: now, + CompactionLevel: 1, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"), + EndTime: now, + CompactionLevel: 1, + }, + }, + minInputBlocks: 3, + maxInputBlocks: 3, + expected: []*encoding.BlockMeta{ + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"), + EndTime: now, + CompactionLevel: 1, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"), + EndTime: now, + CompactionLevel: 1, + }, + { + BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"), + EndTime: now, + CompactionLevel: 1, + }, + }, + expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 1, now.Unix()), + expectedSecond: nil, + expectedHash2: "", + }, } for _, tt := range tests {