Skip to content

Commit

Permalink
Fixed bytes flushed metric
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Elliott <number101010@gmail.com>
  • Loading branch information
joe-elliott committed Feb 4, 2021
1 parent 0e84c94 commit 018af31
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 29 deletions.
13 changes: 5 additions & 8 deletions tempodb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,28 +242,25 @@ func (rw *readerWriter) compact(blockMetas []*backend.BlockMeta, tenantID string
func appendBlock(rw *readerWriter, tracker backend.AppendTracker, block *encoding.CompactorBlock) (backend.AppendTracker, error) {
compactionLevelLabel := strconv.Itoa(int(block.BlockMeta().CompactionLevel - 1))
metricCompactionObjectsWritten.WithLabelValues(compactionLevelLabel).Add(float64(block.CurrentBufferedObjects()))
metricCompactionBytesWritten.WithLabelValues(compactionLevelLabel).Add(float64(block.CurrentBufferLength()))

tracker, err := block.FlushBuffer(context.TODO(), tracker, rw.w)
tracker, bytesFlushed, err := block.FlushBuffer(context.TODO(), tracker, rw.w)
if err != nil {
return nil, err
}
metricCompactionBytesWritten.WithLabelValues(compactionLevelLabel).Add(float64(bytesFlushed))

return tracker, nil
}

func finishBlock(rw *readerWriter, tracker backend.AppendTracker, block *encoding.CompactorBlock) error {
level.Info(rw.logger).Log("msg", "writing compacted block", "block", fmt.Sprintf("%+v", block.BlockMeta()))

tracker, err := appendBlock(rw, tracker, block)
if err != nil {
return err
}

err = block.Complete(context.TODO(), tracker, rw.w)
bytesFlushed, err := block.Complete(context.TODO(), tracker, rw.w)
if err != nil {
return err
}
compactionLevelLabel := strconv.Itoa(int(block.BlockMeta().CompactionLevel - 1))
metricCompactionBytesWritten.WithLabelValues(compactionLevelLabel).Add(float64(bytesFlushed))

return nil
}
Expand Down
22 changes: 11 additions & 11 deletions tempodb/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func TestCompactionMetrics(t *testing.T) {
Block: &encoding.BlockConfig{
IndexDownsample: 11,
BloomFP: .01,
Encoding: backend.EncLZ4_4M,
Encoding: backend.EncNone,
},
WAL: &wal.Config{
Filepath: path.Join(tempDir, "wal"),
Expand Down Expand Up @@ -364,8 +364,8 @@ func TestCompactionMetrics(t *testing.T) {
blocksStart, err := test.GetCounterVecValue(metricCompactionBlocks, "0")
assert.NoError(t, err)

// bytesStart, err := test.GetCounterVecValue(metricCompactionBytesWritten, "0")
// assert.NoError(t, err)
bytesStart, err := test.GetCounterVecValue(metricCompactionBytesWritten, "0")
assert.NoError(t, err)

// compact everything
err = rw.compact(rw.blocklist(testTenantID), testTenantID)
Expand All @@ -380,14 +380,14 @@ func TestCompactionMetrics(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, float64(blockCount), blocksEnd-blocksStart)

// bytesEnd, err := test.GetCounterVecValue(metricCompactionBytesWritten, "0") - jpe restore?
// assert.NoError(t, err)
// bytesPerRecord :=
// 4 /* total length */ +
// 4 /* id length */ +
// 16 /* id */ +
// 3 /* test record length */
// assert.Equal(t, float64(blockCount*recordCount*bytesPerRecord), bytesEnd-bytesStart)
bytesEnd, err := test.GetCounterVecValue(metricCompactionBytesWritten, "0")
assert.NoError(t, err)
bytesPerRecord :=
4 /* total length */ +
4 /* id length */ +
16 /* id */ +
3 /* test record length */
assert.Equal(t, float64(blockCount*recordCount*bytesPerRecord), bytesEnd-bytesStart)
}

func TestCompactionIteratesThroughTenants(t *testing.T) {
Expand Down
21 changes: 11 additions & 10 deletions tempodb/encoding/compactor_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,45 +74,46 @@ func (c *CompactorBlock) Length() int {
}

// FlushBuffer flushes any existing objects to the backend
func (c *CompactorBlock) FlushBuffer(ctx context.Context, tracker backend.AppendTracker, w backend.Writer) (backend.AppendTracker, error) {
func (c *CompactorBlock) FlushBuffer(ctx context.Context, tracker backend.AppendTracker, w backend.Writer) (backend.AppendTracker, int, error) {
if c.appender.Length() == 0 {
return tracker, nil
return tracker, 0, nil
}

meta := c.BlockMeta()
tracker, err := c.encoding.appendBlockData(ctx, w, meta, tracker, c.appendBuffer.Bytes())
if err != nil {
return nil, err
return nil, 0, err
}

bytesFlushed := c.appendBuffer.Len()
c.appendBuffer.Reset()
c.bufferedObjects = 0

return tracker, nil
return tracker, bytesFlushed, nil
}

// Complete finishes writes the compactor metadata and closes all buffers and appenders
func (c *CompactorBlock) Complete(ctx context.Context, tracker backend.AppendTracker, w backend.Writer) error {
func (c *CompactorBlock) Complete(ctx context.Context, tracker backend.AppendTracker, w backend.Writer) (int, error) {
err := c.appender.Complete()
if err != nil {
return err
return 0, err
}

// one final flush
_, err = c.FlushBuffer(ctx, tracker, w)
_, bytesFlushed, err := c.FlushBuffer(ctx, tracker, w)
if err != nil {
return err
return 0, err
}

records := c.appender.Records()
meta := c.BlockMeta()

err = c.encoding.writeBlockMeta(ctx, w, meta, records, c.bloom)
if err != nil {
return err
return 0, err
}

return w.CloseAppend(ctx, tracker)
return bytesFlushed, w.CloseAppend(ctx, tracker)
}

func (c *CompactorBlock) BlockMeta() *backend.BlockMeta {
Expand Down

0 comments on commit 018af31

Please sign in to comment.