Skip to content

Commit

Permalink
Build 24h blocks for older OOO data (#751)
Browse files Browse the repository at this point in the history
* Build 24h blocks for older OOO data

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix tests

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Add TestBiggerBlocksForOldOOOData

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* lint

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix comments

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

* Fix comment

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>

---------

Signed-off-by: Ganesh Vernekar <ganeshvern@gmail.com>
  • Loading branch information
codesome authored and colega committed Nov 29, 2024
1 parent 7ff5bc5 commit 9e6d374
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 4 deletions.
26 changes: 23 additions & 3 deletions tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1503,14 +1503,34 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID

meta := &BlockMeta{}
meta.Compaction.SetOutOfOrder()
for t := blockSize * (oooHeadMint / blockSize); t <= oooHeadMaxt; t += blockSize {
mint, maxt := t, t+blockSize
runCompaction := func(mint, maxt int64) error {
// Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.
uids, err := db.compactor.Write(dest, oooHead.CloneForTimeRange(mint, maxt-1), mint, maxt, meta)
if err != nil {
return nil, err
return err
}
ulids = append(ulids, uids...)
return nil
}

day := 24 * time.Hour.Milliseconds()
maxtFor24hBlock := day * (db.Head().MaxTime() / day)

// 24h blocks for data that is for the previous days
for t := day * (oooHeadMint / day); t < maxtFor24hBlock; t += day {
if err := runCompaction(t, t+day); err != nil {
return nil, err
}
}

oooStart := oooHeadMint
if oooStart < maxtFor24hBlock {
oooStart = maxtFor24hBlock
}
for t := blockSize * (oooStart / blockSize); t <= oooHeadMaxt; t += blockSize {
if err := runCompaction(t, t+blockSize); err != nil {
return nil, err
}
}

if len(ulids) == 0 {
Expand Down
75 changes: 74 additions & 1 deletion tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7518,7 +7518,7 @@ func testOutOfOrderRuntimeConfig(t *testing.T, scenario sampleTypeScenario) {
require.Positive(t, size)

require.Empty(t, db.Blocks())
require.NoError(t, db.compactOOOHead(ctx))
require.NoError(t, db.CompactOOOHead(ctx))
require.NotEmpty(t, db.Blocks())

// WBL is empty.
Expand Down Expand Up @@ -8970,3 +8970,76 @@ func TestGenerateCompactionDelay(t *testing.T) {
assertDelay(db.generateCompactionDelay())
}
}

func TestBiggerBlocksForOldOOOData(t *testing.T) {
var (
ctx = context.Background()
lbls = labels.FromStrings("foo", "bar")

day = 24 * time.Hour.Milliseconds()
hour = time.Hour.Milliseconds()

currTs = time.Now().UnixMilli()
currDayStart = day * (currTs / day)

expOOOSamples []chunks.Sample
)

opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 10 * day
db := openTestDB(t, opts, nil)
db.DisableCompactions()
t.Cleanup(func() {
require.NoError(t, db.Close())
})

// 1 in-order sample.
app := db.Appender(ctx)
inOrderTs := currDayStart + (6 * hour)
ref, err := app.Append(0, lbls, inOrderTs, float64(inOrderTs))
require.NoError(t, err)
require.NoError(t, app.Commit())

// OOO samples till 5 days ago.
for ts := currDayStart - (5 * day); ts < inOrderTs; ts += hour {
app = db.Appender(ctx)
_, err := app.Append(ref, lbls, ts, float64(ts))
require.NoError(t, err)
require.NoError(t, app.Commit())
expOOOSamples = append(expOOOSamples, sample{t: ts, f: float64(ts)})
}

require.Empty(t, db.Blocks())
require.NoError(t, db.CompactOOOHead(ctx))
// 5 OOO blocks from the last 5 days + 3 OOO blocks for the 6h of curr day (2h blocks)
require.Len(t, db.Blocks(), 8)

// Check that blocks are alright.
// Move all the blocks to a new DB and check for all OOO samples
// getting into the new DB and the old DB only has the in-order sample.
newDB := openTestDB(t, opts, nil)
t.Cleanup(func() {
require.NoError(t, newDB.Close())
})
for _, b := range db.Blocks() {
err := os.Rename(b.Dir(), path.Join(newDB.Dir(), b.Meta().ULID.String()))
require.NoError(t, err)
}

require.NoError(t, db.reloadBlocks())
require.NoError(t, newDB.reloadBlocks())
require.Empty(t, db.Blocks())
require.Len(t, newDB.Blocks(), 8)

// Only in-order sample in the old DB.
querier, err := db.Querier(inOrderTs-6*day, inOrderTs+1)
require.NoError(t, err)
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.Equal(t, map[string][]chunks.Sample{`{foo="bar"}`: {sample{t: inOrderTs, f: float64(inOrderTs)}}}, seriesSet)

// All OOO samples in the new DB.
querier, err = newDB.Querier(inOrderTs-6*day, inOrderTs+1)
require.NoError(t, err)
seriesSet = query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.Equal(t, map[string][]chunks.Sample{`{foo="bar"}`: expOOOSamples}, seriesSet)
}

0 comments on commit 9e6d374

Please sign in to comment.