diff --git a/Cargo.lock b/Cargo.lock index 1787afbf970b..6f7b68386950 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4098,7 +4098,6 @@ dependencies = [ "databend-common-sharing", "databend-common-sql", "databend-common-storage", - "databend-common-storages-share", "databend-storages-common-blocks", "databend-storages-common-cache", "databend-storages-common-cache-manager", diff --git a/scripts/ci/deploy/config/databend-query-node-share-1.toml b/scripts/ci/deploy/config/databend-query-node-share-1.toml index 4fbc1f46bae0..9c22a5338b0a 100644 --- a/scripts/ci/deploy/config/databend-query-node-share-1.toml +++ b/scripts/ci/deploy/config/databend-query-node-share-1.toml @@ -142,8 +142,8 @@ table_bloom_index_meta_count = 3000 table_bloom_index_filter_count = 1048576 ### table inverted index caches ### -# Max number of cached inverted index info objects. Set it to 0 to disable it. -inverted_index_info_count = 3000 +# Max number of cached inverted index meta objects. Set it to 0 to disable it. +inverted_index_meta_count = 3000 # Max bytes of cached inverted index filters. Set it to 0 to disable it. inverted_index_filter_size = 2147483648 diff --git a/scripts/ci/deploy/config/databend-query-node-share-2.toml b/scripts/ci/deploy/config/databend-query-node-share-2.toml index 2c3b73e2f0c1..9fb83e798b2b 100644 --- a/scripts/ci/deploy/config/databend-query-node-share-2.toml +++ b/scripts/ci/deploy/config/databend-query-node-share-2.toml @@ -143,8 +143,8 @@ table_bloom_index_meta_count = 3000 table_bloom_index_filter_count = 1048576 ### table inverted index caches ### -# Max number of cached inverted index info objects. Set it to 0 to disable it. -inverted_index_info_count = 3000 +# Max number of cached inverted index meta objects. Set it to 0 to disable it. +inverted_index_meta_count = 3000 # Max bytes of cached inverted index filters. Set it to 0 to disable it. inverted_index_filter_size = 2147483648 diff --git a/scripts/ci/deploy/config/databend-query-node-share-3.toml b/scripts/ci/deploy/config/databend-query-node-share-3.toml index 5bc013732302..2318bbc47d81 100644 --- a/scripts/ci/deploy/config/databend-query-node-share-3.toml +++ b/scripts/ci/deploy/config/databend-query-node-share-3.toml @@ -143,8 +143,8 @@ table_bloom_index_meta_count = 3000 table_bloom_index_filter_count = 1048576 ### table inverted index caches ### -# Max number of cached inverted index info objects. Set it to 0 to disable it. -inverted_index_info_count = 3000 +# Max number of cached inverted index meta objects. Set it to 0 to disable it. +inverted_index_meta_count = 3000 # Max bytes of cached inverted index filters. Set it to 0 to disable it. inverted_index_filter_size = 2147483648 diff --git a/src/common/metrics/src/metrics/storage.rs b/src/common/metrics/src/metrics/storage.rs index e943ea3235d0..7a435f2ec9f1 100644 --- a/src/common/metrics/src/metrics/storage.rs +++ b/src/common/metrics/src/metrics/storage.rs @@ -144,6 +144,22 @@ static BLOCK_INDEX_WRITE_MILLISECONDS: LazyLock = LazyLock::new(|| register_histogram_in_milliseconds("fuse_block_index_write_milliseconds")); static BLOCK_INDEX_READ_BYTES: LazyLock = LazyLock::new(|| register_counter("fuse_block_index_read_bytes")); +static BLOCK_INVERTED_INDEX_WRITE_NUMS: LazyLock = + LazyLock::new(|| register_counter("fuse_block_inverted_index_write_nums")); +static BLOCK_INVERTED_INDEX_WRITE_BYTES: LazyLock = + LazyLock::new(|| register_counter("fuse_block_inverted_index_write_bytes")); +static BLOCK_INVERTED_INDEX_WRITE_MILLISECONDS: LazyLock = LazyLock::new(|| { + register_histogram_in_milliseconds("fuse_block_inverted_index_write_milliseconds") +}); +static BLOCK_INVERTED_INDEX_GENERATE_MILLISECONDS: LazyLock = LazyLock::new(|| { + register_histogram_in_milliseconds("fuse_block_inverted_index_generate_milliseconds") +}); +static BLOCK_INVERTED_INDEX_READ_MILLISECONDS: LazyLock = LazyLock::new(|| { + register_histogram_in_milliseconds("fuse_block_inverted_index_read_milliseconds") +}); +static BLOCK_INVERTED_INDEX_SEARCH_MILLISECONDS: LazyLock = LazyLock::new(|| { + register_histogram_in_milliseconds("fuse_block_inverted_index_search_milliseconds") +}); static COMPACT_BLOCK_READ_NUMS: LazyLock = LazyLock::new(|| register_counter("fuse_compact_block_read_nums")); static COMPACT_BLOCK_READ_BYTES: LazyLock = @@ -496,6 +512,30 @@ pub fn metrics_inc_block_index_write_milliseconds(c: u64) { BLOCK_INDEX_WRITE_MILLISECONDS.observe(c as f64); } +pub fn metrics_inc_block_inverted_index_write_nums(c: u64) { + BLOCK_INVERTED_INDEX_WRITE_NUMS.inc_by(c); +} + +pub fn metrics_inc_block_inverted_index_write_bytes(c: u64) { + BLOCK_INVERTED_INDEX_WRITE_BYTES.inc_by(c); +} + +pub fn metrics_inc_block_inverted_index_write_milliseconds(c: u64) { + BLOCK_INVERTED_INDEX_WRITE_MILLISECONDS.observe(c as f64); +} + +pub fn metrics_inc_block_inverted_index_generate_milliseconds(c: u64) { + BLOCK_INVERTED_INDEX_GENERATE_MILLISECONDS.observe(c as f64); +} + +pub fn metrics_inc_block_inverted_index_read_milliseconds(c: u64) { + BLOCK_INVERTED_INDEX_READ_MILLISECONDS.observe(c as f64); +} + +pub fn metrics_inc_block_inverted_index_search_milliseconds(c: u64) { + BLOCK_INVERTED_INDEX_SEARCH_MILLISECONDS.observe(c as f64); +} + pub fn metrics_inc_block_index_read_bytes(c: u64) { BLOCK_INDEX_READ_BYTES.inc_by(c); } diff --git a/src/query/config/src/config.rs b/src/query/config/src/config.rs index f0aad8b6f0c1..ea442898b16a 100644 --- a/src/query/config/src/config.rs +++ b/src/query/config/src/config.rs @@ -2766,13 +2766,13 @@ pub struct CacheConfig { )] pub table_bloom_index_filter_size: u64, - /// Max number of cached inverted index info objects. Set it to 0 to disable it. + /// Max number of cached inverted index meta objects. Set it to 0 to disable it. #[clap( - long = "cache-inverted-index-info-count", + long = "cache-inverted-index-meta-count", value_name = "VALUE", default_value = "3000" )] - pub inverted_index_info_count: u64, + pub inverted_index_meta_count: u64, /// Max bytes of cached inverted index filters used. Set it to 0 to disable it. #[clap( @@ -2983,7 +2983,7 @@ mod cache_config_converters { table_bloom_index_meta_count: value.table_bloom_index_meta_count, table_bloom_index_filter_count: value.table_bloom_index_filter_count, table_bloom_index_filter_size: value.table_bloom_index_filter_size, - inverted_index_info_count: value.inverted_index_info_count, + inverted_index_meta_count: value.inverted_index_meta_count, inverted_index_filter_size: value.inverted_index_filter_size, inverted_index_filter_memory_ratio: value.inverted_index_filter_memory_ratio, table_prune_partitions_count: value.table_prune_partitions_count, @@ -3008,7 +3008,7 @@ mod cache_config_converters { table_bloom_index_meta_count: value.table_bloom_index_meta_count, table_bloom_index_filter_count: value.table_bloom_index_filter_count, table_bloom_index_filter_size: value.table_bloom_index_filter_size, - inverted_index_info_count: value.inverted_index_info_count, + inverted_index_meta_count: value.inverted_index_meta_count, inverted_index_filter_size: value.inverted_index_filter_size, inverted_index_filter_memory_ratio: value.inverted_index_filter_memory_ratio, table_prune_partitions_count: value.table_prune_partitions_count, diff --git a/src/query/config/src/inner.rs b/src/query/config/src/inner.rs index aab6ab713820..7a3d4c1d9757 100644 --- a/src/query/config/src/inner.rs +++ b/src/query/config/src/inner.rs @@ -541,8 +541,8 @@ pub struct CacheConfig { // One bloom index filter per column of data block being indexed will be generated if necessary. pub table_bloom_index_filter_size: u64, - /// Max number of cached inverted index info objects. Set it to 0 to disable it. - pub inverted_index_info_count: u64, + /// Max number of cached inverted index meta objects. Set it to 0 to disable it. + pub inverted_index_meta_count: u64, /// Max bytes of cached inverted index filters used. Set it to 0 to disable it. pub inverted_index_filter_size: u64, @@ -636,7 +636,7 @@ impl Default for CacheConfig { table_bloom_index_meta_count: 3000, table_bloom_index_filter_count: 0, table_bloom_index_filter_size: 2147483648, - inverted_index_info_count: 3000, + inverted_index_meta_count: 3000, inverted_index_filter_size: 2147483648, inverted_index_filter_memory_ratio: 0, table_prune_partitions_count: 256, diff --git a/src/query/ee/tests/it/inverted_index/index_refresh.rs b/src/query/ee/tests/it/inverted_index/index_refresh.rs index 8e8f40c0a911..02255c8ae4e1 100644 --- a/src/query/ee/tests/it/inverted_index/index_refresh.rs +++ b/src/query/ee/tests/it/inverted_index/index_refresh.rs @@ -21,7 +21,6 @@ use databend_common_exception::Result; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::schema::CreateTableIndexReq; use databend_common_sql::plans::RefreshTableIndexPlan; -use databend_common_storages_fuse::io::read::load_inverted_index_info; use databend_common_storages_fuse::io::read::InvertedIndexReader; use databend_common_storages_fuse::io::MetaReaders; use databend_common_storages_fuse::io::TableMetaLocationGenerator; @@ -74,7 +73,7 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> { table_id, name: index_name.clone(), column_ids: vec![0, 1], - sync_creation: true, + sync_creation: false, options, }; @@ -100,18 +99,13 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> { let new_snapshot = new_fuse_table.read_table_snapshot().await?; assert!(new_snapshot.is_some()); let new_snapshot = new_snapshot.unwrap(); - assert!(new_snapshot.inverted_indexes.is_some()); - - let index_info_loc = new_snapshot - .inverted_indexes - .as_ref() - .and_then(|i| i.get(&index_name)); - assert!(index_info_loc.is_some()); - let index_info = - load_inverted_index_info(new_fuse_table.get_operator(), index_info_loc).await?; - assert!(index_info.is_some()); - let index_info = index_info.unwrap(); - let index_version = index_info.index_version.clone(); + + let table_info = new_table.get_table_info(); + let table_indexes = &table_info.meta.indexes; + let table_index = table_indexes.get(&index_name); + assert!(table_index.is_some()); + let table_index = table_index.unwrap(); + let index_version = table_index.version.clone(); let segment_reader = MetaReaders::segment_info_reader(new_fuse_table.get_operator(), table_schema.clone()); diff --git a/src/query/ee/tests/it/inverted_index/pruning.rs b/src/query/ee/tests/it/inverted_index/pruning.rs index 9d005cd90ad9..722528da7fc1 100644 --- a/src/query/ee/tests/it/inverted_index/pruning.rs +++ b/src/query/ee/tests/it/inverted_index/pruning.rs @@ -38,7 +38,6 @@ use databend_common_meta_app::schema::CreateTableIndexReq; use databend_common_sql::plans::CreateTablePlan; use databend_common_sql::plans::RefreshTableIndexPlan; use databend_common_sql::BloomIndexColumns; -use databend_common_storages_fuse::io::read::load_inverted_index_info; use databend_common_storages_fuse::pruning::create_segment_location_vector; use databend_common_storages_fuse::pruning::FusePruner; use databend_common_storages_fuse::FuseTable; @@ -506,7 +505,7 @@ async fn test_block_pruner() -> Result<()> { table_id, name: index_name.clone(), column_ids: vec![1, 2, 3], - sync_creation: true, + sync_creation: false, options: index_options.clone(), }; @@ -537,15 +536,12 @@ async fn test_block_pruner() -> Result<()> { assert!(snapshot.is_some()); let snapshot = snapshot.unwrap(); - let index_info_loc = snapshot - .inverted_indexes - .as_ref() - .and_then(|i| i.get(&index_name)); - assert!(index_info_loc.is_some()); - let index_info = load_inverted_index_info(fuse_table.get_operator(), index_info_loc).await?; - assert!(index_info.is_some()); - let index_info = index_info.unwrap(); - let index_version = index_info.index_version.clone(); + let table_info = new_table.get_table_info(); + let table_indexes = &table_info.meta.indexes; + let table_index = table_indexes.get(&index_name); + assert!(table_index.is_some()); + let table_index = table_index.unwrap(); + let index_version = table_index.version.clone(); let index_schema = DataSchema::from(index_table_schema); let e1 = PushDownInfo { diff --git a/src/query/service/src/interpreters/hook/refresh_hook.rs b/src/query/service/src/interpreters/hook/refresh_hook.rs index 5377ed93b20e..603d5b01c76d 100644 --- a/src/query/service/src/interpreters/hook/refresh_hook.rs +++ b/src/query/service/src/interpreters/hook/refresh_hook.rs @@ -270,7 +270,7 @@ async fn generate_refresh_inverted_index_plan( let table_meta = &table.get_table_info().meta; for (_, index) in table_meta.indexes.iter() { - if !index.sync_creation { + if index.sync_creation { continue; } let plan = RefreshTableIndexPlan { diff --git a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs index 2de3e922424e..d2bfc6c5e8d1 100644 --- a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs @@ -57,7 +57,6 @@ impl Interpreter for RefreshTableIndexInterpreter { .manager .check_enterprise_enabled(self.ctx.get_license_key(), Feature::InvertedIndex)?; - let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; let table = self .ctx .get_table(&self.plan.catalog, &self.plan.database, &self.plan.table) @@ -108,12 +107,11 @@ impl Interpreter for RefreshTableIndexInterpreter { if let Some(lock_guard) = lock_guard { build_res.main_pipeline.add_lock_guard(lock_guard); } + let fuse_table = FuseTable::try_from_table(table.as_ref())?; fuse_table .do_refresh_inverted_index( self.ctx.clone(), - catalog.clone(), - table.clone(), index_name, index_version, &index.options, diff --git a/src/query/service/src/test_kits/fuse.rs b/src/query/service/src/test_kits/fuse.rs index 697a495ca9a9..4bbc4279e6eb 100644 --- a/src/query/service/src/test_kits/fuse.rs +++ b/src/query/service/src/test_kits/fuse.rs @@ -212,7 +212,6 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> { locations, None, None, - None, ); snapshot_1.timestamp = Some(now - Duration::hours(12)); snapshot_1.summary = merge_statistics(&snapshot_0.summary, &segments_v3[0].1.summary, None); diff --git a/src/query/service/tests/it/storages/fuse/meta/snapshot.rs b/src/query/service/tests/it/storages/fuse/meta/snapshot.rs index b7016946c74f..907c7c5690da 100644 --- a/src/query/service/tests/it/storages/fuse/meta/snapshot.rs +++ b/src/query/service/tests/it/storages/fuse/meta/snapshot.rs @@ -26,18 +26,7 @@ fn default_snapshot() -> TableSnapshot { let uuid = Uuid::new_v4(); let schema = TableSchema::empty(); let stats = Default::default(); - TableSnapshot::new( - uuid, - None, - &None, - None, - schema, - stats, - vec![], - None, - None, - None, - ) + TableSnapshot::new(uuid, None, &None, None, schema, stats, vec![], None, None) } #[test] @@ -61,7 +50,6 @@ fn snapshot_timestamp_monotonic_increase() { vec![], None, None, - None, ); let current_ts = current.timestamp.unwrap(); let prev_ts = prev.timestamp.unwrap(); @@ -87,7 +75,6 @@ fn snapshot_timestamp_time_skew_tolerance() { vec![], None, None, - None, ); let current_ts = current.timestamp.unwrap(); let prev_ts = prev.timestamp.unwrap(); diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index c9b43c8e3a67..2716d2112f66 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -278,7 +278,6 @@ async fn test_commit_to_meta_server() -> Result<()> { new_segments, None, None, - None, ); let faked_catalog = FakedCatalog { diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index 4874180a58b8..ca06c33db0de 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -217,7 +217,6 @@ async fn test_safety() -> Result<()> { locations.clone(), None, None, - None, ); let limit: usize = rand.gen_range(1..15); diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index 7e0d9a20ab92..6ea7f85924cb 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -226,7 +226,6 @@ async fn test_safety_for_recluster() -> Result<()> { locations.clone(), None, None, - None, )); let mut block_ids = HashSet::new(); diff --git a/src/query/service/tests/it/storages/testdata/caches_table.txt b/src/query/service/tests/it/storages/testdata/caches_table.txt index b0dd70e46b87..5b3404c083b0 100644 --- a/src/query/service/tests/it/storages/testdata/caches_table.txt +++ b/src/query/service/tests/it/storages/testdata/caches_table.txt @@ -8,7 +8,6 @@ DB.Table: 'system'.'caches', Table: caches-table_id:1, ver:0, Engine: SystemCach | 'test-node' | 'bloom_index_meta_cache' | 0 | 0 | | 'test-node' | 'file_meta_data_cache' | 0 | 0 | | 'test-node' | 'inverted_index_file_cache' | 0 | 0 | -| 'test-node' | 'inverted_index_info_cache' | 0 | 0 | | 'test-node' | 'inverted_index_meta_cache' | 0 | 0 | | 'test-node' | 'prune_partitions_cache' | 0 | 0 | | 'test-node' | 'segment_info_cache' | 0 | 0 | diff --git a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt index 2eeaf31ef6c7..dae816e8134f 100644 --- a/src/query/service/tests/it/storages/testdata/configs_table_basic.txt +++ b/src/query/service/tests/it/storages/testdata/configs_table_basic.txt @@ -11,7 +11,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo | 'cache' | 'enable_table_meta_cache' | 'true' | '' | | 'cache' | 'inverted_index_filter_memory_ratio' | '0' | '' | | 'cache' | 'inverted_index_filter_size' | '2147483648' | '' | -| 'cache' | 'inverted_index_info_count' | '3000' | '' | +| 'cache' | 'inverted_index_meta_count' | '3000' | '' | | 'cache' | 'table_bloom_index_filter_count' | '0' | '' | | 'cache' | 'table_bloom_index_filter_size' | '2147483648' | '' | | 'cache' | 'table_bloom_index_meta_count' | '3000' | '' | diff --git a/src/query/sql/src/planner/binder/ddl/index.rs b/src/query/sql/src/planner/binder/ddl/index.rs index 8b143549fedd..6929cc764bc3 100644 --- a/src/query/sql/src/planner/binder/ddl/index.rs +++ b/src/query/sql/src/planner/binder/ddl/index.rs @@ -388,7 +388,7 @@ impl Binder { && field.data_type.remove_nullable() != TableDataType::Variant { return Err(ErrorCode::UnsupportedIndex(format!( - "Inverted index currently only support String and variant type, but the type of column {} is {}", + "Inverted index currently only support String and Variant type, but the type of column {} is {}", column, field.data_type ))); } diff --git a/src/query/storages/common/cache_manager/src/cache_manager.rs b/src/query/storages/common/cache_manager/src/cache_manager.rs index c264f08f668b..4d1b651d4e90 100644 --- a/src/query/storages/common/cache_manager/src/cache_manager.rs +++ b/src/query/storages/common/cache_manager/src/cache_manager.rs @@ -35,7 +35,6 @@ use crate::caches::ColumnArrayCache; use crate::caches::CompactSegmentInfoCache; use crate::caches::FileMetaDataCache; use crate::caches::InvertedIndexFileCache; -use crate::caches::InvertedIndexInfoCache; use crate::caches::InvertedIndexMetaCache; use crate::caches::TableSnapshotCache; use crate::caches::TableSnapshotStatisticCache; @@ -54,7 +53,6 @@ pub struct CacheManager { segment_info_cache: Option, bloom_index_filter_cache: Option, bloom_index_meta_cache: Option, - inverted_index_info_cache: Option, inverted_index_meta_cache: Option, inverted_index_file_cache: Option, prune_partitions_cache: Option, @@ -123,7 +121,6 @@ impl CacheManager { segment_info_cache: None, bloom_index_filter_cache: None, bloom_index_meta_cache: None, - inverted_index_info_cache: None, inverted_index_meta_cache: None, inverted_index_file_cache: None, prune_partitions_cache: None, @@ -151,12 +148,8 @@ impl CacheManager { config.table_bloom_index_meta_count, "bloom_index_file_meta_data", ); - let inverted_index_info_cache = Self::new_item_cache( - config.inverted_index_info_count, - "inverted_index_file_info_data", - ); let inverted_index_meta_cache = Self::new_item_cache( - config.inverted_index_info_count, + config.inverted_index_meta_count, "inverted_index_file_meta_data", ); @@ -181,7 +174,6 @@ impl CacheManager { segment_info_cache, bloom_index_filter_cache, bloom_index_meta_cache, - inverted_index_info_cache, inverted_index_meta_cache, inverted_index_file_cache, prune_partitions_cache, @@ -219,10 +211,6 @@ impl CacheManager { self.bloom_index_meta_cache.clone() } - pub fn get_inverted_index_info_cache(&self) -> Option { - self.inverted_index_info_cache.clone() - } - pub fn get_inverted_index_meta_cache(&self) -> Option { self.inverted_index_meta_cache.clone() } diff --git a/src/query/storages/common/cache_manager/src/caches.rs b/src/query/storages/common/cache_manager/src/caches.rs index 0836deb8c413..58f5ddd48d06 100644 --- a/src/query/storages/common/cache_manager/src/caches.rs +++ b/src/query/storages/common/cache_manager/src/caches.rs @@ -31,7 +31,6 @@ use databend_storages_common_index::BloomIndexMeta; use databend_storages_common_index::InvertedIndexFile; use databend_storages_common_index::InvertedIndexMeta; use databend_storages_common_table_meta::meta::CompactSegmentInfo; -use databend_storages_common_table_meta::meta::IndexInfo; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::meta::TableSnapshotStatistics; @@ -54,7 +53,6 @@ pub type BloomIndexFilterCache = /// In memory object cache of parquet FileMetaData of bloom index data pub type BloomIndexMetaCache = NamedCache>; -pub type InvertedIndexInfoCache = NamedCache>; pub type InvertedIndexMetaCache = NamedCache>; pub type InvertedIndexFileCache = NamedCache< InMemoryItemCacheHolder, @@ -155,13 +153,6 @@ impl CachedObject } } -impl CachedObject for IndexInfo { - type Cache = InvertedIndexInfoCache; - fn cache() -> Option { - CacheManager::instance().get_inverted_index_info_cache() - } -} - impl CachedObject for InvertedIndexMeta { type Cache = InvertedIndexMetaCache; fn cache() -> Option { diff --git a/src/query/storages/common/index/src/bloom_index.rs b/src/query/storages/common/index/src/bloom_index.rs index de04a9373c57..973089bd8c6a 100644 --- a/src/query/storages/common/index/src/bloom_index.rs +++ b/src/query/storages/common/index/src/bloom_index.rs @@ -192,7 +192,7 @@ impl BloomIndex { let mut column_distinct_count = HashMap::::new(); for (index, field) in bloom_columns_map.into_iter() { let field_type = &data_blocks_tobe_indexed[0].get_by_offset(index).data_type; - let (column, data_type) = match field_type { + let (column, data_type) = match field_type.remove_nullable() { DataType::Map(box inner_ty) => { // Add bloom filter for the value of map type let val_type = match inner_ty { @@ -205,8 +205,16 @@ impl BloomIndex { let source_columns_iter = data_blocks_tobe_indexed.iter().map(|block| { let value = &block.get_by_offset(index).value; let column = value.convert_to_full_column(field_type, block.num_rows()); - let map_column = - MapType::::try_downcast_column(&column).unwrap(); + let map_column = if field_type.is_nullable() { + let nullable_column = + NullableType::>::try_downcast_column( + &column, + ) + .unwrap(); + nullable_column.column + } else { + MapType::::try_downcast_column(&column).unwrap() + }; map_column.values.values }); let column = Column::concat_columns(source_columns_iter)?; @@ -478,10 +486,11 @@ impl BloomIndex { } pub fn supported_data_type(data_type: &DataType) -> bool { - let mut data_type = data_type; - if let DataType::Map(box inner_ty) = data_type { - data_type = match inner_ty { - DataType::Tuple(kv_tys) => &kv_tys[1], + if let DataType::Map(box inner_ty) = data_type.remove_nullable() { + match inner_ty { + DataType::Tuple(kv_tys) => { + return Xor8Filter::supported_type(&kv_tys[1]); + } _ => unreachable!(), }; } diff --git a/src/query/storages/common/table_meta/src/meta/current/mod.rs b/src/query/storages/common/table_meta/src/meta/current/mod.rs index 14927abff4a1..7cf2cf890a14 100644 --- a/src/query/storages/common/table_meta/src/meta/current/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/current/mod.rs @@ -21,7 +21,6 @@ pub use v2::MetaHLL; pub use v2::Statistics; pub use v2::TableSnapshotStatistics; pub use v4::CompactSegmentInfo; -pub use v4::IndexInfo; pub use v4::SegmentInfo; pub use v4::TableSnapshot; pub use v4::TableSnapshotLite; diff --git a/src/query/storages/common/table_meta/src/meta/mod.rs b/src/query/storages/common/table_meta/src/meta/mod.rs index 987cf870d78c..4c6ef8668945 100644 --- a/src/query/storages/common/table_meta/src/meta/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/mod.rs @@ -39,7 +39,6 @@ pub use statistics::*; pub(crate) use testing::*; pub(crate) use utils::*; pub use versions::testify_version; -pub use versions::IndexInfoVersion; pub use versions::SegmentInfoVersion; pub use versions::SnapshotVersion; pub use versions::TableSnapshotStatisticsVersion; diff --git a/src/query/storages/common/table_meta/src/meta/v4/index.rs b/src/query/storages/common/table_meta/src/meta/v4/index.rs deleted file mode 100644 index 902c43bd80d7..000000000000 --- a/src/query/storages/common/table_meta/src/meta/v4/index.rs +++ /dev/null @@ -1,85 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeSet; -use std::io::Cursor; - -use databend_common_exception::Result; -use databend_common_io::prelude::BinaryRead; -use serde::Deserialize; -use serde::Serialize; - -use crate::meta::format::compress; -use crate::meta::format::encode; -use crate::meta::format::read_and_deserialize; -use crate::meta::format::MetaCompression; -use crate::meta::FormatVersion; -use crate::meta::Location; -use crate::meta::MetaEncoding; -use crate::meta::Versioned; - -/// The IndexInfo structure stores the relationship between index and segments. -#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] -pub struct IndexInfo { - pub format_version: FormatVersion, - // Index version updated every time schema changed. - pub index_version: String, - // Indexed segments locations, used to determine if the segment - // is already indexed and to prevent duplicate indexed. - pub indexed_segments: BTreeSet, -} - -impl IndexInfo { - pub fn new(index_version: String, indexed_segments: BTreeSet) -> Self { - Self { - format_version: IndexInfo::VERSION, - index_version, - indexed_segments, - } - } - - pub fn to_bytes(&self) -> Result> { - let encoding = MetaEncoding::MessagePack; - let compression = MetaCompression::default(); - - let data = encode(&encoding, &self)?; - let data_compress = compress(&compression, data)?; - - let data_size = self.format_version.to_le_bytes().len() - + 2 - + data_compress.len().to_le_bytes().len() - + data_compress.len(); - let mut buf = Vec::with_capacity(data_size); - - buf.extend_from_slice(&self.format_version.to_le_bytes()); - buf.push(encoding as u8); - buf.push(compression as u8); - buf.extend_from_slice(&data_compress.len().to_le_bytes()); - - buf.extend(data_compress); - - Ok(buf) - } - - pub fn from_slice(buffer: &[u8]) -> Result { - let mut cursor = Cursor::new(buffer); - let version = cursor.read_scalar::()?; - assert_eq!(version, IndexInfo::VERSION); - let encoding = MetaEncoding::try_from(cursor.read_scalar::()?)?; - let compression = MetaCompression::try_from(cursor.read_scalar::()?)?; - let index_info_size: u64 = cursor.read_scalar::()?; - - read_and_deserialize(&mut cursor, index_info_size, &encoding, &compression) - } -} diff --git a/src/query/storages/common/table_meta/src/meta/v4/mod.rs b/src/query/storages/common/table_meta/src/meta/v4/mod.rs index 48adfdfdd172..ca6e09db166d 100644 --- a/src/query/storages/common/table_meta/src/meta/v4/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/v4/mod.rs @@ -12,11 +12,9 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod index; mod segment; mod snapshot; -pub use index::IndexInfo; pub use segment::CompactSegmentInfo; pub use segment::SegmentInfo; pub use snapshot::TableSnapshot; diff --git a/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs b/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs index c61003540d9f..4aff9a5a5cfe 100644 --- a/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs +++ b/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; use std::io::Cursor; use chrono::DateTime; @@ -87,10 +86,6 @@ pub struct TableSnapshot { /// The metadata of the cluster keys. pub cluster_key_meta: Option, pub table_statistics_location: Option, - - /// The inverted index infos, key is the index name, - /// value is the index info location. - pub inverted_indexes: Option>, } impl TableSnapshot { @@ -104,7 +99,6 @@ impl TableSnapshot { segments: Vec, cluster_key_meta: Option, table_statistics_location: Option, - inverted_indexes: Option>, ) -> Self { let now = Utc::now(); // make snapshot timestamp monotonically increased @@ -125,7 +119,6 @@ impl TableSnapshot { segments, cluster_key_meta, table_statistics_location, - inverted_indexes, } } @@ -140,7 +133,6 @@ impl TableSnapshot { vec![], None, None, - None, ) } @@ -158,7 +150,6 @@ impl TableSnapshot { clone.segments, clone.cluster_key_meta, clone.table_statistics_location, - clone.inverted_indexes, ) } @@ -238,7 +229,6 @@ impl From for TableSnapshot { segments: s.segments, cluster_key_meta: s.cluster_key_meta, table_statistics_location: s.table_statistics_location, - inverted_indexes: None, } } } @@ -261,7 +251,6 @@ where T: Into segments: s.segments, cluster_key_meta: s.cluster_key_meta, table_statistics_location: s.table_statistics_location, - inverted_indexes: None, } } } diff --git a/src/query/storages/common/table_meta/src/meta/versions.rs b/src/query/storages/common/table_meta/src/meta/versions.rs index 01a3bfca1bbd..42fd7ecf4225 100644 --- a/src/query/storages/common/table_meta/src/meta/versions.rs +++ b/src/query/storages/common/table_meta/src/meta/versions.rs @@ -125,24 +125,6 @@ impl TableSnapshotStatisticsVersion { } } -impl Versioned<0> for v4::IndexInfo {} - -pub enum IndexInfoVersion { - V0(PhantomData), -} - -impl IndexInfoVersion { - pub fn version(&self) -> u64 { - match self { - IndexInfoVersion::V0(a) => Self::ver(a), - } - } - - fn ver>(_v: &PhantomData) -> u64 { - V - } -} - /// Statically check that if T implements Versioned where U equals V #[inline] pub fn testify_version(t: PhantomData) -> PhantomData @@ -206,16 +188,4 @@ mod converters { } } } - - impl TryFrom for IndexInfoVersion { - type Error = ErrorCode; - fn try_from(value: u64) -> Result { - match value { - 0 => Ok(IndexInfoVersion::V0(testify_version::<_, 0>(PhantomData))), - _ => Err(ErrorCode::Internal(format!( - "unknown index info version {value}, versions supported: 0" - ))), - } - } - } } diff --git a/src/query/storages/common/table_meta/src/readers/versioned_reader.rs b/src/query/storages/common/table_meta/src/readers/versioned_reader.rs index 19cc60fe8d67..383365d197de 100644 --- a/src/query/storages/common/table_meta/src/readers/versioned_reader.rs +++ b/src/query/storages/common/table_meta/src/readers/versioned_reader.rs @@ -17,8 +17,6 @@ use futures::AsyncRead; use futures_util::AsyncReadExt; use crate::meta::load_json; -use crate::meta::IndexInfo; -use crate::meta::IndexInfoVersion; use crate::meta::TableSnapshotStatistics; use crate::meta::TableSnapshotStatisticsVersion; @@ -47,18 +45,3 @@ impl VersionedReader for TableSnapshotStatisticsVersion Ok(r) } } - -#[async_trait::async_trait] -impl VersionedReader for IndexInfoVersion { - type TargetType = IndexInfo; - #[async_backtrace::framed] - async fn read(&self, mut reader: R) -> Result - where R: AsyncRead + Unpin + Send { - let mut buffer: Vec = vec![]; - reader.read_to_end(&mut buffer).await?; - let r = match self { - IndexInfoVersion::V0(_v) => IndexInfo::from_slice(&buffer)?, - }; - Ok(r) - } -} diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index eaf25c8ee5a1..858e0043f4c0 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -31,7 +31,6 @@ databend-common-pipeline-transforms = { path = "../../pipeline/transforms" } databend-common-sharing = { path = "../../sharing" } databend-common-sql = { path = "../../sql" } databend-common-storage = { path = "../../../common/storage" } -databend-common-storages-share = { path = "../share" } jsonb = { workspace = true } databend-storages-common-blocks = { path = "../common/blocks" } diff --git a/src/query/storages/fuse/src/constants.rs b/src/query/storages/fuse/src/constants.rs index c777b52866c4..c52a757f7474 100644 --- a/src/query/storages/fuse/src/constants.rs +++ b/src/query/storages/fuse/src/constants.rs @@ -28,7 +28,6 @@ pub const FUSE_TBL_LAST_SNAPSHOT_HINT: &str = "last_snapshot_location_hint"; pub const FUSE_TBL_VIRTUAL_BLOCK_PREFIX: &str = "_vb"; pub const FUSE_TBL_AGG_INDEX_PREFIX: &str = "_i_a"; pub const FUSE_TBL_INVERTED_INDEX_PREFIX: &str = "_i_i"; -pub const FUSE_TBL_INVERTED_INDEX_INFO_PREFIX: &str = "_i_ii"; pub const DEFAULT_BLOCK_PER_SEGMENT: usize = 1000; pub const DEFAULT_ROW_PER_PAGE: usize = 131072; diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 29e8b910ec37..2ebae025cf18 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -557,7 +557,6 @@ impl Table for FuseTable { let prev_statistics_location = prev .as_ref() .and_then(|v| v.table_statistics_location.clone()); - let prev_inverted_indexes = prev.as_ref().and_then(|v| v.inverted_indexes.clone()); let (summary, segments) = if let Some(v) = prev { (v.summary.clone(), v.segments.clone()) } else { @@ -576,7 +575,6 @@ impl Table for FuseTable { segments, cluster_key_meta, prev_statistics_location, - prev_inverted_indexes, ); let mut table_info = self.table_info.clone(); @@ -612,7 +610,6 @@ impl Table for FuseTable { let prev_statistics_location = prev .as_ref() .and_then(|v| v.table_statistics_location.clone()); - let prev_inverted_indexes = prev.as_ref().and_then(|v| v.inverted_indexes.clone()); let prev_snapshot_id = prev.as_ref().map(|v| (v.snapshot_id, prev_version)); let (summary, segments) = if let Some(v) = prev { (v.summary.clone(), v.segments.clone()) @@ -632,7 +629,6 @@ impl Table for FuseTable { segments, None, prev_statistics_location, - prev_inverted_indexes, ); let mut table_info = self.table_info.clone(); diff --git a/src/query/storages/fuse/src/io/locations.rs b/src/query/storages/fuse/src/io/locations.rs index 355fbbec74f8..4f2eef17ffd4 100644 --- a/src/query/storages/fuse/src/io/locations.rs +++ b/src/query/storages/fuse/src/io/locations.rs @@ -16,7 +16,6 @@ use std::marker::PhantomData; use databend_common_exception::Result; use databend_common_expression::DataBlock; -use databend_storages_common_table_meta::meta::IndexInfo; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::SnapshotVersion; @@ -32,7 +31,6 @@ use crate::constants::FUSE_TBL_VIRTUAL_BLOCK_PREFIX; use crate::index::filters::BlockFilter; use crate::index::InvertedIndexFile; use crate::FUSE_TBL_AGG_INDEX_PREFIX; -use crate::FUSE_TBL_INVERTED_INDEX_INFO_PREFIX; use crate::FUSE_TBL_INVERTED_INDEX_PREFIX; use crate::FUSE_TBL_LAST_SNAPSHOT_HINT; use crate::FUSE_TBL_XOR_BLOOM_INDEX_PREFIX; @@ -168,17 +166,6 @@ impl TableMetaLocationGenerator { format!("{prefix}/{FUSE_TBL_AGG_INDEX_PREFIX}/{index_id}/{block_name}") } - pub fn gen_inverted_index_info_location(&self) -> String { - let prefix = self.inverted_index_info_prefix(); - let uuid = Uuid::new_v4().simple().to_string(); - format!("{}{}_v{}.mpk", prefix, uuid, IndexInfo::VERSION,) - } - - // inverted index info path, trailing slash "/" included. - pub fn inverted_index_info_prefix(&self) -> String { - format!("{}/{}/", &self.prefix, FUSE_TBL_INVERTED_INDEX_INFO_PREFIX) - } - pub fn gen_inverted_index_location_from_block_location( loc: &str, index_name: &str, diff --git a/src/query/storages/fuse/src/io/mod.rs b/src/query/storages/fuse/src/io/mod.rs index 69b8c0f36710..24a3531b041c 100644 --- a/src/query/storages/fuse/src/io/mod.rs +++ b/src/query/storages/fuse/src/io/mod.rs @@ -40,12 +40,14 @@ pub use segments::SerializedSegment; pub use snapshots::SnapshotLiteExtended; pub use snapshots::SnapshotsIO; pub(crate) use write::create_index_schema; +pub(crate) use write::create_inverted_index_builders; pub(crate) use write::create_tokenizer_manager; pub use write::serialize_block; pub use write::write_data; pub use write::BlockBuilder; pub use write::BlockSerialization; pub use write::CachedMetaWriter; +pub use write::InvertedIndexBuilder; pub use write::InvertedIndexWriter; pub use write::MetaWriter; pub use write::SegmentWriter; diff --git a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs index 4c81514b4207..e30ffb60e208 100644 --- a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs +++ b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs @@ -14,6 +14,7 @@ use std::future::Future; use std::sync::Arc; +use std::time::Instant; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::Runtime; @@ -21,6 +22,7 @@ use databend_common_base::runtime::TrySpawn; use databend_common_base::GLOBAL_TASK; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use databend_common_metrics::storage::metrics_inc_block_inverted_index_read_milliseconds; use databend_storages_common_cache::CacheKey; use databend_storages_common_cache::InMemoryCacheReader; use databend_storages_common_cache::LoadParams; @@ -29,8 +31,6 @@ use databend_storages_common_cache_manager::CachedObject; use databend_storages_common_cache_manager::InvertedIndexFileMeter; use databend_storages_common_index::InvertedIndexDirectory; use databend_storages_common_index::InvertedIndexMeta; -use databend_storages_common_table_meta::meta::IndexInfo; -use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SingleColumnMeta; use futures_util::future::try_join_all; use opendal::Operator; @@ -52,28 +52,6 @@ const INDEX_COLUMN_NAMES: [&str; 8] = [ ".managed.json", ]; -/// Loads inverted index info data -/// read data from cache, or populate cache items if possible -#[minitrace::trace] -pub async fn load_inverted_index_info( - dal: Operator, - index_info_loc: Option<&Location>, -) -> Result>> { - match index_info_loc { - Some((index_info_loc, ver)) => { - let reader = MetaReaders::inverted_index_info_reader(dal); - let params = LoadParams { - location: index_info_loc.clone(), - len_hint: None, - ver: *ver, - put_cache: true, - }; - Ok(Some(reader.read(¶ms).await?)) - } - None => Ok(None), - } -} - #[async_trait::async_trait] trait InRuntime where Self: Future @@ -159,6 +137,7 @@ pub(crate) async fn load_inverted_index_directory<'a>( field_nums: usize, index_path: &'a str, ) -> Result { + let start = Instant::now(); // load inverted index meta, contains the offsets of each files. let inverted_index_meta = load_inverted_index_meta(dal.clone(), index_path).await?; @@ -183,6 +162,11 @@ pub(crate) async fn load_inverted_index_directory<'a>( // use those files to create inverted index directory let directory = InvertedIndexDirectory::try_create(field_nums, files)?; + // Perf. + { + metrics_inc_block_inverted_index_read_milliseconds(start.elapsed().as_millis() as u64); + } + Ok(directory) } diff --git a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_reader.rs b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_reader.rs index 5ac79190ebda..69f451ce0a38 100644 --- a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_reader.rs +++ b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_reader.rs @@ -12,8 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::time::Instant; + use databend_common_exception::Result; use databend_common_expression::types::F32; +use databend_common_metrics::storage::metrics_inc_block_inverted_index_search_milliseconds; use databend_storages_common_index::InvertedIndexDirectory; use opendal::Operator; use tantivy::collector::DocSetCollector; @@ -68,6 +71,7 @@ impl InvertedIndexReader { query: &str, row_count: u64, ) -> Result)>>> { + let start = Instant::now(); let mut index = Index::open(self.directory)?; index.set_tokenizers(self.tokenizer_manager); let reader = index.reader()?; @@ -80,33 +84,39 @@ impl InvertedIndexReader { } let query = query_parser.parse_query(query)?; - let mut matched_rows = Vec::new(); - if self.has_score { + let matched_rows = if self.has_score { let collector = TopDocs::with_limit(row_count as usize); let docs = searcher.search(&query, &collector)?; - if docs.is_empty() { - return Ok(None); - } + let mut matched_rows = Vec::with_capacity(docs.len()); for (score, doc_addr) in docs { let doc_id = doc_addr.doc_id as usize; let score = F32::from(score); matched_rows.push((doc_id, Some(score))); } + matched_rows } else { let collector = DocSetCollector; let docs = searcher.search(&query, &collector)?; - if docs.is_empty() { - return Ok(None); - } + let mut matched_rows = Vec::with_capacity(docs.len()); for doc_addr in docs { let doc_id = doc_addr.doc_id as usize; matched_rows.push((doc_id, None)); } + matched_rows + }; + + // Perf. + { + metrics_inc_block_inverted_index_search_milliseconds(start.elapsed().as_millis() as u64); } - Ok(Some(matched_rows)) + if !matched_rows.is_empty() { + Ok(Some(matched_rows)) + } else { + Ok(None) + } } // delegation of [InvertedIndexFileReader::cache_key_of_index_columns] diff --git a/src/query/storages/fuse/src/io/read/inverted_index/mod.rs b/src/query/storages/fuse/src/io/read/inverted_index/mod.rs index ed993693ac84..f606eba3f893 100644 --- a/src/query/storages/fuse/src/io/read/inverted_index/mod.rs +++ b/src/query/storages/fuse/src/io/read/inverted_index/mod.rs @@ -15,5 +15,4 @@ mod inverted_index_loader; mod inverted_index_reader; -pub use inverted_index_loader::load_inverted_index_info; pub use inverted_index_reader::InvertedIndexReader; diff --git a/src/query/storages/fuse/src/io/read/meta/meta_readers.rs b/src/query/storages/fuse/src/io/read/meta/meta_readers.rs index bb614ccc97b4..02364b0b7c3d 100644 --- a/src/query/storages/fuse/src/io/read/meta/meta_readers.rs +++ b/src/query/storages/fuse/src/io/read/meta/meta_readers.rs @@ -28,8 +28,6 @@ use databend_storages_common_cache_manager::CompactSegmentInfoMeter; use databend_storages_common_index::BloomIndexMeta; use databend_storages_common_index::InvertedIndexMeta; use databend_storages_common_table_meta::meta::CompactSegmentInfo; -use databend_storages_common_table_meta::meta::IndexInfo; -use databend_storages_common_table_meta::meta::IndexInfoVersion; use databend_storages_common_table_meta::meta::SegmentInfoVersion; use databend_storages_common_table_meta::meta::SingleColumnMeta; use databend_storages_common_table_meta::meta::SnapshotVersion; @@ -57,7 +55,6 @@ pub type CompactSegmentInfoReader = InMemoryItemCacheReader< DefaultHashBuilder, CompactSegmentInfoMeter, >; -pub type InvertedIndexInfoReader = InMemoryItemCacheReader>; pub type InvertedIndexMetaReader = InMemoryItemCacheReader>; @@ -92,13 +89,6 @@ impl MetaReaders { ) } - pub fn inverted_index_info_reader(dal: Operator) -> InvertedIndexInfoReader { - InvertedIndexInfoReader::new( - CacheManager::instance().get_inverted_index_info_cache(), - LoaderWrapper(dal), - ) - } - pub fn inverted_index_meta_reader(dal: Operator) -> InvertedIndexMetaReader { InvertedIndexMetaReader::new( CacheManager::instance().get_inverted_index_meta_cache(), @@ -160,17 +150,6 @@ impl Loader for LoaderWrapper { } } -#[async_trait::async_trait] -impl Loader for LoaderWrapper { - #[async_backtrace::framed] - async fn load(&self, params: &LoadParams) -> Result { - let version = IndexInfoVersion::try_from(params.ver)?; - let LoaderWrapper(operator) = &self; - let reader = bytes_reader(operator, params.location.as_str(), params.len_hint).await?; - version.read(reader).await - } -} - #[async_trait::async_trait] impl Loader for LoaderWrapper { #[async_backtrace::framed] diff --git a/src/query/storages/fuse/src/io/read/mod.rs b/src/query/storages/fuse/src/io/read/mod.rs index 74f02a211fae..288220c73cc3 100644 --- a/src/query/storages/fuse/src/io/read/mod.rs +++ b/src/query/storages/fuse/src/io/read/mod.rs @@ -28,7 +28,6 @@ pub use block::MergeIOReadResult; pub use block::NativeReaderExt; pub use block::NativeSourceData; pub use bloom::BloomBlockFilterReader; -pub use inverted_index::load_inverted_index_info; pub use inverted_index::InvertedIndexReader; pub use meta::CompactSegmentInfoReader; pub use meta::MetaReaders; diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index 9ee13be51304..f56d1756293d 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -15,6 +15,7 @@ use std::collections::BTreeMap; use std::collections::HashMap; use std::sync::Arc; +use std::time::Instant; use chrono::Utc; use databend_common_arrow::arrow::chunk::Chunk as ArrowChunk; @@ -23,11 +24,15 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; +use databend_common_expression::DataField; +use databend_common_expression::DataSchema; use databend_common_expression::FieldIndex; use databend_common_expression::TableField; use databend_common_expression::TableSchemaRef; use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE; use databend_common_io::constants::DEFAULT_BLOCK_INDEX_BUFFER_SIZE; +use databend_common_meta_app::schema::TableMeta; +use databend_common_metrics::storage::metrics_inc_block_inverted_index_generate_milliseconds; use databend_storages_common_blocks::blocks_to_parquet; use databend_storages_common_index::BloomIndex; use databend_storages_common_table_meta::meta::BlockMeta; @@ -38,6 +43,7 @@ use databend_storages_common_table_meta::table::TableCompression; use opendal::Operator; use crate::io::write::WriteSettings; +use crate::io::InvertedIndexWriter; use crate::io::TableMetaLocationGenerator; use crate::operations::column_parquet_metas; use crate::statistics::gen_columns_statistics; @@ -151,11 +157,96 @@ impl BloomIndexState { } } +#[derive(Clone)] +pub struct InvertedIndexBuilder { + pub(crate) name: String, + pub(crate) version: String, + pub(crate) schema: DataSchema, + pub(crate) options: BTreeMap, +} + +pub fn create_inverted_index_builders(table_meta: &TableMeta) -> Vec { + let mut inverted_index_builders = Vec::with_capacity(table_meta.indexes.len()); + for index in table_meta.indexes.values() { + if !index.sync_creation { + continue; + } + let mut index_fields = Vec::with_capacity(index.column_ids.len()); + for column_id in &index.column_ids { + for field in &table_meta.schema.fields { + if field.column_id() == *column_id { + index_fields.push(DataField::from(field)); + break; + } + } + } + // ignore invalid index + if index_fields.len() != index.column_ids.len() { + continue; + } + let index_schema = DataSchema::new(index_fields); + + let inverted_index_builder = InvertedIndexBuilder { + name: index.name.clone(), + version: index.version.clone(), + schema: index_schema, + options: index.options.clone(), + }; + inverted_index_builders.push(inverted_index_builder); + } + inverted_index_builders +} + +pub struct InvertedIndexState { + pub(crate) data: Vec, + pub(crate) size: u64, + pub(crate) location: Location, +} + +impl InvertedIndexState { + pub fn try_create( + source_schema: &TableSchemaRef, + block: &DataBlock, + block_location: &Location, + inverted_index_builder: &InvertedIndexBuilder, + ) -> Result { + let start = Instant::now(); + let mut writer = InvertedIndexWriter::try_create( + Arc::new(inverted_index_builder.schema.clone()), + &inverted_index_builder.options, + )?; + writer.add_block(source_schema, block)?; + let data = writer.finalize()?; + let size = data.len() as u64; + + // Perf. + { + metrics_inc_block_inverted_index_generate_milliseconds( + start.elapsed().as_millis() as u64 + ); + } + + let inverted_index_location = + TableMetaLocationGenerator::gen_inverted_index_location_from_block_location( + &block_location.0, + &inverted_index_builder.name, + &inverted_index_builder.version, + ); + + Ok(Self { + data, + size, + location: (inverted_index_location, 0), + }) + } +} + pub struct BlockSerialization { pub block_raw_data: Vec, pub size: u64, // TODO redundancy pub block_meta: BlockMeta, pub bloom_index_state: Option, + pub inverted_index_states: Vec, } #[derive(Clone)] @@ -166,6 +257,7 @@ pub struct BlockBuilder { pub write_settings: WriteSettings, pub cluster_stats_gen: ClusterStatsGenerator, pub bloom_columns_map: BTreeMap, + pub inverted_index_builders: Vec, } impl BlockBuilder { @@ -186,6 +278,17 @@ impl BlockBuilder { .as_ref() .map(|i| i.column_distinct_count.clone()); + let mut inverted_index_states = Vec::with_capacity(self.inverted_index_builders.len()); + for inverted_index_builder in &self.inverted_index_builders { + let inverted_index_state = InvertedIndexState::try_create( + &self.source_schema, + &data_block, + &block_location, + inverted_index_builder, + )?; + inverted_index_states.push(inverted_index_state); + } + let row_count = data_block.num_rows() as u64; let block_size = data_block.memory_size() as u64; let col_stats = @@ -221,6 +324,7 @@ impl BlockBuilder { size: file_size, block_meta, bloom_index_state, + inverted_index_states, }; Ok(serialized) } diff --git a/src/query/storages/fuse/src/io/write/inverted_index_writer.rs b/src/query/storages/fuse/src/io/write/inverted_index_writer.rs index beda93f7fd32..4bfb40180687 100644 --- a/src/query/storages/fuse/src/io/write/inverted_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/inverted_index_writer.rs @@ -21,10 +21,10 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::types::DataType; use databend_common_expression::DataBlock; -use databend_common_expression::DataSchema; +use databend_common_expression::DataSchemaRef; use databend_common_expression::ScalarRef; +use databend_common_expression::TableSchemaRef; use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE; -use opendal::Operator; use tantivy::indexer::UserOperation; use tantivy::schema::Field; use tantivy::schema::IndexRecordOption; @@ -50,20 +50,19 @@ use tantivy::SegmentComponent; use tantivy_jieba::JiebaTokenizer; use crate::index::build_tantivy_footer; -use crate::io::write_data; pub struct InvertedIndexWriter { - schema: DataSchema, + schema: DataSchemaRef, index_writer: IndexWriter, operations: Vec, } impl InvertedIndexWriter { pub fn try_create( - schema: DataSchema, + schema: DataSchemaRef, index_options: &BTreeMap, ) -> Result { - let (index_schema, _) = create_index_schema(&schema, index_options)?; + let (index_schema, _) = create_index_schema(schema.clone(), index_options)?; let index_settings = IndexSettings { sort_by_field: None, @@ -88,35 +87,19 @@ impl InvertedIndexWriter { }) } - pub fn add_block(&mut self, block: &DataBlock) -> Result<()> { - if block.num_columns() != self.schema.num_fields() { - return Err(ErrorCode::TableSchemaMismatch(format!( - "Data schema mismatched. Data columns length: {}, schema fields length: {}", - block.num_columns(), - self.schema.num_fields() - ))); - } - for (column, field) in block.columns().iter().zip(self.schema.fields().iter()) { - if &column.data_type != field.data_type() { - return Err(ErrorCode::TableSchemaMismatch(format!( - "Data schema mismatched (col name: {}). Data column type is {:?}, but schema field type is {:?}", - field.name(), - column.data_type, - field.data_type() - ))); - } - } - - let mut types = Vec::with_capacity(self.schema.num_fields()); + pub fn add_block(&mut self, source_schema: &TableSchemaRef, block: &DataBlock) -> Result<()> { + let mut field_indexes = Vec::with_capacity(self.schema.num_fields()); for field in self.schema.fields() { let ty = field.data_type().remove_nullable(); - types.push(ty); + let field_index = source_schema.index_of(field.name().as_str())?; + field_indexes.push((field_index, ty)) } + for i in 0..block.num_rows() { let mut doc = TantivyDocument::new(); - for (j, typ) in types.iter().enumerate() { + for (j, (field_index, ty)) in field_indexes.iter().enumerate() { let field = Field::from_field_id(j as u32); - let column = block.get_by_offset(j); + let column = block.get_by_offset(*field_index); match unsafe { column.value.index_unchecked(i) } { ScalarRef::String(text) => doc.add_text(field, text), ScalarRef::Variant(jsonb_val) => { @@ -132,7 +115,7 @@ impl InvertedIndexWriter { } } _ => { - if typ == &DataType::Variant { + if ty == &DataType::Variant { doc.add_object(field, BTreeMap::new()); } else { doc.add_text(field, ""); @@ -147,16 +130,15 @@ impl InvertedIndexWriter { } #[async_backtrace::framed] - pub async fn finalize(mut self, operator: &Operator, index_location: String) -> Result<()> { + pub fn finalize(mut self) -> Result> { let _ = self.index_writer.run(self.operations); let _ = self.index_writer.commit()?; let index = self.index_writer.index(); let mut buffer = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE); - Self::write_index(&mut buffer, index).await?; - write_data(buffer, operator, &index_location).await?; + Self::write_index(&mut buffer, index)?; - Ok(()) + Ok(buffer) } // The tantivy index data consists of eight files. @@ -224,7 +206,7 @@ impl InvertedIndexWriter { // We merge the data from these files into one file and // record the offset to read each part of the data. #[async_backtrace::framed] - async fn write_index(mut writer: &mut W, index: &Index) -> Result<()> { + fn write_index(mut writer: &mut W, index: &Index) -> Result<()> { let directory = index.directory(); let managed_filepath = Path::new(".managed.json"); @@ -443,7 +425,7 @@ pub(crate) fn create_tokenizer_manager( } pub(crate) fn create_index_schema( - schema: &DataSchema, + schema: DataSchemaRef, index_options: &BTreeMap, ) -> Result<(Schema, Vec)> { let tokenizer_name = index_options diff --git a/src/query/storages/fuse/src/io/write/meta_writer.rs b/src/query/storages/fuse/src/io/write/meta_writer.rs index 2f0eca268751..c6794ae56a0d 100644 --- a/src/query/storages/fuse/src/io/write/meta_writer.rs +++ b/src/query/storages/fuse/src/io/write/meta_writer.rs @@ -144,7 +144,6 @@ mod tests { vec![], None, None, - None, ); snapshot.format_version = v; let _ = snapshot.marshal(); @@ -163,7 +162,6 @@ mod tests { vec![], None, None, - None, ); snapshot.marshal().unwrap(); } diff --git a/src/query/storages/fuse/src/io/write/mod.rs b/src/query/storages/fuse/src/io/write/mod.rs index 7d273d1c40eb..1f7ce3db95eb 100644 --- a/src/query/storages/fuse/src/io/write/mod.rs +++ b/src/query/storages/fuse/src/io/write/mod.rs @@ -18,10 +18,12 @@ mod meta_writer; mod segment_writer; mod write_settings; +pub(crate) use block_writer::create_inverted_index_builders; pub use block_writer::serialize_block; pub use block_writer::write_data; pub use block_writer::BlockBuilder; pub use block_writer::BlockSerialization; +pub use block_writer::InvertedIndexBuilder; pub(crate) use inverted_index_writer::create_index_schema; pub(crate) use inverted_index_writer::create_tokenizer_manager; pub use inverted_index_writer::InvertedIndexWriter; diff --git a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs index d76b9401c015..b2b71c962111 100644 --- a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs @@ -130,7 +130,6 @@ impl SnapshotGenerator for AppendGenerator { let mut prev_timestamp = None; let mut prev_snapshot_id = None; let mut table_statistics_location = None; - let mut inverted_indexes = None; let mut new_segments = snapshot_merged.merged_segments.clone(); let mut new_summary = snapshot_merged.merged_statistics.clone(); @@ -138,7 +137,6 @@ impl SnapshotGenerator for AppendGenerator { prev_timestamp = snapshot.timestamp; prev_snapshot_id = Some((snapshot.snapshot_id, snapshot.format_version)); table_statistics_location = snapshot.table_statistics_location.clone(); - inverted_indexes = snapshot.inverted_indexes.clone(); if !self.overwrite { let mut summary = snapshot.summary.clone(); @@ -234,7 +232,6 @@ impl SnapshotGenerator for AppendGenerator { new_segments, cluster_key_meta, table_statistics_location, - inverted_indexes, )) } } diff --git a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs index bc068500bb73..aebd6237dbc8 100644 --- a/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/mutation_generator.rs @@ -105,7 +105,6 @@ impl SnapshotGenerator for MutationGenerator { new_segments, cluster_key_meta, previous.table_statistics_location.clone(), - previous.inverted_indexes.clone(), ); if matches!(self.mutation_kind, MutationKind::Compact) { diff --git a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs index 0f5021db14a5..22be00ff6cd9 100644 --- a/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/truncate_generator.rs @@ -80,7 +80,6 @@ impl SnapshotGenerator for TruncateGenerator { vec![], cluster_key_meta, None, - None, ); Ok(new_snapshot) } diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs index 9e8ce1ad2a92..838cb82a2948 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs @@ -36,6 +36,7 @@ use databend_common_sql::executor::physical_plans::MutationKind; use databend_storages_common_index::BloomIndex; use opendal::Operator; +use crate::io::create_inverted_index_builders; use crate::io::write_data; use crate::io::BlockBuilder; use crate::io::BlockSerialization; @@ -126,6 +127,9 @@ impl TransformSerializeBlock { let bloom_columns_map = table .bloom_index_cols .bloom_index_fields(source_schema.clone(), BloomIndex::supported_type)?; + + let inverted_index_builders = create_inverted_index_builders(&table.table_info.meta); + let block_builder = BlockBuilder { ctx, meta_locations: table.meta_location_generator().clone(), @@ -133,6 +137,7 @@ impl TransformSerializeBlock { write_settings: table.get_write_settings(), cluster_stats_gen, bloom_columns_map, + inverted_index_builders, }; Ok(TransformSerializeBlock { state: State::Consume, @@ -315,7 +320,8 @@ impl Processor for TransformSerializeBlock { // write index data. let bloom_index_state = serialized.bloom_index_state; if let Some(bloom_index_state) = bloom_index_state { - let index_size = bloom_index_state.data.len(); + let start = Instant::now(); + let index_size = bloom_index_state.size; write_data( bloom_index_state.data, &self.dal, @@ -325,13 +331,33 @@ impl Processor for TransformSerializeBlock { // Perf. { metrics_inc_block_index_write_nums(1); - metrics_inc_block_index_write_bytes(index_size as u64); + metrics_inc_block_index_write_bytes(index_size); metrics_inc_block_index_write_milliseconds( start.elapsed().as_millis() as u64 ); } } + // write inverted index + for inverted_index_state in serialized.inverted_index_states { + let start = Instant::now(); + let index_size = inverted_index_state.size; + write_data( + inverted_index_state.data, + &self.dal, + &inverted_index_state.location.0, + ) + .await?; + // Perf. + { + metrics_inc_block_inverted_index_write_nums(1); + metrics_inc_block_inverted_index_write_bytes(index_size); + metrics_inc_block_inverted_index_write_milliseconds( + start.elapsed().as_millis() as u64, + ); + } + } + let data_block = if let Some(index) = index { Self::mutation_logs(MutationLogEntry::ReplacedBlock { index, diff --git a/src/query/storages/fuse/src/operations/gc.rs b/src/query/storages/fuse/src/operations/gc.rs index de5d5f768bdf..4b076c691c13 100644 --- a/src/query/storages/fuse/src/operations/gc.rs +++ b/src/query/storages/fuse/src/operations/gc.rs @@ -15,7 +15,6 @@ use std::collections::BTreeMap; use std::collections::HashSet; use std::hash::BuildHasher; -use std::hash::RandomState; use std::sync::Arc; use std::time::Instant; @@ -33,7 +32,6 @@ use databend_storages_common_index::BloomIndexMeta; use databend_storages_common_index::InvertedIndexFile; use databend_storages_common_index::InvertedIndexMeta; use databend_storages_common_table_meta::meta::CompactSegmentInfo; -use databend_storages_common_table_meta::meta::IndexInfo; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::meta::TableSnapshotStatistics; @@ -97,8 +95,6 @@ impl FuseTable { let root_snapshot_info = root_snapshot_info_opt.unwrap(); - let inverted_index_infos = &root_snapshot_info.inverted_indexes; - if root_snapshot_info.snapshot_lite.timestamp.is_none() { return Err(ErrorCode::StorageOther(format!( "gc: snapshot timestamp is none, snapshot location: {}", @@ -295,13 +291,6 @@ impl FuseTable { .await?; } - // 4. purge inverted index info - - if let Some(inverted_index_infos) = inverted_index_infos { - self.purge_inverted_index_info_files(ctx, inverted_index_infos, counter) - .await?; - } - Ok(None) } @@ -351,7 +340,6 @@ impl FuseTable { Ok(Some(RootSnapshotInfo { snapshot_location, referenced_locations, - inverted_indexes: root_snapshot.inverted_indexes.clone(), snapshot_lite, })) } @@ -763,59 +751,11 @@ impl FuseTable { ); SnapshotsIO::list_files(self.get_operator(), &prefix, None).await } - - async fn purge_inverted_index_info_files( - &self, - ctx: &Arc, - inverted_index_infos: &BTreeMap, - counter: &mut PurgeCounter, - ) -> Result<()> { - // 1. list all the inverted index information files - let index_info_path_prefix = self.meta_location_generator.inverted_index_info_prefix(); - - let status = format!( - "gc: listing inverted index info files, time used so far {:?}", - counter.start.elapsed(), - ); - ctx.set_status_info(&status); - - let index_infos_files = - SnapshotsIO::list_files(self.get_operator(), &index_info_path_prefix, None).await?; - - let candidate_index_infos_files: HashSet<&String, RandomState> = - HashSet::from_iter(index_infos_files.iter()); - - // 2. collect all the inverted index information files referenced (in-used) - let snapshot_info_files: HashSet<&String, RandomState> = - HashSet::from_iter(inverted_index_infos.values().map(|(path, _)| path)); - - // 3. collect the difference, those are the files no longer referenced and should be purged - let files_to_purge: HashSet = candidate_index_infos_files - .difference(&snapshot_info_files) - .map(|v| (**v).to_owned()) - .collect(); - - // 4. purge them all (and their caches) - let inverted_index_info_files_count = files_to_purge.len(); - if inverted_index_info_files_count > 0 { - counter.inverted_index_infos += inverted_index_info_files_count; - let status = format!( - "gc: purging inverted index info files {}, time used so far {:?}", - inverted_index_info_files_count, - counter.start.elapsed(), - ); - ctx.set_status_info(&status); - self.try_purge_location_files_and_cache::(ctx.clone(), files_to_purge) - .await? - } - Ok(()) - } } struct RootSnapshotInfo { snapshot_location: String, referenced_locations: LocationTuple, - inverted_indexes: Option>, snapshot_lite: Arc, } @@ -850,7 +790,6 @@ struct PurgeCounter { blocks: usize, agg_indexes: usize, inverted_indexes: usize, - inverted_index_infos: usize, blooms: usize, segments: usize, table_statistics: usize, @@ -864,7 +803,6 @@ impl PurgeCounter { blocks: 0, agg_indexes: 0, inverted_indexes: 0, - inverted_index_infos: 0, blooms: 0, segments: 0, table_statistics: 0, diff --git a/src/query/storages/fuse/src/operations/inverted_index.rs b/src/query/storages/fuse/src/operations/inverted_index.rs index bb05015819ff..fbbd51e1ac50 100644 --- a/src/query/storages/fuse/src/operations/inverted_index.rs +++ b/src/query/storages/fuse/src/operations/inverted_index.rs @@ -13,15 +13,14 @@ // limitations under the License. use std::collections::BTreeMap; -use std::collections::BTreeSet; use std::collections::VecDeque; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Instant; use async_trait::async_trait; use async_trait::unboxed_simple; -use databend_common_catalog::catalog::Catalog; use databend_common_catalog::plan::Projection; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; @@ -29,9 +28,11 @@ use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; use databend_common_expression::DataBlock; use databend_common_expression::DataSchema; +use databend_common_expression::DataSchemaRef; use databend_common_expression::TableSchemaRef; -use databend_common_meta_app::schema::UpdateTableMetaReq; -use databend_common_meta_types::MatchSeq; +use databend_common_metrics::storage::metrics_inc_block_inverted_index_write_bytes; +use databend_common_metrics::storage::metrics_inc_block_inverted_index_write_milliseconds; +use databend_common_metrics::storage::metrics_inc_block_inverted_index_write_nums; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::Processor; @@ -43,17 +44,11 @@ use databend_common_pipeline_sources::AsyncSource; use databend_common_pipeline_sources::AsyncSourcer; use databend_common_pipeline_transforms::processors::AsyncTransform; use databend_common_pipeline_transforms::processors::AsyncTransformer; -use databend_common_storages_share::save_share_table_info; use databend_storages_common_cache::LoadParams; use databend_storages_common_table_meta::meta::BlockMeta; -use databend_storages_common_table_meta::meta::IndexInfo; use databend_storages_common_table_meta::meta::Location; -use databend_storages_common_table_meta::meta::TableSnapshot; -use databend_storages_common_table_meta::meta::Versioned; -use databend_storages_common_table_meta::table::OPT_KEY_SNAPSHOT_LOCATION; use opendal::Operator; -use crate::io::read::load_inverted_index_info; use crate::io::write_data; use crate::io::BlockReader; use crate::io::InvertedIndexWriter; @@ -83,8 +78,6 @@ impl FuseTable { pub async fn do_refresh_inverted_index( &self, ctx: Arc, - catalog: Arc, - table: Arc, index_name: String, index_version: String, index_options: &BTreeMap, @@ -113,20 +106,6 @@ impl FuseTable { let segment_reader = MetaReaders::segment_info_reader(self.get_operator(), table_schema.clone()); - let mut indexed_segments = BTreeSet::new(); - if let Some(inverted_indexes) = &snapshot.inverted_indexes { - let index_info_loc = inverted_indexes.get(&index_name); - if let Some(old_index_info) = - load_inverted_index_info(self.get_operator(), index_info_loc).await? - { - // Every time the index info changed, a new index version is generated - // and the index data needs to be regenerated. - if old_index_info.index_version == index_version { - indexed_segments = old_index_info.indexed_segments.clone(); - } - } - } - // If no segment locations are specified, iterates through all segments let segment_locs = if let Some(segment_locs) = &segment_locs { segment_locs.clone() @@ -134,15 +113,10 @@ impl FuseTable { snapshot.segments.clone() }; - // Filter segments whose indexes already generated - let segment_locs: Vec<_> = segment_locs - .into_iter() - .filter(|l| !indexed_segments.contains(l)) - .collect(); - if segment_locs.is_empty() { return Ok(()); } + let operator = self.get_operator_ref(); // Read the segment infos and collect the block metas that need to generate the index. let mut block_metas = VecDeque::new(); @@ -157,18 +131,26 @@ impl FuseTable { .await?; for block_meta in segment_info.block_metas()? { - block_metas.push_back(block_meta); + let index_location = + TableMetaLocationGenerator::gen_inverted_index_location_from_block_location( + &block_meta.location.0, + &index_name, + &index_version, + ); + // only generate inverted index if it is not exist. + if (operator.stat(&index_location).await).is_err() { + block_metas.push_back(block_meta); + } } } if block_metas.is_empty() { return Ok(()); } - let data_schema = DataSchema::from(index_schema.as_ref()); + let data_schema = Arc::new(DataSchema::from(index_schema.as_ref())); let settings = ReadSettings::from_ctx(&ctx)?; let write_settings = self.get_write_settings(); let storage_format = write_settings.storage_format; - let operator = self.get_operator_ref(); pipeline.add_source( |output| { @@ -195,26 +177,13 @@ impl FuseTable { index_version.clone(), index_options.clone(), data_schema.clone(), + index_schema.clone(), operator.clone(), )?)) }); pipeline.try_resize(1)?; - pipeline.add_sink(|input| { - InvertedIndexSink::try_create( - input, - ctx.clone(), - catalog.clone(), - table.clone(), - self.clone(), - snapshot.clone(), - index_name.clone(), - index_version.clone(), - segment_locs.clone(), - indexed_segments.clone(), - block_nums, - ) - })?; + pipeline.add_sink(|input| InvertedIndexSink::try_create(input, ctx.clone(), block_nums))?; Ok(()) } @@ -264,7 +233,6 @@ impl AsyncSource for InvertedIndexSource { .read_by_meta(&self.settings, &block_meta, &self.storage_format) .await?; let block = block.add_meta(Some(Box::new(Arc::unwrap_or_clone(block_meta))))?; - Ok(Some(block)) } None => { @@ -280,7 +248,8 @@ pub struct InvertedIndexTransform { index_name: String, index_version: String, index_options: BTreeMap, - data_schema: DataSchema, + data_schema: DataSchemaRef, + source_schema: TableSchemaRef, operator: Operator, } @@ -291,7 +260,8 @@ impl InvertedIndexTransform { index_name: String, index_version: String, index_options: BTreeMap, - data_schema: DataSchema, + data_schema: DataSchemaRef, + source_schema: TableSchemaRef, operator: Operator, ) -> Result> { Ok(AsyncTransformer::create(input, output, Self { @@ -299,6 +269,7 @@ impl InvertedIndexTransform { index_version, index_options, data_schema, + source_schema, operator, })) } @@ -315,10 +286,6 @@ impl AsyncTransform for InvertedIndexTransform { .and_then(BlockMeta::downcast_ref_from) .unwrap(); - let mut index_writer = - InvertedIndexWriter::try_create(self.data_schema.clone(), &self.index_options)?; - index_writer.add_block(&data_block)?; - let index_location = TableMetaLocationGenerator::gen_inverted_index_location_from_block_location( &block_meta.location.0, @@ -326,9 +293,20 @@ impl AsyncTransform for InvertedIndexTransform { &self.index_version, ); - index_writer - .finalize(&self.operator, index_location) - .await?; + let start = Instant::now(); + let mut writer = + InvertedIndexWriter::try_create(self.data_schema.clone(), &self.index_options)?; + writer.add_block(&self.source_schema, &data_block)?; + let data = writer.finalize()?; + let index_size = data.len() as u64; + write_data(data, &self.operator, &index_location).await?; + + // Perf. + { + metrics_inc_block_inverted_index_write_nums(1); + metrics_inc_block_inverted_index_write_bytes(index_size); + metrics_inc_block_inverted_index_write_milliseconds(start.elapsed().as_millis() as u64); + } let new_block = DataBlock::new(vec![], 0); Ok(new_block) @@ -337,46 +315,17 @@ impl AsyncTransform for InvertedIndexTransform { /// `InvertedIndexSink` is used to build inverted index. pub struct InvertedIndexSink { - ctx: Arc, - catalog: Arc, - table: Arc, - fuse_table: FuseTable, - snapshot: Arc, - index_name: String, - index_version: String, - segment_locs: Vec, - indexed_segments: BTreeSet, block_nums: AtomicUsize, - new_snapshot_loc: Option, } impl InvertedIndexSink { - #[allow(clippy::too_many_arguments)] pub fn try_create( input: Arc, ctx: Arc, - catalog: Arc, - table: Arc, - fuse_table: FuseTable, - snapshot: Arc, - index_name: String, - index_version: String, - segment_locs: Vec, - indexed_segments: BTreeSet, block_nums: usize, ) -> Result { - let sinker = AsyncSinker::create(input, ctx.clone(), InvertedIndexSink { - ctx, - catalog, - table, - fuse_table, - snapshot, - index_name, - index_version, - segment_locs, - indexed_segments, + let sinker = AsyncSinker::create(input, ctx, InvertedIndexSink { block_nums: AtomicUsize::new(block_nums), - new_snapshot_loc: None, }); Ok(ProcessorPtr::create(sinker)) } @@ -386,45 +335,6 @@ impl InvertedIndexSink { impl AsyncSink for InvertedIndexSink { const NAME: &'static str = "InvertedIndexSink"; - #[async_backtrace::framed] - async fn on_finish(&mut self) -> Result<()> { - if let Some(new_snapshot_loc) = &self.new_snapshot_loc { - // generate new table meta with new snapshot location - let mut new_table_meta = self.table.get_table_info().meta.clone(); - - new_table_meta.options.insert( - OPT_KEY_SNAPSHOT_LOCATION.to_owned(), - new_snapshot_loc.clone(), - ); - - let table_info = self.table.get_table_info(); - let table_id = table_info.ident.table_id; - let table_version = table_info.ident.seq; - - let req = UpdateTableMetaReq { - table_id, - seq: MatchSeq::Exact(table_version), - new_table_meta, - copied_files: None, - deduplicated_label: None, - update_stream_meta: vec![], - }; - - let res = self.catalog.update_table_meta(table_info, req).await?; - - if let Some(share_table_info) = res.share_table_info { - save_share_table_info( - self.ctx.get_tenant().tenant_name(), - self.ctx.get_data_operator()?.operator(), - share_table_info, - ) - .await?; - } - } - - Ok(()) - } - #[unboxed_simple] #[async_backtrace::framed] async fn consume(&mut self, _data_block: DataBlock) -> Result { @@ -433,67 +343,6 @@ impl AsyncSink for InvertedIndexSink { return Ok(false); } - // All blocks has generated the inverted indexes, - // generate a new snapshot - let mut new_snapshot = TableSnapshot::from_previous( - self.snapshot.as_ref(), - Some(self.fuse_table.get_table_info().ident.seq), - ); - - let mut indexed_segments = BTreeSet::new(); - // only keep valid segments, remove not existed ones. - for indexed_segment in &self.indexed_segments { - if new_snapshot.segments.contains(indexed_segment) { - indexed_segments.insert(indexed_segment.clone()); - } - } - // add new indexed segments - for segment_loc in &self.segment_locs { - indexed_segments.insert(segment_loc.clone()); - } - - let new_index_info = IndexInfo::new(self.index_version.clone(), indexed_segments); - let index_bytes = new_index_info.to_bytes()?; - let new_index_info_loc = self - .fuse_table - .meta_location_generator() - .gen_inverted_index_info_location(); - write_data( - index_bytes, - self.fuse_table.get_operator_ref(), - &new_index_info_loc, - ) - .await?; - - let mut inverted_indexes = new_snapshot.inverted_indexes.clone().unwrap_or_default(); - inverted_indexes.insert( - self.index_name.clone(), - (new_index_info_loc, IndexInfo::VERSION), - ); - new_snapshot.inverted_indexes = Some(inverted_indexes); - - // Write new snapshot file - let new_snapshot_loc = self - .fuse_table - .meta_location_generator() - .snapshot_location_from_uuid(&new_snapshot.snapshot_id, TableSnapshot::VERSION)?; - - let data = new_snapshot.to_bytes()?; - self.fuse_table - .get_operator_ref() - .write(&new_snapshot_loc, data) - .await?; - - // Write new snapshot hint - FuseTable::write_last_snapshot_hint( - self.fuse_table.get_operator_ref(), - self.fuse_table.meta_location_generator(), - new_snapshot_loc.clone(), - ) - .await; - - self.new_snapshot_loc = Some(new_snapshot_loc); - Ok(true) } } diff --git a/src/query/storages/fuse/src/operations/merge.rs b/src/query/storages/fuse/src/operations/merge.rs index 9220a748ee24..4fee803460c7 100644 --- a/src/query/storages/fuse/src/operations/merge.rs +++ b/src/query/storages/fuse/src/operations/merge.rs @@ -25,6 +25,7 @@ use databend_storages_common_table_meta::meta::Location; use super::merge_into::MatchedAggregator; use super::mutation::SegmentIndex; +use crate::io::create_inverted_index_builders; use crate::io::BlockBuilder; use crate::statistics::ClusterStatsGenerator; use crate::FuseTable; @@ -89,6 +90,8 @@ impl FuseTable { let bloom_columns_map = self .bloom_index_cols() .bloom_index_fields(new_schema.clone(), BloomIndex::supported_type)?; + let inverted_index_builders = create_inverted_index_builders(&self.table_info.meta); + let block_builder = BlockBuilder { ctx: ctx.clone(), meta_locations: self.meta_location_generator().clone(), @@ -96,6 +99,7 @@ impl FuseTable { write_settings: self.get_write_settings(), cluster_stats_gen, bloom_columns_map, + inverted_index_builders, }; let aggregator = MatchedAggregator::create( ctx, diff --git a/src/query/storages/fuse/src/pruning/inverted_index_pruner.rs b/src/query/storages/fuse/src/pruning/inverted_index_pruner.rs index a57f7eb521de..def030cb4cef 100644 --- a/src/query/storages/fuse/src/pruning/inverted_index_pruner.rs +++ b/src/query/storages/fuse/src/pruning/inverted_index_pruner.rs @@ -79,7 +79,7 @@ impl InvertedIndexPruner { // parse query text to check whether has phrase terms need position file. let (index_schema, index_fields) = create_index_schema( - &inverted_index_info.index_schema, + Arc::new(inverted_index_info.index_schema.clone()), &inverted_index_info.index_options, )?; let tokenizer_manager = create_tokenizer_manager(&inverted_index_info.index_options); diff --git a/src/query/storages/system/src/caches_table.rs b/src/query/storages/system/src/caches_table.rs index 8b2a2e72832e..72eb9f69b08f 100644 --- a/src/query/storages/system/src/caches_table.rs +++ b/src/query/storages/system/src/caches_table.rs @@ -62,7 +62,6 @@ impl SyncSystemTable for CachesTable { let segment_info_cache = cache_manager.get_table_segment_cache(); let bloom_index_filter_cache = cache_manager.get_bloom_index_filter_cache(); let bloom_index_meta_cache = cache_manager.get_bloom_index_meta_cache(); - let inverted_index_info_cache = cache_manager.get_inverted_index_info_cache(); let inverted_index_meta_cache = cache_manager.get_inverted_index_meta_cache(); let inverted_index_file_cache = cache_manager.get_inverted_index_file_cache(); let prune_partitions_cache = cache_manager.get_prune_partitions_cache(); @@ -104,13 +103,6 @@ impl SyncSystemTable for CachesTable { size.push(bloom_index_meta_cache.size()); } - if let Some(inverted_index_info_cache) = inverted_index_info_cache { - nodes.push(local_node.clone()); - names.push("inverted_index_info_cache".to_string()); - num_items.push(inverted_index_info_cache.len() as u64); - size.push(inverted_index_info_cache.size()); - } - if let Some(inverted_index_meta_cache) = inverted_index_meta_cache { nodes.push(local_node.clone()); names.push("inverted_index_meta_cache".to_string()); diff --git a/tests/suites/5_ee/06_inverted_index/06_0000_purge_inverted_index.result b/tests/suites/5_ee/06_inverted_index/06_0000_purge_inverted_index.result index beff18994d90..7a6d6ef0d212 100644 --- a/tests/suites/5_ee/06_inverted_index/06_0000_purge_inverted_index.result +++ b/tests/suites/5_ee/06_inverted_index/06_0000_purge_inverted_index.result @@ -1,28 +1,22 @@ ################### ###1st insertion### ################### -== number of snapshots (expects 3)== -3 -== number of inverted index info files (expects 2) == -2 +== number of snapshots (expects 1)== +1 == number of invert index files (expects 2) == 2 ################### ###2nd insertion### ################### -== number of snapshots (expects 3)== -3 -== number of inverted index info files (expects 4) == -4 +== number of snapshots (expects 1)== +1 == number of invert index files (expects 4) == 4 ################### ###3nd insertion### ################### -== number of snapshots (expects 3)== -3 -== number of inverted index info files (expects 4) == -4 +== number of snapshots (expects 1)== +1 == number of invert index files (expects 6) == 6 ################### @@ -30,16 +24,12 @@ ################### == number of snapshots (expects 1)== 1 -== number of inverted index info files (expects 2) == +== number of invert index files (expects 2) == 2 -== number of invert index files (expects 0) == -0 ################### ####new insertion## ################### -== number of snapshots (expects 3) == -3 -== number of inverted index info files (expects 4) == +== number of snapshots (expects 1) == +1 +== number of invert index files (expects 4) == 4 -== number of invert index files (expects 2) == -2 diff --git a/tests/suites/5_ee/06_inverted_index/06_0000_purge_inverted_index.sh b/tests/suites/5_ee/06_inverted_index/06_0000_purge_inverted_index.sh index 3874cefc0f5e..aa68f1d5f716 100755 --- a/tests/suites/5_ee/06_inverted_index/06_0000_purge_inverted_index.sh +++ b/tests/suites/5_ee/06_inverted_index/06_0000_purge_inverted_index.sh @@ -29,23 +29,12 @@ echo "###1st insertion###" echo "###################" echo "insert into ${TEST_DB}.customer_feedback values('a', 'b')" | $BENDSQL_CLIENT_CONNECT -echo "== number of snapshots (expects 3)==" -# 1 snapshot for the init insertion, other 2 snapshots are created by inverted index refreshment. -# NOTE: -# - the inverted index refreshment happens AFTER auto purge of transient table, -# - the inverted index refreshment will create one new snapshot for each index (2 snapshot in our case, since there are 2 indexes) +echo "== number of snapshots (expects 1)==" +# 1 snapshot for the init insertion echo "select snapshot_id, previous_snapshot_id from fuse_snapshot('db_purge_inverted_index', 'customer_feedback') limit 100" | $BENDSQL_CLIENT_CONNECT | wc -l # uncomment this line to show the details (for local diagnostic only) #echo "select snapshot_id, previous_snapshot_id from fuse_snapshot('db_purge_inverted_index', 'customer_feedback') limit 100" | $BENDSQL_CLIENT_CONNECT -echo "== number of inverted index info files (expects 2) ==" -# NOTE: -# - there are 2 inverted indexes -# - for each index, the inverted index refreshment will create a index info -echo "list @test_purge_ii PATTERN = '.*/_i_ii/.*.mpk'" | $BENDSQL_CLIENT_CONNECT | wc -l -# uncomment this line to show the details (for local diagnostic only) -#echo "list @test_purge_ii PATTERN = '.*/_i_ii/.*.mpk'" | $BENDSQL_CLIENT_CONNECT - echo "== number of invert index files (expects 2) ==" # NOTE: since there are 1 block, 2 inverted indexes. echo "list @test_purge_ii PATTERN = '.*/_i_i/.*.index';" | $BENDSQL_CLIENT_CONNECT | wc -l @@ -53,31 +42,20 @@ echo "list @test_purge_ii PATTERN = '.*/_i_i/.*.index';" | $BENDSQL_CLIENT_CONNE #echo "list @test_purge_ii PATTERN = '.*/_i_i/.*.index';" | $BENDSQL_CLIENT_CONNECT - - echo "###################" echo "###2nd insertion###" echo "###################" echo "insert into ${TEST_DB}.customer_feedback values('a', 'b')" | $BENDSQL_CLIENT_CONNECT -echo "== number of snapshots (expects 3)==" +echo "== number of snapshots (expects 1)==" # NOTE: # - since previous snapshots should be purged, -# - and 1 snapshot created for this new insertion, other 2 snapshots are created by inverted index refreshment. +# - and 1 snapshot created for this new insertion echo "select snapshot_id, previous_snapshot_id from fuse_snapshot('db_purge_inverted_index', 'customer_feedback') limit 100" | $BENDSQL_CLIENT_CONNECT | wc -l # uncomment this line to show the details (for local diagnostic only) # it will shows that the oldest snapshot is the previous_snapshot of the latest snapshot before 2nd insertion #echo "select snapshot_id, previous_snapshot_id from fuse_snapshot('db_purge_inverted_index', 'customer_feedback') limit 100" | $BENDSQL_CLIENT_CONNECT -echo "== number of inverted index info files (expects 4) ==" -# NOTE: -# - 2 of them are referenced by the newly inserted snapshot -# - inverted index info files that (only) belongs to the previous snapshots are purged as expected. -# - other 2 of them are created by the inverted index refreshment. -echo "list @test_purge_ii PATTERN = '.*/_i_ii/.*.mpk'" | $BENDSQL_CLIENT_CONNECT | wc -l -# uncomment this line to show the details (for local diagnostic only) -#echo "list @test_purge_ii PATTERN = '.*/_i_ii/.*.mpk'" | $BENDSQL_CLIENT_CONNECT - echo "== number of invert index files (expects 4) ==" # NOTE: since there are 2 blocks now, each of them will have 2 inverted indexes. echo "list @test_purge_ii PATTERN = '.*/_i_i/.*.index';" | $BENDSQL_CLIENT_CONNECT | wc -l @@ -91,19 +69,14 @@ echo "###################" echo "insert into ${TEST_DB}.customer_feedback values('a', 'b')" | $BENDSQL_CLIENT_CONNECT -echo "== number of snapshots (expects 3)==" +echo "== number of snapshots (expects 1)==" echo "select snapshot_id, previous_snapshot_id from fuse_snapshot('db_purge_inverted_index', 'customer_feedback') limit 100" | $BENDSQL_CLIENT_CONNECT | wc -l # uncomment this line to show the details (for local diagnostic only) # it will shows that the oldest snapshot is the previous_snapshot of the latest snapshot before 3nd insertion #echo "select snapshot_id, previous_snapshot_id from fuse_snapshot('db_purge_inverted_index', 'customer_feedback') limit 100" | $BENDSQL_CLIENT_CONNECT -echo "== number of inverted index info files (expects 4) ==" -echo "list @test_purge_ii PATTERN = '.*/_i_ii/.*.mpk'" | $BENDSQL_CLIENT_CONNECT | wc -l -# uncomment this line to show the details (for local diagnostic only) -#echo "list @test_purge_ii PATTERN = '.*/_i_ii/.*.mpk'" | $BENDSQL_CLIENT_CONNECT - echo "== number of invert index files (expects 6) ==" -# NOTE: since there are 2 blocks now, each of them will have 2 inverted indexes. +# NOTE: since there are 3 blocks now, each of them will have 2 inverted indexes. echo "list @test_purge_ii PATTERN = '.*/_i_i/.*.index';" | $BENDSQL_CLIENT_CONNECT | wc -l # uncomment this line to show the details (for local diagnostic only) #echo "list @test_purge_ii PATTERN = '.*/_i_i/.*.index';" | $BENDSQL_CLIENT_CONNECT @@ -115,18 +88,11 @@ echo "###################" echo "optimize table ${TEST_DB}.customer_feedback compact" | $BENDSQL_CLIENT_CONNECT echo "== number of snapshots (expects 1)==" -# NOTE: since previous snapshot will be purged(transient table), but inverted index refreshment is NOT executed after compaction +# NOTE: since previous snapshot will be purged(transient table), and inverted index is refreshed after compaction echo "select snapshot_id, previous_snapshot_id from fuse_snapshot('db_purge_inverted_index', 'customer_feedback') limit 100" | $BENDSQL_CLIENT_CONNECT | wc -l -echo "== number of inverted index info files (expects 2) ==" -# NOTE: the 2 inverted index info referenced by the last (one) snapshot -echo "list @test_purge_ii PATTERN = '.*/_i_ii/.*.mpk'" | $BENDSQL_CLIENT_CONNECT | wc -l -# uncomment this line to show the details (for local diagnostic only) -#echo "list @test_purge_ii PATTERN = '.*/_i_ii/.*.mpk'" | $BENDSQL_CLIENT_CONNECT - -echo "== number of invert index files (expects 0) ==" -# NOTE: since inverted index refreshment is NOT executed after compaction, and the inverted index files -# that associated with data blocks are purged alone with the fragmented blocks. +echo "== number of invert index files (expects 2) ==" +# NOTE: inverted index is refreshed after compaction echo "list @test_purge_ii PATTERN = '.*/_i_i/.*.index';" | $BENDSQL_CLIENT_CONNECT | wc -l # uncomment this line to show the details (for local diagnostic only) #echo "list @test_purge_ii PATTERN = '.*/_i_i/.*.index';" | $BENDSQL_CLIENT_CONNECT @@ -136,14 +102,10 @@ echo "####new insertion##" echo "###################" echo "insert into ${TEST_DB}.customer_feedback values('a', 'b')" | $BENDSQL_CLIENT_CONNECT -echo "== number of snapshots (expects 3) ==" +echo "== number of snapshots (expects 1) ==" echo "select snapshot_id, previous_snapshot_id from fuse_snapshot('db_purge_inverted_index', 'customer_feedback') limit 100" | $BENDSQL_CLIENT_CONNECT | wc -l -echo "== number of inverted index info files (expects 4) ==" -echo "list @test_purge_ii PATTERN = '.*/_i_ii/.*.mpk'" | $BENDSQL_CLIENT_CONNECT | wc -l -# for local diagnostic only -#echo "list @test_purge_ii PATTERN = '.*/_i_ii/.*.mpk'" | $BENDSQL_CLIENT_CONNECT -echo "== number of invert index files (expects 2) ==" -# NOTE: the inverted index refreshment will ONLY create new indexes for the new blocks, and there is one new block and 2 indexes +echo "== number of invert index files (expects 4) ==" +# NOTE: the inverted index refreshment will create new indexes for the new blocks, and there is one new block and 2 indexes echo "list @test_purge_ii PATTERN = '.*/_i_i/.*.index';" | $BENDSQL_CLIENT_CONNECT | wc -l # for local diagnostic only #echo "list @test_purge_ii PATTERN = '.*/_i_i/.*.index';" | $BENDSQL_CLIENT_CONNECT