Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Vertical query merging and compaction #370

Merged
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
d14623d
Vertical series iterator
codesome Sep 1, 2018
bea7a5d
Select overlapped blocks first in compactor Plan()
codesome Sep 2, 2018
4afa1f0
Added vertical compaction
codesome Sep 3, 2018
f11143c
Code cleanup and comments
codesome Sep 4, 2018
55b13c0
Fix review comments
codesome Oct 6, 2018
f880bc9
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Oct 6, 2018
7de67b3
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Oct 25, 2018
ba9facf
Fix tests
codesome Oct 25, 2018
0f86bb0
Add benchmark for compaction
codesome Oct 25, 2018
0aae01d
Perform vertical compaction only when blocks are overlapping.
codesome Nov 3, 2018
cb9bb62
Benchmark for vertical compaction
codesome Nov 3, 2018
7ae4941
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Nov 3, 2018
94e5ec1
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Nov 13, 2018
4103678
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Nov 19, 2018
9df857d
Benchmark for query iterator and seek for non overlapping blocks
codesome Nov 30, 2018
ad4ef3f
Vertical query merge only for overlapping blocks
codesome Nov 30, 2018
5620350
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Nov 30, 2018
e9b05eb
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Dec 7, 2018
a8f4c26
Simplify logging in Compact(...)
codesome Dec 27, 2018
5e707bf
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Dec 27, 2018
d655420
Updated CHANGELOG.md
codesome Dec 27, 2018
6cb6f2a
Calculate overlapping inside populateBlock
codesome Jan 4, 2019
f53e648
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 5, 2019
6254595
MinTime and MaxTime for BlockReader.
codesome Jan 10, 2019
275eeb0
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 16, 2019
f43086f
Sort blocks w.r.t. MinTime in reload()
codesome Jan 19, 2019
48eed7c
Log about overlapping in LeveledCompactor.write() instead of returnin…
codesome Jan 19, 2019
9f288dc
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 19, 2019
b92a960
Log about overlapping inside LeveledCompactor.populateBlock()
codesome Jan 21, 2019
0d98331
Fix review comments
codesome Jan 21, 2019
acfbdb3
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 21, 2019
4d448d6
Refactor createBlock to take optional []Series
codesome Jan 21, 2019
159cbe3
review1
Jan 23, 2019
f1253d2
Merge pull request #6 from krasi-georgiev/pull/370-review
codesome Jan 23, 2019
1072f0f
Updated CHANGELOG and minor nits
codesome Jan 23, 2019
4a0e2e6
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Jan 28, 2019
8047a2a
nits
codesome Jan 28, 2019
47436d0
Updated CHANGELOG
codesome Jan 28, 2019
ad7993e
Refactor iterator and seek benchmarks for Querier.
codesome Jan 30, 2019
117cef8
Additional test case
codesome Feb 8, 2019
5f8d911
genSeries takes optional labels. Updated BenchmarkQueryIterator and B…
codesome Feb 10, 2019
a23f030
Split genSeries into genSeries and populateSeries
codesome Feb 12, 2019
260665c
Check error in benchmark
codesome Feb 12, 2019
6a1d3f4
Merge remote-tracking branch 'upstream/master' into vertical-query-me…
codesome Feb 12, 2019
d5e8479
Fix review comments
codesome Feb 14, 2019
58e534c
Warn about overlapping blocks in reload()
codesome Feb 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
## master / unreleased

