Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: sync create inverted index #15379

Merged
merged 11 commits into from
Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions scripts/ci/deploy/config/databend-query-node-share-1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ table_bloom_index_meta_count = 3000
table_bloom_index_filter_count = 1048576

### table inverted index caches ###
# Max number of cached inverted index info objects. Set it to 0 to disable it.
inverted_index_info_count = 3000
# Max number of cached inverted index meta objects. Set it to 0 to disable it.
inverted_index_meta_count = 3000
# Max bytes of cached inverted index filters. Set it to 0 to disable it.
inverted_index_filter_size = 2147483648

Expand Down
4 changes: 2 additions & 2 deletions scripts/ci/deploy/config/databend-query-node-share-2.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ table_bloom_index_meta_count = 3000
table_bloom_index_filter_count = 1048576

### table inverted index caches ###
# Max number of cached inverted index info objects. Set it to 0 to disable it.
inverted_index_info_count = 3000
# Max number of cached inverted index meta objects. Set it to 0 to disable it.
inverted_index_meta_count = 3000
# Max bytes of cached inverted index filters. Set it to 0 to disable it.
inverted_index_filter_size = 2147483648

Expand Down
4 changes: 2 additions & 2 deletions scripts/ci/deploy/config/databend-query-node-share-3.toml
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ table_bloom_index_meta_count = 3000
table_bloom_index_filter_count = 1048576

### table inverted index caches ###
# Max number of cached inverted index info objects. Set it to 0 to disable it.
inverted_index_info_count = 3000
# Max number of cached inverted index meta objects. Set it to 0 to disable it.
inverted_index_meta_count = 3000
# Max bytes of cached inverted index filters. Set it to 0 to disable it.
inverted_index_filter_size = 2147483648

Expand Down
40 changes: 40 additions & 0 deletions src/common/metrics/src/metrics/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,22 @@ static BLOCK_INDEX_WRITE_MILLISECONDS: LazyLock<Histogram> =
LazyLock::new(|| register_histogram_in_milliseconds("fuse_block_index_write_milliseconds"));
static BLOCK_INDEX_READ_BYTES: LazyLock<Counter> =
LazyLock::new(|| register_counter("fuse_block_index_read_bytes"));
static BLOCK_INVERTED_INDEX_WRITE_NUMS: LazyLock<Counter> =
LazyLock::new(|| register_counter("fuse_block_inverted_index_write_nums"));
static BLOCK_INVERTED_INDEX_WRITE_BYTES: LazyLock<Counter> =
LazyLock::new(|| register_counter("fuse_block_inverted_index_write_bytes"));
static BLOCK_INVERTED_INDEX_WRITE_MILLISECONDS: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram_in_milliseconds("fuse_block_inverted_index_write_milliseconds")
});
static BLOCK_INVERTED_INDEX_GENERATE_MILLISECONDS: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram_in_milliseconds("fuse_block_inverted_index_generate_milliseconds")
});
static BLOCK_INVERTED_INDEX_READ_MILLISECONDS: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram_in_milliseconds("fuse_block_inverted_index_read_milliseconds")
});
static BLOCK_INVERTED_INDEX_SEARCH_MILLISECONDS: LazyLock<Histogram> = LazyLock::new(|| {
register_histogram_in_milliseconds("fuse_block_inverted_index_search_milliseconds")
});
static COMPACT_BLOCK_READ_NUMS: LazyLock<Counter> =
LazyLock::new(|| register_counter("fuse_compact_block_read_nums"));
static COMPACT_BLOCK_READ_BYTES: LazyLock<Counter> =
Expand Down Expand Up @@ -496,6 +512,30 @@ pub fn metrics_inc_block_index_write_milliseconds(c: u64) {
BLOCK_INDEX_WRITE_MILLISECONDS.observe(c as f64);
}

pub fn metrics_inc_block_inverted_index_write_nums(c: u64) {
BLOCK_INVERTED_INDEX_WRITE_NUMS.inc_by(c);
}

pub fn metrics_inc_block_inverted_index_write_bytes(c: u64) {
BLOCK_INVERTED_INDEX_WRITE_BYTES.inc_by(c);
}

