From bde58ccbb26f2d82dfd38895d166d096e1031366 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 20 Nov 2024 17:15:02 -0500 Subject: [PATCH 1/2] Build 24h blocks for older OOO data (#751) * Build 24h blocks for older OOO data Signed-off-by: Ganesh Vernekar * Fix tests Signed-off-by: Ganesh Vernekar * Add TestBiggerBlocksForOldOOOData Signed-off-by: Ganesh Vernekar * lint Signed-off-by: Ganesh Vernekar * Fix comments Signed-off-by: Ganesh Vernekar * Fix comment Signed-off-by: Ganesh Vernekar --------- Signed-off-by: Ganesh Vernekar --- tsdb/db.go | 26 +++++++++++++++-- tsdb/db_test.go | 75 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 97 insertions(+), 4 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 9f40a2c49..659f7a596 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1503,14 +1503,34 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID meta := &BlockMeta{} meta.Compaction.SetOutOfOrder() - for t := blockSize * (oooHeadMint / blockSize); t <= oooHeadMaxt; t += blockSize { - mint, maxt := t, t+blockSize + runCompaction := func(mint, maxt int64) error { // Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes. uids, err := db.compactor.Write(dest, oooHead.CloneForTimeRange(mint, maxt-1), mint, maxt, meta) if err != nil { - return nil, err + return err } ulids = append(ulids, uids...) + return nil + } + + day := 24 * time.Hour.Milliseconds() + maxtFor24hBlock := day * (db.Head().MaxTime() / day) + + // 24h blocks for data that is for the previous days + for t := day * (oooHeadMint / day); t < maxtFor24hBlock; t += day { + if err := runCompaction(t, t+day); err != nil { + return nil, err + } + } + + oooStart := oooHeadMint + if oooStart < maxtFor24hBlock { + oooStart = maxtFor24hBlock + } + for t := blockSize * (oooStart / blockSize); t <= oooHeadMaxt; t += blockSize { + if err := runCompaction(t, t+blockSize); err != nil { + return nil, err + } } if len(ulids) == 0 { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 9a5dc0e2a..cdfcf209a 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -7518,7 +7518,7 @@ func testOutOfOrderRuntimeConfig(t *testing.T, scenario sampleTypeScenario) { require.Positive(t, size) require.Empty(t, db.Blocks()) - require.NoError(t, db.compactOOOHead(ctx)) + require.NoError(t, db.CompactOOOHead(ctx)) require.NotEmpty(t, db.Blocks()) // WBL is empty. @@ -8970,3 +8970,76 @@ func TestGenerateCompactionDelay(t *testing.T) { assertDelay(db.generateCompactionDelay()) } } + +func TestBiggerBlocksForOldOOOData(t *testing.T) { + var ( + ctx = context.Background() + lbls = labels.FromStrings("foo", "bar") + + day = 24 * time.Hour.Milliseconds() + hour = time.Hour.Milliseconds() + + currTs = time.Now().UnixMilli() + currDayStart = day * (currTs / day) + + expOOOSamples []chunks.Sample + ) + + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 10 * day + db := openTestDB(t, opts, nil) + db.DisableCompactions() + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + // 1 in-order sample. + app := db.Appender(ctx) + inOrderTs := currDayStart + (6 * hour) + ref, err := app.Append(0, lbls, inOrderTs, float64(inOrderTs)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // OOO samples till 5 days ago. + for ts := currDayStart - (5 * day); ts < inOrderTs; ts += hour { + app = db.Appender(ctx) + _, err := app.Append(ref, lbls, ts, float64(ts)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + expOOOSamples = append(expOOOSamples, sample{t: ts, f: float64(ts)}) + } + + require.Empty(t, db.Blocks()) + require.NoError(t, db.CompactOOOHead(ctx)) + // 5 OOO blocks from the last 5 days + 3 OOO blocks for the 6h of curr day (2h blocks) + require.Len(t, db.Blocks(), 8) + + // Check that blocks are alright. + // Move all the blocks to a new DB and check for all OOO samples + // getting into the new DB and the old DB only has the in-order sample. + newDB := openTestDB(t, opts, nil) + t.Cleanup(func() { + require.NoError(t, newDB.Close()) + }) + for _, b := range db.Blocks() { + err := os.Rename(b.Dir(), path.Join(newDB.Dir(), b.Meta().ULID.String())) + require.NoError(t, err) + } + + require.NoError(t, db.reloadBlocks()) + require.NoError(t, newDB.reloadBlocks()) + require.Empty(t, db.Blocks()) + require.Len(t, newDB.Blocks(), 8) + + // Only in-order sample in the old DB. + querier, err := db.Querier(inOrderTs-6*day, inOrderTs+1) + require.NoError(t, err) + seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Equal(t, map[string][]chunks.Sample{`{foo="bar"}`: {sample{t: inOrderTs, f: float64(inOrderTs)}}}, seriesSet) + + // All OOO samples in the new DB. + querier, err = newDB.Querier(inOrderTs-6*day, inOrderTs+1) + require.NoError(t, err) + seriesSet = query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Equal(t, map[string][]chunks.Sample{`{foo="bar"}`: expOOOSamples}, seriesSet) +} From 1eb3a81e3ef56536297022965c9cc8f17aa10ebc Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Thu, 21 Nov 2024 13:32:25 -0500 Subject: [PATCH 2/2] Add a EnableBiggerOOOBlockForOldSamples option for the TSDB (#763) Signed-off-by: Ganesh Vernekar --- tsdb/db.go | 27 +++++++++++++++++---------- tsdb/db_test.go | 1 + 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 659f7a596..d8a866e64 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -199,6 +199,11 @@ type Options struct { // OOO Native Histogram ingestion is complete. EnableOOONativeHistograms bool + // EnableBiggerOOOBlockForOldSamples enables building 24h blocks for the OOO samples + // that belong to the previous day. This is in-line with Mimir maintaining 24h blocks + // for the previous days. + EnableBiggerOOOBlockForOldSamples bool + // OutOfOrderTimeWindow specifies how much out of order is allowed, if any. // This can change during run-time, so this value from here should only be used // while initialising. @@ -1513,19 +1518,21 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID return nil } - day := 24 * time.Hour.Milliseconds() - maxtFor24hBlock := day * (db.Head().MaxTime() / day) + oooStart := oooHeadMint + if db.opts.EnableBiggerOOOBlockForOldSamples { + day := 24 * time.Hour.Milliseconds() + maxtFor24hBlock := day * (db.Head().MaxTime() / day) - // 24h blocks for data that is for the previous days - for t := day * (oooHeadMint / day); t < maxtFor24hBlock; t += day { - if err := runCompaction(t, t+day); err != nil { - return nil, err + // 24h blocks for data that is for the previous days + for t := day * (oooHeadMint / day); t < maxtFor24hBlock; t += day { + if err := runCompaction(t, t+day); err != nil { + return nil, err + } } - } - oooStart := oooHeadMint - if oooStart < maxtFor24hBlock { - oooStart = maxtFor24hBlock + if oooStart < maxtFor24hBlock { + oooStart = maxtFor24hBlock + } } for t := blockSize * (oooStart / blockSize); t <= oooHeadMaxt; t += blockSize { if err := runCompaction(t, t+blockSize); err != nil { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index cdfcf209a..64adfac51 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -8987,6 +8987,7 @@ func TestBiggerBlocksForOldOOOData(t *testing.T) { opts := DefaultOptions() opts.OutOfOrderTimeWindow = 10 * day + opts.EnableBiggerOOOBlockForOldSamples = true db := openTestDB(t, opts, nil) db.DisableCompactions() t.Cleanup(func() {