Skip to content

Commit

Permalink
support array access elements
Browse files Browse the repository at this point in the history
  • Loading branch information
b41sh committed May 9, 2022
1 parent d6253aa commit ab22b1b
Show file tree
Hide file tree
Showing 11 changed files with 492 additions and 0 deletions.
7 changes: 7 additions & 0 deletions common/datavalues/src/array_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ impl ArrayValue {
pub fn new(values: Vec<DataValue>) -> Self {
Self { values }
}

pub fn inner_type(&self) -> Option<DataTypeImpl> {
if let Some(value) = self.values.get(0) {
return Some(value.max_data_type());
}
None
}
}

impl From<DataValue> for ArrayValue {
Expand Down
16 changes: 16 additions & 0 deletions common/datavalues/src/columns/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,22 @@ impl SeriesFrom<Vec<Option<VariantValue>>, Vec<Option<VariantValue>>> for Series
}
}

impl SeriesFrom<Vec<ArrayValue>, Vec<ArrayValue>> for Series {
fn from_data(vals: Vec<ArrayValue>) -> ColumnRef {
let inner_data_type = match vals.iter().find(|&x| x.inner_type().is_some()) {
Some(array_value) => array_value.inner_type().unwrap(),
None => Int64Type::new_impl(),
};
let mut builder = MutableArrayColumn::with_capacity_meta(vals.len(), ColumnMeta::Array {
data_type: inner_data_type,
});
for val in vals {
builder.append_value(val);
}
builder.finish().arc()
}
}

macro_rules! impl_from_option_iterator {
([], $( { $S: ident} ),*) => {
$(
Expand Down
1 change: 1 addition & 0 deletions common/datavalues/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ macro_rules! for_all_scalar_types {
{ f64 },
{ bool },
{ Vu8 },
{ ArrayValue },
{ VariantValue }
}
};
Expand Down
215 changes: 215 additions & 0 deletions common/functions/src/scalars/semi_structureds/array_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;

use common_datavalues::prelude::*;
use common_datavalues::with_match_scalar_types_error;
use common_exception::ErrorCode;
use common_exception::Result;
use sqlparser::ast::Value;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
use sqlparser::tokenizer::Tokenizer;

use crate::scalars::Function;
use crate::scalars::FunctionContext;
use crate::scalars::FunctionDescription;
use crate::scalars::FunctionFeatures;

pub type ArrayGetFunction = ArrayGetFunctionImpl<false>;

pub type ArrayGetPathFunction = ArrayGetFunctionImpl<true>;

#[derive(Clone)]
pub struct ArrayGetFunctionImpl<const BY_PATH: bool> {
array_type: ArrayType,
display_name: String,
}

impl<const BY_PATH: bool> ArrayGetFunctionImpl<BY_PATH> {
pub fn try_create(display_name: &str, args: &[&DataTypeImpl]) -> Result<Box<dyn Function>> {
let data_type = args[0];
let path_type = args[1];

if !data_type.data_type_id().is_array()
|| (BY_PATH && !path_type.data_type_id().is_string())
|| (!BY_PATH && !path_type.data_type_id().is_integer())
{
return Err(ErrorCode::IllegalDataType(format!(
"Invalid argument types for function '{}': ({:?}, {:?})",
display_name.to_uppercase(),
data_type.data_type_id(),
path_type.data_type_id()
)));
}

let array_type: ArrayType = data_type.clone().try_into()?;
Ok(Box::new(ArrayGetFunctionImpl::<BY_PATH> {
array_type,
display_name: display_name.to_string(),
}))
}

pub fn desc() -> FunctionDescription {
FunctionDescription::creator(Box::new(Self::try_create))
.features(FunctionFeatures::default().deterministic().num_arguments(2))
}
}

impl<const BY_PATH: bool> Function for ArrayGetFunctionImpl<BY_PATH> {
fn name(&self) -> &str {
&*self.display_name
}

fn return_type(&self) -> DataTypeImpl {
// TODO(b41sh): Support multi-dimensional array access
NullableType::new_impl(self.array_type.inner_type().clone())
}

fn eval(
&self,
_func_ctx: FunctionContext,
columns: &ColumnsWithField,
input_rows: usize,
) -> Result<ColumnRef> {
let path_indexes = if BY_PATH {
parse_path_indexes(columns[1].column())?
} else {
build_path_indexes(columns[1].column())?
};

let array_column: &ArrayColumn = if columns[0].column().is_const() {
let const_column: &ConstColumn = Series::check_get(columns[0].column())?;
Series::check_get(const_column.inner())?
} else {
Series::check_get(columns[0].column())?
};

let inner_type = self.array_type.inner_type().data_type_id();
with_match_scalar_types_error!(inner_type.to_physical_type(), |$T| {
let inner_column: &<$T as Scalar>::ColumnType = Series::check_get(array_column.values())?;
let mut builder = NullableColumnBuilder::<$T>::with_capacity(input_rows);

for path_index in path_indexes.iter() {
// TODO(b41sh): Support multi-dimensional array access
if path_index.is_empty() || path_index.len() > 1 {
return Err(ErrorCode::BadArguments(format!(
"Array column don't support accessed by index: {:?}",
path_index
)));
}
let index = path_index[0];
let mut offset = 0;
for row in 0..array_column.len() {
let len = array_column.size_at_index(row);
if index >= len {
return Err(ErrorCode::BadArguments(format!(
"Index out of array column bounds: the len is {} but the index is {}",
len, index
)));
} else {
builder.append(inner_column.get_data(offset + index), true);
}
offset += len;
}
}
Ok(builder.build(input_rows))
})
}
}

impl<const BY_PATH: bool> fmt::Display for ArrayGetFunctionImpl<BY_PATH> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.display_name.to_uppercase())
}
}

