Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deletion of null values #6277

Merged
merged 3 commits into from
Jun 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions common/datablocks/src/kernels/data_block_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,31 @@ impl DataBlock {

let predict_boolean_nonull = Self::cast_to_nonull_boolean(predicate)?;
// faster path for constant filter
if predict_boolean_nonull.is_const() {
let flag = predict_boolean_nonull.get_bool(0)?;
if flag {
return Ok(block);
if let Ok(Some(const_bool)) = Self::try_as_const_bool(&predict_boolean_nonull) {
return if const_bool {
Ok(block)
} else {
return Ok(DataBlock::empty_with_schema(block.schema().clone()));
}
Ok(DataBlock::empty_with_schema(block.schema().clone()))
};
}

let boolean_col: &BooleanColumn = Series::check_get(&predict_boolean_nonull)?;
let rows = boolean_col.len();
let count_zeros = boolean_col.values().null_count();
Self::filter_block_with_bool_column(block, boolean_col)
}

pub fn try_as_const_bool(column_reference: &ColumnRef) -> Result<Option<bool>> {
if column_reference.is_const() {
Ok(Some(column_reference.get_bool(0)?))
} else {
Ok(None)
}
}

pub fn filter_block_with_bool_column(
block: DataBlock,
filter: &BooleanColumn,
) -> Result<DataBlock> {
let rows = filter.len();
let count_zeros = filter.values().null_count();
match count_zeros {
0 => Ok(block),
_ => {
Expand All @@ -49,7 +62,7 @@ impl DataBlock {
}
let mut after_columns = Vec::with_capacity(block.num_columns());
for data_column in block.columns() {
after_columns.push(data_column.filter(boolean_col));
after_columns.push(data_column.filter(filter));
}

Ok(DataBlock::create(block.schema().clone(), after_columns))
Expand Down
30 changes: 30 additions & 0 deletions common/datablocks/tests/it/kernels/data_block_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,33 @@ fn test_filter_all_const_data_block() -> Result<()> {

Ok(())
}

#[test]
fn test_filter_try_as_const_bool() -> Result<()> {
{
fn const_val(v: bool) {
let const_col = ConstColumn::new(Series::from_data(vec![v]), 6);
let predicate: Arc<dyn Column> = Arc::new(const_col);
let r = DataBlock::try_as_const_bool(&predicate);
assert!(matches!(r, Ok(Some(p)) if p == v));
}
// const values, should return Some(val)
const_val(true);
const_val(false);
}
{
// non-const value, should return None
let predicate = Series::from_data(vec![false]);
let r = DataBlock::try_as_const_bool(&predicate);
assert!(matches!(r, Ok(None)));
}

{
// const non-bool column , should return Err
let const_col = ConstColumn::new(Series::from_data(vec![vec![1]]), 6);
let predicate: Arc<dyn Column> = Arc::new(const_col);
let r = DataBlock::try_as_const_bool(&predicate);
assert!(matches!(r, Err(_)));
}
Ok(())
}
4 changes: 1 addition & 3 deletions query/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ impl Interpreter for DeleteInterpreter {
&self,
_input_stream: Option<SendableDataBlockStream>,
) -> Result<SendableDataBlockStream> {
// TODO
// 1. check privilege
// 2. optimize the plan, at least constant folding?
// TODO check privilege
let catalog_name = self.plan.catalog_name.as_str();
let db_name = self.plan.database_name.as_str();
let tbl_name = self.plan.table_name.as_str();
Expand Down
1 change: 0 additions & 1 deletion query/src/storages/fuse/io/read/block_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ impl BlockReader {
let idx = *col_idx[i];
let field = self.arrow_schema.fields[idx].clone();
let column_descriptor = &self.parquet_schema_descriptor.columns()[idx];
//let column_meta = &part.columns_meta[&idx];
let column_meta = &meta
.col_metas
.get(&(i as u32))
Expand Down
1 change: 1 addition & 0 deletions query/src/storages/fuse/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ mod truncate;
pub mod util;

pub use fuse_sink::FuseTableSink;
pub use mutation::delete_from_block;
pub use operation_log::AppendOperationLogEntry;
pub use operation_log::TableOperationLog;
pub use util::column_metas;
59 changes: 40 additions & 19 deletions query/src/storages/fuse/operations/mutation/block_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::ops::Not;
use std::sync::Arc;

use common_datablocks::DataBlock;
use common_datavalues::BooleanColumn;
use common_datavalues::DataSchemaRefExt;
use common_datavalues::Series;
use common_exception::Result;
use common_planners::Expression;

Expand All @@ -37,45 +40,61 @@ pub async fn delete_from_block(
// extract the columns that are going to be filtered on
let col_ids = {
if filter_column_ids.is_empty() {
// here the situation: filter_expr is not null, but filter_column_ids in not empty, which
// indicates the expr being evaluated is unrelated to the value of rows:
// e.g.
// `delete from t where 1 = 1`, `delete from t where now()`,
// or `delete from t where RANDOM()::INT::BOOLEAN`
// tobe refined:
// if the `filter_expr` is of "constant" nullary :
// for the whole block, whether all of the rows should be kept or dropped,
// we can just return from here, without accessing the block data
filtering_whole_block = true;
// To be optimized (in `interpreter_delete`, if we adhere the style of interpreter_select)
// In this case, the expr being evaluated is unrelated to the value of rows:
// - if the `filter_expr` is of "constant" function:
// for the whole block, whether all of the rows should be kept or dropped
// - but, the expr may NOT be deterministic, e.g.
// A nullary non-constant func, which returns true, false or NULL randomly
all_the_columns_ids(table)
} else {
filter_column_ids
}
};

// read the cols that we are going to filtering on
let reader = table.create_block_reader(ctx, col_ids)?;
let data_block = reader.read_with_block_meta(block_meta).await?;

// inverse the expr
let inverse_expr = Expression::UnaryExpression {
op: "not".to_string(),
expr: Box::new(filter_expr.clone()),
};

let schema = table.table_info.schema();
let expr_field = inverse_expr.to_data_field(&schema)?;
let expr_field = filter_expr.to_data_field(&schema)?;
let expr_schema = DataSchemaRefExt::create(vec![expr_field]);

// get the filter
let expr_exec = ExpressionExecutor::try_create(
ctx.clone(),
"filter expression executor (delete) ",
schema.clone(),
expr_schema,
vec![inverse_expr],
vec![filter_expr.clone()],
false,
)?;

// get the single col data block, which indicates the rows should be kept/removed
let filter_result = expr_exec.execute(&data_block)?;

// read the whole block
let predicates = DataBlock::cast_to_nonull_boolean(filter_result.column(0))?;
// shortcut, if predicates is const boolean (or can be cast to boolean)
if let Some(const_bool) = DataBlock::try_as_const_bool(&predicates)? {
return if const_bool {
// all the rows should be removed
Ok(Deletion::Remains(DataBlock::empty_with_schema(
data_block.schema().clone(),
)))
} else {
// none of the rows should be removed
Ok(Deletion::NothingDeleted)
};
}

// reverse the filter
let boolean_col: &BooleanColumn = Series::check_get(&predicates)?;
let values = boolean_col.values();
let filter = BooleanColumn::from_arrow_data(values.not());

// read the whole block if necessary
let whole_block = if filtering_whole_block {
data_block
} else {
Expand All @@ -84,9 +103,11 @@ pub async fn delete_from_block(
whole_block_reader.read_with_block_meta(block_meta).await?
};

// returns the data remains after deletion
let data_block = DataBlock::filter_block(whole_block, filter_result.column(0))?;
// filter out rows
let data_block = DataBlock::filter_block_with_bool_column(whole_block, &filter)?;

let res = if data_block.num_rows() == block_meta.row_count as usize {
// false positive, nothing removed indeed
Deletion::NothingDeleted
} else {
Deletion::Remains(data_block)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_exception::Result;
use opendal::Operator;

use crate::sessions::QueryContext;
use crate::storages::fuse::io::write_meta;
use crate::storages::fuse::io::BlockWriter;
use crate::storages::fuse::io::MetaReaders;
use crate::storages::fuse::io::SegmentWriter;
Expand Down Expand Up @@ -133,16 +134,12 @@ impl<'a> DeletionCollector<'a> {
let new_summary = reduce_statistics(&new_segment_summaries)?;
new_snapshot.summary = new_summary;

// write the new segment out (and keep it in undo log)
// write the new snapshot out (and keep it in undo log)
let snapshot_loc = self.location_generator.snapshot_location_from_uuid(
&new_snapshot.snapshot_id,
new_snapshot.format_version(),
)?;
let bytes = serde_json::to_vec(&new_snapshot)?;
self.data_accessor
.object(&snapshot_loc)
.write(bytes)
.await?;
write_meta(&self.data_accessor, &snapshot_loc, &new_snapshot).await?;
Ok((new_snapshot, snapshot_loc))
}

Expand Down
17 changes: 17 additions & 0 deletions tests/suites/0_stateless/03_dml/03_0025_delete_from.result
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,20 @@ other rows should be kept
1
deleted unconditionally
1
nullable column cases
delete with condition "const false"
1
normal deletion
expects one row deleted
1
expects null valued row kept
1
delete all null valued rows
expects no null valued row kept
1
expects 1 null valued row kept
1
deleted unconditionally (with const true)
1
deleted with nullary expr now()
1
36 changes: 36 additions & 0 deletions tests/suites/0_stateless/03_dml/03_0025_delete_from.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,40 @@ select 'deleted unconditionally';
delete from t;
select count(*) = 0 from t;

drop table t all;

-- setup
select 'nullable column cases';
create table t (c Int null);
insert into t values (1),(2),(NULL);

select 'delete with condition "const false"';
delete from t where 1 = 0;
select count(*) = 3 from t;

select 'normal deletion';
delete from t where c = 1;
select 'expects one row deleted';
select count(*) = 2 from t;
select 'expects null valued row kept';
select count(*) = 1 from t where c IS NULL;

select 'delete all null valued rows';
delete from t where c IS NULL;
select 'expects no null valued row kept';
select count(*) = 0 from t where c IS NULL;
select 'expects 1 null valued row kept';
select count(*) = 1 from t where c IS NOT NULL;

select 'deleted unconditionally (with const true)';
delete from t where 1 = 1;
select count(*) = 0 from t;

insert into t values (1),(2),(NULL);
select 'deleted with nullary expr now()';
delete from t where now();
select count(*) = 0 from t;

drop table t all;

DROP DATABASE db1;
17 changes: 17 additions & 0 deletions tests/suites/0_stateless/03_dml/03_0025_delete_from_v2.result
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,20 @@ other rows should be kept
1
deleted unconditionally
1
nullable column cases
delete with condition "const false"
1
normal deletion
expects one row deleted
1
expects null valued row kept
1
delete all null valued rows
expects no null valued row kept
1
expects 1 null valued row kept
1
deleted unconditionally (with const true)
1
deleted with nullary expr now()
1
36 changes: 36 additions & 0 deletions tests/suites/0_stateless/03_dml/03_0025_delete_from_v2.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,40 @@ select 'deleted unconditionally';
delete from t;
select count(*) = 0 from t;

drop table t all;

-- setup
select 'nullable column cases';
create table t (c Int null);
insert into t values (1),(2),(NULL);

select 'delete with condition "const false"';
delete from t where 1 = 0;
select count(*) = 3 from t;

select 'normal deletion';
delete from t where c = 1;
select 'expects one row deleted';
select count(*) = 2 from t;
select 'expects null valued row kept';
select count(*) = 1 from t where c IS NULL;

select 'delete all null valued rows';
delete from t where c IS NULL;
select 'expects no null valued row kept';
select count(*) = 0 from t where c IS NULL;
select 'expects 1 null valued row kept';
select count(*) = 1 from t where c IS NOT NULL;

select 'deleted unconditionally (with const true)';
delete from t where 1 = 1;
select count(*) = 0 from t;

insert into t values (1),(2),(NULL);
select 'deleted with nullary expr now()';
delete from t where now();
select count(*) = 0 from t;

drop table t all;

DROP DATABASE db1;