From becf6b04dc0064f1c2fb1274408b592b1a74e7f1 Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Mon, 19 Sep 2022 16:44:02 +0800 Subject: [PATCH 1/2] Enable range filter to pre-filter `is_null` and `is_not_null` ops. --- .../fuse/src/pruning/pruning_executor.rs | 9 ++- .../storages/fuse/src/pruning/range_pruner.rs | 10 ++-- src/query/storages/index/src/range_filter.rs | 51 ++++++++++++---- .../09_fuse_engine/09_0018_nullable_prune | 59 +++++++++++++++++++ 4 files changed, 111 insertions(+), 18 deletions(-) create mode 100644 tests/logictest/suites/base/09_fuse_engine/09_0018_nullable_prune diff --git a/src/query/storages/fuse/src/pruning/pruning_executor.rs b/src/query/storages/fuse/src/pruning/pruning_executor.rs index 688f38713bfc..614d31116656 100644 --- a/src/query/storages/fuse/src/pruning/pruning_executor.rs +++ b/src/query/storages/fuse/src/pruning/pruning_executor.rs @@ -118,14 +118,19 @@ impl BlockPruner { } let segment_info = segment_reader.read(seg_loc, None, ver).await?; let mut result = Vec::with_capacity(segment_info.blocks.len()); - if range_filter_pruner.should_keep(&segment_info.summary.col_stats) { + if range_filter_pruner.should_keep( + &segment_info.summary.col_stats, + segment_info.summary.row_count, + ) { for block_meta in &segment_info.blocks { // prune block using range filter if limiter.exceeded() { // before using bloom index to prune, check if limit already exceeded return Ok(result); } - if range_filter_pruner.should_keep(&block_meta.col_stats) { + if range_filter_pruner + .should_keep(&block_meta.col_stats, block_meta.row_count) + { // prune block using bloom filter if bloom_filter_pruner .should_keep( diff --git a/src/query/storages/fuse/src/pruning/range_pruner.rs b/src/query/storages/fuse/src/pruning/range_pruner.rs index 242770fcd9b6..5f4e76e0f4fd 100644 --- a/src/query/storages/fuse/src/pruning/range_pruner.rs +++ b/src/query/storages/fuse/src/pruning/range_pruner.rs @@ -23,13 +23,13 @@ use common_storages_index::RangeFilter; pub trait RangeFilterPruner { // returns ture, if target should NOT be pruned (false positive allowed) - fn should_keep(&self, input: &StatisticsOfColumns) -> bool; + fn should_keep(&self, input: &StatisticsOfColumns, row_count: u64) -> bool; } struct KeepTrue; impl RangeFilterPruner for KeepTrue { - fn should_keep(&self, _input: &StatisticsOfColumns) -> bool { + fn should_keep(&self, _input: &StatisticsOfColumns, _row_count: u64) -> bool { true } } @@ -37,14 +37,14 @@ impl RangeFilterPruner for KeepTrue { struct KeepFalse; impl RangeFilterPruner for KeepFalse { - fn should_keep(&self, _input: &StatisticsOfColumns) -> bool { + fn should_keep(&self, _input: &StatisticsOfColumns, _row_count: u64) -> bool { false } } impl RangeFilterPruner for RangeFilter { - fn should_keep(&self, stats: &StatisticsOfColumns) -> bool { - match self.eval(stats) { + fn should_keep(&self, stats: &StatisticsOfColumns, row_count: u64) -> bool { + match self.eval(stats, row_count) { Ok(r) => r, Err(e) => { // swallow exceptions intentionally, corrupted index should not prevent execution diff --git a/src/query/storages/index/src/range_filter.rs b/src/query/storages/index/src/range_filter.rs index a3cc4a036bf6..2ce1c1c522aa 100644 --- a/src/query/storages/index/src/range_filter.rs +++ b/src/query/storages/index/src/range_filter.rs @@ -105,14 +105,18 @@ impl RangeFilter { } #[tracing::instrument(level = "debug", name = "range_filter_eval", skip_all)] - pub fn eval(&self, stats: &StatisticsOfColumns) -> Result { + pub fn eval(&self, stats: &StatisticsOfColumns, row_count: u64) -> Result { let mut columns = Vec::with_capacity(self.stat_columns.len()); for col in self.stat_columns.iter() { - let val_opt = col.apply_stat_value(stats, self.origin.clone())?; - if val_opt.is_none() { - return Ok(true); + if col.stat_type == StatType::RowCount { + columns.push(Series::from_data(vec![row_count])); + } else { + let val_opt = col.apply_stat_value(stats, self.origin.clone())?; + if val_opt.is_none() { + return Ok(true); + } + columns.push(val_opt.unwrap()); } - columns.push(val_opt.unwrap()); } let data_block = DataBlock::create(self.schema.clone(), columns); let executed_data_block = self.executor.execute(&data_block)?; @@ -135,7 +139,7 @@ pub fn build_verifiable_expr( let (exprs, op) = match expr { Expression::Literal { .. } => return expr.clone(), - Expression::ScalarFunction { op, args } => (args.clone(), op.clone()), + Expression::ScalarFunction { op, args } => try_convert_is_null(op, args.clone()), Expression::BinaryExpression { left, op, right } => match op.to_lowercase().as_str() { "and" => { let left = build_verifiable_expr(left, schema, stat_columns); @@ -173,11 +177,30 @@ fn inverse_operator(op: &str) -> Result<&str> { } } +/// Try to convert `not(is_not_null)` to `is_null`. +fn try_convert_is_null(op: &str, args: Vec) -> (Vec, String) { + // `is null` will be converted to `not(is not null)` in the parser. + // we should convert it back to `is null` here. + if op == "not" && args.len() == 1 { + if let Expression::ScalarFunction { + op: inner_op, + args: inner_args, + } = &args[0] + { + if inner_op == "is_not_null" { + return (inner_args.clone(), String::from("is_null")); + } + } + } + (args, String::from(op)) +} + #[derive(Debug, Copy, Clone, PartialEq)] enum StatType { Min, Max, Nulls, + RowCount, } impl fmt::Display for StatType { @@ -186,6 +209,7 @@ impl fmt::Display for StatType { StatType::Min => write!(f, "min"), StatType::Max => write!(f, "max"), StatType::Nulls => write!(f, "nulls"), + StatType::RowCount => write!(f, "row_count"), } } } @@ -209,7 +233,7 @@ impl StatColumn { expr: Expression, ) -> Self { let column_new = format!("{}_{}", stat_type, field.name()); - let data_type = if matches!(stat_type, StatType::Nulls) { + let data_type = if matches!(stat_type, StatType::Nulls | StatType::RowCount) { u64::to_data_type() } else { field.data_type().clone() @@ -400,15 +424,16 @@ impl<'a> VerifiableExprBuilder<'a> { // TODO: support in/not in. match self.op { "is_null" => { + // should_keep: col.null_count > 0 let nulls_expr = self.nulls_column_expr(0)?; let scalar_expr = lit(0u64); Ok(nulls_expr.gt(scalar_expr)) } "is_not_null" => { - let left_min = self.min_column_expr(0)?; - Ok(Expression::create_scalar_function("is_not_null", vec![ - left_min, - ])) + // should_keep: col.null_count != col.row_count + let nulls_expr = self.nulls_column_expr(0)?; + let row_count_expr = self.row_count_column_expr(0)?; + Ok(nulls_expr.not_eq(row_count_expr)) } "=" => { // left = right => min_left <= max_right and max_left >= min_right @@ -582,6 +607,10 @@ impl<'a> VerifiableExprBuilder<'a> { fn nulls_column_expr(&mut self, index: usize) -> Result { self.stat_column_expr(StatType::Nulls, index) } + + fn row_count_column_expr(&mut self, index: usize) -> Result { + self.stat_column_expr(StatType::RowCount, index) + } } fn is_like_pattern_escape(c: u8) -> bool { diff --git a/tests/logictest/suites/base/09_fuse_engine/09_0018_nullable_prune b/tests/logictest/suites/base/09_fuse_engine/09_0018_nullable_prune new file mode 100644 index 000000000000..54a5ac6e385a --- /dev/null +++ b/tests/logictest/suites/base/09_fuse_engine/09_0018_nullable_prune @@ -0,0 +1,59 @@ +statement ok +DROP DATABASE IF EXISTS db_09_0018; + +statement ok +CREATE DATABASE db_09_0018; + +statement ok +USE db_09_0018; + +statement ok +create table t (a int null); + +statement ok +insert into t select * from numbers(3); + +statement ok +insert into t select null from numbers(3); + +statement query T +explain select * from t; + +---- +TableScan +├── table: default.db_09_0018.t +├── read rows: 6 +├── read bytes: 62 +├── partitions total: 2 +├── partitions scanned: 2 +└── push downs: [filters: [], limit: NONE] + +statement query T +explain select * from t where a is not null; + +---- +TableScan +├── table: default.db_09_0018.t +├── read rows: 3 +├── read bytes: 37 +├── partitions total: 2 +├── partitions scanned: 1 +└── push downs: [filters: [is_not_null(a)], limit: NONE] + +statement query T +explain select * from t where a is null; + +---- +TableScan +├── table: default.db_09_0018.t +├── read rows: 3 +├── read bytes: 25 +├── partitions total: 2 +├── partitions scanned: 1 +└── push downs: [filters: [not(is_not_null(a))], limit: NONE] + +statement ok +DROP TABLE t; + +statement ok +DROP DATABASE db_09_0018; \ No newline at end of file From 6b0f955a724828987a04a8e240dca820581ca030 Mon Sep 17 00:00:00 2001 From: RinChanNOWWW Date: Mon, 19 Sep 2022 17:17:21 +0800 Subject: [PATCH 2/2] fix clippy and unit test. --- .../tests/it/storages/index/range_filter.rs | 10 ++++-- .../hive/src/hive_partition_pruner.rs | 2 +- .../standalone/explain/nullable_prune.test} | 32 ++++++------------- 3 files changed, 19 insertions(+), 25 deletions(-) rename tests/logictest/suites/{base/09_fuse_engine/09_0018_nullable_prune => mode/standalone/explain/nullable_prune.test} (56%) diff --git a/src/query/service/tests/it/storages/index/range_filter.rs b/src/query/service/tests/it/storages/index/range_filter.rs index 4e034c93aa54..8a425d32499a 100644 --- a/src/query/service/tests/it/storages/index/range_filter.rs +++ b/src/query/service/tests/it/storages/index/range_filter.rs @@ -91,6 +91,12 @@ async fn test_range_filter() -> Result<()> { Test { name: "a is not null", expr: Expression::create_scalar_function("is_not_null", vec![col("a")]), + expect: false, + error: "", + }, + Test { + name: "b is not null", + expr: Expression::create_scalar_function("is_not_null", vec![col("b")]), expect: true, error: "", }, @@ -192,7 +198,7 @@ async fn test_range_filter() -> Result<()> { for test in tests { let prune = RangeFilter::try_create(ctx.clone(), &[test.expr], schema.clone())?; - match prune.eval(&stats) { + match prune.eval(&stats, 1) { Ok(actual) => assert_eq!(test.expect, actual, "{:#?}", test.name), Err(e) => assert_eq!(test.error, e.to_string(), "{}", test.name), } @@ -239,7 +245,7 @@ fn test_build_verifiable_function() -> Result<()> { Test { name: "a is not null", expr: Expression::create_scalar_function("is_not_null", vec![col("a")]), - expect: "is_not_null(min_a)", + expect: "(nulls_a != row_count_a)", }, Test { name: "b >= 0 and c like 0xffffff", diff --git a/src/query/storages/hive/src/hive_partition_pruner.rs b/src/query/storages/hive/src/hive_partition_pruner.rs index 06b7915e6a1c..21dd364d073d 100644 --- a/src/query/storages/hive/src/hive_partition_pruner.rs +++ b/src/query/storages/hive/src/hive_partition_pruner.rs @@ -99,7 +99,7 @@ impl HivePartitionPruner { let column_stats = self.get_column_stats(&partitions)?; let mut filted_partitions = vec![]; for (idx, stats) in column_stats.into_iter().enumerate() { - if range_filter.eval(&stats)? { + if range_filter.eval(&stats, 1)? { filted_partitions.push(partitions[idx].clone()); } } diff --git a/tests/logictest/suites/base/09_fuse_engine/09_0018_nullable_prune b/tests/logictest/suites/mode/standalone/explain/nullable_prune.test similarity index 56% rename from tests/logictest/suites/base/09_fuse_engine/09_0018_nullable_prune rename to tests/logictest/suites/mode/standalone/explain/nullable_prune.test index 54a5ac6e385a..83620ee15b37 100644 --- a/tests/logictest/suites/base/09_fuse_engine/09_0018_nullable_prune +++ b/tests/logictest/suites/mode/standalone/explain/nullable_prune.test @@ -1,27 +1,18 @@ statement ok -DROP DATABASE IF EXISTS db_09_0018; +create table t_nullable_prune (a int null); statement ok -CREATE DATABASE db_09_0018; +insert into t_nullable_prune select * from numbers(3); statement ok -USE db_09_0018; - -statement ok -create table t (a int null); - -statement ok -insert into t select * from numbers(3); - -statement ok -insert into t select null from numbers(3); +insert into t_nullable_prune select null from numbers(3); statement query T -explain select * from t; +explain select * from t_nullable_prune; ---- TableScan -├── table: default.db_09_0018.t +├── table: default.default.t_nullable_prune ├── read rows: 6 ├── read bytes: 62 ├── partitions total: 2 @@ -29,11 +20,11 @@ TableScan └── push downs: [filters: [], limit: NONE] statement query T -explain select * from t where a is not null; +explain select * from t_nullable_prune where a is not null; ---- TableScan -├── table: default.db_09_0018.t +├── table: default.default.t_nullable_prune ├── read rows: 3 ├── read bytes: 37 ├── partitions total: 2 @@ -41,11 +32,11 @@ TableScan └── push downs: [filters: [is_not_null(a)], limit: NONE] statement query T -explain select * from t where a is null; +explain select * from t_nullable_prune where a is null; ---- TableScan -├── table: default.db_09_0018.t +├── table: default.default.t_nullable_prune ├── read rows: 3 ├── read bytes: 25 ├── partitions total: 2 @@ -53,7 +44,4 @@ TableScan └── push downs: [filters: [not(is_not_null(a))], limit: NONE] statement ok -DROP TABLE t; - -statement ok -DROP DATABASE db_09_0018; \ No newline at end of file +DROP TABLE default.default.t_nullable_prune;