pub fn metrics_inc_block_inverted_index_write_milliseconds(c: u64) {
BLOCK_INVERTED_INDEX_WRITE_MILLISECONDS.observe(c as f64);
}

pub fn metrics_inc_block_inverted_index_generate_milliseconds(c: u64) {
BLOCK_INVERTED_INDEX_GENERATE_MILLISECONDS.observe(c as f64);
}

pub fn metrics_inc_block_inverted_index_read_milliseconds(c: u64) {
BLOCK_INVERTED_INDEX_READ_MILLISECONDS.observe(c as f64);
}

pub fn metrics_inc_block_inverted_index_search_milliseconds(c: u64) {
BLOCK_INVERTED_INDEX_SEARCH_MILLISECONDS.observe(c as f64);
}

pub fn metrics_inc_block_index_read_bytes(c: u64) {
BLOCK_INDEX_READ_BYTES.inc_by(c);
}
Expand Down
10 changes: 5 additions & 5 deletions src/query/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2766,13 +2766,13 @@ pub struct CacheConfig {
)]
pub table_bloom_index_filter_size: u64,

/// Max number of cached inverted index info objects. Set it to 0 to disable it.
/// Max number of cached inverted index meta objects. Set it to 0 to disable it.
#[clap(
long = "cache-inverted-index-info-count",
long = "cache-inverted-index-meta-count",
value_name = "VALUE",
default_value = "3000"
)]
pub inverted_index_info_count: u64,
pub inverted_index_meta_count: u64,

