Skip to content

Commit

Permalink
Merge pull request #770 from grafana/weekly-r318-pr-751-763
Browse files Browse the repository at this point in the history
[weekly-r318] Build 24h blocks for older OOO data + EnableBiggerOOOBlockForOldSamples option
  • Loading branch information
colega authored Nov 29, 2024
2 parents 0558ce1 + 1eb3a81 commit bcbf773
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 4 deletions.
33 changes: 30 additions & 3 deletions tsdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,11 @@ type Options struct {
// OOO Native Histogram ingestion is complete.
EnableOOONativeHistograms bool

// EnableBiggerOOOBlockForOldSamples enables building 24h blocks for the OOO samples
// that belong to the previous day. This is in-line with Mimir maintaining 24h blocks
// for the previous days.
EnableBiggerOOOBlockForOldSamples bool

// OutOfOrderTimeWindow specifies how much out of order is allowed, if any.
// This can change during run-time, so this value from here should only be used
// while initialising.
Expand Down Expand Up @@ -1503,14 +1508,36 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID

meta := &BlockMeta{}
meta.Compaction.SetOutOfOrder()
for t := blockSize * (oooHeadMint / blockSize); t <= oooHeadMaxt; t += blockSize {
mint, maxt := t, t+blockSize
runCompaction := func(mint, maxt int64) error {
// Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes.
uids, err := db.compactor.Write(dest, oooHead.CloneForTimeRange(mint, maxt-1), mint, maxt, meta)
if err != nil {
return nil, err
return err
}
ulids = append(ulids, uids...)
return nil
}

oooStart := oooHeadMint
if db.opts.EnableBiggerOOOBlockForOldSamples {
day := 24 * time.Hour.Milliseconds()
maxtFor24hBlock := day * (db.Head().MaxTime() / day)

// 24h blocks for data that is for the previous days
for t := day * (oooHeadMint / day); t < maxtFor24hBlock; t += day {
if err := runCompaction(t, t+day); err != nil {
return nil, err
}
}

if oooStart < maxtFor24hBlock {
oooStart = maxtFor24hBlock
}
}
for t := blockSize * (oooStart / blockSize); t <= oooHeadMaxt; t += blockSize {
if err := runCompaction(t, t+blockSize); err != nil {
return nil, err
}
}

if len(ulids) == 0 {
Expand Down
76 changes: 75 additions & 1 deletion tsdb/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7518,7 +7518,7 @@ func testOutOfOrderRuntimeConfig(t *testing.T, scenario sampleTypeScenario) {
require.Positive(t, size)

require.Empty(t, db.Blocks())
require.NoError(t, db.compactOOOHead(ctx))
require.NoError(t, db.CompactOOOHead(ctx))
require.NotEmpty(t, db.Blocks())

// WBL is empty.
Expand Down Expand Up @@ -8970,3 +8970,77 @@ func TestGenerateCompactionDelay(t *testing.T) {
assertDelay(db.generateCompactionDelay())
}
}

func TestBiggerBlocksForOldOOOData(t *testing.T) {
var (
ctx = context.Background()
lbls = labels.FromStrings("foo", "bar")

day = 24 * time.Hour.Milliseconds()
hour = time.Hour.Milliseconds()

currTs = time.Now().UnixMilli()
currDayStart = day * (currTs / day)

expOOOSamples []chunks.Sample
)

opts := DefaultOptions()
opts.OutOfOrderTimeWindow = 10 * day
opts.EnableBiggerOOOBlockForOldSamples = true
db := openTestDB(t, opts, nil)
db.DisableCompactions()
t.Cleanup(func() {
require.NoError(t, db.Close())
})

// 1 in-order sample.
app := db.Appender(ctx)
inOrderTs := currDayStart + (6 * hour)
ref, err := app.Append(0, lbls, inOrderTs, float64(inOrderTs))
require.NoError(t, err)
require.NoError(t, app.Commit())

// OOO samples till 5 days ago.
for ts := currDayStart - (5 * day); ts < inOrderTs; ts += hour {
app = db.Appender(ctx)
_, err := app.Append(ref, lbls, ts, float64(ts))
require.NoError(t, err)
require.NoError(t, app.Commit())
expOOOSamples = append(expOOOSamples, sample{t: ts, f: float64(ts)})
}

require.Empty(t, db.Blocks())
require.NoError(t, db.CompactOOOHead(ctx))
// 5 OOO blocks from the last 5 days + 3 OOO blocks for the 6h of curr day (2h blocks)
require.Len(t, db.Blocks(), 8)

// Check that blocks are alright.
// Move all the blocks to a new DB and check for all OOO samples
// getting into the new DB and the old DB only has the in-order sample.
newDB := openTestDB(t, opts, nil)
t.Cleanup(func() {
require.NoError(t, newDB.Close())
})
for _, b := range db.Blocks() {
err := os.Rename(b.Dir(), path.Join(newDB.Dir(), b.Meta().ULID.String()))
require.NoError(t, err)
}

require.NoError(t, db.reloadBlocks())
require.NoError(t, newDB.reloadBlocks())
require.Empty(t, db.Blocks())
require.Len(t, newDB.Blocks(), 8)

// Only in-order sample in the old DB.
querier, err := db.Querier(inOrderTs-6*day, inOrderTs+1)
require.NoError(t, err)
seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.Equal(t, map[string][]chunks.Sample{`{foo="bar"}`: {sample{t: inOrderTs, f: float64(inOrderTs)}}}, seriesSet)

// All OOO samples in the new DB.
querier, err = newDB.Querier(inOrderTs-6*day, inOrderTs+1)
require.NoError(t, err)
seriesSet = query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"))
require.Equal(t, map[string][]chunks.Sample{`{foo="bar"}`: expOOOSamples}, seriesSet)
}

0 comments on commit bcbf773

Please sign in to comment.