Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: implement a CacheStrategy to ensure compaction use cache correctly #5254

Merged
merged 9 commits into from
Dec 30, 2024
Merged
14 changes: 7 additions & 7 deletions .github/workflows/develop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
# Shares across multiple jobs
shared-key: "check-toml"
- name: Install taplo
run: cargo +stable install taplo-cli --version ^0.9 --locked
run: cargo +stable install taplo-cli --version ^0.9 --locked --force
- name: Run taplo
run: taplo format --check

Expand All @@ -107,7 +107,7 @@ jobs:
shared-key: "build-binaries"
- name: Install cargo-gc-bin
shell: bash
run: cargo install cargo-gc-bin
run: cargo install cargo-gc-bin --force
- name: Build greptime binaries
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
Expand Down Expand Up @@ -163,7 +163,7 @@ jobs:
run: |
sudo apt-get install -y libfuzzer-14-dev
rustup install nightly
cargo +nightly install cargo-fuzz cargo-gc-bin
cargo +nightly install cargo-fuzz cargo-gc-bin --force
- name: Download pre-built binaries
uses: actions/download-artifact@v4
with:
Expand Down Expand Up @@ -220,7 +220,7 @@ jobs:
shell: bash
run: |
sudo apt update && sudo apt install -y libfuzzer-14-dev
cargo install cargo-fuzz cargo-gc-bin
cargo install cargo-fuzz cargo-gc-bin --force
- name: Download pre-built binariy
uses: actions/download-artifact@v4
with:
Expand Down Expand Up @@ -268,7 +268,7 @@ jobs:
shared-key: "build-greptime-ci"
- name: Install cargo-gc-bin
shell: bash
run: cargo install cargo-gc-bin
run: cargo install cargo-gc-bin --force
- name: Build greptime bianry
shell: bash
# `cargo gc` will invoke `cargo build` with specified args
Expand Down Expand Up @@ -338,7 +338,7 @@ jobs:
run: |
sudo apt-get install -y libfuzzer-14-dev
rustup install nightly
cargo +nightly install cargo-fuzz cargo-gc-bin
cargo +nightly install cargo-fuzz cargo-gc-bin --force
# Downloads ci image
- name: Download pre-built binariy
uses: actions/download-artifact@v4
Expand Down Expand Up @@ -487,7 +487,7 @@ jobs:
run: |
sudo apt-get install -y libfuzzer-14-dev
rustup install nightly
cargo +nightly install cargo-fuzz cargo-gc-bin
cargo +nightly install cargo-fuzz cargo-gc-bin --force
# Downloads ci image
- name: Download pre-built binariy
uses: actions/download-artifact@v4
Expand Down
189 changes: 189 additions & 0 deletions src/mito2/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,195 @@ const FILE_TYPE: &str = "file";
/// Metrics type key for selector result cache.
const SELECTOR_RESULT_TYPE: &str = "selector_result";

/// Cache strategies that may only enable a subset of caches.
#[derive(Clone)]
pub enum CacheStrategy {
/// Strategy for normal operations.
/// Doesn't disable any cache.
EnableAll(CacheManagerRef),
/// Strategy for compaction.
/// Disables some caches during compaction to avoid affecting queries.
/// Enables the write cache so that the compaction can read files cached
/// in the write cache and write the compacted files back to the write cache.
Compaction(CacheManagerRef),
/// Do not use any cache.
Disabled,
}

impl CacheStrategy {
/// Calls [CacheManager::get_parquet_meta_data()].
pub async fn get_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager
.get_parquet_meta_data(region_id, file_id)
.await
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager
.get_parquet_meta_data(region_id, file_id)
.await
}
CacheStrategy::Disabled => None,
}
}

/// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
pub fn get_parquet_meta_data_from_mem_cache(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id)
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id)
}
CacheStrategy::Disabled => None,
}
}

/// Calls [CacheManager::put_parquet_meta_data()].
pub fn put_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
metadata: Arc<ParquetMetaData>,
) {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.put_parquet_meta_data(region_id, file_id, metadata);
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.put_parquet_meta_data(region_id, file_id, metadata);
}
CacheStrategy::Disabled => {}
}
}

/// Calls [CacheManager::remove_parquet_meta_data()].
pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.remove_parquet_meta_data(region_id, file_id);
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.remove_parquet_meta_data(region_id, file_id);
}
CacheStrategy::Disabled => {}
}
}

/// Calls [CacheManager::get_repeated_vector()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn get_repeated_vector(
&self,
data_type: &ConcreteDataType,
value: &Value,
) -> Option<VectorRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_repeated_vector(data_type, value)
}
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}

