From 1de52c60cce154395d1025224fbcb8b639aba282 Mon Sep 17 00:00:00 2001 From: evenyag Date: Thu, 7 Nov 2024 00:49:12 +0800 Subject: [PATCH] fix: do not pick compacting/expired files --- src/mito2/src/compaction/twcs.rs | 18 ++++++++++++++++++ src/mito2/src/compaction/window.rs | 22 ++++++++++++++++++++++ src/mito2/src/test_util/memtable_util.rs | 4 ++-- 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 8af1d63eb2fa..c28b30ac05d3 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -298,6 +298,9 @@ fn assign_to_windows<'a>( let mut windows: HashMap = HashMap::new(); // Iterates all files and assign to time windows according to max timestamp for f in files { + if f.compacting() { + continue; + } let (_, end) = f.time_range(); let time_window = end .convert_to(TimeUnit::Second) @@ -444,6 +447,21 @@ mod tests { ); } + #[test] + fn test_assign_compacting_to_windows() { + let files = [ + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + new_file_handle(FileId::random(), 0, 999, 0), + ]; + files[0].set_compacting(true); + files[2].set_compacting(true); + let windows = assign_to_windows(files.iter(), 3); + assert_eq!(3, windows.get(&0).unwrap().files.len()); + } + /// (Window value, overlapping, files' time ranges in window) type ExpectedWindowSpec = (i64, bool, Vec<(i64, i64)>); diff --git a/src/mito2/src/compaction/window.rs b/src/mito2/src/compaction/window.rs index cf5f1721555a..9d6207bba298 100644 --- a/src/mito2/src/compaction/window.rs +++ b/src/mito2/src/compaction/window.rs @@ -352,6 +352,28 @@ mod tests { ); } + #[test] + fn test_assign_compacting_files_to_windows() { + let picker = WindowedCompactionPicker::new(Some(HOUR / 1000)); + let files = vec![ + (FileId::random(), 0, 2 * HOUR - 1, 0), + (FileId::random(), HOUR, HOUR * 3 - 1, 0), + ]; + let version = build_version(&files, Some(Duration::from_millis(3 * HOUR as u64))); + version.ssts.levels()[0] + .files() + .for_each(|f| f.set_compacting(true)); + let (outputs, expired_ssts, window_seconds) = picker.pick_inner( + RegionId::new(0, 0), + &version, + Timestamp::new_millisecond(HOUR * 3), + ); + + assert!(expired_ssts.is_empty()); + assert_eq!(HOUR / 1000, window_seconds); + assert!(outputs.is_empty()); + } + #[test] fn test_file_time_bucket_span() { assert_eq!( diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs index 55b42d61dd41..f1cc57aa3b51 100644 --- a/src/mito2/src/test_util/memtable_util.rs +++ b/src/mito2/src/test_util/memtable_util.rs @@ -34,8 +34,8 @@ use crate::error::Result; use crate::memtable::key_values::KeyValue; use crate::memtable::partition_tree::data::{timestamp_array_to_i64_slice, DataBatch, DataBuffer}; use crate::memtable::{ - BoxedBatchIterator, BulkPart, IterBuilder, KeyValues, Memtable, MemtableBuilder, MemtableId, - MemtableRange, MemtableRef, MemtableStats, + BoxedBatchIterator, BulkPart, KeyValues, Memtable, MemtableBuilder, MemtableId, MemtableRange, + MemtableRef, MemtableStats, }; use crate::row_converter::{McmpRowCodec, RowCodec, SortField};