/// Max bytes of cached inverted index filters used. Set it to 0 to disable it.
#[clap(
Expand Down Expand Up @@ -2983,7 +2983,7 @@ mod cache_config_converters {
table_bloom_index_meta_count: value.table_bloom_index_meta_count,
table_bloom_index_filter_count: value.table_bloom_index_filter_count,
table_bloom_index_filter_size: value.table_bloom_index_filter_size,
inverted_index_info_count: value.inverted_index_info_count,
inverted_index_meta_count: value.inverted_index_meta_count,
inverted_index_filter_size: value.inverted_index_filter_size,
inverted_index_filter_memory_ratio: value.inverted_index_filter_memory_ratio,
table_prune_partitions_count: value.table_prune_partitions_count,
Expand All @@ -3008,7 +3008,7 @@ mod cache_config_converters {
table_bloom_index_meta_count: value.table_bloom_index_meta_count,
table_bloom_index_filter_count: value.table_bloom_index_filter_count,
table_bloom_index_filter_size: value.table_bloom_index_filter_size,
inverted_index_info_count: value.inverted_index_info_count,
inverted_index_meta_count: value.inverted_index_meta_count,
inverted_index_filter_size: value.inverted_index_filter_size,
inverted_index_filter_memory_ratio: value.inverted_index_filter_memory_ratio,
table_prune_partitions_count: value.table_prune_partitions_count,
Expand Down
6 changes: 3 additions & 3 deletions src/query/config/src/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,8 +541,8 @@ pub struct CacheConfig {
// One bloom index filter per column of data block being indexed will be generated if necessary.
pub table_bloom_index_filter_size: u64,

/// Max number of cached inverted index info objects. Set it to 0 to disable it.
pub inverted_index_info_count: u64,
/// Max number of cached inverted index meta objects. Set it to 0 to disable it.
pub inverted_index_meta_count: u64,

/// Max bytes of cached inverted index filters used. Set it to 0 to disable it.
pub inverted_index_filter_size: u64,
Expand Down Expand Up @@ -636,7 +636,7 @@ impl Default for CacheConfig {
table_bloom_index_meta_count: 3000,
table_bloom_index_filter_count: 0,
table_bloom_index_filter_size: 2147483648,
inverted_index_info_count: 3000,
inverted_index_meta_count: 3000,
inverted_index_filter_size: 2147483648,
inverted_index_filter_memory_ratio: 0,
table_prune_partitions_count: 256,
Expand Down
22 changes: 8 additions & 14 deletions src/query/ee/tests/it/inverted_index/index_refresh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use databend_common_exception::Result;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_sql::plans::RefreshTableIndexPlan;
use databend_common_storages_fuse::io::read::load_inverted_index_info;
use databend_common_storages_fuse::io::read::InvertedIndexReader;
use databend_common_storages_fuse::io::MetaReaders;
use databend_common_storages_fuse::io::TableMetaLocationGenerator;
Expand Down Expand Up @@ -74,7 +73,7 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> {
table_id,
name: index_name.clone(),
column_ids: vec![0, 1],
sync_creation: true,
sync_creation: false,
options,
};

Expand All @@ -100,18 +99,13 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> {
let new_snapshot = new_fuse_table.read_table_snapshot().await?;
assert!(new_snapshot.is_some());
let new_snapshot = new_snapshot.unwrap();
assert!(new_snapshot.inverted_indexes.is_some());

let index_info_loc = new_snapshot
.inverted_indexes
.as_ref()
.and_then(|i| i.get(&index_name));
assert!(index_info_loc.is_some());
let index_info =
load_inverted_index_info(new_fuse_table.get_operator(), index_info_loc).await?;
assert!(index_info.is_some());
let index_info = index_info.unwrap();
let index_version = index_info.index_version.clone();

let table_info = new_table.get_table_info();
let table_indexes = &table_info.meta.indexes;
let table_index = table_indexes.get(&index_name);
assert!(table_index.is_some());
let table_index = table_index.unwrap();
let index_version = table_index.version.clone();

let segment_reader =
MetaReaders::segment_info_reader(new_fuse_table.get_operator(), table_schema.clone());
Expand Down
18 changes: 7 additions & 11 deletions src/query/ee/tests/it/inverted_index/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ use databend_common_meta_app::schema::CreateTableIndexReq;
use databend_common_sql::plans::CreateTablePlan;
use databend_common_sql::plans::RefreshTableIndexPlan;
use databend_common_sql::BloomIndexColumns;
use databend_common_storages_fuse::io::read::load_inverted_index_info;
use databend_common_storages_fuse::pruning::create_segment_location_vector;
use databend_common_storages_fuse::pruning::FusePruner;
use databend_common_storages_fuse::FuseTable;
Expand Down Expand Up @@ -506,7 +505,7 @@ async fn test_block_pruner() -> Result<()> {
table_id,
name: index_name.clone(),
column_ids: vec![1, 2, 3],
sync_creation: true,
sync_creation: false,
options: index_options.clone(),
};

Expand Down Expand Up @@ -537,15 +536,12 @@ async fn test_block_pruner() -> Result<()> {
assert!(snapshot.is_some());
let snapshot = snapshot.unwrap();

let index_info_loc = snapshot
.inverted_indexes
.as_ref()
.and_then(|i| i.get(&index_name));
assert!(index_info_loc.is_some());
let index_info = load_inverted_index_info(fuse_table.get_operator(), index_info_loc).await?;
assert!(index_info.is_some());
let index_info = index_info.unwrap();
let index_version = index_info.index_version.clone();
let table_info = new_table.get_table_info();
let table_indexes = &table_info.meta.indexes;
let table_index = table_indexes.get(&index_name);
assert!(table_index.is_some());
let table_index = table_index.unwrap();
let index_version = table_index.version.clone();

let index_schema = DataSchema::from(index_table_schema);
let e1 = PushDownInfo {
Expand Down
2 changes: 1 addition & 1 deletion src/query/service/src/interpreters/hook/refresh_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ async fn generate_refresh_inverted_index_plan(

let table_meta = &table.get_table_info().meta;
for (_, index) in table_meta.indexes.iter() {
if !index.sync_creation {
if index.sync_creation {
continue;
}
let plan = RefreshTableIndexPlan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ impl Interpreter for RefreshTableIndexInterpreter {
.manager
.check_enterprise_enabled(self.ctx.get_license_key(), Feature::InvertedIndex)?;

let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let table = self
.ctx
.get_table(&self.plan.catalog, &self.plan.database, &self.plan.table)
Expand Down Expand Up @@ -108,12 +107,11 @@ impl Interpreter for RefreshTableIndexInterpreter {
if let Some(lock_guard) = lock_guard {
build_res.main_pipeline.add_lock_guard(lock_guard);
}

let fuse_table = FuseTable::try_from_table(table.as_ref())?;
fuse_table
.do_refresh_inverted_index(
self.ctx.clone(),
catalog.clone(),
table.clone(),
index_name,
index_version,
&index.options,
Expand Down
1 change: 0 additions & 1 deletion src/query/service/src/test_kits/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ pub async fn generate_snapshots(fixture: &TestFixture) -> Result<()> {
locations,
None,
None,
None,
);
snapshot_1.timestamp = Some(now - Duration::hours(12));
snapshot_1.summary = merge_statistics(&snapshot_0.summary, &segments_v3[0].1.summary, None);
Expand Down
15 changes: 1 addition & 14 deletions src/query/service/tests/it/storages/fuse/meta/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,7 @@ fn default_snapshot() -> TableSnapshot {
let uuid = Uuid::new_v4();
let schema = TableSchema::empty();
let stats = Default::default();
TableSnapshot::new(
uuid,
None,
&None,
None,
schema,
stats,
vec![],
None,
None,
None,
)
TableSnapshot::new(uuid, None, &None, None, schema, stats, vec![], None, None)
}

#[test]
Expand All @@ -61,7 +50,6 @@ fn snapshot_timestamp_monotonic_increase() {
vec![],
None,
None,
None,
);
let current_ts = current.timestamp.unwrap();
let prev_ts = prev.timestamp.unwrap();
Expand All @@ -87,7 +75,6 @@ fn snapshot_timestamp_time_skew_tolerance() {
vec![],
None,
None,
None,
);
let current_ts = current.timestamp.unwrap();
let prev_ts = prev.timestamp.unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ async fn test_commit_to_meta_server() -> Result<()> {
new_segments,
None,
None,
None,
);

let faked_catalog = FakedCatalog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ async fn test_safety() -> Result<()> {
locations.clone(),
None,
None,
None,
);

let limit: usize = rand.gen_range(1..15);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ async fn test_safety_for_recluster() -> Result<()> {
locations.clone(),
None,
None,
None,
));

let mut block_ids = HashSet::new();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ DB.Table: 'system'.'caches', Table: caches-table_id:1, ver:0, Engine: SystemCach
| 'test-node' | 'bloom_index_meta_cache' | 0 | 0 |
| 'test-node' | 'file_meta_data_cache' | 0 | 0 |
| 'test-node' | 'inverted_index_file_cache' | 0 | 0 |
| 'test-node' | 'inverted_index_info_cache' | 0 | 0 |
| 'test-node' | 'inverted_index_meta_cache' | 0 | 0 |
| 'test-node' | 'prune_partitions_cache' | 0 | 0 |
| 'test-node' | 'segment_info_cache' | 0 | 0 |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ DB.Table: 'system'.'configs', Table: configs-table_id:1, ver:0, Engine: SystemCo
| 'cache' | 'enable_table_meta_cache' | 'true' | '' |
| 'cache' | 'inverted_index_filter_memory_ratio' | '0' | '' |
| 'cache' | 'inverted_index_filter_size' | '2147483648' | '' |
| 'cache' | 'inverted_index_info_count' | '3000' | '' |
| 'cache' | 'inverted_index_meta_count' | '3000' | '' |
| 'cache' | 'table_bloom_index_filter_count' | '0' | '' |
| 'cache' | 'table_bloom_index_filter_size' | '2147483648' | '' |
| 'cache' | 'table_bloom_index_meta_count' | '3000' | '' |
Expand Down
2 changes: 1 addition & 1 deletion src/query/sql/src/planner/binder/ddl/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl Binder {
&& field.data_type.remove_nullable() != TableDataType::Variant
{
return Err(ErrorCode::UnsupportedIndex(format!(
"Inverted index currently only support String and variant type, but the type of column {} is {}",
"Inverted index currently only support String and Variant type, but the type of column {} is {}",
column, field.data_type
)));
}
Expand Down
Loading
Loading