Skip to content

Commit

Permalink
refactor: sync create inverted index (#15379)
Browse files Browse the repository at this point in the history
* refactor: sync create inverted index

* fix

* fix

* fix

* fix

* add metrics

* fix prune inverted index tests

* fix
  • Loading branch information
b41sh authored Apr 30, 2024
1 parent aa7960a commit 2783378
Show file tree
Hide file tree
Showing 54 changed files with 338 additions and 692 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions scripts/ci/deploy/config/databend-query-node-share-1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ table_bloom_index_meta_count = 3000
table_bloom_index_filter_count = 1048576

### table inverted index caches ###
# Max number of cached inverted index info objects. Set it to 0 to disable it.
inverted_index_info_count = 3000
# Max number of cached inverted index meta objects. Set it to 0 to disable it.
inverted_index_meta_count = 3000
# Max bytes of cached inverted index filters. Set it to 0 to disable it.
inverted_index_filter_size = 2147483648

Expand Down
4 changes: 2 additions & 2 deletions scripts/ci/deploy/config/databend-query-node-share-2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ table_bloom_index_meta_count = 3000
table_bloom_index_filter_count = 1048576

### table inverted index caches ###
# Max number of cached inverted index info objects. Set it to 0 to disable it.
inverted_index_info_count = 3000
# Max number of cached inverted index meta objects. Set it to 0 to disable it.
inverted_index_meta_count = 3000
# Max bytes of cached inverted index filters. Set it to 0 to disable it.
inverted_index_filter_size = 2147483648

Expand Down
4 changes: 2 additions & 2 deletions scripts/ci/deploy/config/databend-query-node-share-3.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ table_bloom_index_meta_count = 3000
table_bloom_index_filter_count = 1048576

### table inverted index caches ###
# Max number of cached inverted index info objects. Set it to 0 to disable it.
inverted_index_info_count = 3000
# Max number of cached inverted index meta objects. Set it to 0 to disable it.
inverted_index_meta_count = 3000
# Max bytes of cached inverted index filters. Set it to 0 to disable it.
inverted_index_filter_size = 2147483648

Expand Down
40 changes: 40 additions & 0 deletions src/common/metrics/src/metrics/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,22 @@ static BLOCK_INDEX_WRITE_MILLISECONDS: LazyLock<Histogram> =
LazyLock::new(|| register_histogram_in_milliseconds("fuse_block_index_write_milliseconds"));
static BLOCK_INDEX_READ_BYTES: LazyLock<Counter> =
LazyLock::new(|| register_counter("fuse_block_index_read_bytes"));
static BLOCK_INVERTED_INDEX_WRITE_NUMS: LazyLock<Counter> =
LazyLock::new(|| register_counter("fuse_block_inverted_index_write_nums"));
static BLOCK_INVERTED_INDEX_WRITE_BYTES: LazyLock<Counter> =
LazyLock::new(|| register_counter("fuse_block_inverted_index_write_bytes"));
static BLOCK_INVERTED_INDEX_WRITE_MILLISECONDS: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram_in_milliseconds("fuse_block_inverted_index_write_milliseconds")
});
static BLOCK_INVERTED_INDEX_GENERATE_MILLISECONDS: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram_in_milliseconds("fuse_block_inverted_index_generate_milliseconds")
});
static BLOCK_INVERTED_INDEX_READ_MILLISECONDS: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram_in_milliseconds("fuse_block_inverted_index_read_milliseconds")
});
static BLOCK_INVERTED_INDEX_SEARCH_MILLISECONDS: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram_in_milliseconds("fuse_block_inverted_index_search_milliseconds")
});
static COMPACT_BLOCK_READ_NUMS: LazyLock<Counter> =
LazyLock::new(|| register_counter("fuse_compact_block_read_nums"));
static COMPACT_BLOCK_READ_BYTES: LazyLock<Counter> =
Expand Down Expand Up @@ -496,6 +512,30 @@ pub fn metrics_inc_block_index_write_milliseconds(c: u64) {
BLOCK_INDEX_WRITE_MILLISECONDS.observe(c as f64);
}