fn parse_path_indexes(column: &ColumnRef) -> Result<Vec<Vec<usize>>> {
let column: &StringColumn = if column.is_const() {
let const_column: &ConstColumn = Series::check_get(column)?;
Series::check_get(const_column.inner())?
} else {
Series::check_get(column)?
};

let dialect = &GenericDialect {};
let mut path_indexes: Vec<Vec<usize>> = vec![];
for v in column.iter() {
if v.is_empty() {
return Err(ErrorCode::SyntaxException(
"Bad compound object's field path name: '' in GET_PATH",
));
}
let definition = std::str::from_utf8(v).unwrap();
let mut tokenizer = Tokenizer::new(dialect, definition);
match tokenizer.tokenize() {
Ok((tokens, position_map)) => {
match Parser::new(tokens, position_map, dialect).parse_map_keys() {
Ok(values) => {
let mut path_index: Vec<usize> = Vec::with_capacity(values.len());
for value in values.iter() {
if let Value::Number(val, _) = value {
let data_value = DataValue::try_from_literal(val, None).unwrap();
match data_value.as_u64() {
Ok(v) => path_index.push(v as usize),
Err(_) => return Err(ErrorCode::IllegalDataType(format!(
"Array column only support accessed by index, but got {}",
val
))),
}
} else {
return Err(ErrorCode::IllegalDataType(format!(
"Array column only support accessed by index, but got {}",
value
)));
}
}
path_indexes.push(path_index);
}
Err(parse_error) => return Err(ErrorCode::from(parse_error)),
}
}
Err(tokenize_error) => {
return Err(ErrorCode::SyntaxException(format!(
"Can not tokenize definition: {}, Error: {:?}",
definition, tokenize_error
)))
}
}
}
Ok(path_indexes)
}

fn build_path_indexes(column: &ColumnRef) -> Result<Vec<Vec<usize>>> {
if column.is_const() {
let const_column: &ConstColumn = Series::check_get(column)?;
return build_path_indexes(const_column.inner());
}

let mut path_indexes: Vec<Vec<usize>> = vec![];
for i in 0..column.len() {
let val = column.get(i);
match val.as_u64() {
Ok(index) => path_indexes.push(vec![index as usize]),
Err(_) => {
return Err(ErrorCode::IllegalDataType(format!(
"Array column only support accessed by index, but got {}",
val
)))
}
}
}
Ok(path_indexes)
}
10 changes: 10 additions & 0 deletions common/functions/src/scalars/semi_structureds/get.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
use sqlparser::tokenizer::Tokenizer;

use crate::scalars::semi_structureds::array_get::ArrayGetFunction;
use crate::scalars::semi_structureds::array_get::ArrayGetPathFunction;
use crate::scalars::Function;
use crate::scalars::FunctionContext;
use crate::scalars::FunctionDescription;
Expand All @@ -43,6 +45,14 @@ impl<const BY_PATH: bool, const IGNORE_CASE: bool> GetFunctionImpl<BY_PATH, IGNO
let data_type = args[0];
let path_type = args[1];

if data_type.data_type_id().is_array() {
if BY_PATH {
return ArrayGetPathFunction::try_create(display_name, args);
} else {
return ArrayGetFunction::try_create(display_name, args);
}
}

if (IGNORE_CASE
&& (!data_type.data_type_id().is_variant_or_object()
|| !path_type.data_type_id().is_string()))
Expand Down
3 changes: 3 additions & 0 deletions common/functions/src/scalars/semi_structureds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod array_get;
mod check_json;
mod get;
mod json_extract_path_text;
mod parse_json;
mod semi_structured;

pub use array_get::ArrayGetFunction;
pub use array_get::ArrayGetPathFunction;
pub use check_json::CheckJsonFunction;
pub use get::GetFunction;
pub use get::GetIgnoreCaseFunction;
Expand Down
Loading

0 comments on commit ab22b1b

Please sign in to comment.