diff --git a/Cargo.lock b/Cargo.lock index e031e23632a7..fb58d5b2b996 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5380,6 +5380,7 @@ dependencies = [ "tokio", "tokio-stream", "tokio-util", + "toml 0.8.8", "uuid", ] diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 3aa7eb4334f1..41f939d158d0 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -138,6 +138,18 @@ mem_threshold_on_create = "64M" # File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`). intermediate_path = "" +[region_engine.mito.memtable] +# Memtable type. +# - "experimental": experimental memtable +# - "time_series": time-series memtable (deprecated) +type = "experimental" +# The max number of keys in one shard. +index_max_keys_per_shard = 8192 +# The max rows of data inside the actively writing buffer in one shard. +data_freeze_threshold = 32768 +# Max dictionary bytes. +fork_dictionary_bytes = "1GiB" + # Log options, see `standalone.example.toml` # [logging] # dir = "/tmp/greptimedb/logs" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 710e79c0a62e..4483f481b8c3 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -244,6 +244,18 @@ mem_threshold_on_create = "64M" # File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`). intermediate_path = "" +[region_engine.mito.memtable] +# Memtable type. +# - "experimental": experimental memtable +# - "time_series": time-series memtable (deprecated) +type = "experimental" +# The max number of keys in one shard. +index_max_keys_per_shard = 8192 +# The max rows of data inside the actively writing buffer in one shard. +data_freeze_threshold = 32768 +# Max dictionary bytes. +fork_dictionary_bytes = "1GiB" + # Log options # [logging] # Specify logs directory. diff --git a/src/mito2/Cargo.toml b/src/mito2/Cargo.toml index 27605bc3fcfa..5a9a117fe2e3 100644 --- a/src/mito2/Cargo.toml +++ b/src/mito2/Cargo.toml @@ -76,6 +76,7 @@ common-test-util.workspace = true criterion = "0.4" log-store.workspace = true rand.workspace = true +toml.workspace = true [[bench]] name = "bench_merge_tree" diff --git a/src/mito2/src/config.rs b/src/mito2/src/config.rs index 2f97b5b7f697..ed0b1b214563 100644 --- a/src/mito2/src/config.rs +++ b/src/mito2/src/config.rs @@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize}; use serde_with::{serde_as, NoneAsEmptyString}; use crate::error::Result; -use crate::memtable::merge_tree::MergeTreeConfig; +use crate::memtable::MemtableConfig; use crate::sst::DEFAULT_WRITE_BUFFER_SIZE; /// Default max running background job. @@ -104,8 +104,8 @@ pub struct MitoConfig { /// Inverted index configs. pub inverted_index: InvertedIndexConfig, - /// Experimental memtable. - pub experimental_memtable: Option, + /// Memtable config + pub memtable: MemtableConfig, } impl Default for MitoConfig { @@ -131,7 +131,7 @@ impl Default for MitoConfig { parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, allow_stale_entries: false, inverted_index: InvertedIndexConfig::default(), - experimental_memtable: None, + memtable: MemtableConfig::default(), }; // Adjust buffer and cache size according to system memory if we can. @@ -319,3 +319,25 @@ fn divide_num_cpus(divisor: usize) -> usize { (cores + divisor - 1) / divisor } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_deserialize_config() { + let s = r#" +[memtable] +type = "experimental" +index_max_keys_per_shard = 8192 +data_freeze_threshold = 1024 +dedup = true +fork_dictionary_bytes = "512MiB" +"#; + let config: MitoConfig = toml::from_str(s).unwrap(); + let MemtableConfig::Experimental(config) = &config.memtable else { + unreachable!() + }; + assert_eq!(1024, config.data_freeze_threshold); + } +} diff --git a/src/mito2/src/memtable.rs b/src/mito2/src/memtable.rs index c5cd32bf0ec5..aa3d7e2bed71 100644 --- a/src/mito2/src/memtable.rs +++ b/src/mito2/src/memtable.rs @@ -14,16 +14,12 @@ //! Memtables are write buffers for regions. -pub mod key_values; -pub mod merge_tree; -pub mod time_series; -pub(crate) mod version; - use std::fmt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use common_time::Timestamp; +use serde::{Deserialize, Serialize}; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; use table::predicate::Predicate; @@ -31,14 +27,34 @@ use table::predicate::Predicate; use crate::error::Result; use crate::flush::WriteBufferManagerRef; pub use crate::memtable::key_values::KeyValues; +use crate::memtable::merge_tree::MergeTreeConfig; use crate::metrics::WRITE_BUFFER_BYTES; use crate::read::Batch; +pub mod key_values; +pub mod merge_tree; +pub mod time_series; +pub(crate) mod version; + /// Id for memtables. /// /// Should be unique under the same region. pub type MemtableId = u32; +/// Config for memtables. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum MemtableConfig { + Experimental(MergeTreeConfig), + TimeSeries, +} + +impl Default for MemtableConfig { + fn default() -> Self { + Self::Experimental(MergeTreeConfig::default()) + } +} + #[derive(Debug, Default)] pub struct MemtableStats { /// The estimated bytes allocated by this memtable from heap. @@ -187,9 +203,30 @@ impl Drop for AllocTracker { #[cfg(test)] mod tests { + use common_base::readable_size::ReadableSize; + use super::*; use crate::flush::{WriteBufferManager, WriteBufferManagerImpl}; + #[test] + fn test_deserialize_memtable_config() { + let s = r#" +type = "experimental" +index_max_keys_per_shard = 8192 +data_freeze_threshold = 1024 +dedup = true +fork_dictionary_bytes = "512MiB" +"#; + let config: MemtableConfig = toml::from_str(s).unwrap(); + let MemtableConfig::Experimental(merge_tree) = config else { + unreachable!() + }; + assert!(merge_tree.dedup); + assert_eq!(8192, merge_tree.index_max_keys_per_shard); + assert_eq!(1024, merge_tree.data_freeze_threshold); + assert_eq!(ReadableSize::mb(512), merge_tree.fork_dictionary_bytes); + } + #[test] fn test_alloc_tracker_without_manager() { let tracker = AllocTracker::new(None); diff --git a/src/mito2/src/memtable/merge_tree.rs b/src/mito2/src/memtable/merge_tree.rs index 2a7799524342..ff56cb2010e7 100644 --- a/src/mito2/src/memtable/merge_tree.rs +++ b/src/mito2/src/memtable/merge_tree.rs @@ -44,7 +44,7 @@ use crate::memtable::{ }; /// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size. -const DICTIONARY_SIZE_FACTOR: u64 = 16; +const DICTIONARY_SIZE_FACTOR: u64 = 8; /// Id of a shard, only unique inside a partition. type ShardId = u32; @@ -74,7 +74,7 @@ pub struct MergeTreeConfig { impl Default for MergeTreeConfig { fn default() -> Self { - let mut fork_dictionary_bytes = ReadableSize::mb(512); + let mut fork_dictionary_bytes = ReadableSize::gb(1); if let Some(sys_memory) = common_config::utils::get_sys_total_memory() { let adjust_dictionary_bytes = std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes); @@ -85,7 +85,7 @@ impl Default for MergeTreeConfig { Self { index_max_keys_per_shard: 8192, - data_freeze_threshold: 102400, + data_freeze_threshold: 32768, dedup: true, fork_dictionary_bytes, } diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs index 84590e5ba382..08db2002ac22 100644 --- a/src/mito2/src/worker.rs +++ b/src/mito2/src/worker.rs @@ -49,7 +49,7 @@ use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef use crate::manifest::action::RegionEdit; use crate::memtable::merge_tree::MergeTreeMemtableBuilder; use crate::memtable::time_series::TimeSeriesMemtableBuilder; -use crate::memtable::MemtableBuilderRef; +use crate::memtable::{MemtableBuilderRef, MemtableConfig}; use crate::region::{MitoRegionRef, RegionMap, RegionMapRef}; use crate::request::{ BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest, @@ -320,15 +320,15 @@ impl WorkerStarter { let (sender, receiver) = mpsc::channel(self.config.worker_channel_size); let running = Arc::new(AtomicBool::new(true)); - let memtable_builder = if let Some(config) = &self.config.experimental_memtable { - Arc::new(MergeTreeMemtableBuilder::new( - config.clone(), + + let memtable_builder = match &self.config.memtable { + MemtableConfig::Experimental(merge_tree) => Arc::new(MergeTreeMemtableBuilder::new( + merge_tree.clone(), Some(self.write_buffer_manager.clone()), - )) as _ - } else { - Arc::new(TimeSeriesMemtableBuilder::new(Some( + )) as _, + MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(Some( self.write_buffer_manager.clone(), - ))) as _ + ))) as _, }; let mut worker_thread = RegionWorkerLoop { id: self.id, diff --git a/tests-integration/tests/http.rs b/tests-integration/tests/http.rs index 1d62ed4a07d3..7f59e1dfc63c 100644 --- a/tests-integration/tests/http.rs +++ b/tests-integration/tests/http.rs @@ -785,6 +785,13 @@ write_buffer_size = "8MiB" mem_threshold_on_create = "64.0MiB" intermediate_path = "" +[datanode.region_engine.mito.memtable] +type = "experimental" +index_max_keys_per_shard = 8192 +data_freeze_threshold = 32768 +dedup = true +fork_dictionary_bytes = "1GiB" + [[datanode.region_engine]] [datanode.region_engine.file]