Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): Array lambda function support outer scope columns #15957

Merged
merged 4 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 151 additions & 105 deletions src/query/expression/src/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::FunctionDomain;
use crate::FunctionEval;
use crate::FunctionRegistry;
use crate::RemoteExpr;
use crate::ScalarRef;

#[derive(Default)]
pub struct EvaluateOptions<'a> {
Expand Down Expand Up @@ -235,6 +236,7 @@ impl<'a> Evaluator<'a> {
return_type,
..
} => {
let data_types = args.iter().map(|arg| arg.data_type().clone()).collect();
let args = args
.iter()
.map(|expr| self.partial_run(expr, validity.clone(), options))
Expand All @@ -248,7 +250,7 @@ impl<'a> Evaluator<'a> {
.all_equal()
);

self.run_lambda(name, args, lambda_expr, return_type)
self.run_lambda(name, args, data_types, lambda_expr, return_type)
}
};

Expand Down Expand Up @@ -1034,21 +1036,26 @@ impl<'a> Evaluator<'a> {
unreachable!("expr is not a set returning function: {expr}")
}

fn run_array_reduce(&self, column: &Column, expr: &Expr) -> Result<Scalar> {
fn run_array_reduce(
&self,
col_entries: Vec<BlockEntry>,
column: &Column,
expr: &Expr,
) -> Result<Scalar> {
let col_type = column.data_type();
if col_type.is_null() || column.len() < 1 {
return Ok(Scalar::Null);
}
let mut arg0 = column.index(0).unwrap().to_owned();
let mut arg0 = unsafe { column.index_unchecked(0).to_owned() };
let mut eval_options = EvaluateOptions::default();
for i in 1..column.len() {
let arg1 = column.index(i).unwrap().to_owned();
let entries = {
vec![
BlockEntry::new(col_type.clone(), Value::Scalar(arg0.clone())),
BlockEntry::new(col_type.clone(), Value::Scalar(arg1)),
]
};
let arg1 = unsafe { column.index_unchecked(i).to_owned() };
let mut entries = col_entries.clone();
entries.push(BlockEntry::new(
col_type.clone(),
Value::Scalar(arg0.clone()),
));
entries.push(BlockEntry::new(col_type.clone(), Value::Scalar(arg1)));
let block = DataBlock::new(entries, 1);
let evaluator = Evaluator::new(&block, self.func_ctx, self.fn_registry);
let result = evaluator.run(expr)?;
Expand All @@ -1072,124 +1079,162 @@ impl<'a> Evaluator<'a> {
&self,
func_name: &str,
args: Vec<Value<AnyType>>,
data_types: Vec<DataType>,
lambda_expr: &RemoteExpr,
return_type: &DataType,
) -> Result<Value<AnyType>> {
let expr = lambda_expr.as_expr(self.fn_registry);
// array_reduce differs
if func_name == "array_reduce" {
match &args[0] {
Value::Scalar(s) => match s {
Scalar::Array(c) => {
let result = self.run_array_reduce(c, &expr)?;
return Ok(Value::Scalar(result));
let len = args.iter().find_map(|arg| match arg {
Value::Column(col) => Some(col.len()),
_ => None,
});

let lambda_idx = args.len() - 1;
let mut builder = ColumnBuilder::with_capacity(return_type, len.unwrap_or(1));
for idx in 0..(len.unwrap_or(1)) {
let mut entries = Vec::with_capacity(args.len() - 1);
for i in 0..lambda_idx {
let scalar = unsafe { args[i].index_unchecked(idx) };
let entry =
BlockEntry::new(data_types[i].clone(), Value::Scalar(scalar.to_owned()));
entries.push(entry);
}
let scalar = unsafe { args[lambda_idx].index_unchecked(idx) };
match scalar {
ScalarRef::Array(col) => {
let result = self.run_array_reduce(entries, &col, &expr)?;
builder.push(result.as_ref());
}
_ => unreachable!(),
},
Value::Column(c) => {
let mut builder = ColumnBuilder::with_capacity(return_type, c.len());
for val in c.iter() {
match &val.to_owned() {
Scalar::Array(c) => {
let result = self.run_array_reduce(c, &expr)?;
let item = result.as_ref();
builder.push(item);
}
Scalar::Null => {
builder.push_default();
}
_ => unreachable!(),
}
ScalarRef::Null => {
builder.push_default();
}
return Ok(Value::Column(builder.build()));
_ => unreachable!(),
}
}
let res = match len {
Some(_) => Value::Column(builder.build()),
None => Value::Scalar(builder.build_scalar()),
};
return Ok(res);
}
// TODO: Support multi args
match &args[0] {
Value::Scalar(s) => match s {
Scalar::Array(c) => {
let entry = BlockEntry::new(c.data_type(), Value::Column(c.clone()));
let block = DataBlock::new(vec![entry], c.len());

// If there is only one column, we can extract the inner column and execute on all rows at once
if args.len() == 1 && matches!(args[0], Value::Column(_)) {
let (inner_col, inner_ty, offsets, validity) = match &args[0] {
Value::Column(Column::Array(box array_col)) => (
array_col.values.clone(),
array_col.values.data_type(),
array_col.offsets.clone(),
None,
),
Value::Column(Column::Nullable(box nullable_col)) => match &nullable_col.column {
Column::Array(box array_col) => (
array_col.values.clone(),
array_col.values.data_type(),
array_col.offsets.clone(),
Some(nullable_col.validity.clone()),
),
_ => unreachable!(),
},
_ => unreachable!(),
};
let entry = BlockEntry::new(inner_ty, Value::Column(inner_col.clone()));
let block = DataBlock::new(vec![entry], inner_col.len());

let evaluator = Evaluator::new(&block, self.func_ctx, self.fn_registry);
let result = evaluator.run(&expr)?;
let result_col = result.convert_to_full_column(expr.data_type(), inner_col.len());

let array_col = if func_name == "array_filter" {
let result_col = result_col.remove_nullable();
let bitmap = result_col.as_boolean().unwrap();
let filtered_inner_col = inner_col.filter(bitmap);
// generate new offsets after filter.
let mut new_offset = 0;
let mut filtered_offsets = Vec::with_capacity(offsets.len());
filtered_offsets.push(0);
for offset in offsets.windows(2) {
let off = offset[0] as usize;
let len = (offset[1] - offset[0]) as usize;
let unset_count = bitmap.null_count_range(off, len);
new_offset += (len - unset_count) as u64;
filtered_offsets.push(new_offset);
}

Column::Array(Box::new(ArrayColumn {
values: filtered_inner_col,
offsets: filtered_offsets.into(),
}))
} else {
Column::Array(Box::new(ArrayColumn {
values: result_col,
offsets,
}))
};
let col = match validity {
Some(validity) => Value::Column(Column::Nullable(Box::new(NullableColumn {
column: array_col,
validity,
}))),
None => Value::Column(array_col),
};
return Ok(col);
}

let len = args.iter().find_map(|arg| match arg {
Value::Column(col) => Some(col.len()),
_ => None,
});
let lambda_idx = args.len() - 1;
let mut builder = ColumnBuilder::with_capacity(return_type, len.unwrap_or(1));
for idx in 0..(len.unwrap_or(1)) {
let mut entries = Vec::with_capacity(args.len());
for i in 0..lambda_idx {
let scalar = unsafe { args[i].index_unchecked(idx) };
let entry =
BlockEntry::new(data_types[i].clone(), Value::Scalar(scalar.to_owned()));
entries.push(entry);
}
let scalar = unsafe { args[lambda_idx].index_unchecked(idx) };
match scalar {
ScalarRef::Array(col) => {
// add lambda array scalar value as a column
let col_len = col.len();
let entry =
BlockEntry::new(col.data_type().clone(), Value::Column(col.clone()));
entries.push(entry);
let block = DataBlock::new(entries, col_len);

let evaluator = Evaluator::new(&block, self.func_ctx, self.fn_registry);
let result = evaluator.run(&expr)?;
let result_col = result.convert_to_full_column(expr.data_type(), c.len());
let result_col = result.convert_to_full_column(expr.data_type(), col_len);

let val = if func_name == "array_filter" {
let result_col = result_col.remove_nullable();
let bitmap = result_col.as_boolean().unwrap();
let filtered_inner_col = c.filter(bitmap);
Value::Scalar(Scalar::Array(filtered_inner_col))

let src_entry = block.get_by_offset(lambda_idx);
let src_col = src_entry.value.as_column().unwrap();
let filtered_col = src_col.filter(bitmap);
Scalar::Array(filtered_col)
} else {
Value::Scalar(Scalar::Array(result_col))
Scalar::Array(result_col)
};
Ok(val)
builder.push(val.as_ref());
}
ScalarRef::Null => {
builder.push_default();
}
_ => unreachable!(),
},
Value::Column(c) => {
let (inner_col, inner_ty, offsets, validity) = match c {
Column::Array(box array_col) => (
array_col.values.clone(),
array_col.values.data_type(),
array_col.offsets.clone(),
None,
),
Column::Nullable(box nullable_col) => match &nullable_col.column {
Column::Array(box array_col) => (
array_col.values.clone(),
array_col.values.data_type(),
array_col.offsets.clone(),
Some(nullable_col.validity.clone()),
),
_ => unreachable!(),
},
_ => unreachable!(),
};
let entry = BlockEntry::new(inner_ty, Value::Column(inner_col.clone()));
let block = DataBlock::new(vec![entry], inner_col.len());

let evaluator = Evaluator::new(&block, self.func_ctx, self.fn_registry);
let result = evaluator.run(&expr)?;
let result_col = result.convert_to_full_column(expr.data_type(), inner_col.len());

let array_col = if func_name == "array_filter" {
let result_col = result_col.remove_nullable();
let bitmap = result_col.as_boolean().unwrap();
let filtered_inner_col = inner_col.filter(bitmap);
// generate new offsets after filter.
let mut new_offset = 0;
let mut filtered_offsets = Vec::with_capacity(offsets.len());
filtered_offsets.push(0);
for offset in offsets.windows(2) {
let off = offset[0] as usize;
let len = (offset[1] - offset[0]) as usize;
let unset_count = bitmap.null_count_range(off, len);
new_offset += (len - unset_count) as u64;
filtered_offsets.push(new_offset);
}

Column::Array(Box::new(ArrayColumn {
values: filtered_inner_col,
offsets: filtered_offsets.into(),
}))
} else {
Column::Array(Box::new(ArrayColumn {
values: result_col,
offsets,
}))
};
let col = match validity {
Some(validity) => Value::Column(Column::Nullable(Box::new(NullableColumn {
column: array_col,
validity,
}))),
None => Value::Column(array_col),
};
Ok(col)
}
}
let res = match len {
Some(_) => Value::Column(builder.build()),
None => Value::Scalar(builder.build_scalar()),
};
Ok(res)
}

pub fn get_children(
Expand Down Expand Up @@ -1354,6 +1399,7 @@ impl<'a> Evaluator<'a> {
return_type,
..
} => {
let data_types = args.iter().map(|arg| arg.data_type().clone()).collect();
let args = args
.iter()
.map(|expr| self.partial_run(expr, None, &mut EvaluateOptions::default()))
Expand All @@ -1368,7 +1414,7 @@ impl<'a> Evaluator<'a> {
);

Ok((
self.run_lambda(name, args, lambda_expr, return_type)?,
self.run_lambda(name, args, data_types, lambda_expr, return_type)?,
return_type.clone(),
))
}
Expand Down
7 changes: 4 additions & 3 deletions src/query/expression/src/filter/selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,7 @@ impl<'a> Selector<'a> {
);
let mut eval_options = EvaluateOptions::new(selection);

let data_types = args.iter().map(|arg| arg.data_type().clone()).collect();
let args = args
.iter()
.map(|expr| self.evaluator.partial_run(expr, None, &mut eval_options))
Expand All @@ -596,9 +597,9 @@ impl<'a> Selector<'a> {
})
.all_equal()
);
let result = self
.evaluator
.run_lambda(name, args, lambda_expr, return_type)?;
let result =
self.evaluator
.run_lambda(name, args, data_types, lambda_expr, return_type)?;
(result, return_type.clone())
}
_ => {
Expand Down
12 changes: 4 additions & 8 deletions src/query/sql/src/planner/expression_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_expression::infer_schema_type;
use databend_common_expression::infer_table_schema;
use databend_common_expression::type_check::check_cast;
use databend_common_expression::type_check::check_function;
Expand Down Expand Up @@ -348,28 +347,25 @@ pub fn parse_computed_expr_to_string(

pub fn parse_lambda_expr(
ctx: Arc<dyn TableContext>,
mut bind_context: BindContext,
columns: &[(String, DataType)],
ast: &AExpr,
) -> Result<Box<(ScalarExpr, DataType)>> {
let settings = Settings::create(Tenant::new_literal("dummy"));
let mut bind_context = BindContext::new();
let mut metadata = Metadata::default();

let metadata = Metadata::default();
bind_context.set_expr_context(ExprContext::InLambdaFunction);

let column_len = bind_context.all_column_bindings().len();
for (idx, column) in columns.iter().enumerate() {
bind_context.add_column_binding(
ColumnBindingBuilder::new(
column.0.clone(),
idx,
column_len + idx,
Box::new(column.1.clone()),
Visibility::Visible,
)
.build(),
);

let table_type = infer_schema_type(&column.1)?;
metadata.add_base_table_column(column.0.to_string(), table_type, 0, None, None, None, None);
}

let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
Expand Down
Loading
Loading