Skip to content

Commit

Permalink
Add tempodb_compaction_bytes_written metric, update compaction_bytes/…
Browse files Browse the repository at this point in the history
…objects_written metrics when flushing for efficiency
  • Loading branch information
mdisibio committed Nov 23, 2020
1 parent 9a82020 commit 47d7f97
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 23 deletions.
17 changes: 13 additions & 4 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,15 @@ var (
Help: "Records the amount of time to compact a set of blocks.",
Buckets: prometheus.ExponentialBuckets(30, 2, 10),
}, []string{"level"})
metricCompactionObjectsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{
metricCompactionObjectsWritten = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempodb",
Name: "compaction_objects_processed_total",
Help: "Total number of objects processed during compaction.",
Name: "compaction_objects_written",
Help: "Total number of objects written to backend during compaction.",
}, []string{"level"})
metricCompactionBytesWritten = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: "tempodb",
Name: "compaction_bytes_written",
Help: "Total number of bytes written to backend during compaction.",
}, []string{"level"})
metricCompactionErrors = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "tempodb",
Expand Down Expand Up @@ -199,7 +204,6 @@ func (rw *readerWriter) compact(blockMetas []*encoding.BlockMeta, tenantID strin
return err
}
lowestBookmark.clear()
metricCompactionObjectsProcessed.WithLabelValues(compactionLevelLabel).Inc()