pub fn metrics_inc_block_inverted_index_write_nums(c: u64) {
BLOCK_INVERTED_INDEX_WRITE_NUMS.inc_by(c);
}

pub fn metrics_inc_block_inverted_index_write_bytes(c: u64) {
BLOCK_INVERTED_INDEX_WRITE_BYTES.inc_by(c);
}

pub fn metrics_inc_block_inverted_index_write_milliseconds(c: u64) {
BLOCK_INVERTED_INDEX_WRITE_MILLISECONDS.observe(c as f64);
}

pub fn metrics_inc_block_inverted_index_generate_milliseconds(c: u64) {
BLOCK_INVERTED_INDEX_GENERATE_MILLISECONDS.observe(c as f64);
}

pub fn metrics_inc_block_inverted_index_read_milliseconds(c: u64) {
BLOCK_INVERTED_INDEX_READ_MILLISECONDS.observe(c as f64);
}

pub fn metrics_inc_block_inverted_index_search_milliseconds(c: u64) {
BLOCK_INVERTED_INDEX_SEARCH_MILLISECONDS.observe(c as f64);
}

pub fn metrics_inc_block_index_read_bytes(c: u64) {
BLOCK_INDEX_READ_BYTES.inc_by(c);
}
Expand Down
10 changes: 5 additions & 5 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2766,13 +2766,13 @@ pub struct CacheConfig {
)]
pub table_bloom_index_filter_size: u64,

/// Max number of cached inverted index info objects. Set it to 0 to disable it.
/// Max number of cached inverted index meta objects. Set it to 0 to disable it.
#[clap(
long = "cache-inverted-index-info-count",
long = "cache-inverted-index-meta-count",
value_name = "VALUE",
default_value = "3000"
)]
pub inverted_index_info_count: u64,
pub inverted_index_meta_count: u64,

