diff --git a/tempodb/compactor.go b/tempodb/compactor.go index 78f6f98d29d..342ebcaa4f0 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -32,10 +32,15 @@ var ( Help: "Records the amount of time to compact a set of blocks.", Buckets: prometheus.ExponentialBuckets(30, 2, 10), }, []string{"level"}) - metricCompactionObjectsProcessed = promauto.NewCounterVec(prometheus.CounterOpts{ + metricCompactionObjectsWritten = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "tempodb", - Name: "compaction_objects_processed_total", - Help: "Total number of objects processed during compaction.", + Name: "compaction_objects_written", + Help: "Total number of objects written to backend during compaction.", + }, []string{"level"}) + metricCompactionBytesWritten = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "tempodb", + Name: "compaction_bytes_written", + Help: "Total number of bytes written to backend during compaction.", }, []string{"level"}) metricCompactionErrors = promauto.NewCounter(prometheus.CounterOpts{ Namespace: "tempodb", @@ -199,7 +204,6 @@ func (rw *readerWriter) compact(blockMetas []*encoding.BlockMeta, tenantID strin return err } lowestBookmark.clear() - metricCompactionObjectsProcessed.WithLabelValues(compactionLevelLabel).Inc() // write partial block if currentBlock.CurrentBufferLength() >= int(rw.compactorCfg.FlushSizeBytes) { @@ -241,6 +245,11 @@ func appendBlock(rw *readerWriter, tracker backend.AppendTracker, block *wal.Com if err != nil { return nil, err } + + compactionLevelLabel := strconv.Itoa(int(block.BlockMeta().CompactionLevel - 1)) + metricCompactionObjectsWritten.WithLabelValues(compactionLevelLabel).Add(float64(block.CurrentBufferedObjects())) + metricCompactionBytesWritten.WithLabelValues(compactionLevelLabel).Add(float64(block.CurrentBufferLength())) + block.ResetBuffer() return tracker, nil diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 9f854f58d11..1c45265aabc 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -94,7 +94,7 @@ func TestCompaction(t *testing.T) { _, err = rand.Read(id) assert.NoError(t, err, "unexpected creating random id") - req := test.MakeRequest(i*10, id) + req := test.MakeRequest(10, id) reqs = append(reqs, req) ids = append(ids, id) @@ -124,12 +124,6 @@ func TestCompaction(t *testing.T) { rw.pollBlocklist() - processedStart, err := GetCounterVecValue(metricCompactionObjectsProcessed, "0") - assert.NoError(t, err) - - blocksStart, err := GetCounterVecValue(metricCompactionBlocks, "0") - assert.NoError(t, err) - blocklist := rw.blocklist(testTenantID) blockSelector := newTimeWindowBlockSelector(blocklist, rw.compactorCfg.MaxCompactionRange, 10000, defaultMinInputBlocks, 2) @@ -160,15 +154,6 @@ func TestCompaction(t *testing.T) { } assert.Equal(t, blockCount*recordCount, records) - // Check metric - processedEnd, err := GetCounterVecValue(metricCompactionObjectsProcessed, "0") - assert.NoError(t, err) - assert.Equal(t, float64(blockCount*recordCount), processedEnd-processedStart) - - blocksEnd, err := GetCounterVecValue(metricCompactionBlocks, "0") - assert.NoError(t, err) - assert.Equal(t, float64(blockCount), blocksEnd-blocksStart) - // now see if we can find our ids for i, id := range allIds { b, _, err := rw.Find(context.Background(), testTenantID, id) @@ -345,6 +330,77 @@ func TestCompactionUpdatesBlocklist(t *testing.T) { } } +func TestCompactionMetrics(t *testing.T) { + tempDir, err := ioutil.TempDir("/tmp", "") + defer os.RemoveAll(tempDir) + assert.NoError(t, err, "unexpected error creating temp dir") + + r, w, c, err := New(&Config{ + Backend: "local", + Pool: &pool.Config{ + MaxWorkers: 10, + QueueDepth: 100, + }, + Local: &local.Config{ + Path: path.Join(tempDir, "traces"), + }, + WAL: &wal.Config{ + Filepath: path.Join(tempDir, "wal"), + IndexDownsample: rand.Int()%20 + 1, + BloomFP: .01, + }, + BlocklistPoll: 0, + }, log.NewNopLogger()) + assert.NoError(t, err) + + c.EnableCompaction(&CompactorConfig{ + ChunkSizeBytes: 10, + MaxCompactionRange: 24 * time.Hour, + BlockRetention: 0, + CompactedBlockRetention: 0, + }, &mockSharder{}) + + // Cut x blocks with y records each + blockCount := 5 + recordCount := 1 + cutTestBlocks(t, w, blockCount, recordCount) + + rw := r.(*readerWriter) + rw.pollBlocklist() + + // Get starting metrics + processedStart, err := GetCounterVecValue(metricCompactionObjectsWritten, "0") + assert.NoError(t, err) + + blocksStart, err := GetCounterVecValue(metricCompactionBlocks, "0") + assert.NoError(t, err) + + bytesStart, err := GetCounterVecValue(metricCompactionBytesWritten, "0") + assert.NoError(t, err) + + // compact everything + err = rw.compact(rw.blocklist(testTenantID), testTenantID) + assert.NoError(t, err) + + // Check metric + processedEnd, err := GetCounterVecValue(metricCompactionObjectsWritten, "0") + assert.NoError(t, err) + assert.Equal(t, float64(blockCount*recordCount), processedEnd-processedStart) + + blocksEnd, err := GetCounterVecValue(metricCompactionBlocks, "0") + assert.NoError(t, err) + assert.Equal(t, float64(blockCount), blocksEnd-blocksStart) + + bytesEnd, err := GetCounterVecValue(metricCompactionBytesWritten, "0") + assert.NoError(t, err) + bytesPerRecord := + 4 /* total length */ + + 4 /* id length */ + + 16 /* id */ + + 3 /* test record length */ + assert.Equal(t, float64(blockCount*recordCount*bytesPerRecord), bytesEnd-bytesStart) +} + func cutTestBlocks(t *testing.T, w Writer, blockCount int, recordCount int) { wal := w.WAL() for i := 0; i < blockCount; i++ { diff --git a/tempodb/wal/compactor_block.go b/tempodb/wal/compactor_block.go index bf0d9a02963..ef8a211c713 100644 --- a/tempodb/wal/compactor_block.go +++ b/tempodb/wal/compactor_block.go @@ -17,8 +17,9 @@ type CompactorBlock struct { bloom *bloom.ShardedBloomFilter - appendBuffer *bytes.Buffer - appender encoding.Appender + bufferedObjects int + appendBuffer *bytes.Buffer + appender encoding.Appender } func newCompactorBlock(id uuid.UUID, tenantID string, bloomFP float64, indexDownsample int, metas []*encoding.BlockMeta, filepath string, estimatedObjects int) (*CompactorBlock, error) { @@ -56,6 +57,7 @@ func (c *CompactorBlock) Write(id encoding.ID, object []byte) error { if err != nil { return err } + c.bufferedObjects++ c.meta.ObjectAdded(id) c.bloom.Add(id) return nil @@ -69,8 +71,13 @@ func (c *CompactorBlock) CurrentBufferLength() int { return c.appendBuffer.Len() } +func (c *CompactorBlock) CurrentBufferedObjects() int { + return c.bufferedObjects +} + func (c *CompactorBlock) ResetBuffer() { c.appendBuffer.Reset() + c.bufferedObjects = 0 } func (c *CompactorBlock) Length() int { diff --git a/tempodb/wal/compactor_block_test.go b/tempodb/wal/compactor_block_test.go index c34aa4f1da0..b3b37cc35a0 100644 --- a/tempodb/wal/compactor_block_test.go +++ b/tempodb/wal/compactor_block_test.go @@ -95,5 +95,7 @@ func TestCompactorBlockWrite(t *testing.T) { } records := cb.Records() - assert.Equal(t, math.Ceil(float64(numObjects)/3), float64(len(records))) + assert.Equal(t, math.Ceil(float64(numObjects)/float64(walCfg.IndexDownsample)), float64(len(records))) + + assert.Equal(t, numObjects, cb.CurrentBufferedObjects()) }