Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compact more than 2 blocks at a time #348

Merged
merged 4 commits into from
Nov 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

* [CHANGE] From path.Join to filepath.Join [#338](https://github.com/grafana/tempo/pull/338)
* [CHANGE] Upgrade Cortex from v1.3.0 to v.1.4.0 [#341](https://github.com/grafana/tempo/pull/341)
* [CHANGE] Compact more than 2 blocks at a time [#348](https://github.com/grafana/tempo/pull/348)
* [ENHANCEMENT] Add tempodb_compaction_objects_combined metric. [#339](https://github.com/grafana/tempo/pull/339)
* [BUGFIX] Frequent errors logged by compactor regarding meta not found [#327](https://github.com/grafana/tempo/pull/327)
* [BUGFIX] Fix distributors panicking on rollout [#343](https://github.com/grafana/tempo/pull/343)
Expand Down
44 changes: 30 additions & 14 deletions tempodb/compaction_block_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ type CompactionBlockSelector interface {
}

const (
activeWindowDuration = 24 * time.Hour
activeWindowDuration = 24 * time.Hour
defaultMinInputBlocks = 2
defaultMaxInputBlocks = 8
)

/*************************** Simple Block Selector **************************/
Expand Down Expand Up @@ -57,15 +59,19 @@ func (sbs *simpleBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, string

type timeWindowBlockSelector struct {
blocklist []*encoding.BlockMeta
MinInputBlocks int
MaxInputBlocks int
MaxCompactionRange time.Duration // Size of the time window - say 6 hours
MaxCompactionObjects int // maximum size of compacted objects
}

var _ (CompactionBlockSelector) = (*timeWindowBlockSelector)(nil)

func newTimeWindowBlockSelector(blocklist []*encoding.BlockMeta, maxCompactionRange time.Duration, maxCompactionObjects int) CompactionBlockSelector {
func newTimeWindowBlockSelector(blocklist []*encoding.BlockMeta, maxCompactionRange time.Duration, maxCompactionObjects int, minInputBlocks int, maxInputBlocks int) CompactionBlockSelector {
twbs := &timeWindowBlockSelector{
blocklist: append([]*encoding.BlockMeta(nil), blocklist...),
MinInputBlocks: minInputBlocks,
MaxInputBlocks: maxInputBlocks,
MaxCompactionRange: maxCompactionRange,
MaxCompactionObjects: maxCompactionObjects,
}
Expand Down Expand Up @@ -126,7 +132,7 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s
}

// did we find enough blocks?
if len(windowBlocks) >= inputBlocks {
if len(windowBlocks) >= twbs.MinInputBlocks {
var compactBlocks []*encoding.BlockMeta

// blocks in the currently active window
Expand All @@ -140,29 +146,31 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s
// the active window should be compacted by level
if activeWindow <= blockWindow {
// search forward for inputBlocks in a row that have the same compaction level
mdisibio marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i+inputBlocks-1 < len(windowBlocks); i++ {
if windowBlocks[i].CompactionLevel == windowBlocks[i+inputBlocks-1].CompactionLevel {
compactBlocks = windowBlocks[i : i+inputBlocks]
// Gather as many as possible while staying within limits
for i := 0; i <= len(windowBlocks)-twbs.MinInputBlocks+1; i++ {
for j := i + 1; j <= len(windowBlocks)-1 &&
windowBlocks[i].CompactionLevel == windowBlocks[j].CompactionLevel &&
len(compactBlocks)+1 <= twbs.MaxInputBlocks &&
totalObjects(compactBlocks)+windowBlocks[j].TotalObjects <= twbs.MaxCompactionObjects; j++ {
compactBlocks = windowBlocks[i : j+1]
}
if len(compactBlocks) > 0 {
// Found a stripe of blocks
break
}
}

compact = false
if len(compactBlocks) > 0 {
if len(compactBlocks) >= twbs.MinInputBlocks {
compact = true
hashString = fmt.Sprintf("%v-%v-%v", compactBlocks[0].TenantID, compactBlocks[0].CompactionLevel, currentWindow)
}
} else { // all other windows will be compacted using their two smallest blocks
compactBlocks = windowBlocks[:inputBlocks]
compactBlocks = windowBlocks[:twbs.MinInputBlocks]
hashString = fmt.Sprintf("%v-%v", compactBlocks[0].TenantID, currentWindow)
}

// are they small enough
totalObjects := 0
for _, block := range compactBlocks {
totalObjects += block.TotalObjects
}
if totalObjects > twbs.MaxCompactionObjects {
if totalObjects(compactBlocks) > twbs.MaxCompactionObjects {
compact = false
}

Expand Down Expand Up @@ -191,6 +199,14 @@ func (twbs *timeWindowBlockSelector) BlocksToCompact() ([]*encoding.BlockMeta, s
return nil, ""
}

func totalObjects(blocks []*encoding.BlockMeta) int {
totalObjects := 0
for _, b := range blocks {
totalObjects += b.TotalObjects
}
return totalObjects
}

func (twbs *timeWindowBlockSelector) windowForBlock(meta *encoding.BlockMeta) int64 {
return twbs.windowForTime(meta.EndTime)
}
Expand Down
218 changes: 216 additions & 2 deletions tempodb/compaction_block_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) {
tests := []struct {
name string
blocklist []*encoding.BlockMeta
minInputBlocks int // optional, defaults to global const
maxInputBlocks int // optional, defaults to global const
expected []*encoding.BlockMeta
expectedHash string
expectedSecond []*encoding.BlockMeta
Expand Down Expand Up @@ -76,6 +78,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) {
EndTime: now,
},
},
maxInputBlocks: 2,
expected: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
Expand Down Expand Up @@ -157,6 +160,7 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) {
TotalObjects: 12,
},
},
maxInputBlocks: 2,
expected: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
Expand Down Expand Up @@ -336,11 +340,221 @@ func TestTimeWindowBlockSelectorBlocksToCompact(t *testing.T) {
},
expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Add(-timeWindow).Unix()),
},
{
name: "doesn't choose across time windows",
blocklist: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
EndTime: now,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
EndTime: now.Add(-timeWindow),
},
},
expected: nil,
expectedHash: "",
expectedSecond: nil,
expectedHash2: "",
},
{
name: "doesn't exceed max compaction objects",
blocklist: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
TotalObjects: 99,
EndTime: now,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
TotalObjects: 2,
EndTime: now,
},
},
expected: nil,
expectedHash: "",
expectedSecond: nil,
expectedHash2: "",
},
{
name: "Returns as many blocks as possible without exceeding max compaction objects",
blocklist: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
TotalObjects: 50,
EndTime: now,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
TotalObjects: 50,
EndTime: now,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"),
TotalObjects: 50,
EndTime: now,
}},
expected: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
TotalObjects: 50,
EndTime: now,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
TotalObjects: 50,
EndTime: now,
},
},
expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()),
expectedSecond: nil,
expectedHash2: "",
},
{
// First compaction gets 3 blocks, second compaction gets 2 more
name: "choose more than 2 blocks",
maxInputBlocks: 3,
blocklist: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
EndTime: now,
TotalObjects: 1,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
EndTime: now,
TotalObjects: 2,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"),
EndTime: now,
TotalObjects: 3,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"),
EndTime: now,
TotalObjects: 4,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000005"),
EndTime: now,
TotalObjects: 5,
},
},
expected: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
EndTime: now,
TotalObjects: 1,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
EndTime: now,
TotalObjects: 2,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"),
EndTime: now,
TotalObjects: 3,
},
},
expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()),
expectedSecond: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"),
EndTime: now,
TotalObjects: 4,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000005"),
EndTime: now,
TotalObjects: 5,
},
},
expectedHash2: fmt.Sprintf("%v-%v-%v", tenantID, 0, now.Unix()),
},
{
name: "honors minimum block count",
blocklist: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
EndTime: now,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
EndTime: now,
},
},
minInputBlocks: 3,
maxInputBlocks: 3,
expected: nil,
expectedHash: "",
expectedSecond: nil,
expectedHash2: "",
},
{
name: "can choose blocks not at the lowest compaction level",
blocklist: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000001"),
EndTime: now,
CompactionLevel: 0,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
EndTime: now,
CompactionLevel: 1,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"),
EndTime: now,
CompactionLevel: 1,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"),
EndTime: now,
CompactionLevel: 1,
},
},
minInputBlocks: 3,
maxInputBlocks: 3,
expected: []*encoding.BlockMeta{
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000002"),
EndTime: now,
CompactionLevel: 1,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000003"),
EndTime: now,
CompactionLevel: 1,
},
{
BlockID: uuid.MustParse("00000000-0000-0000-0000-000000000004"),
EndTime: now,
CompactionLevel: 1,
},
},
expectedHash: fmt.Sprintf("%v-%v-%v", tenantID, 1, now.Unix()),
expectedSecond: nil,
expectedHash2: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
selector := newTimeWindowBlockSelector(tt.blocklist, time.Second, 100)

