From 6f5badf15a7211fb730583afdda6ab61e38d39ca Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Fri, 15 Nov 2024 13:58:40 -0500 Subject: [PATCH 1/6] Build 24h blocks for older OOO data Signed-off-by: Ganesh Vernekar --- tsdb/db.go | 43 +++++++++++++++++++++++++++++++++---------- tsdb/db_test.go | 20 ++++++++++---------- 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 4d721dca0..175e28a14 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1309,7 +1309,7 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) { } }() - lastBlockMaxt := int64(math.MinInt64) + lastBlockMaxt, firstBlockMint := int64(math.MinInt64), int64(math.MaxInt64) defer func() { errs := tsdb_errors.NewMulti(returnErr) if err := db.head.truncateWAL(lastBlockMaxt); err != nil { @@ -1367,6 +1367,9 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) { } // Consider only successful compactions for WAL truncation. lastBlockMaxt = maxt + if firstBlockMint != math.MaxInt64 { + firstBlockMint = mint + } } // Clear some disk space before compacting blocks, especially important @@ -1386,7 +1389,7 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) { if lastBlockMaxt != math.MinInt64 { // The head was compacted, so we compact OOO head as well. - if err := db.compactOOOHead(ctx); err != nil { + if err := db.compactOOOHead(ctx, firstBlockMint); err != nil { return fmt.Errorf("compact ooo head: %w", err) } } @@ -1422,17 +1425,17 @@ func (db *DB) CompactHeadWithoutTruncation(head *RangeHead) error { } // CompactOOOHead compacts the OOO Head. -func (db *DB) CompactOOOHead(ctx context.Context) error { +func (db *DB) CompactOOOHead(ctx context.Context, currDayTime time.Time) error { db.cmtx.Lock() defer db.cmtx.Unlock() - return db.compactOOOHead(ctx) + return db.compactOOOHead(ctx, currDayTime.UnixMilli()) } // Callback for testing. var compactOOOHeadTestingCallback func() -func (db *DB) compactOOOHead(ctx context.Context) error { +func (db *DB) compactOOOHead(ctx context.Context, firstBlockMint int64) error { if !db.oooWasEnabled.Load() { return nil } @@ -1446,7 +1449,7 @@ func (db *DB) compactOOOHead(ctx context.Context) error { compactOOOHeadTestingCallback = nil } - ulids, err := db.compactOOO(db.dir, oooHead) + ulids, err := db.compactOOO(db.dir, oooHead, firstBlockMint) if err != nil { return fmt.Errorf("compact ooo head: %w", err) } @@ -1486,7 +1489,7 @@ func (db *DB) compactOOOHead(ctx context.Context) error { // compactOOO creates a new block per possible block range in the compactor's directory from the OOO Head given. // Each ULID in the result corresponds to a block in a unique time range. -func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID, err error) { +func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead, firstBlockMint int64) (_ []ulid.ULID, err error) { start := time.Now() blockSize := oooHead.ChunkRange() @@ -1503,14 +1506,34 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID meta := &BlockMeta{} meta.Compaction.SetOutOfOrder() - for t := blockSize * (oooHeadMint / blockSize); t <= oooHeadMaxt; t += blockSize { - mint, maxt := t, t+blockSize + runCompaction := func(mint, maxt int64) error { // Block intervals are half-open: [b.MinTime, b.MaxTime). Block intervals are always +1 than the total samples it includes. uids, err := db.compactor.Write(dest, oooHead.CloneForTimeRange(mint, maxt-1), mint, maxt, meta) if err != nil { - return nil, err + return err } ulids = append(ulids, uids...) + return nil + } + + day := 24 * time.Hour.Milliseconds() + maxtFor24hBlock := day * (firstBlockMint / day) + + // 24h blocks for data that is for the previous days + for t := day * (oooHeadMint / day); t < maxtFor24hBlock; t += day { + if err := runCompaction(t, t+day); err != nil { + return nil, err + } + } + + oooStart := oooHeadMint + if oooStart < maxtFor24hBlock { + oooStart = maxtFor24hBlock + } + for t := blockSize * (oooStart / blockSize); t <= oooHeadMaxt; t += blockSize { + if err := runCompaction(t, t+blockSize); err != nil { + return nil, err + } } if len(ulids) == 0 { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index ed8dfa001..928c938d2 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -3772,7 +3772,7 @@ func TestQuerierShouldNotFailIfOOOCompactionOccursAfterRetrievingQuerier(t *test go func() { defer compactionComplete.Store(true) - require.NoError(t, db.CompactOOOHead(ctx)) + require.NoError(t, db.CompactOOOHead(ctx, time.Now())) require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.chunksRemoved)) }() @@ -3870,7 +3870,7 @@ func TestQuerierShouldNotFailIfOOOCompactionOccursAfterSelecting(t *testing.T) { go func() { defer compactionComplete.Store(true) - require.NoError(t, db.CompactOOOHead(ctx)) + require.NoError(t, db.CompactOOOHead(ctx, time.Now())) require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.chunksRemoved)) }() @@ -3961,7 +3961,7 @@ func TestQuerierShouldNotFailIfOOOCompactionOccursAfterRetrievingIterators(t *te go func() { defer compactionComplete.Store(true) - require.NoError(t, db.CompactOOOHead(ctx)) + require.NoError(t, db.CompactOOOHead(ctx, time.Now())) require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.chunksRemoved)) }() @@ -4993,7 +4993,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSample } // OOO compaction happens here. - require.NoError(t, db.CompactOOOHead(ctx)) + require.NoError(t, db.CompactOOOHead(ctx, time.Now())) // 3 blocks exist now. [0, 120), [120, 240), [240, 360) require.Len(t, db.Blocks(), 3) @@ -5399,7 +5399,7 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa // Compaction should also work fine. require.Empty(t, db.Blocks()) - require.NoError(t, db.CompactOOOHead(ctx)) + require.NoError(t, db.CompactOOOHead(ctx, time.Now())) require.Len(t, db.Blocks(), 1) // One block from OOO data. require.Equal(t, int64(0), db.Blocks()[0].MinTime()) require.Equal(t, 120*time.Minute.Milliseconds(), db.Blocks()[0].MaxTime()) @@ -6907,7 +6907,7 @@ func TestOOOHistogramCompactionWithCounterResets(t *testing.T) { require.Greater(t, f.Size(), int64(100)) // OOO compaction happens here. - require.NoError(t, db.CompactOOOHead(ctx)) + require.NoError(t, db.CompactOOOHead(ctx, time.Now())) // Check that blocks are created after compaction. require.Len(t, db.Blocks(), 5) @@ -7097,7 +7097,7 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) { originalCompactor := db.compactor db.compactor = &mockCompactorFailing{t: t} for i := 0; i < 5; i++ { - require.Error(t, db.CompactOOOHead(ctx)) + require.Error(t, db.CompactOOOHead(ctx, time.Now())) } require.Empty(t, db.Blocks()) @@ -7108,7 +7108,7 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) { verifyFirstWBLFileIs0(6) db.compactor = originalCompactor - require.NoError(t, db.CompactOOOHead(ctx)) + require.NoError(t, db.CompactOOOHead(ctx, time.Now())) oldBlocks := db.Blocks() require.Len(t, db.Blocks(), 3) @@ -7120,7 +7120,7 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) { // The failed compaction should not have left the ooo Head corrupted. // Hence, expect no new blocks with another OOO compaction call. - require.NoError(t, db.CompactOOOHead(ctx)) + require.NoError(t, db.CompactOOOHead(ctx, time.Now())) require.Len(t, db.Blocks(), 3) require.Equal(t, oldBlocks, db.Blocks()) @@ -7528,7 +7528,7 @@ func testOutOfOrderRuntimeConfig(t *testing.T, scenario sampleTypeScenario) { require.Positive(t, size) require.Empty(t, db.Blocks()) - require.NoError(t, db.compactOOOHead(ctx)) + require.NoError(t, db.CompactOOOHead(ctx, time.Now())) require.NotEmpty(t, db.Blocks()) // WBL is empty. From 083a6a02c0e619aa4153b3b9aece84613a144b21 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 20 Nov 2024 12:32:47 -0500 Subject: [PATCH 2/6] Fix tests Signed-off-by: Ganesh Vernekar --- tsdb/db.go | 21 +++++++++------------ tsdb/db_test.go | 20 ++++++++++---------- 2 files changed, 19 insertions(+), 22 deletions(-) diff --git a/tsdb/db.go b/tsdb/db.go index 175e28a14..117193ff5 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1309,7 +1309,7 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) { } }() - lastBlockMaxt, firstBlockMint := int64(math.MinInt64), int64(math.MaxInt64) + lastBlockMaxt := int64(math.MinInt64) defer func() { errs := tsdb_errors.NewMulti(returnErr) if err := db.head.truncateWAL(lastBlockMaxt); err != nil { @@ -1367,9 +1367,6 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) { } // Consider only successful compactions for WAL truncation. lastBlockMaxt = maxt - if firstBlockMint != math.MaxInt64 { - firstBlockMint = mint - } } // Clear some disk space before compacting blocks, especially important @@ -1389,7 +1386,7 @@ func (db *DB) Compact(ctx context.Context) (returnErr error) { if lastBlockMaxt != math.MinInt64 { // The head was compacted, so we compact OOO head as well. - if err := db.compactOOOHead(ctx, firstBlockMint); err != nil { + if err := db.compactOOOHead(ctx); err != nil { return fmt.Errorf("compact ooo head: %w", err) } } @@ -1425,17 +1422,18 @@ func (db *DB) CompactHeadWithoutTruncation(head *RangeHead) error { } // CompactOOOHead compacts the OOO Head. -func (db *DB) CompactOOOHead(ctx context.Context, currDayTime time.Time) error { +func (db *DB) CompactOOOHead(ctx context.Context) error { db.cmtx.Lock() defer db.cmtx.Unlock() - return db.compactOOOHead(ctx, currDayTime.UnixMilli()) + return db.compactOOOHead(ctx) } // Callback for testing. var compactOOOHeadTestingCallback func() -func (db *DB) compactOOOHead(ctx context.Context, firstBlockMint int64) error { +// firstBlockMint is in milliseconds. +func (db *DB) compactOOOHead(ctx context.Context) error { if !db.oooWasEnabled.Load() { return nil } @@ -1449,7 +1447,7 @@ func (db *DB) compactOOOHead(ctx context.Context, firstBlockMint int64) error { compactOOOHeadTestingCallback = nil } - ulids, err := db.compactOOO(db.dir, oooHead, firstBlockMint) + ulids, err := db.compactOOO(db.dir, oooHead) if err != nil { return fmt.Errorf("compact ooo head: %w", err) } @@ -1483,13 +1481,12 @@ func (db *DB) compactOOOHead(ctx context.Context, firstBlockMint int64) error { return fmt.Errorf("truncate ooo wbl: %w", err) } } - return nil } // compactOOO creates a new block per possible block range in the compactor's directory from the OOO Head given. // Each ULID in the result corresponds to a block in a unique time range. -func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead, firstBlockMint int64) (_ []ulid.ULID, err error) { +func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead) (_ []ulid.ULID, err error) { start := time.Now() blockSize := oooHead.ChunkRange() @@ -1517,7 +1514,7 @@ func (db *DB) compactOOO(dest string, oooHead *OOOCompactionHead, firstBlockMint } day := 24 * time.Hour.Milliseconds() - maxtFor24hBlock := day * (firstBlockMint / day) + maxtFor24hBlock := day * (db.Head().MaxTime() / day) // 24h blocks for data that is for the previous days for t := day * (oooHeadMint / day); t < maxtFor24hBlock; t += day { diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 928c938d2..1ddee19b6 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -3772,7 +3772,7 @@ func TestQuerierShouldNotFailIfOOOCompactionOccursAfterRetrievingQuerier(t *test go func() { defer compactionComplete.Store(true) - require.NoError(t, db.CompactOOOHead(ctx, time.Now())) + require.NoError(t, db.CompactOOOHead(ctx)) require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.chunksRemoved)) }() @@ -3870,7 +3870,7 @@ func TestQuerierShouldNotFailIfOOOCompactionOccursAfterSelecting(t *testing.T) { go func() { defer compactionComplete.Store(true) - require.NoError(t, db.CompactOOOHead(ctx, time.Now())) + require.NoError(t, db.CompactOOOHead(ctx)) require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.chunksRemoved)) }() @@ -3961,7 +3961,7 @@ func TestQuerierShouldNotFailIfOOOCompactionOccursAfterRetrievingIterators(t *te go func() { defer compactionComplete.Store(true) - require.NoError(t, db.CompactOOOHead(ctx, time.Now())) + require.NoError(t, db.CompactOOOHead(ctx)) require.Equal(t, float64(1), prom_testutil.ToFloat64(db.Head().metrics.chunksRemoved)) }() @@ -4993,7 +4993,7 @@ func testOOOCompaction(t *testing.T, scenario sampleTypeScenario, addExtraSample } // OOO compaction happens here. - require.NoError(t, db.CompactOOOHead(ctx, time.Now())) + require.NoError(t, db.CompactOOOHead(ctx)) // 3 blocks exist now. [0, 120), [120, 240), [240, 360) require.Len(t, db.Blocks(), 3) @@ -5399,7 +5399,7 @@ func testOOOQueryAfterRestartWithSnapshotAndRemovedWBL(t *testing.T, scenario sa // Compaction should also work fine. require.Empty(t, db.Blocks()) - require.NoError(t, db.CompactOOOHead(ctx, time.Now())) + require.NoError(t, db.CompactOOOHead(ctx)) require.Len(t, db.Blocks(), 1) // One block from OOO data. require.Equal(t, int64(0), db.Blocks()[0].MinTime()) require.Equal(t, 120*time.Minute.Milliseconds(), db.Blocks()[0].MaxTime()) @@ -6907,7 +6907,7 @@ func TestOOOHistogramCompactionWithCounterResets(t *testing.T) { require.Greater(t, f.Size(), int64(100)) // OOO compaction happens here. - require.NoError(t, db.CompactOOOHead(ctx, time.Now())) + require.NoError(t, db.CompactOOOHead(ctx)) // Check that blocks are created after compaction. require.Len(t, db.Blocks(), 5) @@ -7097,7 +7097,7 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) { originalCompactor := db.compactor db.compactor = &mockCompactorFailing{t: t} for i := 0; i < 5; i++ { - require.Error(t, db.CompactOOOHead(ctx, time.Now())) + require.Error(t, db.CompactOOOHead(ctx)) } require.Empty(t, db.Blocks()) @@ -7108,7 +7108,7 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) { verifyFirstWBLFileIs0(6) db.compactor = originalCompactor - require.NoError(t, db.CompactOOOHead(ctx, time.Now())) + require.NoError(t, db.CompactOOOHead(ctx)) oldBlocks := db.Blocks() require.Len(t, db.Blocks(), 3) @@ -7120,7 +7120,7 @@ func testOOOCompactionFailure(t *testing.T, scenario sampleTypeScenario) { // The failed compaction should not have left the ooo Head corrupted. // Hence, expect no new blocks with another OOO compaction call. - require.NoError(t, db.CompactOOOHead(ctx, time.Now())) + require.NoError(t, db.CompactOOOHead(ctx)) require.Len(t, db.Blocks(), 3) require.Equal(t, oldBlocks, db.Blocks()) @@ -7528,7 +7528,7 @@ func testOutOfOrderRuntimeConfig(t *testing.T, scenario sampleTypeScenario) { require.Positive(t, size) require.Empty(t, db.Blocks()) - require.NoError(t, db.CompactOOOHead(ctx, time.Now())) + require.NoError(t, db.CompactOOOHead(ctx)) require.NotEmpty(t, db.Blocks()) // WBL is empty. From 9d7c99eb867674ad9226206bdc1ff46cdea08242 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 20 Nov 2024 13:17:35 -0500 Subject: [PATCH 3/6] Add TestBiggerBlocksForOldOOOData Signed-off-by: Ganesh Vernekar --- tsdb/db_test.go | 73 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 1ddee19b6..29a49ee61 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -9087,3 +9087,76 @@ func TestBlockClosingBlockedDuringRemoteRead(t *testing.T) { case <-blockClosed: } } + +func TestBiggerBlocksForOldOOOData(t *testing.T) { + var ( + ctx = context.Background() + lbls = labels.FromStrings("foo", "bar") + + day = 24 * time.Hour.Milliseconds() + hour = time.Hour.Milliseconds() + + currTs = time.Now().UnixMilli() + currDayStart = day * (currTs / day) + + expOOOSamples []chunks.Sample + ) + + opts := DefaultOptions() + opts.OutOfOrderTimeWindow = 10 * day + db := openTestDB(t, opts, nil) + db.DisableCompactions() + t.Cleanup(func() { + require.NoError(t, db.Close()) + }) + + // 1 in-order sample. + app := db.Appender(ctx) + inOrderTs := currDayStart + (6 * hour) + ref, err := app.Append(0, lbls, inOrderTs, float64(inOrderTs)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + + // OOO samples till 5 days ago. + for ts := currDayStart - (5 * day); ts < inOrderTs; ts += hour { + app = db.Appender(ctx) + _, err := app.Append(ref, lbls, ts, float64(ts)) + require.NoError(t, err) + require.NoError(t, app.Commit()) + expOOOSamples = append(expOOOSamples, sample{t: ts, f: float64(ts)}) + } + + require.Len(t, db.Blocks(), 0) + require.NoError(t, db.CompactOOOHead(ctx)) + // 5 OOO blocks from the last 5 days + 3 OOO blocks for the 6h of curr day (2h blocks) + require.Len(t, db.Blocks(), 8) + + // Check that blocks are alright. + // Move all the blocks to a new DB and check for all OOO samples + // getting into the new DB and the old DB only has the in-order sample. + newDB := openTestDB(t, opts, nil) + t.Cleanup(func() { + require.NoError(t, newDB.Close()) + }) + for _, b := range db.Blocks() { + err := os.Rename(b.Dir(), path.Join(newDB.Dir(), b.Meta().ULID.String())) + require.NoError(t, err) + } + + require.NoError(t, db.reloadBlocks()) + require.NoError(t, newDB.reloadBlocks()) + require.Len(t, db.Blocks(), 0) + require.Len(t, newDB.Blocks(), 8) + + // Only in-order sample in the old DB. + querier, err := db.Querier(inOrderTs-6*day, inOrderTs+1) + require.NoError(t, err) + seriesSet := query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Equal(t, map[string][]chunks.Sample{`{foo="bar"}`: {sample{t: inOrderTs, f: float64(inOrderTs)}}}, seriesSet) + + // All OOO samples in the new DB. + querier, err = newDB.Querier(inOrderTs-6*day, inOrderTs+1) + require.NoError(t, err) + seriesSet = query(t, querier, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) + require.Equal(t, map[string][]chunks.Sample{`{foo="bar"}`: expOOOSamples}, seriesSet) +} From 88158a773b886eb4afee3f36a3a243eb2b72d5a7 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 20 Nov 2024 13:23:00 -0500 Subject: [PATCH 4/6] lint Signed-off-by: Ganesh Vernekar --- tsdb/db_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tsdb/db_test.go b/tsdb/db_test.go index 29a49ee61..06344f31a 100644 --- a/tsdb/db_test.go +++ b/tsdb/db_test.go @@ -9126,7 +9126,7 @@ func TestBiggerBlocksForOldOOOData(t *testing.T) { expOOOSamples = append(expOOOSamples, sample{t: ts, f: float64(ts)}) } - require.Len(t, db.Blocks(), 0) + require.Empty(t, db.Blocks()) require.NoError(t, db.CompactOOOHead(ctx)) // 5 OOO blocks from the last 5 days + 3 OOO blocks for the 6h of curr day (2h blocks) require.Len(t, db.Blocks(), 8) @@ -9145,7 +9145,7 @@ func TestBiggerBlocksForOldOOOData(t *testing.T) { require.NoError(t, db.reloadBlocks()) require.NoError(t, newDB.reloadBlocks()) - require.Len(t, db.Blocks(), 0) + require.Empty(t, db.Blocks()) require.Len(t, newDB.Blocks(), 8) // Only in-order sample in the old DB. From 183f54fd3644c6ae1dc9de7b5573510abe96081f Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 20 Nov 2024 15:46:42 -0500 Subject: [PATCH 5/6] Fix comments Signed-off-by: Ganesh Vernekar --- tsdb/db.go | 1 - 1 file changed, 1 deletion(-) diff --git a/tsdb/db.go b/tsdb/db.go index 117193ff5..143e466a9 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1432,7 +1432,6 @@ func (db *DB) CompactOOOHead(ctx context.Context) error { // Callback for testing. var compactOOOHeadTestingCallback func() -// firstBlockMint is in milliseconds. func (db *DB) compactOOOHead(ctx context.Context) error { if !db.oooWasEnabled.Load() { return nil From 60cac005031af0cc066120cb0e93b6df7a029b78 Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Wed, 20 Nov 2024 17:00:39 -0500 Subject: [PATCH 6/6] Fix comment Signed-off-by: Ganesh Vernekar --- tsdb/db.go | 1 + 1 file changed, 1 insertion(+) diff --git a/tsdb/db.go b/tsdb/db.go index 143e466a9..7fb4cac93 100644 --- a/tsdb/db.go +++ b/tsdb/db.go @@ -1480,6 +1480,7 @@ func (db *DB) compactOOOHead(ctx context.Context) error { return fmt.Errorf("truncate ooo wbl: %w", err) } } + return nil }