// write partial block
if currentBlock.CurrentBufferLength() >= int(rw.compactorCfg.FlushSizeBytes) {
Expand Down Expand Up @@ -241,6 +245,11 @@ func appendBlock(rw *readerWriter, tracker backend.AppendTracker, block *wal.Com
if err != nil {
return nil, err
}

compactionLevelLabel := strconv.Itoa(int(block.BlockMeta().CompactionLevel - 1))
metricCompactionObjectsWritten.WithLabelValues(compactionLevelLabel).Add(float64(block.CurrentBufferedObjects()))
metricCompactionBytesWritten.WithLabelValues(compactionLevelLabel).Add(float64(block.CurrentBufferLength()))

block.ResetBuffer()

return tracker, nil
Expand Down
88 changes: 72 additions & 16 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestCompaction(t *testing.T) {
_, err = rand.Read(id)
assert.NoError(t, err, "unexpected creating random id")

req := test.MakeRequest(i*10, id)
req := test.MakeRequest(10, id)
reqs = append(reqs, req)
ids = append(ids, id)

Expand Down Expand Up @@ -124,12 +124,6 @@ func TestCompaction(t *testing.T) {

rw.pollBlocklist()

processedStart, err := GetCounterVecValue(metricCompactionObjectsProcessed, "0")
assert.NoError(t, err)

blocksStart, err := GetCounterVecValue(metricCompactionBlocks, "0")
assert.NoError(t, err)

blocklist := rw.blocklist(testTenantID)
blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000, defaultMinInputBlocks, 2)

Expand Down Expand Up @@ -160,15 +154,6 @@ func TestCompaction(t *testing.T) {
}
assert.Equal(t, blockCount*recordCount, records)

// Check metric
processedEnd, err := GetCounterVecValue(metricCompactionObjectsProcessed, "0")
assert.NoError(t, err)
assert.Equal(t, float64(blockCount*recordCount), processedEnd-processedStart)

blocksEnd, err := GetCounterVecValue(metricCompactionBlocks, "0")
assert.NoError(t, err)
assert.Equal(t, float64(blockCount), blocksEnd-blocksStart)

// now see if we can find our ids
for i, id := range allIds {
b, _, err := rw.Find(context.Background(), testTenantID, id)
Expand Down Expand Up @@ -345,6 +330,77 @@ func TestCompactionUpdatesBlocklist(t *testing.T) {
}
}

func TestCompactionMetrics(t *testing.T) {
tempDir, err := ioutil.TempDir("/tmp", "")
defer os.RemoveAll(tempDir)
assert.NoError(t, err, "unexpected error creating temp dir")

r, w, c, err := New(&Config{
Backend: "local",
Pool: &pool.Config{
MaxWorkers: 10,
QueueDepth: 100,
},
Local: &local.Config{
Path: path.Join(tempDir, "traces"),
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
IndexDownsample: rand.Int()%20 + 1,
BloomFP: .01,
},
BlocklistPoll: 0,
}, log.NewNopLogger())
assert.NoError(t, err)

c.EnableCompaction(&CompactorConfig{
ChunkSizeBytes: 10,
MaxCompactionRange: 24 * time.Hour,
BlockRetention: 0,
CompactedBlockRetention: 0,
}, &mockSharder{})

// Cut x blocks with y records each
blockCount := 5
recordCount := 1
cutTestBlocks(t, w, blockCount, recordCount)

rw := r.(*readerWriter)
rw.pollBlocklist()

// Get starting metrics
processedStart, err := GetCounterVecValue(metricCompactionObjectsWritten, "0")
assert.NoError(t, err)

blocksStart, err := GetCounterVecValue(metricCompactionBlocks, "0")
assert.NoError(t, err)

bytesStart, err := GetCounterVecValue(metricCompactionBytesWritten, "0")
assert.NoError(t, err)

// compact everything
err = rw.compact(rw.blocklist(testTenantID), testTenantID)
assert.NoError(t, err)

// Check metric
processedEnd, err := GetCounterVecValue(metricCompactionObjectsWritten, "0")
assert.NoError(t, err)
assert.Equal(t, float64(blockCount*recordCount), processedEnd-processedStart)

blocksEnd, err := GetCounterVecValue(metricCompactionBlocks, "0")
assert.NoError(t, err)
assert.Equal(t, float64(blockCount), blocksEnd-blocksStart)

bytesEnd, err := GetCounterVecValue(metricCompactionBytesWritten, "0")
assert.NoError(t, err)
bytesPerRecord :=
4 /* total length */ +
4 /* id length */ +
16 /* id */ +
3 /* test record length */
assert.Equal(t, float64(blockCount*recordCount*bytesPerRecord), bytesEnd-bytesStart)
}

func cutTestBlocks(t *testing.T, w Writer, blockCount int, recordCount int) {
wal := w.WAL()
for i := 0; i < blockCount; i++ {
Expand Down
11 changes: 9 additions & 2 deletions tempodb/wal/compactor_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ type CompactorBlock struct {

bloom *bloom.ShardedBloomFilter

appendBuffer *bytes.Buffer
appender encoding.Appender
bufferedObjects int
appendBuffer *bytes.Buffer
appender encoding.Appender
}

func newCompactorBlock(id uuid.UUID, tenantID string, bloomFP float64, indexDownsample int, metas []*encoding.BlockMeta, filepath string, estimatedObjects int) (*CompactorBlock, error) {
Expand Down Expand Up @@ -56,6 +57,7 @@ func (c *CompactorBlock) Write(id encoding.ID, object []byte) error {
if err != nil {
return err
}
c.bufferedObjects++
c.meta.ObjectAdded(id)
c.bloom.Add(id)
return nil
Expand All @@ -69,8 +71,13 @@ func (c *CompactorBlock) CurrentBufferLength() int {
return c.appendBuffer.Len()
}

func (c *CompactorBlock) CurrentBufferedObjects() int {
return c.bufferedObjects
}

func (c *CompactorBlock) ResetBuffer() {
c.appendBuffer.Reset()
c.bufferedObjects = 0
}

func (c *CompactorBlock) Length() int {
Expand Down
4 changes: 3 additions & 1 deletion tempodb/wal/compactor_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,7 @@ func TestCompactorBlockWrite(t *testing.T) {
}

records := cb.Records()
assert.Equal(t, math.Ceil(float64(numObjects)/3), float64(len(records)))
assert.Equal(t, math.Ceil(float64(numObjects)/float64(walCfg.IndexDownsample)), float64(len(records)))

assert.Equal(t, numObjects, cb.CurrentBufferedObjects())
}

0 comments on commit 47d7f97

Please sign in to comment.