From 800eb4c5a20a294e89f1fcf12ece2e2a28065f99 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 20 Nov 2024 17:15:02 -0500 Subject: [PATCH] Build 24h blocks for older OOO data (#751) * Build 24h blocks for older OOO data Signed-off-by: Ganesh Vernekar * Fix tests Signed-off-by: Ganesh Vernekar * Add TestBiggerBlocksForOldOOOData Signed-off-by: Ganesh Vernekar * lint Signed-off-by: Ganesh Vernekar * Fix comments Signed-off-by: Ganesh Vernekar * Fix comment Signed-off-by: Ganesh Vernekar --------- Signed-off-by: Ganesh Vernekar --- tsdb/db.go | 26 +++++++++++++++-- tsdb/db_test.go | 76 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 98 insertions(+), 4 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 9f40a2c49..659f7a596 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1503,14 +1503,34 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID meta := &BlockMeta{} meta.Compaction.SetOutOfOrder() - for t := blockSize * (oooHeadMint / blockSize); t <= oooHeadMaxt; t += blockSize { - mint, maxt := t, t+blockSize + runCompaction := func(mint, maxt int64) error { // Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes. uids, err := db.compactor.Write(dest, oooHead.CloneForTimeRange(mint, maxt-1), mint, maxt, meta) if err != nil { - return nil, err + return err } ulids = append(ulids, uids...) + return nil + } + + day := 24 * time.Hour.Milliseconds() + maxtFor24hBlock := day * (db.Head().MaxTime() / day) + + // 24h blocks for data that is for the previous days + for t := day * (oooHeadMint / day); t < maxtFor24hBlock; t += day { + if err := runCompaction(t, t+day); err != nil { + return nil, err + } + } + + oooStart := oooHeadMint + if oooStart < maxtFor24hBlock { + oooStart = maxtFor24hBlock + } + for t := blockSize * (oooStart / blockSize); t <= oooHeadMaxt; t += blockSize { + if err := runCompaction(t, t+blockSize); err != nil { + return nil, err + } } if len(ulids) == 0 { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 9a5dc0e2a..8773dee75 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -7518,7 +7518,7 @@ func testOutOfOrderRuntimeConfig(t *testing.T, scenario sampleTypeScenario) { require.Positive(t, size) require.Empty(t, db.Blocks()) - require.NoError(t, db.compactOOOHead(ctx)) + require.NoError(t, db.CompactOOOHead(ctx)) require.NotEmpty(t, db.Blocks()) // WBL is empty. @@ -8970,3 +8970,77 @@ func TestGenerateCompactionDelay(t *testing.T) { assertDelay(db.generateCompactionDelay()) } } + +func TestBiggerBlocksForOldOOOData(t *testing.T) { + var ( + ctx = context.Background() + lbls = labels.FromStrings("foo", "bar") + + day = 24 * time.Hour.Milliseconds() + hour = time.Hour.Milliseconds() + + currTs = time.Now().UnixMilli() + currDayStart = day * (currTs / day) + + expOOOSamples []chunks.Sample + ) + + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 10 * day + db := openTestDB(t, opts, nil) + db.DisableCompactions() + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + // 1 in-order sample. + app := db.Appender(ctx) + inOrderTs := currDayStart + (6 * hour) + ref, err := app.Append(0, lbls, inOrderTs, float64(inOrderTs)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // OOO samples till 5 days ago. + for ts := currDayStart - (5 * day); ts < inOrderTs; ts += hour { + app = db.Appender(ctx) + _, err := app.Append(ref, lbls, ts, float64(ts)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + expOOOSamples = append(expOOOSamples, sample{t: ts, f: float64(ts)}) + } + + require.Empty(t, db.Blocks()) + require.NoError(t, db.CompactOOOHead(ctx)) + // 5 OOO blocks from the last 5 days + 3 OOO blocks for the 6h of curr day (2h blocks) + require.Len(t, db.Blocks(), 8) + + // Check that blocks are alright. + // Move all the blocks to a new DB and check for all OOO samples + // getting into the new DB and the old DB only has the in-order sample. + newDB := openTestDB(t, opts, nil) + t.Cleanup(func() { + require.NoError(t, newDB.Close()) + }) + for _, b := range db.Blocks() { + err := os.Rename(b.Dir(), path.Join(newDB.Dir(), b.Meta().ULID.String())) + require.NoError(t, err) + } + + require.NoError(t, db.reloadBlocks()) + require.NoError(t, newDB.reloadBlocks()) + require.Empty(t, db.Blocks()) + require.Len(t, newDB.Blocks(), 8) + + // Only in-order sample in the old DB. + querier, err := db.Querier(inOrderTs-6*day, inOrderTs+1) + require.NoError(t, err) + seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Equal(t, map[string][]chunks.Sample{`{foo="bar"}`: {sample{t: inOrderTs, f: float64(inOrderTs)}}}, seriesSet) + + // All OOO samples in the new DB. + querier, err = newDB.Querier(inOrderTs-6*day, inOrderTs+1) + require.NoError(t, err) + seriesSet = query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Equal(t, map[string][]chunks.Sample{`{foo="bar"}`: expOOOSamples}, seriesSet) +} +>>>>>>> 3931e9c4f (Build 24h blocks for older OOO data (#751))