/// Max bytes of cached inverted index filters used. Set it to 0 to disable it.
#[clap(
Expand Down Expand Up @@ -2983,7 +2983,7 @@ mod cache_config_converters {
table_bloom_index_meta_count: value.table_bloom_index_meta_count,
table_bloom_index_filter_count: value.table_bloom_index_filter_count,
table_bloom_index_filter_size: value.table_bloom_index_filter_size,
inverted_index_info_count: value.inverted_index_info_count,
inverted_index_meta_count: value.inverted_index_meta_count,
inverted_index_filter_size: value.inverted_index_filter_size,
inverted_index_filter_memory_ratio: value.inverted_index_filter_memory_ratio,
table_prune_partitions_count: value.table_prune_partitions_count,
Expand All @@ -3008,7 +3008,7 @@ mod cache_config_converters {
table_bloom_index_meta_count: value.table_bloom_index_meta_count,
table_bloom_index_filter_count: value.table_bloom_index_filter_count,
table_bloom_index_filter_size: value.table_bloom_index_filter_size,
inverted_index_info_count: value.inverted_index_info_count,
inverted_index_meta_count: value.inverted_index_meta_count,
inverted_index_filter_size: value.inverted_index_filter_size,
inverted_index_filter_memory_ratio: value.inverted_index_filter_memory_ratio,
table_prune_partitions_count: value.table_prune_partitions_count,
Expand Down
6 changes: 3 additions & 3 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,8 @@ pub struct CacheConfig {
// One bloom index filter per column of data block being indexed will be generated if necessary.
pub table_bloom_index_filter_size: u64,

/// Max number of cached inverted index info objects. Set it to 0 to disable it.
pub inverted_index_info_count: u64,
/// Max number of cached inverted index meta objects. Set it to 0 to disable it.
pub inverted_index_meta_count: u64,

/// Max bytes of cached inverted index filters used. Set it to 0 to disable it.
pub inverted_index_filter_size: u64,
Expand Down Expand Up @@ -636,7 +636,7 @@ impl Default for CacheConfig {
table_bloom_index_meta_count: 3000,
table_bloom_index_filter_count: 0,
table_bloom_index_filter_size: 2147483648,
inverted_index_info_count: 3000,
inverted_index_meta_count: 3000,
inverted_index_filter_size: 2147483648,
inverted_index_filter_memory_ratio: 0,
table_prune_partitions_count: 256,
Expand Down
22 changes: 8 additions & 14 deletions src/query/ee/tests/it/inverted_index/index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use databend_common_exception::Result;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_sql::plans::RefreshTableIndexPlan;
use databend_common_storages_fuse::io::read::load_inverted_index_info;
use databend_common_storages_fuse::io::read::InvertedIndexReader;
use databend_common_storages_fuse::io::MetaReaders;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
Expand Down Expand Up @@ -74,7 +73,7 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> {
table_id,
name: index_name.clone(),
column_ids: vec![0, 1],
sync_creation: true,
sync_creation: false,
options,
};

Expand All @@ -100,18 +99,13 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> {
let new_snapshot = new_fuse_table.read_table_snapshot().await?;
assert!(new_snapshot.is_some());
let new_snapshot = new_snapshot.unwrap();
assert!(new_snapshot.inverted_indexes.is_some());

let index_info_loc = new_snapshot
.inverted_indexes
.as_ref()
.and_then(|i| i.get(&index_name));
assert!(index_info_loc.is_some());
let index_info =
load_inverted_index_info(new_fuse_table.get_operator(), index_info_loc).await?;
assert!(index_info.is_some());
let index_info = index_info.unwrap();
let index_version = index_info.index_version.clone();

let table_info = new_table.get_table_info();
let table_indexes = &table_info.meta.indexes;
let table_index = table_indexes.get(&index_name);
assert!(table_index.is_some());
let table_index = table_index.unwrap();
let index_version = table_index.version.clone();

let segment_reader =
MetaReaders::segment_info_reader(new_fuse_table.get_operator(), table_schema.clone());
Expand Down
18 changes: 7 additions & 11 deletions src/query/ee/tests/it/inverted_index/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_sql::plans::CreateTablePlan;
use databend_common_sql::plans::RefreshTableIndexPlan;
use databend_common_sql::BloomIndexColumns;
use databend_common_storages_fuse::io::read::load_inverted_index_info;
use databend_common_storages_fuse::pruning::create_segment_location_vector;
use databend_common_storages_fuse::pruning::FusePruner;
use databend_common_storages_fuse::FuseTable;
Expand Down Expand Up @@ -506,7 +505,7 @@ async fn test_block_pruner() -> Result<()> {
table_id,
name: index_name.clone(),
column_ids: vec![1, 2, 3],
sync_creation: true,
sync_creation: false,
options: index_options.clone(),
};

Expand Down Expand Up @@ -537,15 +536,12 @@ async fn test_block_pruner() -> Result<()> {
assert!(snapshot.is_some());
let snapshot = snapshot.unwrap();

let index_info_loc = snapshot
.inverted_indexes
.as_ref()
.and_then(|i| i.get(&index_name));
assert!(index_info_loc.is_some());
let index_info = load_inverted_index_info(fuse_table.get_operator(), index_info_loc).await?;
assert!(index_info.is_some());
let index_info = index_info.unwrap();
let index_version = index_info.index_version.clone();
let table_info = new_table.get_table_info();
let table_indexes = &table_info.meta.indexes;
let table_index = table_indexes.get(&index_name);
assert!(table_index.is_some());
let table_index = table_index.unwrap();
let index_version = table_index.version.clone();

let index_schema = DataSchema::from(index_table_schema);
let e1 = PushDownInfo {
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/hook/refresh_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ async fn generate_refresh_inverted_index_plan(

let table_meta = &table.get_table_info().meta;
for (_, index) in table_meta.indexes.iter() {
if !index.sync_creation {
if index.sync_creation {
continue;
}
let plan = RefreshTableIndexPlan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ impl Interpreter for RefreshTableIndexInterpreter {
.manager
.check_enterprise_enabled(self.ctx.get_license_key(), Feature::InvertedIndex)?;

let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let table = self
.ctx
.get_table(&self.plan.catalog, &self.plan.database, &self.plan.table)
Expand Down Expand Up @@ -108,12 +107,11 @@ impl Interpreter for RefreshTableIndexInterpreter {
if let Some(lock_guard) = lock_guard {
build_res.main_pipeline.add_lock_guard(lock_guard);
}

let fuse_table = FuseTable::try_from_table(table.as_ref())?;
fuse_table
.do_refresh_inverted_index(
self.ctx.clone(),
catalog.clone(),
table.clone(),
index_name,
index_version,
&index.options,
Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/test_kits/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> {
locations,
None,
None,
None,
);
snapshot_1.timestamp = Some(now - Duration::hours(12));
snapshot_1.summary = merge_statistics(&snapshot_0.summary, &segments_v3[0].1.summary, None);
Expand Down
15 changes: 1 addition & 14 deletions src/query/service/tests/it/storages/fuse/meta/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,7 @@ fn default_snapshot() -> TableSnapshot {
let uuid = Uuid::new_v4();
let schema = TableSchema::empty();
let stats = Default::default();
TableSnapshot::new(
uuid,
None,
&None,
None,
schema,
stats,
vec![],
None,
None,
None,
)
TableSnapshot::new(uuid, None, &None, None, schema, stats, vec![], None, None)
}

#[test]
Expand All @@ -61,7 +50,6 @@ fn snapshot_timestamp_monotonic_increase() {
vec![],
None,
None,
None,
);
let current_ts = current.timestamp.unwrap();
let prev_ts = prev.timestamp.unwrap();
Expand All @@ -87,7 +75,6 @@ fn snapshot_timestamp_time_skew_tolerance() {
vec![],
None,
None,
None,
);
let current_ts = current.timestamp.unwrap();
let prev_ts = prev.timestamp.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ async fn test_commit_to_meta_server() -> Result<()> {
new_segments,
None,
None,
None,
);

let faked_catalog = FakedCatalog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ async fn test_safety() -> Result<()> {
locations.clone(),
None,
None,
None,
);

let limit: usize = rand.gen_range(1..15);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ async fn test_safety_for_recluster() -> Result<()> {
locations.clone(),
None,
None,
None,
));

let mut block_ids = HashSet::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ DB.Table: 'system'.'caches', Table: caches-table_id:1, ver:0, Engine: SystemCach
| 'test-node' | 'bloom_index_meta_cache' | 0 | 0 |
| 'test-node' | 'file_meta_data_cache' | 0 | 0 |
| 'test-node' | 'inverted_index_file_cache' | 0 | 0 |
| 'test-node' | 'inverted_index_info_cache' | 0 | 0 |
| 'test-node' | 'inverted_index_meta_cache' | 0 | 0 |
| 'test-node' | 'prune_partitions_cache' | 0 | 0 |
| 'test-node' | 'segment_info_cache' | 0 | 0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| 'cache' | 'enable_table_meta_cache' | 'true' | '' |
| 'cache' | 'inverted_index_filter_memory_ratio' | '0' | '' |
| 'cache' | 'inverted_index_filter_size' | '2147483648' | '' |
| 'cache' | 'inverted_index_info_count' | '3000' | '' |
| 'cache' | 'inverted_index_meta_count' | '3000' | '' |
| 'cache' | 'table_bloom_index_filter_count' | '0' | '' |
| 'cache' | 'table_bloom_index_filter_size' | '2147483648' | '' |
| 'cache' | 'table_bloom_index_meta_count' | '3000' | '' |
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/ddl/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl Binder {
&& field.data_type.remove_nullable() != TableDataType::Variant
{
return Err(ErrorCode::UnsupportedIndex(format!(
"Inverted index currently only support String and variant type, but the type of column {} is {}",
"Inverted index currently only support String and Variant type, but the type of column {} is {}",
column, field.data_type
)));
}
Expand Down
Loading

0 comments on commit 2783378

Please sign in to comment.