Skip to content

Commit

Permalink
refactor: make MergeTreeMemtable the default choice (#3430)
Browse files Browse the repository at this point in the history
* refactor: make MergeTreeMemtable the default choice

* refactor: reformat

* chore: add doc to config
  • Loading branch information
v0y4g3r authored Mar 5, 2024
1 parent 02b18fb commit 7183fa1
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions config/datanode.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,18 @@ mem_threshold_on_create = "64M"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""

[region_engine.mito.memtable]
# Memtable type.
# - "experimental": experimental memtable
# - "time_series": time-series memtable (deprecated)
type = "experimental"
# The max number of keys in one shard.
index_max_keys_per_shard = 8192
# The max rows of data inside the actively writing buffer in one shard.
data_freeze_threshold = 32768
# Max dictionary bytes.
fork_dictionary_bytes = "1GiB"

# Log options, see `standalone.example.toml`
# [logging]
# dir = "/tmp/greptimedb/logs"
Expand Down
12 changes: 12 additions & 0 deletions config/standalone.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,18 @@ mem_threshold_on_create = "64M"
# File system path to store intermediate files for external sorting (default `{data_home}/index_intermediate`).
intermediate_path = ""

[region_engine.mito.memtable]
# Memtable type.
# - "experimental": experimental memtable
# - "time_series": time-series memtable (deprecated)
type = "experimental"
# The max number of keys in one shard.
index_max_keys_per_shard = 8192
# The max rows of data inside the actively writing buffer in one shard.
data_freeze_threshold = 32768
# Max dictionary bytes.
fork_dictionary_bytes = "1GiB"

# Log options
# [logging]
# Specify logs directory.
Expand Down
1 change: 1 addition & 0 deletions src/mito2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ common-test-util.workspace = true
criterion = "0.4"
log-store.workspace = true
rand.workspace = true
toml.workspace = true

[[bench]]
name = "bench_merge_tree"
Expand Down
30 changes: 26 additions & 4 deletions src/mito2/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use serde::{Deserialize, Serialize};
use serde_with::{serde_as, NoneAsEmptyString};

use crate::error::Result;
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::memtable::MemtableConfig;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;

/// Default max running background job.
Expand Down Expand Up @@ -104,8 +104,8 @@ pub struct MitoConfig {
/// Inverted index configs.
pub inverted_index: InvertedIndexConfig,

/// Experimental memtable.
pub experimental_memtable: Option<MergeTreeConfig>,
/// Memtable config
pub memtable: MemtableConfig,
}

impl Default for MitoConfig {
Expand All @@ -131,7 +131,7 @@ impl Default for MitoConfig {
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
allow_stale_entries: false,
inverted_index: InvertedIndexConfig::default(),
experimental_memtable: None,
memtable: MemtableConfig::default(),
};

// Adjust buffer and cache size according to system memory if we can.
Expand Down Expand Up @@ -319,3 +319,25 @@ fn divide_num_cpus(divisor: usize) -> usize {

(cores + divisor - 1) / divisor
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_deserialize_config() {
let s = r#"
[memtable]
type = "experimental"
index_max_keys_per_shard = 8192
data_freeze_threshold = 1024
dedup = true
fork_dictionary_bytes = "512MiB"
"#;
let config: MitoConfig = toml::from_str(s).unwrap();
let MemtableConfig::Experimental(config) = &config.memtable else {
unreachable!()
};
assert_eq!(1024, config.data_freeze_threshold);
}
}
47 changes: 42 additions & 5 deletions src/mito2/src/memtable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,47 @@

//! Memtables are write buffers for regions.
pub mod key_values;
pub mod merge_tree;
pub mod time_series;
pub(crate) mod version;

use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;

use common_time::Timestamp;
use serde::{Deserialize, Serialize};
use store_api::metadata::RegionMetadataRef;
use store_api::storage::ColumnId;
use table::predicate::Predicate;

use crate::error::Result;
use crate::flush::WriteBufferManagerRef;
pub use crate::memtable::key_values::KeyValues;
use crate::memtable::merge_tree::MergeTreeConfig;
use crate::metrics::WRITE_BUFFER_BYTES;
use crate::read::Batch;

pub mod key_values;
pub mod merge_tree;
pub mod time_series;
pub(crate) mod version;

/// Id for memtables.
///
/// Should be unique under the same region.
pub type MemtableId = u32;

/// Config for memtables.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum MemtableConfig {
Experimental(MergeTreeConfig),
TimeSeries,
}

impl Default for MemtableConfig {
fn default() -> Self {
Self::Experimental(MergeTreeConfig::default())
}
}

#[derive(Debug, Default)]
pub struct MemtableStats {
/// The estimated bytes allocated by this memtable from heap.
Expand Down Expand Up @@ -187,9 +203,30 @@ impl Drop for AllocTracker {

#[cfg(test)]
mod tests {
use common_base::readable_size::ReadableSize;

use super::*;
use crate::flush::{WriteBufferManager, WriteBufferManagerImpl};

#[test]
fn test_deserialize_memtable_config() {
let s = r#"
type = "experimental"
index_max_keys_per_shard = 8192
data_freeze_threshold = 1024
dedup = true
fork_dictionary_bytes = "512MiB"
"#;
let config: MemtableConfig = toml::from_str(s).unwrap();
let MemtableConfig::Experimental(merge_tree) = config else {
unreachable!()
};
assert!(merge_tree.dedup);
assert_eq!(8192, merge_tree.index_max_keys_per_shard);
assert_eq!(1024, merge_tree.data_freeze_threshold);
assert_eq!(ReadableSize::mb(512), merge_tree.fork_dictionary_bytes);
}

#[test]
fn test_alloc_tracker_without_manager() {
let tracker = AllocTracker::new(None);
Expand Down
6 changes: 3 additions & 3 deletions src/mito2/src/memtable/merge_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use crate::memtable::{
};

/// Use `1/DICTIONARY_SIZE_FACTOR` of OS memory as dictionary size.
const DICTIONARY_SIZE_FACTOR: u64 = 16;
const DICTIONARY_SIZE_FACTOR: u64 = 8;

/// Id of a shard, only unique inside a partition.
type ShardId = u32;
Expand Down Expand Up @@ -74,7 +74,7 @@ pub struct MergeTreeConfig {

impl Default for MergeTreeConfig {
fn default() -> Self {
let mut fork_dictionary_bytes = ReadableSize::mb(512);
let mut fork_dictionary_bytes = ReadableSize::gb(1);
if let Some(sys_memory) = common_config::utils::get_sys_total_memory() {
let adjust_dictionary_bytes =
std::cmp::min(sys_memory / DICTIONARY_SIZE_FACTOR, fork_dictionary_bytes);
Expand All @@ -85,7 +85,7 @@ impl Default for MergeTreeConfig {

Self {
index_max_keys_per_shard: 8192,
data_freeze_threshold: 102400,
data_freeze_threshold: 32768,
dedup: true,
fork_dictionary_bytes,
}
Expand Down
16 changes: 8 additions & 8 deletions src/mito2/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef
use crate::manifest::action::RegionEdit;
use crate::memtable::merge_tree::MergeTreeMemtableBuilder;
use crate::memtable::time_series::TimeSeriesMemtableBuilder;
use crate::memtable::MemtableBuilderRef;
use crate::memtable::{MemtableBuilderRef, MemtableConfig};
use crate::region::{MitoRegionRef, RegionMap, RegionMapRef};
use crate::request::{
BackgroundNotify, DdlRequest, SenderDdlRequest, SenderWriteRequest, WorkerRequest,
Expand Down Expand Up @@ -320,15 +320,15 @@ impl<S: LogStore> WorkerStarter<S> {
let (sender, receiver) = mpsc::channel(self.config.worker_channel_size);

let running = Arc::new(AtomicBool::new(true));
let memtable_builder = if let Some(config) = &self.config.experimental_memtable {
Arc::new(MergeTreeMemtableBuilder::new(
config.clone(),

let memtable_builder = match &self.config.memtable {
MemtableConfig::Experimental(merge_tree) => Arc::new(MergeTreeMemtableBuilder::new(
merge_tree.clone(),
Some(self.write_buffer_manager.clone()),
)) as _
} else {
Arc::new(TimeSeriesMemtableBuilder::new(Some(
)) as _,
MemtableConfig::TimeSeries => Arc::new(TimeSeriesMemtableBuilder::new(Some(
self.write_buffer_manager.clone(),
))) as _
))) as _,
};
let mut worker_thread = RegionWorkerLoop {
id: self.id,
Expand Down
7 changes: 7 additions & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,13 @@ write_buffer_size = "8MiB"
mem_threshold_on_create = "64.0MiB"
intermediate_path = ""
[datanode.region_engine.mito.memtable]
type = "experimental"
index_max_keys_per_shard = 8192
data_freeze_threshold = 32768
dedup = true
fork_dictionary_bytes = "1GiB"
[[datanode.region_engine]]
[datanode.region_engine.file]
Expand Down

0 comments on commit 7183fa1

Please sign in to comment.