/// Calls [CacheManager::put_repeated_vector()].
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
if let CacheStrategy::EnableAll(cache_manager) = self {
cache_manager.put_repeated_vector(value, vector);
}
}

/// Calls [CacheManager::get_pages()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}

/// Calls [CacheManager::put_pages()].
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
if let CacheStrategy::EnableAll(cache_manager) = self {
cache_manager.put_pages(page_key, pages);
}
}

/// Calls [CacheManager::get_selector_result()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn get_selector_result(
&self,
selector_key: &SelectorResultKey,
) -> Option<Arc<SelectorResultValue>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_selector_result(selector_key)
}
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}

/// Calls [CacheManager::put_selector_result()].
/// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
pub fn put_selector_result(
&self,
selector_key: SelectorResultKey,
result: Arc<SelectorResultValue>,
) {
if let CacheStrategy::EnableAll(cache_manager) = self {
cache_manager.put_selector_result(selector_key, result);
}
}

/// Calls [CacheManager::write_cache()].
/// It returns None if the strategy is [CacheStrategy::Disabled].
pub fn write_cache(&self) -> Option<&WriteCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
CacheStrategy::Disabled => None,
}
}

/// Calls [CacheManager::index_cache()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn index_cache(&self) -> Option<&InvertedIndexCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.index_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}

/// Calls [CacheManager::bloom_filter_index_cache()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}

/// Calls [CacheManager::puffin_metadata_cache()].
/// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
}

/// Manages cached data for the engine.
///
/// All caches are disabled by default.
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ mod tests {
use super::*;
use crate::access_layer::OperationType;
use crate::cache::test_util::new_fs_store;
use crate::cache::CacheManager;
use crate::cache::{CacheManager, CacheStrategy};
use crate::region::options::IndexOptions;
use crate::sst::file::FileId;
use crate::sst::location::{index_file_path, sst_file_path};
Expand Down Expand Up @@ -495,7 +495,7 @@ mod tests {

// Read metadata from write cache
let builder = ParquetReaderBuilder::new(data_home, handle.clone(), mock_store.clone())
.cache(Some(cache_manager.clone()));
.cache(CacheStrategy::EnableAll(cache_manager.clone()));
let reader = builder.build().await.unwrap();

// Check parquet metadata
Expand Down
6 changes: 4 additions & 2 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use table::predicate::Predicate;
use tokio::sync::mpsc::{self, Sender};

use crate::access_layer::AccessLayerRef;
use crate::cache::CacheManagerRef;
use crate::cache::{CacheManagerRef, CacheStrategy};
use crate::compaction::compactor::{CompactionRegion, CompactionVersion, DefaultCompactor};
use crate::compaction::picker::{new_picker, CompactionTask};
use crate::compaction::task::CompactionTaskImpl;
Expand Down Expand Up @@ -573,6 +573,7 @@ pub struct SerializedCompactionOutput {
struct CompactionSstReaderBuilder<'a> {
metadata: RegionMetadataRef,
sst_layer: AccessLayerRef,
cache: CacheManagerRef,
inputs: &'a [FileHandle],
append_mode: bool,
filter_deleted: bool,
Expand All @@ -586,7 +587,8 @@ impl<'a> CompactionSstReaderBuilder<'a> {
let mut scan_input = ScanInput::new(self.sst_layer, ProjectionMapper::all(&self.metadata)?)
.with_files(self.inputs.to_vec())
.with_append_mode(self.append_mode)
.with_cache(None)
// We use special cache strategy for compaction.
.with_cache(CacheStrategy::Compaction(self.cache))
.with_filter_deleted(self.filter_deleted)
// We ignore file not found error during compaction.
.with_ignore_file_not_found(true)
Expand Down
1 change: 1 addition & 0 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ impl Compactor for DefaultCompactor {
let reader = CompactionSstReaderBuilder {
metadata: region_metadata.clone(),
sst_layer: sst_layer.clone(),
cache: cache_manager.clone(),
inputs: &output.inputs,
append_mode,
filter_deleted: output.filter_deleted,
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
use store_api::storage::{RegionId, ScanRequest};
use tokio::sync::{oneshot, Semaphore};

use crate::cache::CacheStrategy;
use crate::config::MitoConfig;
use crate::error::{
InvalidRequestSnafu, JoinSnafu, RecvSnafu, RegionNotFoundSnafu, Result, SerdeJsonSnafu,
Expand Down Expand Up @@ -428,7 +429,7 @@ impl EngineInner {
version,
region.access_layer.clone(),
request,
Some(cache_manager),
CacheStrategy::EnableAll(cache_manager),
)
.with_parallel_scan_channel_size(self.config.parallel_scan_channel_size)
.with_ignore_inverted_index(self.config.inverted_index.apply_on_query.disabled())
Expand Down
Loading
Loading