min := defaultMinInputBlocks
if tt.minInputBlocks > 0 {
min = tt.minInputBlocks
}

max := defaultMaxInputBlocks
if tt.maxInputBlocks > 0 {
max = tt.maxInputBlocks
}

selector := newTimeWindowBlockSelector(tt.blocklist, time.Second, 100, min, max)

actual, hash := selector.BlocksToCompact()
assert.Equal(t, tt.expected, actual)
Expand Down Expand Up @@ -530,7 +744,7 @@ func TestTimeWindowBlockSelectorSort(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
selector := newTimeWindowBlockSelector(tt.blocklist, timeWindow, 100)
selector := newTimeWindowBlockSelector(tt.blocklist, timeWindow, 100, defaultMinInputBlocks, defaultMaxInputBlocks)
actual := selector.(*timeWindowBlockSelector).blocklist
assert.Equal(t, tt.expected, actual)
})
Expand Down
2 changes: 1 addition & 1 deletion tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (rw *readerWriter) doCompaction() {
tenantID := tenants[rand.Intn(len(tenants))].(string)
blocklist := rw.blocklist(tenantID)

blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, rw.compactorCfg.MaxCompactionObjects)
blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, rw.compactorCfg.MaxCompactionObjects, defaultMinInputBlocks, defaultMaxInputBlocks)

start := time.Now()

Expand Down
4 changes: 2 additions & 2 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestCompaction(t *testing.T) {
rw.pollBlocklist()

blocklist := rw.blocklist(testTenantID)
blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000)
blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000, defaultMinInputBlocks, 2)

expectedCompactions := len(blocklist) / inputBlocks
compactions := 0
Expand Down Expand Up @@ -230,7 +230,7 @@ func TestSameIDCompaction(t *testing.T) {

var blocks []*encoding.BlockMeta
blocklist := rw.blocklist(testTenantID)
blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000)
blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000, defaultMinInputBlocks, 2)
blocks, _ = blockSelector.BlocksToCompact()
assert.Len(t, blocks, inputBlocks)

Expand Down