From 76e8c5063a584a2e9b5f8016e601d04f9b5dd7aa Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Mon, 22 May 2023 19:16:15 +0800 Subject: [PATCH 1/2] chore: simplify compact logs --- analytic_engine/src/compaction/mod.rs | 28 +++++++++++++++++-- analytic_engine/src/compaction/picker.rs | 7 +++-- .../src/instance/flush_compaction.rs | 2 +- analytic_engine/src/sst/file.rs | 1 - 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/analytic_engine/src/compaction/mod.rs b/analytic_engine/src/compaction/mod.rs index 79a9708b40..754a6a28a8 100644 --- a/analytic_engine/src/compaction/mod.rs +++ b/analytic_engine/src/compaction/mod.rs @@ -2,7 +2,7 @@ //! Compaction. -use std::{collections::HashMap, str::FromStr, sync::Arc}; +use std::{collections::HashMap, fmt, str::FromStr, sync::Arc}; use common_util::config::{ReadableSize, TimeUnit}; use serde::{Deserialize, Serialize}; @@ -316,7 +316,7 @@ pub struct ExpiredFiles { pub files: Vec, } -#[derive(Debug, Default, Clone)] +#[derive(Default, Clone)] pub struct CompactionTask { pub compaction_inputs: Vec, pub expired: Vec, @@ -352,6 +352,30 @@ impl CompactionTask { } } +impl fmt::Debug for CompactionTask { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("CompactionTask") + .field("inputs", &self.compaction_inputs) + .field( + "expired", + &self + .expired + .iter() + .map(|expired| { + format!( + "level:{}, files:{:?}", + expired.level, + expired.files.iter().map(|f| f.id()) + ) + }) + // only print first 10 files + .take(10) + .collect::>(), + ) + .finish() + } +} + pub struct PickerManager { default_picker: CompactionPickerRef, time_window_picker: CompactionPickerRef, diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index c2b5b96002..b1ab07efe7 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -484,6 +484,7 @@ impl TimeWindowPicker { let all_keys: BTreeSet<_> = buckets.keys().collect(); + // First compact latest buckets for key in all_keys.into_iter().rev() { if let Some(bucket) = buckets.get(key) { debug!("Key {}, now {}", key, now); @@ -577,7 +578,7 @@ impl LevelPicker for TimeWindowPicker { debug!("TWCS compaction options: {:?}", opts); - let (buckets, ts) = Self::get_buckets( + let (buckets, max_bucket_ts) = Self::get_buckets( &uncompact_files, &ctx.segment_duration, opts.timestamp_resolution, @@ -589,8 +590,8 @@ impl LevelPicker for TimeWindowPicker { &ctx.segment_duration, opts.timestamp_resolution, ); - debug!("now {}, max_ts: {}", now, ts); - assert!(now >= ts); + debug!("now {}, max_ts: {}", now, max_bucket_ts); + assert!(now >= max_bucket_ts); Self::newest_bucket(buckets, opts.size_tiered, now) } diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index eb460c60f9..64cd91e014 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -704,7 +704,7 @@ impl SpaceStore { } info!( - "try do compaction for table:{}#{}, estimated input files size:{}, input files number:{}", + "Try do compaction for table:{}#{}, estimated input files size:{}, input files number:{}", table_data.name, table_data.id, task.estimated_total_input_file_size(), diff --git a/analytic_engine/src/sst/file.rs b/analytic_engine/src/sst/file.rs index 13bdfbcae1..1b7d5a160c 100644 --- a/analytic_engine/src/sst/file.rs +++ b/analytic_engine/src/sst/file.rs @@ -253,7 +253,6 @@ impl fmt::Debug for FileHandle { f.debug_struct("FileHandle") .field("meta", &self.inner.meta) .field("being_compacted", &self.being_compacted()) - .field("metrics", &self.inner.metrics) .finish() } } From da6899c4c97089d4fbd8aa8a01db98ab8366d5bc Mon Sep 17 00:00:00 2001 From: jiacai2050 Date: Tue, 23 May 2023 10:37:30 +0800 Subject: [PATCH 2/2] limit sst input table size for old bucket --- analytic_engine/src/compaction/picker.rs | 61 +++++++++++++----------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index b1ab07efe7..314b4d17aa 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -160,6 +160,24 @@ fn find_uncompact_files( .collect() } +// Trim the largest sstables off the end to meet the `max_threshold` and +// `max_input_sstable_size` +fn trim_to_threshold( + input_files: Vec, + max_threshold: usize, + max_input_sstable_size: u64, +) -> Vec { + let mut input_size = 0; + input_files + .into_iter() + .take(max_threshold) + .take_while(|f| { + input_size += f.size(); + input_size <= max_input_sstable_size + }) + .collect() +} + /// Size tiered compaction strategy /// See https://github.com/jeffjirsa/twcs/blob/master/src/main/java/com/jeffjirsa/cassandra/db/compaction/SizeTieredCompactionStrategy.java #[derive(Default)] @@ -394,18 +412,7 @@ impl SizeTieredPicker { .reverse() }); - // and then trim the coldest sstables off the end to meet the max_threshold - let len = sorted_files.len(); - let mut input_size = 0; - let pruned_bucket: Vec = sorted_files - .into_iter() - .take(std::cmp::min(max_threshold, len)) - .take_while(|f| { - input_size += f.size(); - input_size <= max_input_sstable_size - }) - .collect(); - + let pruned_bucket = trim_to_threshold(sorted_files, max_threshold, max_input_sstable_size); // bucket hotness is the sum of the hotness of all sstable members let bucket_hotness = pruned_bucket.iter().map(Bucket::hotness).sum(); @@ -489,6 +496,7 @@ impl TimeWindowPicker { if let Some(bucket) = buckets.get(key) { debug!("Key {}, now {}", key, now); + let max_input_sstable_size = size_tiered_opts.max_input_sstable_size.as_byte(); if bucket.len() >= size_tiered_opts.min_threshold && *key >= now { // If we're in the newest bucket, we'll use STCS to prioritize sstables let buckets = SizeTieredPicker::get_buckets( @@ -501,7 +509,7 @@ impl TimeWindowPicker { buckets, size_tiered_opts.min_threshold, size_tiered_opts.max_threshold, - size_tiered_opts.max_input_sstable_size.as_byte(), + max_input_sstable_size, ); if files.is_some() { @@ -509,9 +517,14 @@ impl TimeWindowPicker { } } else if bucket.len() >= 2 && *key < now { debug!("Bucket size {} >= 2 and not in current bucket, compacting what's here: {:?}", bucket.len(), bucket); - return Some(Self::trim_to_threshold( - bucket, + // Sort by sstable file size + let mut sorted_files = bucket.to_vec(); + sorted_files.sort_unstable_by_key(FileHandle::size); + + return Some(trim_to_threshold( + sorted_files, size_tiered_opts.max_threshold, + max_input_sstable_size, )); } else { debug!( @@ -527,19 +540,6 @@ impl TimeWindowPicker { None } - fn trim_to_threshold(files: &[FileHandle], max_threshold: usize) -> Vec { - // Sort by sstable file size - let mut sorted_files = files.to_vec(); - sorted_files.sort_unstable_by_key(FileHandle::size); - - // Trim the largest sstables off the end to meet the maxThreshold - let len = sorted_files.len(); - sorted_files - .into_iter() - .take(std::cmp::min(max_threshold, len)) - .collect() - } - /// Get current window timestamp, the caller MUST ensure the level has ssts, /// panic otherwise. fn get_current_window( @@ -590,7 +590,10 @@ impl LevelPicker for TimeWindowPicker { &ctx.segment_duration, opts.timestamp_resolution, ); - debug!("now {}, max_ts: {}", now, max_bucket_ts); + debug!( + "TWCS current window is {}, max_bucket_ts: {}", + now, max_bucket_ts + ); assert!(now >= max_bucket_ts); Self::newest_bucket(buckets, opts.size_tiered, now)