From f9ce2708d3d0d924c2f8bad37568424e82d83b0f Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Fri, 16 Feb 2024 16:50:41 +0800 Subject: [PATCH] feat(mito): add options to ignore building index for specific column ids (#3295) Signed-off-by: Zhenchi --- src/mito2/src/access_layer.rs | 4 ++ src/mito2/src/cache/write_cache.rs | 4 ++ src/mito2/src/compaction/twcs.rs | 6 ++ src/mito2/src/flush.rs | 6 ++ src/mito2/src/read/scan_region.rs | 8 +++ src/mito2/src/region/options.rs | 64 ++++++++++++++++++- src/mito2/src/sst/index.rs | 16 ++++- src/mito2/src/sst/index/applier/builder.rs | 32 ++++++++-- .../src/sst/index/applier/builder/between.rs | 47 +++++++++++--- .../sst/index/applier/builder/comparison.rs | 38 ++++++++--- .../src/sst/index/applier/builder/eq_list.rs | 63 ++++++++++++++---- .../src/sst/index/applier/builder/in_list.rs | 45 ++++++++++--- .../sst/index/applier/builder/regex_match.rs | 38 ++++++++--- src/mito2/src/sst/index/codec.rs | 2 +- src/mito2/src/sst/index/creator.rs | 19 +++++- src/mito2/src/worker/handle_flush.rs | 1 + 16 files changed, 332 insertions(+), 61 deletions(-) diff --git a/src/mito2/src/access_layer.rs b/src/mito2/src/access_layer.rs index eb049c6f3a5b..6c69e8459d74 100644 --- a/src/mito2/src/access_layer.rs +++ b/src/mito2/src/access_layer.rs @@ -24,6 +24,7 @@ use crate::cache::write_cache::SstUploadRequest; use crate::cache::CacheManagerRef; use crate::error::{CleanDirSnafu, DeleteIndexSnafu, DeleteSstSnafu, OpenDalSnafu, Result}; use crate::read::Source; +use crate::region::options::IndexOptions; use crate::sst::file::{FileHandle, FileId, FileMeta}; use crate::sst::index::intermediate::IntermediateManager; use crate::sst::index::IndexerBuilder; @@ -143,6 +144,7 @@ impl AccessLayer { row_group_size: write_opts.row_group_size, object_store: self.object_store.clone(), intermediate_manager: self.intermediate_manager.clone(), + index_options: request.index_options, } .build(); let mut writer = ParquetWriter::new( @@ -187,6 +189,8 @@ pub(crate) struct SstWriteRequest { pub(crate) mem_threshold_index_create: Option, /// The size of write buffer for index. pub(crate) index_write_buffer_size: Option, + /// The options of the index for the region. + pub(crate) index_options: IndexOptions, } /// Creates a fs object store with atomic write dir. diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 3e66e4bf90e9..edf89a47c689 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -118,6 +118,7 @@ impl WriteCache { row_group_size: write_opts.row_group_size, object_store: self.file_cache.local_store(), intermediate_manager: self.intermediate_manager.clone(), + index_options: write_request.index_options, } .build(); @@ -235,6 +236,7 @@ mod tests { use super::*; use crate::cache::test_util::new_fs_store; use crate::cache::CacheManager; + use crate::region::options::IndexOptions; use crate::sst::file::FileId; use crate::sst::location::{index_file_path, sst_file_path}; use crate::sst::parquet::reader::ParquetReaderBuilder; @@ -279,6 +281,7 @@ mod tests { mem_threshold_index_create: None, index_write_buffer_size: None, cache_manager: Default::default(), + index_options: IndexOptions::default(), }; let upload_request = SstUploadRequest { @@ -363,6 +366,7 @@ mod tests { mem_threshold_index_create: None, index_write_buffer_size: None, cache_manager: cache_manager.clone(), + index_options: IndexOptions::default(), }; let write_opts = WriteOptions { row_group_size: 512, diff --git a/src/mito2/src/compaction/twcs.rs b/src/mito2/src/compaction/twcs.rs index 0a7b727bb4e8..fbb07b71e1c7 100644 --- a/src/mito2/src/compaction/twcs.rs +++ b/src/mito2/src/compaction/twcs.rs @@ -37,6 +37,7 @@ use crate::metrics::{COMPACTION_FAILURE_COUNT, COMPACTION_STAGE_ELAPSED}; use crate::read::projection::ProjectionMapper; use crate::read::seq_scan::SeqScan; use crate::read::{BoxedBatchReader, Source}; +use crate::region::options::IndexOptions; use crate::request::{ BackgroundNotify, CompactionFailed, CompactionFinished, OutputTx, WorkerRequest, }; @@ -186,6 +187,7 @@ impl Picker for TwcsPicker { start_time, cache_manager, storage: current_version.options.storage.clone(), + index_options: current_version.options.index_options.clone(), }; Some(Box::new(task)) } @@ -251,6 +253,8 @@ pub(crate) struct TwcsCompactionTask { pub(crate) cache_manager: CacheManagerRef, /// Target storage of the region. pub(crate) storage: Option, + /// Index options of the region. + pub(crate) index_options: IndexOptions, } impl Debug for TwcsCompactionTask { @@ -327,6 +331,7 @@ impl TwcsCompactionTask { let file_id = output.output_file_id; let cache_manager = self.cache_manager.clone(); let storage = self.storage.clone(); + let index_options = self.index_options.clone(); futs.push(async move { let reader = build_sst_reader(metadata.clone(), sst_layer.clone(), &output.inputs).await?; @@ -341,6 +346,7 @@ impl TwcsCompactionTask { create_inverted_index, mem_threshold_index_create, index_write_buffer_size, + index_options, }, &write_opts, ) diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs index 150b2fb1b689..3848ec0726fe 100644 --- a/src/mito2/src/flush.rs +++ b/src/mito2/src/flush.rs @@ -34,6 +34,7 @@ use crate::error::{ use crate::memtable::MemtableBuilderRef; use crate::metrics::{FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_ERRORS_TOTAL, FLUSH_REQUESTS_TOTAL}; use crate::read::Source; +use crate::region::options::IndexOptions; use crate::region::version::{VersionControlData, VersionControlRef, VersionRef}; use crate::request::{ BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderDdlRequest, @@ -203,6 +204,9 @@ pub(crate) struct RegionFlushTask { pub(crate) engine_config: Arc, pub(crate) row_group_size: Option, pub(crate) cache_manager: CacheManagerRef, + + /// Index options for the region. + pub(crate) index_options: IndexOptions, } impl RegionFlushTask { @@ -338,6 +342,7 @@ impl RegionFlushTask { create_inverted_index, mem_threshold_index_create, index_write_buffer_size, + index_options: self.index_options.clone(), }; let Some(sst_info) = self .access_layer @@ -766,6 +771,7 @@ mod tests { engine_config: Arc::new(MitoConfig::default()), row_group_size: None, cache_manager: Arc::new(CacheManager::default()), + index_options: IndexOptions::default(), }; task.push_sender(OptionOutputTx::from(output_tx)); scheduler diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index d5f7dbe10023..564344b1ab4f 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -259,6 +259,14 @@ impl ScanRegion { self.access_layer.object_store().clone(), file_cache, self.version.metadata.as_ref(), + self.version + .options + .index_options + .inverted_index + .ignore_column_ids + .iter() + .copied() + .collect(), ) .build(&self.request.filters) .inspect_err(|err| warn!(err; "Failed to build index applier")) diff --git a/src/mito2/src/region/options.rs b/src/mito2/src/region/options.rs index e8a7f6631944..5811d8b37988 100644 --- a/src/mito2/src/region/options.rs +++ b/src/mito2/src/region/options.rs @@ -18,10 +18,12 @@ use std::collections::HashMap; use std::time::Duration; use common_wal::options::{WalOptions, WAL_OPTIONS_KEY}; -use serde::Deserialize; +use serde::de::Error as _; +use serde::{Deserialize, Deserializer}; use serde_json::Value; use serde_with::{serde_as, with_prefix, DisplayFromStr}; use snafu::ResultExt; +use store_api::storage::ColumnId; use crate::error::{Error, JsonOptionsSnafu, Result}; @@ -40,6 +42,8 @@ pub struct RegionOptions { pub storage: Option, /// Wal options. pub wal_options: WalOptions, + /// Index options. + pub index_options: IndexOptions, } impl TryFrom<&HashMap> for RegionOptions { @@ -64,11 +68,14 @@ impl TryFrom<&HashMap> for RegionOptions { }, )?; + let index_options: IndexOptions = serde_json::from_str(&json).context(JsonOptionsSnafu)?; + Ok(RegionOptions { ttl: options.ttl, compaction, storage: options.storage, wal_options, + index_options, }) } } @@ -152,6 +159,40 @@ impl Default for RegionOptionsWithoutEnum { } } +with_prefix!(prefix_inverted_index "index.inverted_index."); + +/// Options for index. +#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize)] +#[serde(default)] +pub struct IndexOptions { + /// Options for the inverted index. + #[serde(flatten, with = "prefix_inverted_index")] + pub inverted_index: InvertedIndexOptions, +} + +/// Options for the inverted index. +#[derive(Debug, Clone, PartialEq, Eq, Default, Deserialize)] +#[serde(default)] +pub struct InvertedIndexOptions { + /// The column ids that should be ignored when building the inverted index. + /// The column ids are separated by commas. For example, "1,2,3". + #[serde(deserialize_with = "deserialize_ignore_column_ids")] + pub ignore_column_ids: Vec, +} + +fn deserialize_ignore_column_ids<'de, D>(deserializer: D) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + let s: String = Deserialize::deserialize(deserializer)?; + let mut column_ids = Vec::new(); + for item in s.split(',') { + let column_id = item.parse().map_err(D::Error::custom)?; + column_ids.push(column_id); + } + Ok(column_ids) +} + /// Converts the `options` map to a json object. /// /// Converts all key-values to lowercase and replaces "null" strings by `null` json values. @@ -257,6 +298,21 @@ mod tests { expect == got } + #[test] + fn test_with_index() { + let map = make_map(&[("index.inverted_index.ignore_column_ids", "1,2,3")]); + let options = RegionOptions::try_from(&map).unwrap(); + let expect = RegionOptions { + index_options: IndexOptions { + inverted_index: InvertedIndexOptions { + ignore_column_ids: vec![1, 2, 3], + }, + }, + ..Default::default() + }; + assert_eq!(expect, options); + } + // No need to add compatible tests for RegionOptions since the above tests already check for compatibility. #[test] fn test_with_any_wal_options() { @@ -281,6 +337,7 @@ mod tests { ("compaction.twcs.time_window", "2h"), ("compaction.type", "twcs"), ("storage", "S3"), + ("index.inverted_index.ignore_column_ids", "1,2,3"), ( WAL_OPTIONS_KEY, &serde_json::to_string(&wal_options).unwrap(), @@ -296,6 +353,11 @@ mod tests { }), storage: Some("s3".to_string()), wal_options, + index_options: IndexOptions { + inverted_index: InvertedIndexOptions { + ignore_column_ids: vec![1, 2, 3], + }, + }, }; assert_eq!(expect, options); } diff --git a/src/mito2/src/sst/index.rs b/src/mito2/src/sst/index.rs index 00f432efac6f..34bca9f39dc7 100644 --- a/src/mito2/src/sst/index.rs +++ b/src/mito2/src/sst/index.rs @@ -27,6 +27,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::storage::RegionId; use crate::read::Batch; +use crate::region::options::IndexOptions; use crate::sst::file::FileId; use crate::sst::index::intermediate::IntermediateManager; @@ -132,6 +133,7 @@ pub(crate) struct IndexerBuilder<'a> { pub(crate) segment_row_count: usize, pub(crate) object_store: ObjectStore, pub(crate) intermediate_manager: IntermediateManager, + pub(crate) index_options: IndexOptions, } impl<'a> IndexerBuilder<'a> { @@ -184,7 +186,15 @@ impl<'a> IndexerBuilder<'a> { self.mem_threshold_index_create, segment_row_count, ) - .with_buffer_size(self.write_buffer_size); + .with_buffer_size(self.write_buffer_size) + .with_ignore_column_ids( + self.index_options + .inverted_index + .ignore_column_ids + .iter() + .map(|i| i.to_string()) + .collect(), + ); Indexer { file_id: self.file_id, @@ -281,6 +291,7 @@ mod tests { row_group_size: 1024, object_store: mock_object_store(), intermediate_manager: mock_intm_mgr(), + index_options: IndexOptions::default(), } .build(); @@ -301,6 +312,7 @@ mod tests { row_group_size: 1024, object_store: mock_object_store(), intermediate_manager: mock_intm_mgr(), + index_options: IndexOptions::default(), } .build(); @@ -321,6 +333,7 @@ mod tests { row_group_size: 1024, object_store: mock_object_store(), intermediate_manager: mock_intm_mgr(), + index_options: IndexOptions::default(), } .build(); @@ -341,6 +354,7 @@ mod tests { row_group_size: 0, object_store: mock_object_store(), intermediate_manager: mock_intm_mgr(), + index_options: IndexOptions::default(), } .build(); diff --git a/src/mito2/src/sst/index/applier/builder.rs b/src/mito2/src/sst/index/applier/builder.rs index 166c490eb36c..070842544f04 100644 --- a/src/mito2/src/sst/index/applier/builder.rs +++ b/src/mito2/src/sst/index/applier/builder.rs @@ -18,7 +18,7 @@ mod eq_list; mod in_list; mod regex_match; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use api::v1::SemanticType; use common_query::logical_plan::Expr; @@ -54,6 +54,9 @@ pub(crate) struct SstIndexApplierBuilder<'a> { /// Metadata of the region, used to get metadata like column type. metadata: &'a RegionMetadata, + /// Column ids to ignore. + ignore_column_ids: HashSet, + /// Stores predicates during traversal on the Expr tree. output: HashMap>, } @@ -65,12 +68,14 @@ impl<'a> SstIndexApplierBuilder<'a> { object_store: ObjectStore, file_cache: Option, metadata: &'a RegionMetadata, + ignore_column_ids: HashSet, ) -> Self { Self { region_dir, object_store, file_cache, metadata, + ignore_column_ids, output: HashMap::default(), } } @@ -138,7 +143,7 @@ impl<'a> SstIndexApplierBuilder<'a> { } /// Helper function to get the column id and the column type of a tag column. - /// Returns `None` if the column is not a tag column. + /// Returns `None` if the column is not a tag column or if the column is ignored. fn tag_column_id_and_type( &self, column_name: &str, @@ -150,8 +155,18 @@ impl<'a> SstIndexApplierBuilder<'a> { column: column_name, })?; - Ok((column.semantic_type == SemanticType::Tag) - .then(|| (column.column_id, column.column_schema.data_type.clone()))) + if self.ignore_column_ids.contains(&column.column_id) { + return Ok(None); + } + + if column.semantic_type != SemanticType::Tag { + return Ok(None); + } + + Ok(Some(( + column.column_id, + column.column_schema.data_type.clone(), + ))) } /// Helper function to get a non-null literal. @@ -293,8 +308,13 @@ mod tests { #[test] fn test_collect_and_basic() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(DfExpr::BinaryExpr(BinaryExpr { diff --git a/src/mito2/src/sst/index/applier/builder/between.rs b/src/mito2/src/sst/index/applier/builder/between.rs index 6edeaf689b9f..9f761328f350 100644 --- a/src/mito2/src/sst/index/applier/builder/between.rs +++ b/src/mito2/src/sst/index/applier/builder/between.rs @@ -58,6 +58,8 @@ impl<'a> SstIndexApplierBuilder<'a> { #[cfg(test)] mod tests { + use std::collections::HashSet; + use super::*; use crate::error::Error; use crate::sst::index::applier::builder::tests::{ @@ -68,8 +70,13 @@ mod tests { #[test] fn test_collect_between_basic() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let between = Between { negated: false, @@ -102,8 +109,13 @@ mod tests { #[test] fn test_collect_between_negated() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let between = Between { negated: true, @@ -119,8 +131,13 @@ mod tests { #[test] fn test_collect_between_field_column() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let between = Between { negated: false, @@ -136,8 +153,13 @@ mod tests { #[test] fn test_collect_between_type_mismatch() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let between = Between { negated: false, @@ -154,8 +176,13 @@ mod tests { #[test] fn test_collect_between_nonexistent_column() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let between = Between { negated: false, diff --git a/src/mito2/src/sst/index/applier/builder/comparison.rs b/src/mito2/src/sst/index/applier/builder/comparison.rs index 76973620aec6..4914a7578cb5 100644 --- a/src/mito2/src/sst/index/applier/builder/comparison.rs +++ b/src/mito2/src/sst/index/applier/builder/comparison.rs @@ -130,6 +130,8 @@ impl<'a> SstIndexApplierBuilder<'a> { #[cfg(test)] mod tests { + use std::collections::HashSet; + use super::*; use crate::error::Error; use crate::sst::index::applier::builder::tests::{ @@ -223,8 +225,13 @@ mod tests { ]; let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); for ((left, op, right), _) in &cases { builder.collect_comparison_expr(left, op, right).unwrap(); @@ -243,8 +250,13 @@ mod tests { #[test] fn test_collect_comparison_type_mismatch() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let res = builder.collect_comparison_expr(&tag_column(), &Operator::Lt, &int64_lit(10)); assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); @@ -254,8 +266,13 @@ mod tests { #[test] fn test_collect_comparison_field_column() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); builder .collect_comparison_expr(&field_column(), &Operator::Lt, &string_lit("abc")) @@ -266,8 +283,13 @@ mod tests { #[test] fn test_collect_comparison_nonexistent_column() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let res = builder.collect_comparison_expr( &nonexistent_column(), diff --git a/src/mito2/src/sst/index/applier/builder/eq_list.rs b/src/mito2/src/sst/index/applier/builder/eq_list.rs index d67c048ad105..23a4d7516da3 100644 --- a/src/mito2/src/sst/index/applier/builder/eq_list.rs +++ b/src/mito2/src/sst/index/applier/builder/eq_list.rs @@ -132,8 +132,13 @@ mod tests { #[test] fn test_collect_eq_basic() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); builder .collect_eq(&tag_column(), &string_lit("foo")) @@ -161,8 +166,13 @@ mod tests { #[test] fn test_collect_eq_field_column() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); builder .collect_eq(&field_column(), &string_lit("abc")) @@ -173,8 +183,13 @@ mod tests { #[test] fn test_collect_eq_nonexistent_column() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc")); assert!(matches!(res, Err(Error::ColumnNotFound { .. }))); @@ -184,8 +199,13 @@ mod tests { #[test] fn test_collect_eq_type_mismatch() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let res = builder.collect_eq(&tag_column(), &int64_lit(1)); assert!(matches!(res, Err(Error::FieldTypeMismatch { .. }))); @@ -195,8 +215,13 @@ mod tests { #[test] fn test_collect_or_eq_list_basic() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(tag_column()), @@ -245,8 +270,13 @@ mod tests { #[test] fn test_collect_or_eq_list_invalid_op() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(tag_column()), @@ -274,8 +304,13 @@ mod tests { #[test] fn test_collect_or_eq_list_multiple_columns() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let eq_expr = DfExpr::BinaryExpr(BinaryExpr { left: Box::new(tag_column()), diff --git a/src/mito2/src/sst/index/applier/builder/in_list.rs b/src/mito2/src/sst/index/applier/builder/in_list.rs index 294d3ab0b317..ead08943fa39 100644 --- a/src/mito2/src/sst/index/applier/builder/in_list.rs +++ b/src/mito2/src/sst/index/applier/builder/in_list.rs @@ -63,8 +63,13 @@ mod tests { #[test] fn test_collect_in_list_basic() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let in_list = InList { expr: Box::new(tag_column()), @@ -87,8 +92,13 @@ mod tests { #[test] fn test_collect_in_list_negated() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let in_list = InList { expr: Box::new(tag_column()), @@ -103,8 +113,13 @@ mod tests { #[test] fn test_collect_in_list_field_column() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let in_list = InList { expr: Box::new(field_column()), @@ -119,8 +134,13 @@ mod tests { #[test] fn test_collect_in_list_type_mismatch() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let in_list = InList { expr: Box::new(tag_column()), @@ -136,8 +156,13 @@ mod tests { #[test] fn test_collect_in_list_nonexistent_column() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let in_list = InList { expr: Box::new(nonexistent_column()), diff --git a/src/mito2/src/sst/index/applier/builder/regex_match.rs b/src/mito2/src/sst/index/applier/builder/regex_match.rs index 806da6d516f6..b318fd6308e8 100644 --- a/src/mito2/src/sst/index/applier/builder/regex_match.rs +++ b/src/mito2/src/sst/index/applier/builder/regex_match.rs @@ -45,6 +45,8 @@ impl<'a> SstIndexApplierBuilder<'a> { #[cfg(test)] mod tests { + use std::collections::HashSet; + use super::*; use crate::error::Error; use crate::sst::index::applier::builder::tests::{ @@ -55,8 +57,13 @@ mod tests { #[test] fn test_regex_match_basic() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); builder .collect_regex_match(&tag_column(), &string_lit("abc")) @@ -75,8 +82,13 @@ mod tests { #[test] fn test_regex_match_field_column() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); builder .collect_regex_match(&field_column(), &string_lit("abc")) @@ -88,8 +100,13 @@ mod tests { #[test] fn test_regex_match_type_mismatch() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); builder .collect_regex_match(&tag_column(), &int64_lit(123)) @@ -101,8 +118,13 @@ mod tests { #[test] fn test_regex_match_type_nonexist_column() { let metadata = test_region_metadata(); - let mut builder = - SstIndexApplierBuilder::new("test".to_string(), test_object_store(), None, &metadata); + let mut builder = SstIndexApplierBuilder::new( + "test".to_string(), + test_object_store(), + None, + &metadata, + HashSet::default(), + ); let res = builder.collect_regex_match(&nonexistent_column(), &string_lit("abc")); assert!(matches!(res, Err(Error::ColumnNotFound { .. }))); diff --git a/src/mito2/src/sst/index/codec.rs b/src/mito2/src/sst/index/codec.rs index 5962c185f44a..855cad82ac23 100644 --- a/src/mito2/src/sst/index/codec.rs +++ b/src/mito2/src/sst/index/codec.rs @@ -37,7 +37,7 @@ impl IndexValueCodec { } } -type ColumnId = String; +pub(crate) type ColumnId = String; /// Decodes primary key values into their corresponding column ids, data types and values. pub struct IndexValuesCodec { diff --git a/src/mito2/src/sst/index/creator.rs b/src/mito2/src/sst/index/creator.rs index cb4f3433f080..6b85df954139 100644 --- a/src/mito2/src/sst/index/creator.rs +++ b/src/mito2/src/sst/index/creator.rs @@ -15,7 +15,7 @@ mod statistics; mod temp_provider; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::num::NonZeroUsize; use std::sync::Arc; @@ -40,7 +40,7 @@ use crate::metrics::{ }; use crate::read::Batch; use crate::sst::file::FileId; -use crate::sst::index::codec::{IndexValueCodec, IndexValuesCodec}; +use crate::sst::index::codec::{ColumnId, IndexValueCodec, IndexValuesCodec}; use crate::sst::index::creator::statistics::Statistics; use crate::sst::index::creator::temp_provider::TempFileProvider; use crate::sst::index::intermediate::{IntermediateLocation, IntermediateManager}; @@ -72,6 +72,9 @@ pub struct SstIndexCreator { stats: Statistics, /// Whether the index creation is aborted. aborted: bool, + + /// Ignore column IDs for index creation. + ignore_column_ids: HashSet, } impl SstIndexCreator { @@ -110,6 +113,8 @@ impl SstIndexCreator { stats: Statistics::default(), aborted: false, + + ignore_column_ids: HashSet::default(), } } @@ -119,6 +124,12 @@ impl SstIndexCreator { self } + /// Sets the ignore column IDs for index creation. + pub fn with_ignore_column_ids(mut self, ignore_column_ids: HashSet) -> Self { + self.ignore_column_ids = ignore_column_ids; + self + } + /// Updates index with a batch of rows. /// Garbage will be cleaned up if failed to update. pub async fn update(&mut self, batch: &Batch) -> Result<()> { @@ -189,6 +200,10 @@ impl SstIndexCreator { guard.inc_row_count(n); for (column_id, field, value) in self.codec.decode(batch.primary_key())? { + if self.ignore_column_ids.contains(column_id) { + continue; + } + if let Some(value) = value.as_ref() { self.value_buf.clear(); IndexValueCodec::encode_value(value.as_value_ref(), field, &mut self.value_buf)?; diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs index c93bd6ef7d16..56b14563678f 100644 --- a/src/mito2/src/worker/handle_flush.rs +++ b/src/mito2/src/worker/handle_flush.rs @@ -149,6 +149,7 @@ impl RegionWorkerLoop { engine_config, row_group_size, cache_manager: self.cache_manager.clone(), + index_options: region.version().options.index_options.clone(), } } }