From 06dbb176b39f48d1de84f4e896701d588062b992 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 28 Dec 2024 14:43:32 +0800 Subject: [PATCH 1/9] feat: impl CacheStrategy --- src/mito2/src/cache.rs | 179 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 77577926a2a3..480a0e4cc154 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -55,6 +55,185 @@ const FILE_TYPE: &str = "file"; /// Metrics type key for selector result cache. const SELECTOR_RESULT_TYPE: &str = "selector_result"; +/// Cache strategies that may only enable a subset of caches. +#[derive(Clone)] +pub enum CacheStrategy { + /// Strategy for normal operations. + /// Doesn't disable any cache. + Normal(CacheManagerRef), + /// Strategy for compaction. + /// Disables some caches that are not needed during compaction. + /// Enables the write cache so that the compaction can read files cached + /// in the write cache and write the compacted files back to the write cache. + Compaction(CacheManagerRef), +} + +impl CacheStrategy { + /// Calls [CacheManager::get_parquet_meta_data()]. + pub async fn get_parquet_meta_data( + &self, + region_id: RegionId, + file_id: FileId, + ) -> Option> { + match self { + CacheStrategy::Normal(cache_manager) => { + cache_manager + .get_parquet_meta_data(region_id, file_id) + .await + } + CacheStrategy::Compaction(cache_manager) => { + cache_manager + .get_parquet_meta_data(region_id, file_id) + .await + } + } + } + + /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()]. + pub fn get_parquet_meta_data_from_mem_cache( + &self, + region_id: RegionId, + file_id: FileId, + ) -> Option> { + match self { + CacheStrategy::Normal(cache_manager) => { + cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id) + } + CacheStrategy::Compaction(cache_manager) => { + cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id) + } + } + } + + /// Calls [CacheManager::put_parquet_meta_data()]. + pub fn put_parquet_meta_data( + &self, + region_id: RegionId, + file_id: FileId, + metadata: Arc, + ) { + match self { + CacheStrategy::Normal(cache_manager) => { + cache_manager.put_parquet_meta_data(region_id, file_id, metadata); + } + CacheStrategy::Compaction(cache_manager) => { + cache_manager.put_parquet_meta_data(region_id, file_id, metadata); + } + } + } + + /// Calls [CacheManager::remove_parquet_meta_data()]. + pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) { + match self { + CacheStrategy::Normal(cache_manager) => { + cache_manager.remove_parquet_meta_data(region_id, file_id); + } + CacheStrategy::Compaction(cache_manager) => { + cache_manager.remove_parquet_meta_data(region_id, file_id); + } + } + } + + /// Calls [CacheManager::get_repeated_vector()]. + /// It returns None if the strategy is [CacheStrategy::Compaction]. + pub fn get_repeated_vector( + &self, + data_type: &ConcreteDataType, + value: &Value, + ) -> Option { + match self { + CacheStrategy::Normal(cache_manager) => { + cache_manager.get_repeated_vector(data_type, value) + } + CacheStrategy::Compaction(_) => None, + } + } + + /// Calls [CacheManager::put_repeated_vector()]. + /// It does nothing if the strategy is [CacheStrategy::Compaction]. + pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) { + if let CacheStrategy::Normal(cache_manager) = self { + cache_manager.put_repeated_vector(value, vector); + } + } + + /// Calls [CacheManager::get_pages()]. + /// It returns None if the strategy is [CacheStrategy::Compaction]. + pub fn get_pages(&self, page_key: &PageKey) -> Option> { + match self { + CacheStrategy::Normal(cache_manager) => cache_manager.get_pages(page_key), + CacheStrategy::Compaction(_) => None, + } + } + + /// Calls [CacheManager::put_pages()]. + /// It does nothing if the strategy is [CacheStrategy::Compaction]. + pub fn put_pages(&self, page_key: PageKey, pages: Arc) { + if let CacheStrategy::Normal(cache_manager) = self { + cache_manager.put_pages(page_key, pages); + } + } + + /// Calls [CacheManager::get_selector_result()]. + /// It returns None if the strategy is [CacheStrategy::Compaction]. + pub fn get_selector_result( + &self, + selector_key: &SelectorResultKey, + ) -> Option> { + match self { + CacheStrategy::Normal(cache_manager) => cache_manager.get_selector_result(selector_key), + CacheStrategy::Compaction(_) => None, + } + } + + /// Calls [CacheManager::put_selector_result()]. + /// It does nothing if the strategy is [CacheStrategy::Compaction]. + pub fn put_selector_result( + &self, + selector_key: SelectorResultKey, + result: Arc, + ) { + if let CacheStrategy::Normal(cache_manager) = self { + cache_manager.put_selector_result(selector_key, result); + } + } + + /// Calls [CacheManager::write_cache()]. + pub fn write_cache(&self) -> Option<&WriteCacheRef> { + match self { + CacheStrategy::Normal(cache_manager) => cache_manager.write_cache(), + CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(), + } + } + + /// Calls [CacheManager::index_cache()]. + /// It returns None if the strategy is [CacheStrategy::Compaction]. + pub fn index_cache(&self) -> Option<&InvertedIndexCacheRef> { + match self { + CacheStrategy::Normal(cache_manager) => cache_manager.index_cache(), + CacheStrategy::Compaction(_) => None, + } + } + + /// Calls [CacheManager::bloom_filter_index_cache()]. + /// It returns None if the strategy is [CacheStrategy::Compaction]. + pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> { + match self { + CacheStrategy::Normal(cache_manager) => cache_manager.bloom_filter_index_cache(), + CacheStrategy::Compaction(_) => None, + } + } + + /// Calls [CacheManager::puffin_metadata_cache()]. + /// It returns None if the strategy is [CacheStrategy::Compaction]. + pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> { + match self { + CacheStrategy::Normal(cache_manager) => cache_manager.puffin_metadata_cache(), + CacheStrategy::Compaction(_) => None, + } + } +} + /// Manages cached data for the engine. /// /// All caches are disabled by default. From 7941682b0b4303a824b988bc49e788cd2742716b Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 28 Dec 2024 18:10:26 +0800 Subject: [PATCH 2/9] refactor: replace Option with CacheStrategy --- src/mito2/src/cache/write_cache.rs | 4 +- src/mito2/src/compaction.rs | 6 ++- src/mito2/src/compaction/compactor.rs | 1 + src/mito2/src/engine.rs | 3 +- src/mito2/src/read/last_row.rs | 45 +++++++++------------ src/mito2/src/read/projection.rs | 37 ++++++++++-------- src/mito2/src/read/range.rs | 11 +++--- src/mito2/src/read/scan_region.rs | 50 ++++++++---------------- src/mito2/src/read/seq_scan.rs | 2 +- src/mito2/src/read/unordered_scan.rs | 2 +- src/mito2/src/sst/parquet.rs | 10 ++--- src/mito2/src/sst/parquet/file_range.rs | 2 +- src/mito2/src/sst/parquet/reader.rs | 45 ++++++++++----------- src/mito2/src/sst/parquet/row_group.rs | 52 +++++++++++-------------- 14 files changed, 121 insertions(+), 149 deletions(-) diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 18fe41c5f614..de94bdb03137 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -332,7 +332,7 @@ mod tests { use super::*; use crate::access_layer::OperationType; use crate::cache::test_util::new_fs_store; - use crate::cache::CacheManager; + use crate::cache::{CacheManager, CacheStrategy}; use crate::region::options::IndexOptions; use crate::sst::file::FileId; use crate::sst::location::{index_file_path, sst_file_path}; @@ -495,7 +495,7 @@ mod tests { // Read metadata from write cache let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone()) - .cache(Some(cache_manager.clone())); + .cache(CacheStrategy::Normal(cache_manager.clone())); let reader = builder.build().await.unwrap(); // Check parquet metadata diff --git a/src/mito2/src/compaction.rs b/src/mito2/src/compaction.rs index 7fdd32aa2721..fa6f0df184e0 100644 --- a/src/mito2/src/compaction.rs +++ b/src/mito2/src/compaction.rs @@ -43,7 +43,7 @@ use table::predicate::Predicate; use tokio::sync::mpsc::{self, Sender}; use crate::access_layer::AccessLayerRef; -use crate::cache::CacheManagerRef; +use crate::cache::{CacheManagerRef, CacheStrategy}; use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor}; use crate::compaction::picker::{new_picker, CompactionTask}; use crate::compaction::task::CompactionTaskImpl; @@ -573,6 +573,7 @@ pub struct SerializedCompactionOutput { struct CompactionSstReaderBuilder<'a> { metadata: RegionMetadataRef, sst_layer: AccessLayerRef, + cache: CacheManagerRef, inputs: &'a [FileHandle], append_mode: bool, filter_deleted: bool, @@ -586,7 +587,8 @@ impl<'a> CompactionSstReaderBuilder<'a> { let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?) .with_files(self.inputs.to_vec()) .with_append_mode(self.append_mode) - .with_cache(None) + // We use special cache strategy for compaction. + .with_cache(CacheStrategy::Compaction(self.cache)) .with_filter_deleted(self.filter_deleted) // We ignore file not found error during compaction. .with_ignore_file_not_found(true) diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs index 58425f4d79e3..c07a333edae1 100644 --- a/src/mito2/src/compaction/compactor.rs +++ b/src/mito2/src/compaction/compactor.rs @@ -307,6 +307,7 @@ impl Compactor for DefaultCompactor { let reader = CompactionSstReaderBuilder { metadata: region_metadata.clone(), sst_layer: sst_layer.clone(), + cache: cache_manager.clone(), inputs: &output.inputs, append_mode, filter_deleted: output.filter_deleted, diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index 71caf363c02c..e6b1e501aec7 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -84,6 +84,7 @@ use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest}; use store_api::storage::{RegionId, ScanRequest}; use tokio::sync::{oneshot, Semaphore}; +use crate::cache::CacheStrategy; use crate::config::MitoConfig; use crate::error::{ InvalidRequestSnafu, JoinSnafu, RecvSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu, @@ -428,7 +429,7 @@ impl EngineInner { version, region.access_layer.clone(), request, - Some(cache_manager), + CacheStrategy::Normal(cache_manager), ) .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size) .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) diff --git a/src/mito2/src/read/last_row.rs b/src/mito2/src/read/last_row.rs index de96f8881e46..92a7bfa1adc1 100644 --- a/src/mito2/src/read/last_row.rs +++ b/src/mito2/src/read/last_row.rs @@ -21,7 +21,7 @@ use datatypes::vectors::UInt32Vector; use store_api::storage::TimeSeriesRowSelector; use crate::cache::{ - selector_result_cache_hit, selector_result_cache_miss, CacheManagerRef, SelectorResultKey, + selector_result_cache_hit, selector_result_cache_miss, CacheStrategy, SelectorResultKey, SelectorResultValue, }; use crate::error::Result; @@ -86,7 +86,7 @@ impl RowGroupLastRowCachedReader { pub(crate) fn new( file_id: FileId, row_group_idx: usize, - cache_manager: Option, + cache_strategy: CacheStrategy, row_group_reader: RowGroupReader, ) -> Self { let key = SelectorResultKey { @@ -95,20 +95,17 @@ impl RowGroupLastRowCachedReader { selector: TimeSeriesRowSelector::LastRow, }; - let Some(cache_manager) = cache_manager else { - return Self::new_miss(key, row_group_reader, None); - }; - if let Some(value) = cache_manager.get_selector_result(&key) { + if let Some(value) = cache_strategy.get_selector_result(&key) { let schema_matches = value.projection == row_group_reader.read_format().projection_indices(); if schema_matches { // Schema matches, use cache batches. Self::new_hit(value) } else { - Self::new_miss(key, row_group_reader, Some(cache_manager)) + Self::new_miss(key, row_group_reader, cache_strategy) } } else { - Self::new_miss(key, row_group_reader, Some(cache_manager)) + Self::new_miss(key, row_group_reader, cache_strategy) } } @@ -130,13 +127,13 @@ impl RowGroupLastRowCachedReader { fn new_miss( key: SelectorResultKey, row_group_reader: RowGroupReader, - cache_manager: Option, + cache_strategy: CacheStrategy, ) -> Self { selector_result_cache_miss(); Self::Miss(RowGroupLastRowReader::new( key, row_group_reader, - cache_manager, + cache_strategy, )) } } @@ -175,23 +172,19 @@ pub(crate) struct RowGroupLastRowReader { reader: RowGroupReader, selector: LastRowSelector, yielded_batches: Vec, - cache_manager: Option, + cache_strategy: CacheStrategy, /// Index buffer to take a new batch from the last row. take_index: UInt32Vector, } impl RowGroupLastRowReader { - fn new( - key: SelectorResultKey, - reader: RowGroupReader, - cache_manager: Option, - ) -> Self { + fn new(key: SelectorResultKey, reader: RowGroupReader, cache_strategy: CacheStrategy) -> Self { Self { key, reader, selector: LastRowSelector::default(), yielded_batches: vec![], - cache_manager, + cache_strategy, take_index: UInt32Vector::from_vec(vec![0]), } } @@ -221,17 +214,15 @@ impl RowGroupLastRowReader { /// Updates row group's last row cache if cache manager is present. fn maybe_update_cache(&mut self) { - if let Some(cache) = &self.cache_manager { - if self.yielded_batches.is_empty() { - // we always expect that row groups yields batches. - return; - } - let value = Arc::new(SelectorResultValue { - result: std::mem::take(&mut self.yielded_batches), - projection: self.reader.read_format().projection_indices().to_vec(), - }); - cache.put_selector_result(self.key, value) + if self.yielded_batches.is_empty() { + // we always expect that row groups yields batches. + return; } + let value = Arc::new(SelectorResultValue { + result: std::mem::take(&mut self.yielded_batches), + projection: self.reader.read_format().projection_indices().to_vec(), + }); + self.cache_strategy.put_selector_result(self.key, value); } fn metrics(&self) -> &ReaderMetrics { diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index 9ba5f6eccf1e..a20fe7b51b81 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -30,7 +30,7 @@ use snafu::{OptionExt, ResultExt}; use store_api::metadata::RegionMetadataRef; use store_api::storage::ColumnId; -use crate::cache::CacheManager; +use crate::cache::CacheStrategy; use crate::error::{InvalidRequestSnafu, Result}; use crate::read::Batch; use crate::row_converter::{McmpRowCodec, RowCodec, SortField}; @@ -171,7 +171,7 @@ impl ProjectionMapper { pub(crate) fn convert( &self, batch: &Batch, - cache_manager: Option<&CacheManager>, + cache_strategy: &CacheStrategy, ) -> common_recordbatch::error::Result { debug_assert_eq!(self.batch_fields.len(), batch.fields().len()); debug_assert!(self @@ -204,15 +204,12 @@ impl ProjectionMapper { match index { BatchIndex::Tag(idx) => { let value = &pk_values[*idx]; - let vector = match cache_manager { - Some(cache) => repeated_vector_with_cache( - &column_schema.data_type, - value, - num_rows, - cache, - )?, - None => new_repeated_vector(&column_schema.data_type, value, num_rows)?, - }; + let vector = repeated_vector_with_cache( + &column_schema.data_type, + value, + num_rows, + cache_strategy, + )?; columns.push(vector); } BatchIndex::Timestamp => { @@ -244,9 +241,9 @@ fn repeated_vector_with_cache( data_type: &ConcreteDataType, value: &Value, num_rows: usize, - cache_manager: &CacheManager, + cache_strategy: &CacheStrategy, ) -> common_recordbatch::error::Result { - if let Some(vector) = cache_manager.get_repeated_vector(data_type, value) { + if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) { // Tries to get the vector from cache manager. If the vector doesn't // have enough length, creates a new one. match vector.len().cmp(&num_rows) { @@ -260,7 +257,7 @@ fn repeated_vector_with_cache( let vector = new_repeated_vector(data_type, value, num_rows)?; // Updates cache. if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE { - cache_manager.put_repeated_vector(value.clone(), vector.clone()); + cache_strategy.put_repeated_vector(value.clone(), vector.clone()); } Ok(vector) @@ -284,12 +281,15 @@ fn new_repeated_vector( #[cfg(test)] mod tests { + use std::sync::Arc; + use api::v1::OpType; use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array}; use datatypes::arrow::util::pretty; use datatypes::value::ValueRef; use super::*; + use crate::cache::CacheManager; use crate::read::BatchBuilder; use crate::test_util::meta_util::TestRegionMetadataBuilder; @@ -359,8 +359,9 @@ mod tests { // With vector cache. let cache = CacheManager::builder().vector_cache_size(1024).build(); + let cache = CacheStrategy::Normal(Arc::new(cache)); let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3); - let record_batch = mapper.convert(&batch, Some(&cache)).unwrap(); + let record_batch = mapper.convert(&batch, &cache).unwrap(); let expect = "\ +---------------------+----+----+----+----+ | ts | k0 | k1 | v0 | v1 | @@ -380,7 +381,7 @@ mod tests { assert!(cache .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3)) .is_none()); - let record_batch = mapper.convert(&batch, Some(&cache)).unwrap(); + let record_batch = mapper.convert(&batch, &cache).unwrap(); assert_eq!(expect, print_record_batch(record_batch)); } @@ -401,7 +402,9 @@ mod tests { ); let batch = new_batch(0, &[1, 2], &[(4, 4)], 3); - let record_batch = mapper.convert(&batch, None).unwrap(); + let cache = CacheManager::builder().vector_cache_size(1024).build(); + let cache = CacheStrategy::Normal(Arc::new(cache)); + let record_batch = mapper.convert(&batch, &cache).unwrap(); let expect = "\ +----+----+ | v1 | k0 | diff --git a/src/mito2/src/read/range.rs b/src/mito2/src/read/range.rs index 1b29e196a2fe..4dce0625a1d0 100644 --- a/src/mito2/src/read/range.rs +++ b/src/mito2/src/read/range.rs @@ -22,7 +22,7 @@ use parquet::arrow::arrow_reader::RowSelection; use smallvec::{smallvec, SmallVec}; use store_api::region_engine::PartitionRange; -use crate::cache::CacheManager; +use crate::cache::CacheStrategy; use crate::error::Result; use crate::memtable::{MemtableRange, MemtableRanges, MemtableStats}; use crate::read::scan_region::ScanInput; @@ -112,7 +112,7 @@ impl RangeMeta { Self::push_unordered_file_ranges( input.memtables.len(), &input.files, - input.cache_manager.as_deref(), + &input.cache_strategy, &mut ranges, ); @@ -203,16 +203,15 @@ impl RangeMeta { fn push_unordered_file_ranges( num_memtables: usize, files: &[FileHandle], - cache: Option<&CacheManager>, + cache: &CacheStrategy, ranges: &mut Vec, ) { // For append mode, we can parallelize reading row groups. for (i, file) in files.iter().enumerate() { let file_index = num_memtables + i; // Get parquet meta from the cache. - let parquet_meta = cache.and_then(|c| { - c.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id()) - }); + let parquet_meta = + cache.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id()); if let Some(parquet_meta) = parquet_meta { // Scans each row group. for row_group_index in 0..file.meta_ref().num_row_groups { diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 5cd99fe3778e..f62380e49b1a 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -33,7 +33,7 @@ use tokio_stream::wrappers::ReceiverStream; use crate::access_layer::AccessLayerRef; use crate::cache::file_cache::FileCacheRef; -use crate::cache::CacheManagerRef; +use crate::cache::{CacheManagerRef, CacheStrategy}; use crate::config::DEFAULT_SCAN_CHANNEL_SIZE; use crate::error::Result; use crate::memtable::MemtableRange; @@ -171,7 +171,7 @@ pub(crate) struct ScanRegion { /// Scan request. request: ScanRequest, /// Cache. - cache_manager: Option, + cache_strategy: CacheStrategy, /// Capacity of the channel to send data from parallel scan tasks to the main task. parallel_scan_channel_size: usize, /// Whether to ignore inverted index. @@ -190,13 +190,13 @@ impl ScanRegion { version: VersionRef, access_layer: AccessLayerRef, request: ScanRequest, - cache_manager: Option, + cache_strategy: CacheStrategy, ) -> ScanRegion { ScanRegion { version, access_layer, request, - cache_manager, + cache_strategy, parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, ignore_inverted_index: false, ignore_fulltext_index: false, @@ -357,7 +357,7 @@ impl ScanRegion { .with_predicate(Some(predicate)) .with_memtables(memtables) .with_files(files) - .with_cache(self.cache_manager) + .with_cache(self.cache_strategy) .with_inverted_index_applier(inverted_index_applier) .with_bloom_filter_index_applier(bloom_filter_applier) .with_fulltext_index_applier(fulltext_index_applier) @@ -421,23 +421,14 @@ impl ScanRegion { } let file_cache = || -> Option { - let cache_manager = self.cache_manager.as_ref()?; - let write_cache = cache_manager.write_cache()?; + let write_cache = self.cache_strategy.write_cache()?; let file_cache = write_cache.file_cache(); Some(file_cache) }(); - let index_cache = self - .cache_manager - .as_ref() - .and_then(|c| c.index_cache()) - .cloned(); + let index_cache = self.cache_strategy.index_cache().cloned(); - let puffin_metadata_cache = self - .cache_manager - .as_ref() - .and_then(|c| c.puffin_metadata_cache()) - .cloned(); + let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned(); InvertedIndexApplierBuilder::new( self.access_layer.region_dir().to_string(), @@ -470,23 +461,14 @@ impl ScanRegion { } let file_cache = || -> Option { - let cache_manager = self.cache_manager.as_ref()?; - let write_cache = cache_manager.write_cache()?; + let write_cache = self.cache_strategy.write_cache()?; let file_cache = write_cache.file_cache(); Some(file_cache) }(); - let index_cache = self - .cache_manager - .as_ref() - .and_then(|c| c.bloom_filter_index_cache()) - .cloned(); + let index_cache = self.cache_strategy.bloom_filter_index_cache().cloned(); - let puffin_metadata_cache = self - .cache_manager - .as_ref() - .and_then(|c| c.puffin_metadata_cache()) - .cloned(); + let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned(); BloomFilterIndexApplierBuilder::new( self.access_layer.region_dir().to_string(), @@ -550,7 +532,7 @@ pub(crate) struct ScanInput { /// Handles to SST files to scan. pub(crate) files: Vec, /// Cache. - pub(crate) cache_manager: Option, + pub(crate) cache_strategy: CacheStrategy, /// Ignores file not found error. ignore_file_not_found: bool, /// Capacity of the channel to send data from parallel scan tasks to the main task. @@ -582,7 +564,7 @@ impl ScanInput { predicate: None, memtables: Vec::new(), files: Vec::new(), - cache_manager: None, + cache_strategy: CacheStrategy::Normal(CacheManagerRef::default()), ignore_file_not_found: false, parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, inverted_index_applier: None, @@ -626,8 +608,8 @@ impl ScanInput { /// Sets cache for this query. #[must_use] - pub(crate) fn with_cache(mut self, cache: Option) -> Self { - self.cache_manager = cache; + pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self { + self.cache_strategy = cache; self } @@ -760,7 +742,7 @@ impl ScanInput { .read_sst(file.clone()) .predicate(self.predicate.clone()) .projection(Some(self.mapper.column_ids().to_vec())) - .cache(self.cache_manager.clone()) + .cache(self.cache_strategy.clone()) .inverted_index_applier(self.inverted_index_applier.clone()) .bloom_filter_index_applier(self.bloom_filter_index_applier.clone()) .fulltext_index_applier(self.fulltext_index_applier.clone()) diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index ca9291c0f6ed..5ae5b6b8ecc8 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -257,7 +257,7 @@ impl SeqScan { .await .map_err(BoxedError::new) .context(ExternalSnafu)?; - let cache = stream_ctx.input.cache_manager.as_deref(); + let cache = &stream_ctx.input.cache_strategy; let mut metrics = ScannerMetrics::default(); let mut fetch_start = Instant::now(); #[cfg(debug_assertions)] diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 28e7d64addd8..7779e8a8f1bf 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -148,7 +148,7 @@ impl UnorderedScan { let stream = try_stream! { part_metrics.on_first_poll(); - let cache = stream_ctx.input.cache_manager.as_deref(); + let cache = &stream_ctx.input.cache_strategy; let range_builder_list = Arc::new(RangeBuilderList::new( stream_ctx.input.num_memtables(), stream_ctx.input.num_files(), diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 280d46b500df..1b29f9270201 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -95,7 +95,7 @@ mod tests { use tokio_util::compat::FuturesAsyncWriteCompatExt; use super::*; - use crate::cache::{CacheManager, PageKey}; + use crate::cache::{CacheManager, CacheStrategy, PageKey}; use crate::sst::index::Indexer; use crate::sst::parquet::format::WriteFormat; use crate::sst::parquet::reader::ParquetReaderBuilder; @@ -195,7 +195,7 @@ mod tests { .unwrap(); // Enable page cache. - let cache = Some(Arc::new( + let cache = CacheStrategy::Normal(Arc::new( CacheManager::builder() .page_cache_size(64 * 1024 * 1024) .build(), @@ -219,15 +219,15 @@ mod tests { // Doesn't have compressed page cached. let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0); - assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); + assert!(cache.get_pages(&page_key).is_none()); // Cache 4 row groups. for i in 0..4 { let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0); - assert!(cache.as_ref().unwrap().get_pages(&page_key).is_some()); + assert!(cache.get_pages(&page_key).is_some()); } let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0); - assert!(cache.as_ref().unwrap().get_pages(&page_key).is_none()); + assert!(cache.get_pages(&page_key).is_none()); } #[tokio::test] diff --git a/src/mito2/src/sst/parquet/file_range.rs b/src/mito2/src/sst/parquet/file_range.rs index 0976c2402f08..bc333a6df544 100644 --- a/src/mito2/src/sst/parquet/file_range.rs +++ b/src/mito2/src/sst/parquet/file_range.rs @@ -114,7 +114,7 @@ impl FileRange { let reader = RowGroupLastRowCachedReader::new( self.file_handle().file_id(), self.row_group_idx, - self.context.reader_builder.cache_manager().clone(), + self.context.reader_builder.cache_strategy().clone(), RowGroupReader::new(self.context.clone(), parquet_reader), ); PruneReader::new_with_last_row_reader(self.context.clone(), reader) diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index b3fb3340aa2f..c834c4221516 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -38,7 +38,7 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::cache::CacheManagerRef; +use crate::cache::{CacheManagerRef, CacheStrategy}; use crate::error::{ ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu, ReadParquetSnafu, Result, @@ -77,8 +77,8 @@ pub struct ParquetReaderBuilder { /// `None` reads all columns. Due to schema change, the projection /// can contain columns not in the parquet file. projection: Option>, - /// Manager that caches SST data. - cache_manager: Option, + /// Strategy to cache SST data. + cache_strategy: CacheStrategy, /// Index appliers. inverted_index_applier: Option, bloom_filter_index_applier: Option, @@ -102,7 +102,8 @@ impl ParquetReaderBuilder { object_store, predicate: None, projection: None, - cache_manager: None, + // TODO(yingwen): Maybe add a Disabled variant. + cache_strategy: CacheStrategy::Normal(CacheManagerRef::default()), inverted_index_applier: None, bloom_filter_index_applier: None, fulltext_index_applier: None, @@ -128,8 +129,8 @@ impl ParquetReaderBuilder { /// Attaches the cache to the builder. #[must_use] - pub fn cache(mut self, cache: Option) -> ParquetReaderBuilder { - self.cache_manager = cache; + pub fn cache(mut self, cache: CacheStrategy) -> ParquetReaderBuilder { + self.cache_strategy = cache; self } @@ -234,7 +235,7 @@ impl ParquetReaderBuilder { object_store: self.object_store.clone(), projection: projection_mask, field_levels, - cache_manager: self.cache_manager.clone(), + cache_strategy: self.cache_strategy.clone(), }; let filters = if let Some(predicate) = &self.predicate { @@ -308,10 +309,12 @@ impl ParquetReaderBuilder { let region_id = self.file_handle.region_id(); let file_id = self.file_handle.file_id(); // Tries to get from global cache. - if let Some(manager) = &self.cache_manager { - if let Some(metadata) = manager.get_parquet_meta_data(region_id, file_id).await { - return Ok(metadata); - } + if let Some(metadata) = self + .cache_strategy + .get_parquet_meta_data(region_id, file_id) + .await + { + return Ok(metadata); } // Cache miss, load metadata directly. @@ -319,13 +322,11 @@ impl ParquetReaderBuilder { let metadata = metadata_loader.load().await?; let metadata = Arc::new(metadata); // Cache the metadata. - if let Some(cache) = &self.cache_manager { - cache.put_parquet_meta_data( - self.file_handle.region_id(), - self.file_handle.file_id(), - metadata.clone(), - ); - } + self.cache_strategy.put_parquet_meta_data( + self.file_handle.region_id(), + self.file_handle.file_id(), + metadata.clone(), + ); Ok(metadata) } @@ -857,7 +858,7 @@ pub(crate) struct RowGroupReaderBuilder { /// Field levels to read. field_levels: FieldLevels, /// Cache. - cache_manager: Option, + cache_strategy: CacheStrategy, } impl RowGroupReaderBuilder { @@ -875,8 +876,8 @@ impl RowGroupReaderBuilder { &self.parquet_meta } - pub(crate) fn cache_manager(&self) -> &Option { - &self.cache_manager + pub(crate) fn cache_strategy(&self) -> &CacheStrategy { + &self.cache_strategy } /// Builds a [ParquetRecordBatchReader] to read the row group at `row_group_idx`. @@ -890,7 +891,7 @@ impl RowGroupReaderBuilder { self.file_handle.file_id(), &self.parquet_meta, row_group_idx, - self.cache_manager.clone(), + self.cache_strategy.clone(), &self.file_path, self.object_store.clone(), ); diff --git a/src/mito2/src/sst/parquet/row_group.rs b/src/mito2/src/sst/parquet/row_group.rs index dde78f39e4ee..a3c92cb768f6 100644 --- a/src/mito2/src/sst/parquet/row_group.rs +++ b/src/mito2/src/sst/parquet/row_group.rs @@ -32,7 +32,7 @@ use store_api::storage::RegionId; use tokio::task::yield_now; use crate::cache::file_cache::{FileType, IndexKey}; -use crate::cache::{CacheManagerRef, PageKey, PageValue}; +use crate::cache::{CacheStrategy, PageKey, PageValue}; use crate::metrics::{READ_STAGE_ELAPSED, READ_STAGE_FETCH_PAGES}; use crate::sst::file::FileId; use crate::sst::parquet::helper::fetch_byte_ranges; @@ -223,7 +223,7 @@ pub struct InMemoryRowGroup<'a> { region_id: RegionId, file_id: FileId, row_group_idx: usize, - cache_manager: Option, + cache_strategy: CacheStrategy, file_path: &'a str, /// Object store. object_store: ObjectStore, @@ -240,7 +240,7 @@ impl<'a> InMemoryRowGroup<'a> { file_id: FileId, parquet_meta: &'a ParquetMetaData, row_group_idx: usize, - cache_manager: Option, + cache_strategy: CacheStrategy, file_path: &'a str, object_store: ObjectStore, ) -> Self { @@ -249,7 +249,7 @@ impl<'a> InMemoryRowGroup<'a> { region_id, file_id, row_group_idx, - cache_manager, + cache_strategy, file_path, object_store, } @@ -293,21 +293,19 @@ impl<'a> InMemoryRowGroup<'a> { let assigned_columns = self.base.assign_dense_chunk(projection, chunk_data); // Put fetched data to cache if necessary. - if let Some(cache) = &self.cache_manager { - for (col_idx, data) in assigned_columns { - let column = self.base.metadata.column(col_idx); - if !cache_uncompressed_pages(column) { - // For columns that have multiple uncompressed pages, we only cache the compressed page - // to save memory. - let page_key = PageKey::new_compressed( - self.region_id, - self.file_id, - self.row_group_idx, - col_idx, - ); - cache - .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone()))); - } + for (col_idx, data) in assigned_columns { + let column = self.base.metadata.column(col_idx); + if !cache_uncompressed_pages(column) { + // For columns that have multiple uncompressed pages, we only cache the compressed page + // to save memory. + let page_key = PageKey::new_compressed( + self.region_id, + self.file_id, + self.row_group_idx, + col_idx, + ); + self.cache_strategy + .put_pages(page_key, Arc::new(PageValue::new_compressed(data.clone()))); } } } @@ -325,9 +323,6 @@ impl<'a> InMemoryRowGroup<'a> { .enumerate() .filter(|(idx, chunk)| chunk.is_none() && projection.leaf_included(*idx)) .for_each(|(idx, chunk)| { - let Some(cache) = &self.cache_manager else { - return; - }; let column = self.base.metadata.column(idx); if cache_uncompressed_pages(column) { // Fetches uncompressed pages for the row group. @@ -337,7 +332,8 @@ impl<'a> InMemoryRowGroup<'a> { self.row_group_idx, idx, ); - self.base.column_uncompressed_pages[idx] = cache.get_pages(&page_key); + self.base.column_uncompressed_pages[idx] = + self.cache_strategy.get_pages(&page_key); } else { // Fetches the compressed page from the cache. let page_key = PageKey::new_compressed( @@ -347,7 +343,7 @@ impl<'a> InMemoryRowGroup<'a> { idx, ); - *chunk = cache.get_pages(&page_key).map(|page_value| { + *chunk = self.cache_strategy.get_pages(&page_key).map(|page_value| { Arc::new(ColumnChunkData::Dense { offset: column.byte_range().0 as usize, data: page_value.compressed.clone(), @@ -383,7 +379,7 @@ impl<'a> InMemoryRowGroup<'a> { key: IndexKey, ranges: &[Range], ) -> Option> { - if let Some(cache) = self.cache_manager.as_ref()?.write_cache() { + if let Some(cache) = self.cache_strategy.write_cache() { return cache.file_cache().read_ranges(key, ranges).await; } None @@ -399,10 +395,6 @@ impl<'a> InMemoryRowGroup<'a> { let page_reader = self.base.column_reader(i)?; - let Some(cache) = &self.cache_manager else { - return Ok(Box::new(page_reader)); - }; - let column = self.base.metadata.column(i); if cache_uncompressed_pages(column) { // This column use row group level page cache. @@ -411,7 +403,7 @@ impl<'a> InMemoryRowGroup<'a> { let page_value = Arc::new(PageValue::new_row_group(pages)); let page_key = PageKey::new_uncompressed(self.region_id, self.file_id, self.row_group_idx, i); - cache.put_pages(page_key, page_value.clone()); + self.cache_strategy.put_pages(page_key, page_value.clone()); return Ok(Box::new(RowGroupCachedReader::new(&page_value.row_group))); } From 5c667def05e974f409765ebdd9c83ffc19a1c954 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 28 Dec 2024 23:19:56 +0800 Subject: [PATCH 3/9] feat: add disabled strategy --- src/mito2/src/cache.rs | 38 +++++++++++++++++------------ src/mito2/src/sst/parquet/reader.rs | 5 ++-- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 480a0e4cc154..72b3266790d8 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -66,6 +66,8 @@ pub enum CacheStrategy { /// Enables the write cache so that the compaction can read files cached /// in the write cache and write the compacted files back to the write cache. Compaction(CacheManagerRef), + /// No cache. + Disabled, } impl CacheStrategy { @@ -86,6 +88,7 @@ impl CacheStrategy { .get_parquet_meta_data(region_id, file_id) .await } + CacheStrategy::Disabled => None, } } @@ -102,6 +105,7 @@ impl CacheStrategy { CacheStrategy::Compaction(cache_manager) => { cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id) } + CacheStrategy::Disabled => None, } } @@ -119,6 +123,7 @@ impl CacheStrategy { CacheStrategy::Compaction(cache_manager) => { cache_manager.put_parquet_meta_data(region_id, file_id, metadata); } + CacheStrategy::Disabled => {} } } @@ -131,11 +136,12 @@ impl CacheStrategy { CacheStrategy::Compaction(cache_manager) => { cache_manager.remove_parquet_meta_data(region_id, file_id); } + CacheStrategy::Disabled => {} } } /// Calls [CacheManager::get_repeated_vector()]. - /// It returns None if the strategy is [CacheStrategy::Compaction]. + /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. pub fn get_repeated_vector( &self, data_type: &ConcreteDataType, @@ -145,12 +151,12 @@ impl CacheStrategy { CacheStrategy::Normal(cache_manager) => { cache_manager.get_repeated_vector(data_type, value) } - CacheStrategy::Compaction(_) => None, + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, } } /// Calls [CacheManager::put_repeated_vector()]. - /// It does nothing if the strategy is [CacheStrategy::Compaction]. + /// It does nothing if the strategy isn't [CacheStrategy::Normal]. pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) { if let CacheStrategy::Normal(cache_manager) = self { cache_manager.put_repeated_vector(value, vector); @@ -158,16 +164,16 @@ impl CacheStrategy { } /// Calls [CacheManager::get_pages()]. - /// It returns None if the strategy is [CacheStrategy::Compaction]. + /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. pub fn get_pages(&self, page_key: &PageKey) -> Option> { match self { CacheStrategy::Normal(cache_manager) => cache_manager.get_pages(page_key), - CacheStrategy::Compaction(_) => None, + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, } } /// Calls [CacheManager::put_pages()]. - /// It does nothing if the strategy is [CacheStrategy::Compaction]. + /// It does nothing if the strategy isn't [CacheStrategy::Normal]. pub fn put_pages(&self, page_key: PageKey, pages: Arc) { if let CacheStrategy::Normal(cache_manager) = self { cache_manager.put_pages(page_key, pages); @@ -175,19 +181,19 @@ impl CacheStrategy { } /// Calls [CacheManager::get_selector_result()]. - /// It returns None if the strategy is [CacheStrategy::Compaction]. + /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. pub fn get_selector_result( &self, selector_key: &SelectorResultKey, ) -> Option> { match self { CacheStrategy::Normal(cache_manager) => cache_manager.get_selector_result(selector_key), - CacheStrategy::Compaction(_) => None, + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, } } /// Calls [CacheManager::put_selector_result()]. - /// It does nothing if the strategy is [CacheStrategy::Compaction]. + /// It does nothing if the strategy isn't [CacheStrategy::Normal]. pub fn put_selector_result( &self, selector_key: SelectorResultKey, @@ -199,37 +205,39 @@ impl CacheStrategy { } /// Calls [CacheManager::write_cache()]. + /// It returns None if the strategy is [CacheStrategy::Disabled]. pub fn write_cache(&self) -> Option<&WriteCacheRef> { match self { CacheStrategy::Normal(cache_manager) => cache_manager.write_cache(), CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(), + CacheStrategy::Disabled => None, } } /// Calls [CacheManager::index_cache()]. - /// It returns None if the strategy is [CacheStrategy::Compaction]. + /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. pub fn index_cache(&self) -> Option<&InvertedIndexCacheRef> { match self { CacheStrategy::Normal(cache_manager) => cache_manager.index_cache(), - CacheStrategy::Compaction(_) => None, + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, } } /// Calls [CacheManager::bloom_filter_index_cache()]. - /// It returns None if the strategy is [CacheStrategy::Compaction]. + /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> { match self { CacheStrategy::Normal(cache_manager) => cache_manager.bloom_filter_index_cache(), - CacheStrategy::Compaction(_) => None, + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, } } /// Calls [CacheManager::puffin_metadata_cache()]. - /// It returns None if the strategy is [CacheStrategy::Compaction]. + /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> { match self { CacheStrategy::Normal(cache_manager) => cache_manager.puffin_metadata_cache(), - CacheStrategy::Compaction(_) => None, + CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, } } } diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index c834c4221516..579f80a433cc 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -38,7 +38,7 @@ use store_api::metadata::{RegionMetadata, RegionMetadataRef}; use store_api::storage::ColumnId; use table::predicate::Predicate; -use crate::cache::{CacheManagerRef, CacheStrategy}; +use crate::cache::CacheStrategy; use crate::error::{ ArrowReaderSnafu, InvalidMetadataSnafu, InvalidParquetSnafu, ReadDataPartSnafu, ReadParquetSnafu, Result, @@ -102,8 +102,7 @@ impl ParquetReaderBuilder { object_store, predicate: None, projection: None, - // TODO(yingwen): Maybe add a Disabled variant. - cache_strategy: CacheStrategy::Normal(CacheManagerRef::default()), + cache_strategy: CacheStrategy::Disabled, inverted_index_applier: None, bloom_filter_index_applier: None, fulltext_index_applier: None, From 304f657e0109070025b39ae9b115412457682353 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sat, 28 Dec 2024 23:23:20 +0800 Subject: [PATCH 4/9] ci: force update taplo --- .github/workflows/develop.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index e901dcb721bd..e01c9d08d7a8 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -84,7 +84,7 @@ jobs: # Shares across multiple jobs shared-key: "check-toml" - name: Install taplo - run: cargo +stable install taplo-cli --version ^0.9 --locked + run: cargo +stable install taplo-cli --version ^0.9 --locked --force - name: Run taplo run: taplo format --check From a91daaa2d4eca60ff2f75365cee05900c9f3acb1 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 29 Dec 2024 00:20:37 +0800 Subject: [PATCH 5/9] refactor: rename CacheStrategy::Normal to CacheStrategy::EnableAll --- src/mito2/src/cache.rs | 42 ++++++++++++++++-------------- src/mito2/src/cache/write_cache.rs | 2 +- src/mito2/src/engine.rs | 2 +- src/mito2/src/read/projection.rs | 4 +-- src/mito2/src/read/scan_region.rs | 2 +- src/mito2/src/sst/parquet.rs | 2 +- 6 files changed, 28 insertions(+), 26 deletions(-) diff --git a/src/mito2/src/cache.rs b/src/mito2/src/cache.rs index 72b3266790d8..3d8bebdfc291 100644 --- a/src/mito2/src/cache.rs +++ b/src/mito2/src/cache.rs @@ -60,13 +60,13 @@ const SELECTOR_RESULT_TYPE: &str = "selector_result"; pub enum CacheStrategy { /// Strategy for normal operations. /// Doesn't disable any cache. - Normal(CacheManagerRef), + EnableAll(CacheManagerRef), /// Strategy for compaction. - /// Disables some caches that are not needed during compaction. + /// Disables some caches during compaction to avoid affecting queries. /// Enables the write cache so that the compaction can read files cached /// in the write cache and write the compacted files back to the write cache. Compaction(CacheManagerRef), - /// No cache. + /// Do not use any cache. Disabled, } @@ -78,7 +78,7 @@ impl CacheStrategy { file_id: FileId, ) -> Option> { match self { - CacheStrategy::Normal(cache_manager) => { + CacheStrategy::EnableAll(cache_manager) => { cache_manager .get_parquet_meta_data(region_id, file_id) .await @@ -99,7 +99,7 @@ impl CacheStrategy { file_id: FileId, ) -> Option> { match self { - CacheStrategy::Normal(cache_manager) => { + CacheStrategy::EnableAll(cache_manager) => { cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id) } CacheStrategy::Compaction(cache_manager) => { @@ -117,7 +117,7 @@ impl CacheStrategy { metadata: Arc, ) { match self { - CacheStrategy::Normal(cache_manager) => { + CacheStrategy::EnableAll(cache_manager) => { cache_manager.put_parquet_meta_data(region_id, file_id, metadata); } CacheStrategy::Compaction(cache_manager) => { @@ -130,7 +130,7 @@ impl CacheStrategy { /// Calls [CacheManager::remove_parquet_meta_data()]. pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) { match self { - CacheStrategy::Normal(cache_manager) => { + CacheStrategy::EnableAll(cache_manager) => { cache_manager.remove_parquet_meta_data(region_id, file_id); } CacheStrategy::Compaction(cache_manager) => { @@ -148,7 +148,7 @@ impl CacheStrategy { value: &Value, ) -> Option { match self { - CacheStrategy::Normal(cache_manager) => { + CacheStrategy::EnableAll(cache_manager) => { cache_manager.get_repeated_vector(data_type, value) } CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, @@ -156,9 +156,9 @@ impl CacheStrategy { } /// Calls [CacheManager::put_repeated_vector()]. - /// It does nothing if the strategy isn't [CacheStrategy::Normal]. + /// It does nothing if the strategy isn't [CacheStrategy::EnableAll]. pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) { - if let CacheStrategy::Normal(cache_manager) = self { + if let CacheStrategy::EnableAll(cache_manager) = self { cache_manager.put_repeated_vector(value, vector); } } @@ -167,15 +167,15 @@ impl CacheStrategy { /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. pub fn get_pages(&self, page_key: &PageKey) -> Option> { match self { - CacheStrategy::Normal(cache_manager) => cache_manager.get_pages(page_key), + CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key), CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, } } /// Calls [CacheManager::put_pages()]. - /// It does nothing if the strategy isn't [CacheStrategy::Normal]. + /// It does nothing if the strategy isn't [CacheStrategy::EnableAll]. pub fn put_pages(&self, page_key: PageKey, pages: Arc) { - if let CacheStrategy::Normal(cache_manager) = self { + if let CacheStrategy::EnableAll(cache_manager) = self { cache_manager.put_pages(page_key, pages); } } @@ -187,19 +187,21 @@ impl CacheStrategy { selector_key: &SelectorResultKey, ) -> Option> { match self { - CacheStrategy::Normal(cache_manager) => cache_manager.get_selector_result(selector_key), + CacheStrategy::EnableAll(cache_manager) => { + cache_manager.get_selector_result(selector_key) + } CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, } } /// Calls [CacheManager::put_selector_result()]. - /// It does nothing if the strategy isn't [CacheStrategy::Normal]. + /// It does nothing if the strategy isn't [CacheStrategy::EnableAll]. pub fn put_selector_result( &self, selector_key: SelectorResultKey, result: Arc, ) { - if let CacheStrategy::Normal(cache_manager) = self { + if let CacheStrategy::EnableAll(cache_manager) = self { cache_manager.put_selector_result(selector_key, result); } } @@ -208,7 +210,7 @@ impl CacheStrategy { /// It returns None if the strategy is [CacheStrategy::Disabled]. pub fn write_cache(&self) -> Option<&WriteCacheRef> { match self { - CacheStrategy::Normal(cache_manager) => cache_manager.write_cache(), + CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(), CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(), CacheStrategy::Disabled => None, } @@ -218,7 +220,7 @@ impl CacheStrategy { /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. pub fn index_cache(&self) -> Option<&InvertedIndexCacheRef> { match self { - CacheStrategy::Normal(cache_manager) => cache_manager.index_cache(), + CacheStrategy::EnableAll(cache_manager) => cache_manager.index_cache(), CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, } } @@ -227,7 +229,7 @@ impl CacheStrategy { /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> { match self { - CacheStrategy::Normal(cache_manager) => cache_manager.bloom_filter_index_cache(), + CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(), CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, } } @@ -236,7 +238,7 @@ impl CacheStrategy { /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled]. pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> { match self { - CacheStrategy::Normal(cache_manager) => cache_manager.puffin_metadata_cache(), + CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(), CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None, } } diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index de94bdb03137..681355b373f4 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -495,7 +495,7 @@ mod tests { // Read metadata from write cache let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone()) - .cache(CacheStrategy::Normal(cache_manager.clone())); + .cache(CacheStrategy::EnableAll(cache_manager.clone())); let reader = builder.build().await.unwrap(); // Check parquet metadata diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs index e6b1e501aec7..3809a6f2a306 100644 --- a/src/mito2/src/engine.rs +++ b/src/mito2/src/engine.rs @@ -429,7 +429,7 @@ impl EngineInner { version, region.access_layer.clone(), request, - CacheStrategy::Normal(cache_manager), + CacheStrategy::EnableAll(cache_manager), ) .with_parallel_scan_channel_size(self.config.parallel_scan_channel_size) .with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled()) diff --git a/src/mito2/src/read/projection.rs b/src/mito2/src/read/projection.rs index a20fe7b51b81..e091cf076853 100644 --- a/src/mito2/src/read/projection.rs +++ b/src/mito2/src/read/projection.rs @@ -359,7 +359,7 @@ mod tests { // With vector cache. let cache = CacheManager::builder().vector_cache_size(1024).build(); - let cache = CacheStrategy::Normal(Arc::new(cache)); + let cache = CacheStrategy::EnableAll(Arc::new(cache)); let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3); let record_batch = mapper.convert(&batch, &cache).unwrap(); let expect = "\ @@ -403,7 +403,7 @@ mod tests { let batch = new_batch(0, &[1, 2], &[(4, 4)], 3); let cache = CacheManager::builder().vector_cache_size(1024).build(); - let cache = CacheStrategy::Normal(Arc::new(cache)); + let cache = CacheStrategy::EnableAll(Arc::new(cache)); let record_batch = mapper.convert(&batch, &cache).unwrap(); let expect = "\ +----+----+ diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index f62380e49b1a..bc9f8f709a7e 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -564,7 +564,7 @@ impl ScanInput { predicate: None, memtables: Vec::new(), files: Vec::new(), - cache_strategy: CacheStrategy::Normal(CacheManagerRef::default()), + cache_strategy: CacheStrategy::EnableAll(CacheManagerRef::default()), ignore_file_not_found: false, parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, inverted_index_applier: None, diff --git a/src/mito2/src/sst/parquet.rs b/src/mito2/src/sst/parquet.rs index 1b29f9270201..af677c28d1eb 100644 --- a/src/mito2/src/sst/parquet.rs +++ b/src/mito2/src/sst/parquet.rs @@ -195,7 +195,7 @@ mod tests { .unwrap(); // Enable page cache. - let cache = CacheStrategy::Normal(Arc::new( + let cache = CacheStrategy::EnableAll(Arc::new( CacheManager::builder() .page_cache_size(64 * 1024 * 1024) .build(), From 6f054abf031f2274be595f9b666e96d9de60cb68 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 29 Dec 2024 00:29:08 +0800 Subject: [PATCH 6/9] ci: force install cargo-gc-bin --- .github/workflows/develop.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index e01c9d08d7a8..61a7eda1b2b1 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -107,7 +107,7 @@ jobs: shared-key: "build-binaries" - name: Install cargo-gc-bin shell: bash - run: cargo install cargo-gc-bin + run: cargo install cargo-gc-bin --force - name: Build greptime binaries shell: bash # `cargo gc` will invoke `cargo build` with specified args @@ -268,7 +268,7 @@ jobs: shared-key: "build-greptime-ci" - name: Install cargo-gc-bin shell: bash - run: cargo install cargo-gc-bin + run: cargo install cargo-gc-bin --force - name: Build greptime bianry shell: bash # `cargo gc` will invoke `cargo build` with specified args From 3da6aa5fa05b2096a76dd37134583a6f38604742 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 29 Dec 2024 12:57:49 +0800 Subject: [PATCH 7/9] ci: force install --- .github/workflows/develop.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/develop.yml b/.github/workflows/develop.yml index 61a7eda1b2b1..a42f645d0b4d 100644 --- a/.github/workflows/develop.yml +++ b/.github/workflows/develop.yml @@ -163,7 +163,7 @@ jobs: run: | sudo apt-get install -y libfuzzer-14-dev rustup install nightly - cargo +nightly install cargo-fuzz cargo-gc-bin + cargo +nightly install cargo-fuzz cargo-gc-bin --force - name: Download pre-built binaries uses: actions/download-artifact@v4 with: @@ -220,7 +220,7 @@ jobs: shell: bash run: | sudo apt update && sudo apt install -y libfuzzer-14-dev - cargo install cargo-fuzz cargo-gc-bin + cargo install cargo-fuzz cargo-gc-bin --force - name: Download pre-built binariy uses: actions/download-artifact@v4 with: @@ -338,7 +338,7 @@ jobs: run: | sudo apt-get install -y libfuzzer-14-dev rustup install nightly - cargo +nightly install cargo-fuzz cargo-gc-bin + cargo +nightly install cargo-fuzz cargo-gc-bin --force # Downloads ci image - name: Download pre-built binariy uses: actions/download-artifact@v4 @@ -487,7 +487,7 @@ jobs: run: | sudo apt-get install -y libfuzzer-14-dev rustup install nightly - cargo +nightly install cargo-fuzz cargo-gc-bin + cargo +nightly install cargo-fuzz cargo-gc-bin --force # Downloads ci image - name: Download pre-built binariy uses: actions/download-artifact@v4 From 58b83032f111191c5ec17abcff4c2aef2dc1ec29 Mon Sep 17 00:00:00 2001 From: evenyag Date: Sun, 29 Dec 2024 21:38:33 +0800 Subject: [PATCH 8/9] chore: use CacheStrategy::Disabled as ScanInput default --- src/mito2/src/read/scan_region.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index bc9f8f709a7e..be5edb39af36 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -564,7 +564,7 @@ impl ScanInput { predicate: None, memtables: Vec::new(), files: Vec::new(), - cache_strategy: CacheStrategy::EnableAll(CacheManagerRef::default()), + cache_strategy: CacheStrategy::Disabled, ignore_file_not_found: false, parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE, inverted_index_applier: None, From 593013faea067f586cf5ff6574eb4aee1aa770b0 Mon Sep 17 00:00:00 2001 From: evenyag Date: Mon, 30 Dec 2024 11:27:57 +0800 Subject: [PATCH 9/9] chore: fix compiler errors --- src/mito2/src/read/scan_region.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index be5edb39af36..ad5d2e4a15cf 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -33,7 +33,7 @@ use tokio_stream::wrappers::ReceiverStream; use crate::access_layer::AccessLayerRef; use crate::cache::file_cache::FileCacheRef; -use crate::cache::{CacheManagerRef, CacheStrategy}; +use crate::cache::CacheStrategy; use crate::config::DEFAULT_SCAN_CHANNEL_SIZE; use crate::error::Result; use crate::memtable::MemtableRange;