Skip to content

Commit

Permalink
Merge pull request #7725 from RinChanNOWWW/nullable_pruner
Browse files Browse the repository at this point in the history
feat(query): enable range filter to pre-filter `is_null` and `is_not_null` ops.
  • Loading branch information
mergify[bot] authored Sep 20, 2022
2 parents 254feaf + 88b0854 commit 985d6f8
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 21 deletions.
10 changes: 8 additions & 2 deletions src/query/service/tests/it/storages/index/range_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ async fn test_range_filter() -> Result<()> {
Test {
name: "a is not null",
expr: Expression::create_scalar_function("is_not_null", vec![col("a")]),
expect: false,
error: "",
},
Test {
name: "b is not null",
expr: Expression::create_scalar_function("is_not_null", vec![col("b")]),
expect: true,
error: "",
},
Expand Down Expand Up @@ -192,7 +198,7 @@ async fn test_range_filter() -> Result<()> {
for test in tests {
let prune = RangeFilter::try_create(ctx.clone(), &[test.expr], schema.clone())?;

match prune.eval(&stats) {
match prune.eval(&stats, 1) {
Ok(actual) => assert_eq!(test.expect, actual, "{:#?}", test.name),
Err(e) => assert_eq!(test.error, e.to_string(), "{}", test.name),
}
Expand Down Expand Up @@ -239,7 +245,7 @@ fn test_build_verifiable_function() -> Result<()> {
Test {
name: "a is not null",
expr: Expression::create_scalar_function("is_not_null", vec![col("a")]),
expect: "is_not_null(min_a)",
expect: "(nulls_a != row_count_a)",
},
Test {
name: "b >= 0 and c like 0xffffff",
Expand Down
9 changes: 7 additions & 2 deletions src/query/storages/fuse/src/pruning/pruning_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,19 @@ impl BlockPruner {
}
let segment_info = segment_reader.read(seg_loc, None, ver).await?;
let mut result = Vec::with_capacity(segment_info.blocks.len());
if range_filter_pruner.should_keep(&segment_info.summary.col_stats) {
if range_filter_pruner.should_keep(
&segment_info.summary.col_stats,
segment_info.summary.row_count,
) {
for block_meta in &segment_info.blocks {
// prune block using range filter
if limiter.exceeded() {
// before using bloom index to prune, check if limit already exceeded
return Ok(result);
}
if range_filter_pruner.should_keep(&block_meta.col_stats) {
if range_filter_pruner
.should_keep(&block_meta.col_stats, block_meta.row_count)
{
// prune block using bloom filter
if bloom_filter_pruner
.should_keep(
Expand Down
10 changes: 5 additions & 5 deletions src/query/storages/fuse/src/pruning/range_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,28 @@ use common_storages_index::RangeFilter;

pub trait RangeFilterPruner {
// returns ture, if target should NOT be pruned (false positive allowed)
fn should_keep(&self, input: &StatisticsOfColumns) -> bool;
fn should_keep(&self, input: &StatisticsOfColumns, row_count: u64) -> bool;
}

struct KeepTrue;

impl RangeFilterPruner for KeepTrue {
fn should_keep(&self, _input: &StatisticsOfColumns) -> bool {
fn should_keep(&self, _input: &StatisticsOfColumns, _row_count: u64) -> bool {
true
}
}

struct KeepFalse;

impl RangeFilterPruner for KeepFalse {
fn should_keep(&self, _input: &StatisticsOfColumns) -> bool {
fn should_keep(&self, _input: &StatisticsOfColumns, _row_count: u64) -> bool {
false
}
}

impl RangeFilterPruner for RangeFilter {
fn should_keep(&self, stats: &StatisticsOfColumns) -> bool {
match self.eval(stats) {
fn should_keep(&self, stats: &StatisticsOfColumns, row_count: u64) -> bool {
match self.eval(stats, row_count) {
Ok(r) => r,
Err(e) => {
// swallow exceptions intentionally, corrupted index should not prevent execution
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/hive/src/hive_partition_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl HivePartitionPruner {
let column_stats = self.get_column_stats(&partitions)?;
let mut filted_partitions = vec![];
for (idx, stats) in column_stats.into_iter().enumerate() {
if range_filter.eval(&stats)? {
if range_filter.eval(&stats, 1)? {
filted_partitions.push(partitions[idx].clone());
}
}
Expand Down
51 changes: 40 additions & 11 deletions src/query/storages/index/src/range_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,18 @@ impl RangeFilter {
}

#[tracing::instrument(level = "debug", name = "range_filter_eval", skip_all)]
pub fn eval(&self, stats: &StatisticsOfColumns) -> Result<bool> {
pub fn eval(&self, stats: &StatisticsOfColumns, row_count: u64) -> Result<bool> {
let mut columns = Vec::with_capacity(self.stat_columns.len());
for col in self.stat_columns.iter() {
let val_opt = col.apply_stat_value(stats, self.origin.clone())?;
if val_opt.is_none() {
return Ok(true);
if col.stat_type == StatType::RowCount {
columns.push(Series::from_data(vec![row_count]));
} else {
let val_opt = col.apply_stat_value(stats, self.origin.clone())?;
if val_opt.is_none() {
return Ok(true);
}
columns.push(val_opt.unwrap());
}
columns.push(val_opt.unwrap());
}
let data_block = DataBlock::create(self.schema.clone(), columns);
let executed_data_block = self.executor.execute(&data_block)?;
Expand All @@ -135,7 +139,7 @@ pub fn build_verifiable_expr(

let (exprs, op) = match expr {
Expression::Literal { .. } => return expr.clone(),
Expression::ScalarFunction { op, args } => (args.clone(), op.clone()),
Expression::ScalarFunction { op, args } => try_convert_is_null(op, args.clone()),
Expression::BinaryExpression { left, op, right } => match op.to_lowercase().as_str() {
"and" => {
let left = build_verifiable_expr(left, schema, stat_columns);
Expand Down Expand Up @@ -173,11 +177,30 @@ fn inverse_operator(op: &str) -> Result<&str> {
}
}

/// Try to convert `not(is_not_null)` to `is_null`.
fn try_convert_is_null(op: &str, args: Vec<Expression>) -> (Vec<Expression>, String) {
// `is null` will be converted to `not(is not null)` in the parser.
// we should convert it back to `is null` here.
if op == "not" && args.len() == 1 {
if let Expression::ScalarFunction {
op: inner_op,
args: inner_args,
} = &args[0]
{
if inner_op == "is_not_null" {
return (inner_args.clone(), String::from("is_null"));
}
}
}
(args, String::from(op))
}

#[derive(Debug, Copy, Clone, PartialEq)]
enum StatType {
Min,
Max,
Nulls,
RowCount,
}

impl fmt::Display for StatType {
Expand All @@ -186,6 +209,7 @@ impl fmt::Display for StatType {
StatType::Min => write!(f, "min"),
StatType::Max => write!(f, "max"),
StatType::Nulls => write!(f, "nulls"),
StatType::RowCount => write!(f, "row_count"),
}
}
}
Expand All @@ -209,7 +233,7 @@ impl StatColumn {
expr: Expression,
) -> Self {
let column_new = format!("{}_{}", stat_type, field.name());
let data_type = if matches!(stat_type, StatType::Nulls) {
let data_type = if matches!(stat_type, StatType::Nulls | StatType::RowCount) {
u64::to_data_type()
} else {
field.data_type().clone()
Expand Down Expand Up @@ -400,15 +424,16 @@ impl<'a> VerifiableExprBuilder<'a> {
// TODO: support in/not in.
match self.op {
"is_null" => {
// should_keep: col.null_count > 0
let nulls_expr = self.nulls_column_expr(0)?;
let scalar_expr = lit(0u64);
Ok(nulls_expr.gt(scalar_expr))
}
"is_not_null" => {
let left_min = self.min_column_expr(0)?;
Ok(Expression::create_scalar_function("is_not_null", vec![
left_min,
]))
// should_keep: col.null_count != col.row_count
let nulls_expr = self.nulls_column_expr(0)?;
let row_count_expr = self.row_count_column_expr(0)?;
Ok(nulls_expr.not_eq(row_count_expr))
}
"=" => {
// left = right => min_left <= max_right and max_left >= min_right
Expand Down Expand Up @@ -582,6 +607,10 @@ impl<'a> VerifiableExprBuilder<'a> {
fn nulls_column_expr(&mut self, index: usize) -> Result<Expression> {
self.stat_column_expr(StatType::Nulls, index)
}

fn row_count_column_expr(&mut self, index: usize) -> Result<Expression> {
self.stat_column_expr(StatType::RowCount, index)
}
}

fn is_like_pattern_escape(c: u8) -> bool {
Expand Down
47 changes: 47 additions & 0 deletions tests/logictest/suites/mode/standalone/explain/nullable_prune.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
statement ok
create table t_nullable_prune (a int null);

statement ok
insert into t_nullable_prune select * from numbers(3);

statement ok
insert into t_nullable_prune select null from numbers(3);

statement query T
explain select * from t_nullable_prune;

----
TableScan
├── table: default.default.t_nullable_prune
├── read rows: 6
├── read bytes: 62
├── partitions total: 2
├── partitions scanned: 2
└── push downs: [filters: [], limit: NONE]

statement query T
explain select * from t_nullable_prune where a is not null;

----
TableScan
├── table: default.default.t_nullable_prune
├── read rows: 3
├── read bytes: 37
├── partitions total: 2
├── partitions scanned: 1
└── push downs: [filters: [is_not_null(a)], limit: NONE]

statement query T
explain select * from t_nullable_prune where a is null;

----
TableScan
├── table: default.default.t_nullable_prune
├── read rows: 3
├── read bytes: 25
├── partitions total: 2
├── partitions scanned: 1
└── push downs: [filters: [not(is_not_null(a))], limit: NONE]

statement ok
DROP TABLE default.default.t_nullable_prune;

1 comment on commit 985d6f8

@vercel
Copy link

@vercel vercel bot commented on 985d6f8 Sep 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend-git-main-databend.vercel.app
databend.vercel.app
databend-databend.vercel.app
databend.rs

Please sign in to comment.