diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 937865d226a2..3fffcb2dc4c1 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -393,6 +393,7 @@ impl MitoEngineInner { .id(region_id) .name(®ion_name) .row_key(row_key.clone()) + .compaction_time_window(request.table_options.compaction_time_window) .default_cf(default_cf.clone()) .build() .context(BuildRegionDescriptorSnafu { @@ -406,6 +407,7 @@ impl MitoEngineInner { .write_buffer_size .map(|size| size.0 as usize), ttl: request.table_options.ttl, + compaction_time_window: request.table_options.compaction_time_window, }; let region = self @@ -512,6 +514,7 @@ impl MitoEngineInner { .write_buffer_size .map(|s| s.0 as usize), ttl: table_info.meta.options.ttl, + compaction_time_window: table_info.meta.options.compaction_time_window, }; debug!( diff --git a/src/mito/src/engine/procedure/create.rs b/src/mito/src/engine/procedure/create.rs index 174a45ef88b7..31eda68a510a 100644 --- a/src/mito/src/engine/procedure/create.rs +++ b/src/mito/src/engine/procedure/create.rs @@ -163,15 +163,18 @@ impl CreateMitoTable { let table_options = &self.data.request.table_options; let write_buffer_size = table_options.write_buffer_size.map(|size| size.0 as usize); let ttl = table_options.ttl; + let compaction_time_window = table_options.compaction_time_window; let open_opts = OpenOptions { parent_dir: table_dir.clone(), write_buffer_size, ttl, + compaction_time_window, }; let create_opts = CreateOptions { parent_dir: table_dir, write_buffer_size, ttl, + compaction_time_window, }; let primary_key_indices = &self.data.request.primary_key_indices; @@ -216,6 +219,7 @@ impl CreateMitoTable { .name(region_name.clone()) .row_key(row_key.clone()) .default_cf(default_cf.clone()) + .compaction_time_window(compaction_time_window) .build() .context(BuildRegionDescriptorSnafu { table_name: &self.data.request.table_name, diff --git a/src/storage/benches/memtable/util/regiondesc_util.rs b/src/storage/benches/memtable/util/regiondesc_util.rs index d881f4aaf0e0..a378f3842c20 100644 --- a/src/storage/benches/memtable/util/regiondesc_util.rs +++ b/src/storage/benches/memtable/util/regiondesc_util.rs @@ -67,6 +67,7 @@ impl RegionDescBuilder { row_key: self.key_builder.build().unwrap(), default_cf: self.default_cf_builder.build().unwrap(), extra_cfs: Vec::new(), + compaction_time_window: None, } } diff --git a/src/storage/src/compaction/noop.rs b/src/storage/src/compaction/noop.rs index ebb6ecda1ff7..50ea24a5908c 100644 --- a/src/storage/src/compaction/noop.rs +++ b/src/storage/src/compaction/noop.rs @@ -17,7 +17,7 @@ use std::marker::PhantomData; use store_api::storage::RegionId; -use crate::compaction::{CompactionTask, Picker, PickerContext}; +use crate::compaction::{CompactionTask, Picker}; use crate::error::Result; use crate::scheduler::{Request, Scheduler}; @@ -49,7 +49,7 @@ impl Picker for NoopCompactionPicker { type Request = NoopCompactionRequest; type Task = NoopCompactionTask; - fn pick(&self, _ctx: &PickerContext, _req: &Self::Request) -> Result> { + fn pick(&self, _req: &Self::Request) -> Result> { Ok(None) } } diff --git a/src/storage/src/compaction/picker.rs b/src/storage/src/compaction/picker.rs index 3cba9e2a3667..96f25eb3a8ab 100644 --- a/src/storage/src/compaction/picker.rs +++ b/src/storage/src/compaction/picker.rs @@ -35,14 +35,24 @@ pub trait Picker: Send + 'static { type Request: Request; type Task: CompactionTask; - fn pick( - &self, - ctx: &PickerContext, - req: &Self::Request, - ) -> crate::error::Result>; + fn pick(&self, req: &Self::Request) -> crate::error::Result>; } -pub struct PickerContext {} +pub struct PickerContext { + compaction_time_window: Option, +} + +impl PickerContext { + pub fn with(compaction_time_window: Option) -> Self { + Self { + compaction_time_window, + } + } + + pub fn compaction_time_window(&self) -> Option { + self.compaction_time_window + } +} /// L0 -> L1 compaction based on time windows. pub struct SimplePicker { @@ -89,7 +99,6 @@ impl Picker for SimplePicker { fn pick( &self, - ctx: &PickerContext, req: &CompactionRequestImpl, ) -> crate::error::Result>> { let levels = &req.levels(); @@ -110,9 +119,10 @@ impl Picker for SimplePicker { expired_ssts.iter().for_each(|f| f.mark_compacting(true)); } + let ctx = &PickerContext::with(req.compaction_time_window); for level_num in 0..levels.level_num() { let level = levels.level(level_num as u8); - let outputs = self.strategy.pick(ctx, level); + let (compaction_time_window, outputs) = self.strategy.pick(ctx, level); if outputs.is_empty() { debug!("No SST file can be compacted at level {}", level_num); @@ -133,6 +143,7 @@ impl Picker for SimplePicker { manifest: req.manifest.clone(), expired_ssts, sst_write_buffer_size: req.sst_write_buffer_size, + compaction_time_window, })); } diff --git a/src/storage/src/compaction/scheduler.rs b/src/storage/src/compaction/scheduler.rs index 2e20102dea47..d5fe26245142 100644 --- a/src/storage/src/compaction/scheduler.rs +++ b/src/storage/src/compaction/scheduler.rs @@ -22,7 +22,7 @@ use store_api::storage::RegionId; use tokio::sync::oneshot::Sender; use tokio::sync::Notify; -use crate::compaction::picker::{Picker, PickerContext}; +use crate::compaction::picker::Picker; use crate::compaction::task::CompactionTask; use crate::error::Result; use crate::manifest::region::RegionManifest; @@ -59,6 +59,7 @@ pub struct CompactionRequestImpl { pub manifest: RegionManifest, pub wal: Wal, pub ttl: Option, + pub compaction_time_window: Option, /// Compaction result sender. pub sender: Option>>, @@ -101,7 +102,7 @@ where finish_notifier: Arc, ) -> Result<()> { let region_id = req.key(); - let Some(task) = self.picker.pick(&PickerContext {}, &req)? else { + let Some(task) = self.picker.pick(&req)? else { info!("No file needs compaction in region: {:?}", region_id); req.complete(Ok(())); return Ok(()); diff --git a/src/storage/src/compaction/strategy.rs b/src/storage/src/compaction/strategy.rs index 4679cf4489bb..48ecce1fc635 100644 --- a/src/storage/src/compaction/strategy.rs +++ b/src/storage/src/compaction/strategy.rs @@ -26,7 +26,7 @@ use crate::sst::{FileHandle, LevelMeta}; /// Compaction strategy that defines which SSTs need to be compacted at given level. pub trait Strategy { - fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> Vec; + fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> (Option, Vec); } pub type StrategyRef = Arc; @@ -37,29 +37,33 @@ pub type StrategyRef = Arc; pub struct SimpleTimeWindowStrategy {} impl Strategy for SimpleTimeWindowStrategy { - fn pick(&self, _ctx: &PickerContext, level: &LevelMeta) -> Vec { + fn pick(&self, ctx: &PickerContext, level: &LevelMeta) -> (Option, Vec) { // SimpleTimeWindowStrategy only handles level 0 to level 1 compaction. if level.level() != 0 { - return vec![]; + return (None, vec![]); } let files = find_compactable_files(level); debug!("Compactable files found: {:?}", files); if files.is_empty() { - return vec![]; + return (None, vec![]); } - - let time_bucket = infer_time_bucket(&files); + let time_bucket = ctx + .compaction_time_window() + .unwrap_or_else(|| infer_time_bucket(&files)); let buckets = calculate_time_buckets(time_bucket, &files); debug!("File bucket:{}, file groups: {:?}", time_bucket, buckets); - buckets - .into_iter() - .map(|(bound, files)| CompactionOutput { - output_level: 1, - bucket_bound: bound, - bucket: time_bucket, - inputs: files, - }) - .collect() + ( + Some(time_bucket), + buckets + .into_iter() + .map(|(bound, files)| CompactionOutput { + output_level: 1, + bucket_bound: bound, + bucket: time_bucket, + inputs: files, + }) + .collect(), + ) } } diff --git a/src/storage/src/compaction/task.rs b/src/storage/src/compaction/task.rs index 9257f01cecd9..87b077321cc8 100644 --- a/src/storage/src/compaction/task.rs +++ b/src/storage/src/compaction/task.rs @@ -48,6 +48,7 @@ pub struct CompactionTaskImpl { pub manifest: RegionManifest, pub expired_ssts: Vec, pub sst_write_buffer_size: ReadableSize, + pub compaction_time_window: Option, } impl Debug for CompactionTaskImpl { @@ -101,6 +102,7 @@ impl CompactionTaskImpl { } /// Writes updated SST info into manifest. + // TODO(etolbakov): we are not persisting inferred compaction_time_window (#1083)[https://github.com/GreptimeTeam/greptimedb/pull/1083] async fn write_manifest_and_apply( &self, output: HashSet, diff --git a/src/storage/src/engine.rs b/src/storage/src/engine.rs index 142ec5b806d5..0a80bcc1b7dc 100644 --- a/src/storage/src/engine.rs +++ b/src/storage/src/engine.rs @@ -298,6 +298,7 @@ impl EngineInner { name, &self.config, opts.ttl, + opts.compaction_time_window, ) .await?; @@ -340,6 +341,7 @@ impl EngineInner { ®ion_name, &self.config, opts.ttl, + opts.compaction_time_window, ) .await?; @@ -368,6 +370,7 @@ impl EngineInner { region_name: &str, config: &EngineConfig, ttl: Option, + compaction_time_window: Option, ) -> Result> { let parent_dir = util::normalize_dir(parent_dir); @@ -397,6 +400,7 @@ impl EngineInner { engine_config: self.config.clone(), file_purger: self.file_purger.clone(), ttl, + compaction_time_window, }) } } diff --git a/src/storage/src/manifest/action.rs b/src/storage/src/manifest/action.rs index ae3a366ee440..2d0b2eab663b 100644 --- a/src/storage/src/manifest/action.rs +++ b/src/storage/src/manifest/action.rs @@ -38,6 +38,8 @@ pub struct RawRegionMetadata { pub columns: RawColumnsMetadata, pub column_families: RawColumnFamiliesMetadata, pub version: VersionNumber, + /// Time window for compaction + pub compaction_time_window: Option, } /// Minimal data that could be used to persist and recover [ColumnsMetadata](crate::metadata::ColumnsMetadata). diff --git a/src/storage/src/metadata.rs b/src/storage/src/metadata.rs index 81d9a333b07d..f4f172f2d2a0 100644 --- a/src/storage/src/metadata.rs +++ b/src/storage/src/metadata.rs @@ -198,6 +198,8 @@ pub struct RegionMetadata { pub columns: ColumnsMetadataRef, column_families: ColumnFamiliesMetadata, version: VersionNumber, + /// Time window for compaction + compaction_time_window: Option, } impl RegionMetadata { @@ -216,6 +218,11 @@ impl RegionMetadata { &self.schema } + #[inline] + pub fn compaction_time_window(&self) -> Option { + self.compaction_time_window + } + #[inline] pub fn user_schema(&self) -> &SchemaRef { self.schema.user_schema() @@ -317,7 +324,8 @@ impl RegionMetadata { let mut builder = RegionDescriptorBuilder::default() .id(self.id) .name(&self.name) - .row_key(row_key); + .row_key(row_key) + .compaction_time_window(self.compaction_time_window); for (cf_id, cf) in &self.column_families.id_to_cfs { let mut cf_builder = ColumnFamilyDescriptorBuilder::default() @@ -350,6 +358,7 @@ impl From<&RegionMetadata> for RawRegionMetadata { columns: RawColumnsMetadata::from(&*data.columns), column_families: RawColumnFamiliesMetadata::from(&data.column_families), version: data.version, + compaction_time_window: data.compaction_time_window, } } } @@ -368,6 +377,7 @@ impl TryFrom for RegionMetadata { columns, column_families: raw.column_families.into(), version: raw.version, + compaction_time_window: raw.compaction_time_window, }) } } @@ -635,6 +645,7 @@ impl TryFrom for RegionMetadataBuilder { .name(desc.name) .id(desc.id) .row_key(desc.row_key)? + .compaction_time_window(desc.compaction_time_window) .add_column_family(desc.default_cf)?; for cf in desc.extra_cfs { builder = builder.add_column_family(cf)?; @@ -791,6 +802,7 @@ struct RegionMetadataBuilder { columns_meta_builder: ColumnsMetadataBuilder, cfs_meta_builder: ColumnFamiliesMetadataBuilder, version: VersionNumber, + compaction_time_window: Option, } impl Default for RegionMetadataBuilder { @@ -807,6 +819,7 @@ impl RegionMetadataBuilder { columns_meta_builder: ColumnsMetadataBuilder::default(), cfs_meta_builder: ColumnFamiliesMetadataBuilder::default(), version: Schema::INITIAL_VERSION, + compaction_time_window: None, } } @@ -831,6 +844,11 @@ impl RegionMetadataBuilder { Ok(self) } + fn compaction_time_window(mut self, compaction_time_window: Option) -> Self { + self.compaction_time_window = compaction_time_window; + self + } + fn add_column_family(mut self, cf: ColumnFamilyDescriptor) -> Result { let column_index_start = self.columns_meta_builder.columns.len(); let column_index_end = column_index_start + cf.columns.len(); @@ -861,6 +879,7 @@ impl RegionMetadataBuilder { columns, column_families: self.cfs_meta_builder.build(), version: self.version, + compaction_time_window: self.compaction_time_window, }) } } @@ -1065,6 +1084,7 @@ mod tests { .unwrap(); RegionMetadataBuilder::new() .name(TEST_REGION) + .compaction_time_window(None) .row_key(row_key) .unwrap() .add_column_family(cf) diff --git a/src/storage/src/region.rs b/src/storage/src/region.rs index f553bbfd03b0..1b17c6c32f50 100644 --- a/src/storage/src/region.rs +++ b/src/storage/src/region.rs @@ -156,6 +156,7 @@ pub struct StoreConfig { pub engine_config: Arc, pub file_purger: FilePurgerRef, pub ttl: Option, + pub compaction_time_window: Option, } pub type RecoverdMetadata = (SequenceNumber, (ManifestVersion, RawRegionMetadata)); @@ -232,6 +233,7 @@ impl RegionImpl { store_config.memtable_builder, store_config.engine_config.clone(), store_config.ttl, + store_config.compaction_time_window, )), wal, flush_strategy: store_config.flush_strategy, @@ -250,7 +252,7 @@ impl RegionImpl { pub async fn open( name: String, store_config: StoreConfig, - _opts: &OpenOptions, + opts: &OpenOptions, ) -> Result>> { // Load version meta data from manifest. let (version, mut recovered_metadata) = match Self::recover_from_manifest( @@ -307,11 +309,14 @@ impl RegionImpl { name, version_control, }); - + let compaction_time_window = store_config + .compaction_time_window + .or(opts.compaction_time_window); let writer = Arc::new(RegionWriter::new( store_config.memtable_builder, store_config.engine_config.clone(), store_config.ttl, + compaction_time_window, )); let writer_ctx = WriterContext { shared: &shared, diff --git a/src/storage/src/region/writer.rs b/src/storage/src/region/writer.rs index 43214928117d..dea40d251e0b 100644 --- a/src/storage/src/region/writer.rs +++ b/src/storage/src/region/writer.rs @@ -68,9 +68,15 @@ impl RegionWriter { memtable_builder: MemtableBuilderRef, config: Arc, ttl: Option, + compaction_time_window: Option, ) -> RegionWriter { RegionWriter { - inner: Mutex::new(WriterInner::new(memtable_builder, config, ttl)), + inner: Mutex::new(WriterInner::new( + memtable_builder, + config, + ttl, + compaction_time_window, + )), version_mutex: Mutex::new(()), } } @@ -361,6 +367,7 @@ struct WriterInner { closed: bool, engine_config: Arc, ttl: Option, + compaction_time_window: Option, } impl WriterInner { @@ -368,6 +375,7 @@ impl WriterInner { memtable_builder: MemtableBuilderRef, engine_config: Arc, ttl: Option, + compaction_time_window: Option, ) -> WriterInner { WriterInner { memtable_builder, @@ -375,6 +383,7 @@ impl WriterInner { engine_config, closed: false, ttl, + compaction_time_window, } } @@ -638,7 +647,13 @@ impl WriterInner { return Ok(()); } - let cb = Self::build_flush_callback(¤t_version, ctx, &self.engine_config, self.ttl); + let cb = Self::build_flush_callback( + ¤t_version, + ctx, + &self.engine_config, + self.ttl, + self.compaction_time_window, + ); let flush_req = FlushJob { max_memtable_id: max_memtable_id.unwrap(), @@ -678,6 +693,7 @@ impl WriterInner { manifest: writer_ctx.manifest.clone(), wal: writer_ctx.wal.clone(), ttl: self.ttl, + compaction_time_window: self.compaction_time_window, sender: None, sst_write_buffer_size, }; @@ -725,6 +741,7 @@ impl WriterInner { ctx: &WriterContext, config: &Arc, ttl: Option, + compaction_time_window: Option, ) -> Option { let region_id = version.metadata().id(); let compaction_request = CompactionRequestImpl { @@ -735,6 +752,7 @@ impl WriterInner { manifest: ctx.manifest.clone(), wal: ctx.wal.clone(), ttl, + compaction_time_window, sender: None, sst_write_buffer_size: config.sst_write_buffer_size, }; diff --git a/src/storage/src/test_util/config_util.rs b/src/storage/src/test_util/config_util.rs index e498d5869a92..4faab3af899f 100644 --- a/src/storage/src/test_util/config_util.rs +++ b/src/storage/src/test_util/config_util.rs @@ -83,5 +83,6 @@ pub async fn new_store_config_with_object_store( engine_config: Default::default(), file_purger, ttl: None, + compaction_time_window: None, } } diff --git a/src/storage/src/test_util/descriptor_util.rs b/src/storage/src/test_util/descriptor_util.rs index a7bfdd23576b..4e81b1b0da7f 100644 --- a/src/storage/src/test_util/descriptor_util.rs +++ b/src/storage/src/test_util/descriptor_util.rs @@ -94,6 +94,7 @@ impl RegionDescBuilder { row_key: self.key_builder.build().unwrap(), default_cf: self.default_cf_builder.build().unwrap(), extra_cfs: Vec::new(), + compaction_time_window: None, } } diff --git a/src/store-api/src/storage/descriptors.rs b/src/store-api/src/storage/descriptors.rs index 86427a22e0dc..b373c5cd4c3d 100644 --- a/src/store-api/src/storage/descriptors.rs +++ b/src/store-api/src/storage/descriptors.rs @@ -149,6 +149,8 @@ pub struct RegionDescriptor { /// Extra column families defined by user. #[builder(default, setter(each(name = "push_extra_column_family")))] pub extra_cfs: Vec, + /// Time window for compaction + pub compaction_time_window: Option, } impl RowKeyDescriptorBuilder { diff --git a/src/store-api/src/storage/engine.rs b/src/store-api/src/storage/engine.rs index 8801dcaf810e..78bc5fc6310c 100644 --- a/src/store-api/src/storage/engine.rs +++ b/src/store-api/src/storage/engine.rs @@ -88,6 +88,7 @@ pub struct CreateOptions { pub write_buffer_size: Option, /// Region SST files TTL pub ttl: Option, + pub compaction_time_window: Option, } /// Options to open a region. @@ -99,4 +100,5 @@ pub struct OpenOptions { pub write_buffer_size: Option, /// Region SST files TTL pub ttl: Option, + pub compaction_time_window: Option, } diff --git a/src/store-api/src/storage/requests.rs b/src/store-api/src/storage/requests.rs index 94c56f537849..9338ff88d8d2 100644 --- a/src/store-api/src/storage/requests.rs +++ b/src/store-api/src/storage/requests.rs @@ -162,6 +162,7 @@ mod tests { .name("test") .row_key(row_key) .default_cf(default_cf) + .compaction_time_window(Some(1677652502)) .build() .unwrap() } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index ac36c374dfc4..18c92b7a047b 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -60,10 +60,13 @@ pub struct TableOptions { pub ttl: Option, /// Extra options that may not applicable to all table engines. pub extra_options: HashMap, + /// Time window for compaction + pub compaction_time_window: Option, } pub const WRITE_BUFFER_SIZE_KEY: &str = "write_buffer_size"; pub const TTL_KEY: &str = "ttl"; +pub const COMPACTION_TIME_WINDOW_KEY: &str = "compaction_time_window"; impl TryFrom<&HashMap> for TableOptions { type Error = error::Error; @@ -94,8 +97,20 @@ impl TryFrom<&HashMap> for TableOptions { .into(); options.ttl = Some(ttl_value); } + if let Some(compaction_time_window) = value.get(COMPACTION_TIME_WINDOW_KEY) { + options.compaction_time_window = match compaction_time_window.parse::() { + Ok(t) => Some(t), + Err(_) => { + return ParseTableOptionSnafu { + key: COMPACTION_TIME_WINDOW_KEY, + value: compaction_time_window, + } + .fail() + } + }; + } options.extra_options = HashMap::from_iter(value.iter().filter_map(|(k, v)| { - if k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY { + if k != WRITE_BUFFER_SIZE_KEY && k != TTL_KEY && k != COMPACTION_TIME_WINDOW_KEY { Some((k.clone(), v.clone())) } else { None @@ -118,6 +133,12 @@ impl From<&TableOptions> for HashMap { let ttl_str = humantime::format_duration(ttl).to_string(); res.insert(TTL_KEY.to_string(), ttl_str); } + if let Some(compaction_time_window) = opts.compaction_time_window { + res.insert( + COMPACTION_TIME_WINDOW_KEY.to_string(), + compaction_time_window.to_string(), + ); + } res.extend( opts.extra_options .iter() @@ -229,6 +250,7 @@ mod tests { write_buffer_size: None, ttl: Some(Duration::from_secs(1000)), extra_options: HashMap::new(), + compaction_time_window: Some(1677652502), }; let serialized = serde_json::to_string(&options).unwrap(); let deserialized: TableOptions = serde_json::from_str(&serialized).unwrap(); @@ -241,6 +263,7 @@ mod tests { write_buffer_size: Some(ReadableSize::mb(128)), ttl: Some(Duration::from_secs(1000)), extra_options: HashMap::new(), + compaction_time_window: Some(1677652502), }; let serialized_map = HashMap::from(&options); let serialized = TableOptions::try_from(&serialized_map).unwrap(); @@ -250,6 +273,7 @@ mod tests { write_buffer_size: None, ttl: None, extra_options: HashMap::new(), + compaction_time_window: None, }; let serialized_map = HashMap::from(&options); let serialized = TableOptions::try_from(&serialized_map).unwrap(); @@ -259,6 +283,7 @@ mod tests { write_buffer_size: Some(ReadableSize::mb(128)), ttl: Some(Duration::from_secs(1000)), extra_options: HashMap::from([("a".to_string(), "A".to_string())]), + compaction_time_window: Some(1677652502), }; let serialized_map = HashMap::from(&options); let serialized = TableOptions::try_from(&serialized_map).unwrap();