From 018af31a63a13d2cbfee3fe74170dc2fa8db98be Mon Sep 17 00:00:00 2001 From: Joe Elliott Date: Thu, 4 Feb 2021 15:39:39 -0500 Subject: [PATCH] Fixed bytes flushed metric Signed-off-by: Joe Elliott --- tempodb/compactor.go | 13 +++++-------- tempodb/compactor_test.go | 22 +++++++++++----------- tempodb/encoding/compactor_block.go | 21 +++++++++++---------- 3 files changed, 27 insertions(+), 29 deletions(-) diff --git a/tempodb/compactor.go b/tempodb/compactor.go index 433b8d574ff..8b5d45df033 100644 --- a/tempodb/compactor.go +++ b/tempodb/compactor.go @@ -242,12 +242,12 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string func appendBlock(rw *readerWriter, tracker backend.AppendTracker, block *encoding.CompactorBlock) (backend.AppendTracker, error) { compactionLevelLabel := strconv.Itoa(int(block.BlockMeta().CompactionLevel - 1)) metricCompactionObjectsWritten.WithLabelValues(compactionLevelLabel).Add(float64(block.CurrentBufferedObjects())) - metricCompactionBytesWritten.WithLabelValues(compactionLevelLabel).Add(float64(block.CurrentBufferLength())) - tracker, err := block.FlushBuffer(context.TODO(), tracker, rw.w) + tracker, bytesFlushed, err := block.FlushBuffer(context.TODO(), tracker, rw.w) if err != nil { return nil, err } + metricCompactionBytesWritten.WithLabelValues(compactionLevelLabel).Add(float64(bytesFlushed)) return tracker, nil } @@ -255,15 +255,12 @@ func appendBlock(rw *readerWriter, tracker backend.AppendTracker, block *encodin func finishBlock(rw *readerWriter, tracker backend.AppendTracker, block *encoding.CompactorBlock) error { level.Info(rw.logger).Log("msg", "writing compacted block", "block", fmt.Sprintf("%+v", block.BlockMeta())) - tracker, err := appendBlock(rw, tracker, block) - if err != nil { - return err - } - - err = block.Complete(context.TODO(), tracker, rw.w) + bytesFlushed, err := block.Complete(context.TODO(), tracker, rw.w) if err != nil { return err } + compactionLevelLabel := strconv.Itoa(int(block.BlockMeta().CompactionLevel - 1)) + metricCompactionBytesWritten.WithLabelValues(compactionLevelLabel).Add(float64(bytesFlushed)) return nil } diff --git a/tempodb/compactor_test.go b/tempodb/compactor_test.go index 9ab3c4157c4..3b08a2b823d 100644 --- a/tempodb/compactor_test.go +++ b/tempodb/compactor_test.go @@ -333,7 +333,7 @@ func TestCompactionMetrics(t *testing.T) { Block: &encoding.BlockConfig{ IndexDownsample: 11, BloomFP: .01, - Encoding: backend.EncLZ4_4M, + Encoding: backend.EncNone, }, WAL: &wal.Config{ Filepath: path.Join(tempDir, "wal"), @@ -364,8 +364,8 @@ func TestCompactionMetrics(t *testing.T) { blocksStart, err := test.GetCounterVecValue(metricCompactionBlocks, "0") assert.NoError(t, err) - // bytesStart, err := test.GetCounterVecValue(metricCompactionBytesWritten, "0") - // assert.NoError(t, err) + bytesStart, err := test.GetCounterVecValue(metricCompactionBytesWritten, "0") + assert.NoError(t, err) // compact everything err = rw.compact(rw.blocklist(testTenantID), testTenantID) @@ -380,14 +380,14 @@ func TestCompactionMetrics(t *testing.T) { assert.NoError(t, err) assert.Equal(t, float64(blockCount), blocksEnd-blocksStart) - // bytesEnd, err := test.GetCounterVecValue(metricCompactionBytesWritten, "0") - jpe restore? - // assert.NoError(t, err) - // bytesPerRecord := - // 4 /* total length */ + - // 4 /* id length */ + - // 16 /* id */ + - // 3 /* test record length */ - // assert.Equal(t, float64(blockCount*recordCount*bytesPerRecord), bytesEnd-bytesStart) + bytesEnd, err := test.GetCounterVecValue(metricCompactionBytesWritten, "0") + assert.NoError(t, err) + bytesPerRecord := + 4 /* total length */ + + 4 /* id length */ + + 16 /* id */ + + 3 /* test record length */ + assert.Equal(t, float64(blockCount*recordCount*bytesPerRecord), bytesEnd-bytesStart) } func TestCompactionIteratesThroughTenants(t *testing.T) { diff --git a/tempodb/encoding/compactor_block.go b/tempodb/encoding/compactor_block.go index 90b63d3595c..6431add9c24 100644 --- a/tempodb/encoding/compactor_block.go +++ b/tempodb/encoding/compactor_block.go @@ -74,34 +74,35 @@ func (c *CompactorBlock) Length() int { } // FlushBuffer flushes any existing objects to the backend -func (c *CompactorBlock) FlushBuffer(ctx context.Context, tracker backend.AppendTracker, w backend.Writer) (backend.AppendTracker, error) { +func (c *CompactorBlock) FlushBuffer(ctx context.Context, tracker backend.AppendTracker, w backend.Writer) (backend.AppendTracker, int, error) { if c.appender.Length() == 0 { - return tracker, nil + return tracker, 0, nil } meta := c.BlockMeta() tracker, err := c.encoding.appendBlockData(ctx, w, meta, tracker, c.appendBuffer.Bytes()) if err != nil { - return nil, err + return nil, 0, err } + bytesFlushed := c.appendBuffer.Len() c.appendBuffer.Reset() c.bufferedObjects = 0 - return tracker, nil + return tracker, bytesFlushed, nil } // Complete finishes writes the compactor metadata and closes all buffers and appenders -func (c *CompactorBlock) Complete(ctx context.Context, tracker backend.AppendTracker, w backend.Writer) error { +func (c *CompactorBlock) Complete(ctx context.Context, tracker backend.AppendTracker, w backend.Writer) (int, error) { err := c.appender.Complete() if err != nil { - return err + return 0, err } // one final flush - _, err = c.FlushBuffer(ctx, tracker, w) + _, bytesFlushed, err := c.FlushBuffer(ctx, tracker, w) if err != nil { - return err + return 0, err } records := c.appender.Records() @@ -109,10 +110,10 @@ func (c *CompactorBlock) Complete(ctx context.Context, tracker backend.AppendTra err = c.encoding.writeBlockMeta(ctx, w, meta, records, c.bloom) if err != nil { - return err + return 0, err } - return w.CloseAppend(ctx, tracker) + return bytesFlushed, w.CloseAppend(ctx, tracker) } func (c *CompactorBlock) BlockMeta() *backend.BlockMeta {