- [ENHANCEMENT] Time-ovelapping blocks are now allowed. [#370](https://github.com/prometheus/tsdb/pull/370)
- Added `MergeChunks` function in `chunkenc/xor.go` to merge 2 time-overlapping chunks.
- Added `MergeOverlappingChunks` function in `chunks/chunks.go` to merge multiple time-overlapping Chunk Metas.
- Added `MinTime` and `MaxTime` method for `BlockReader`.
- [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes`

## 0.4.0
- [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed.
- [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374)
- [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change:
- added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- new public interface `SizeReader: Size() int64`
- `OpenBlock` signature changed to take a logger.
- Added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total`
- New public interface `SizeReader: Size() int64`
- `OpenBlock` signature changed to take a logger.
- [REMOVED] `PrefixMatcher` is considered unused so was removed.
- [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere.
- [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read.
Expand All @@ -23,4 +26,4 @@
- [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)`
- [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field.
- [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset.
-[FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc.
- [FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc.
12 changes: 12 additions & 0 deletions block.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ type BlockReader interface {

// Tombstones returns a TombstoneReader over the block's deleted data.
Tombstones() (TombstoneReader, error)

// MinTime returns the min time of the block.
MinTime() int64

// MaxTime returns the max time of the block.
MaxTime() int64
}

// Appendable defines an entity to which data can be appended.
Expand Down Expand Up @@ -349,6 +355,12 @@ func (pb *Block) Dir() string { return pb.dir }
// Meta returns meta information about the block.
func (pb *Block) Meta() BlockMeta { return pb.meta }

// MinTime returns the min time of the meta.
func (pb *Block) MinTime() int64 { return pb.meta.MinTime }

// MaxTime returns the max time of the meta.
func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime }

// Size returns the number of bytes that the block takes up.
func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes }

Expand Down
72 changes: 72 additions & 0 deletions chunks/chunks.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,78 @@ func (w *Writer) write(b []byte) error {
return err
}

// MergeOverlappingChunks removes the samples whose timestamp is overlapping.
// The first appearing sample is retained in case there is overlapping.
gouthamve marked this conversation as resolved.
Show resolved Hide resolved
// This assumes that `chks []Meta` is sorted w.r.t. MinTime.
func MergeOverlappingChunks(chks []Meta) ([]Meta, error) {
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
if len(chks) < 2 {
return chks, nil
}
newChks := make([]Meta, 0, len(chks)) // Will contain the merged chunks.
newChks = append(newChks, chks[0])
last := 0
for _, c := range chks[1:] {
// We need to check only the last chunk in newChks.
// Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping)
// (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime.
// So never overlaps with newChks[last-1] or anything before that.
if c.MinTime > newChks[last].MaxTime {
newChks = append(newChks, c)
continue
}
nc := &newChks[last]
if c.MaxTime > nc.MaxTime {
nc.MaxTime = c.MaxTime
}
chk, err := MergeChunks(nc.Chunk, c.Chunk)
if err != nil {
return nil, err
}
nc.Chunk = chk
}

return newChks, nil
}

// MergeChunks vertically merges a and b, i.e., if there is any sample
// with same timestamp in both a and b, the sample in a is discarded.
func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) {
newChunk := chunkenc.NewXORChunk()
app, err := newChunk.Appender()
if err != nil {
return nil, err
}
ait := a.Iterator()
bit := b.Iterator()
aok, bok := ait.Next(), bit.Next()
for aok && bok {
at, av := ait.At()
bt, bv := bit.At()
if at < bt {
app.Append(at, av)
aok = ait.Next()
} else if bt < at {
app.Append(bt, bv)
bok = bit.Next()
} else {
app.Append(bt, bv)
aok = ait.Next()
bok = bit.Next()
}
}
for aok {
at, av := ait.At()
app.Append(at, av)
aok = ait.Next()
}
for bok {
bt, bv := bit.At()
app.Append(bt, bv)
bok = bit.Next()
}
return newChunk, nil
}

func (w *Writer) WriteChunks(chks ...Meta) error {
// Calculate maximum space we need and cut a new segment in case
// we don't fit into the current one.
Expand Down
99 changes: 83 additions & 16 deletions compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tsdb
import (
"fmt"
"io"
"math"
"math/rand"
"os"
"path/filepath"
Expand Down Expand Up @@ -79,12 +80,13 @@ type LeveledCompactor struct {
}

type compactorMetrics struct {
ran prometheus.Counter
failed prometheus.Counter
duration prometheus.Histogram
chunkSize prometheus.Histogram
chunkSamples prometheus.Histogram
chunkRange prometheus.Histogram
ran prometheus.Counter
failed prometheus.Counter
overlappingBlocks prometheus.Counter
duration prometheus.Histogram
chunkSize prometheus.Histogram
chunkSamples prometheus.Histogram
chunkRange prometheus.Histogram
}

func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
Expand All @@ -98,6 +100,10 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
Name: "prometheus_tsdb_compactions_failed_total",
Help: "Total number of compactions that failed for the partition.",
})
m.overlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{
Name: "prometheus_tsdb_compactions_overlapping_total",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compactions_overlapping_total makes it feel like the number of concurrent compactions. Maybe compactions_overlapping_blocks_total?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compactions_overlapping_blocks_total sounds like number of blocks that were overlapping during compaction. How about vertical_compactions_total?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already done :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bwplotka isn't just logging enough?
how would this metric help with troubleshooting, I am just curious as I almost always use the logs for debugging.

Help: "Total number of compactions done on overlapping blocks.",
})
m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "prometheus_tsdb_compaction_duration_seconds",
Help: "Duration of compaction runs",
Expand All @@ -123,6 +129,7 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics {
r.MustRegister(
m.ran,
m.failed,
m.overlappingBlocks,
m.duration,
m.chunkRange,
m.chunkSamples,
Expand Down Expand Up @@ -179,11 +186,15 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) {
return dms[i].meta.MinTime < dms[j].meta.MinTime
})

res := c.selectOverlappingDirs(dms)
if len(res) > 0 {
return res, nil
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
}
// No overlapping blocks, do compaction the usual way.
// We do not include a recently created block with max(minTime), so the block which was just created from WAL.
// This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap.
dms = dms[:len(dms)-1]

var res []string
for _, dm := range c.selectDirs(dms) {
res = append(res, dm.dir)
}
Expand Down Expand Up @@ -244,6 +255,28 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta {
return nil
}

// selectOverlappingDirs returns all dirs with overlaping time ranges.
// It expects sorted input by mint.
func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string {
if len(ds) < 2 {
return nil
}
var overlappingDirs []string
globalMaxt := ds[0].meta.MaxTime
for i, d := range ds[1:] {
if d.meta.MinTime < globalMaxt {
if len(overlappingDirs) == 0 { // When it is the first overlap, need to add the last one as well.
overlappingDirs = append(overlappingDirs, ds[i].dir)
}
overlappingDirs = append(overlappingDirs, d.dir)
}
if d.meta.MaxTime > globalMaxt {
globalMaxt = d.meta.MaxTime
}
}
return overlappingDirs
}

// splitByRange splits the directories by the time range. The range sequence starts at 0.
//
// For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30
Expand Down Expand Up @@ -291,12 +324,17 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
res := &BlockMeta{
ULID: uid,
MinTime: blocks[0].MinTime,
MaxTime: blocks[len(blocks)-1].MaxTime,
}

sources := map[ulid.ULID]struct{}{}
// For overlapping blocks, the Maxt can be
// in any block so we track it globally.
maxt := int64(math.MinInt64)

for _, b := range blocks {
if b.MaxTime > maxt {
maxt = b.MaxTime
}
krasi-georgiev marked this conversation as resolved.
Show resolved Hide resolved
if b.Compaction.Level > res.Compaction.Level {
res.Compaction.Level = b.Compaction.Level
}
Expand All @@ -318,6 +356,7 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta {
return res.Compaction.Sources[i].Compare(res.Compaction.Sources[j]) < 0
})

res.MaxTime = maxt
return res
}

Expand Down Expand Up @@ -575,19 +614,34 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe

// populateBlock fills the index and chunk writers with new data gathered as the union
// of the provided blocks. It returns meta information for the new block.
// It expects sorted blocks input by mint.
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error {
if len(blocks) == 0 {
return errors.New("cannot populate block from no readers")
}

var (
set ChunkSeriesSet
allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{}
set ChunkSeriesSet
allSymbols = make(map[string]struct{}, 1<<16)
closers = []io.Closer{}
err error
overlapping bool
)
defer func() { closeAll(closers...) }()

globalMaxt := blocks[0].MaxTime()
for i, b := range blocks {
if !overlapping {
if i > 0 && b.MinTime() < globalMaxt {
c.metrics.overlappingBlocks.Inc()
overlapping = true
level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID)
}
if b.MaxTime() > globalMaxt {
globalMaxt = b.MaxTime()
}
}

indexr, err := b.Index()
if err != nil {
return errors.Wrapf(err, "open index reader for block %s", b)
Expand Down Expand Up @@ -645,6 +699,12 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,

for set.Next() {
lset, chks, dranges := set.At() // The chunks here are not fully deleted.
if overlapping {
// If blocks are overlapping, it is possible to have unsorted chunks.
sort.Slice(chks, func(i, j int) bool {
return chks[i].MinTime < chks[j].MinTime
})
}

// Skip the series with all deleted chunks.
if len(chks) == 0 {
Expand Down Expand Up @@ -678,21 +738,28 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta,
}
}

if err := chunkw.WriteChunks(chks...); err != nil {
mergedChks := chks
if overlapping {
mergedChks, err = chunks.MergeOverlappingChunks(chks)
if err != nil {
return errors.Wrap(err, "merge overlapping chunks")
}
}
if err := chunkw.WriteChunks(mergedChks...); err != nil {
return errors.Wrap(err, "write chunks")
}

if err := indexw.AddSeries(i, lset, chks...); err != nil {
if err := indexw.AddSeries(i, lset, mergedChks...); err != nil {
return errors.Wrap(err, "add series")
}

meta.Stats.NumChunks += uint64(len(chks))
meta.Stats.NumChunks += uint64(len(mergedChks))
meta.Stats.NumSeries++
for _, chk := range chks {
for _, chk := range mergedChks {
meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples())
}

for _, chk := range chks {
for _, chk := range mergedChks {
if err := c.chunkPool.Put(chk.Chunk); err != nil {
return errors.Wrap(err, "put chunk")
}
Expand Down
Loading