Skip to content

Commit

Permalink
feat(query): support json_object_delete and json_object_pick func…
Browse files Browse the repository at this point in the history
…tion (#16682)

* feat(query): support `json_object_delete`, `json_object_pick` function

* add tests

* fix
  • Loading branch information
b41sh authored Oct 25, 2024
1 parent ab4bcd5 commit 78629a5
Show file tree
Hide file tree
Showing 9 changed files with 527 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
deltalake = { git = "https://github.com/delta-io/delta-rs", rev = "57795da" }
ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" }
jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "672e423" }
jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "ada713c" }
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
orc-rust = { git = "https://github.com/datafuse-extras/datafusion-orc", rev = "03372b97" }
recursive = { git = "https://github.com/datafuse-extras/recursive.git", rev = "6af35a1" }
Expand Down
128 changes: 115 additions & 13 deletions src/query/expression/src/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::HashMap;
use std::ops::Not;

Expand All @@ -35,6 +36,7 @@ use crate::types::array::ArrayColumn;
use crate::types::boolean::BooleanDomain;
use crate::types::nullable::NullableColumn;
use crate::types::nullable::NullableDomain;
use crate::types::string::StringColumnBuilder;
use crate::types::ArgType;
use crate::types::ArrayType;
use crate::types::BooleanType;
Expand Down Expand Up @@ -507,7 +509,7 @@ impl<'a> Evaluator<'a> {
},
(DataType::Variant, DataType::Array(inner_dest_ty)) => {
let empty_vec = vec![];
let temp_array: jsonb::Value;
let mut temp_array: jsonb::Value;
match value {
Value::Scalar(Scalar::Variant(x)) => {
let array = if validity.as_ref().map(|v| v.get_bit(0)).unwrap_or(true) {
Expand All @@ -520,22 +522,16 @@ impl<'a> Evaluator<'a> {
} else {
&empty_vec
};

let column = VariantType::create_column_from_variants(array.as_slice());

let validity = validity.map(|validity| {
Bitmap::new_constant(
validity.unset_bits() != validity.len(),
column.len(),
)
});

let validity = None;
let column = Column::Variant(VariantType::create_column_from_variants(
array.as_slice(),
));
let new_array = self
.run_cast(
span,
&DataType::Variant,
inner_dest_ty,
Value::Column(Column::Variant(column)),
Value::Column(column),
validity,
options,
)?
Expand All @@ -547,7 +543,6 @@ impl<'a> Evaluator<'a> {
let mut array_builder =
ArrayType::<VariantType>::create_builder(col.len(), &[]);

let mut temp_array: jsonb::Value;
for (idx, x) in col.iter().enumerate() {
let array = if validity.as_ref().map(|v| v.get_bit(idx)).unwrap_or(true)
{
Expand Down Expand Up @@ -597,6 +592,113 @@ impl<'a> Evaluator<'a> {
other => unreachable!("source: {}", other),
}
}
(DataType::Variant, DataType::Map(box DataType::Tuple(fields_dest_ty)))
if fields_dest_ty.len() == 2 && fields_dest_ty[0] == DataType::String =>
{
let empty_obj = BTreeMap::new();
let mut temp_obj: jsonb::Value;
match value {
Value::Scalar(Scalar::Variant(x)) => {
let obj = if validity.as_ref().map(|v| v.get_bit(0)).unwrap_or(true) {
temp_obj = jsonb::from_slice(&x).map_err(|e| {
ErrorCode::BadArguments(format!(
"Expect to be valid json, got err: {e:?}"
))
})?;
temp_obj.as_object().unwrap_or(&empty_obj)
} else {
&empty_obj
};
let validity = None;

let mut key_builder = StringColumnBuilder::with_capacity(obj.len(), 0);
for k in obj.keys() {
key_builder.put_str(k.as_str());
key_builder.commit_row();
}
let key_column = Column::String(key_builder.build());

let values: Vec<_> = obj.values().cloned().collect();
let value_column = Column::Variant(
VariantType::create_column_from_variants(values.as_slice()),
);

let new_value_column = self
.run_cast(
span,
&DataType::Variant,
&fields_dest_ty[1],
Value::Column(value_column),
validity,
options,
)?
.into_column()
.unwrap();
Ok(Value::Scalar(Scalar::Map(Column::Tuple(vec![
key_column,
new_value_column,
]))))
}
Value::Column(Column::Variant(col)) => {
let mut key_builder = StringColumnBuilder::with_capacity(0, 0);
let mut value_builder =
ArrayType::<VariantType>::create_builder(col.len(), &[]);

for (idx, x) in col.iter().enumerate() {
let obj = if validity.as_ref().map(|v| v.get_bit(idx)).unwrap_or(true) {
temp_obj = jsonb::from_slice(x).map_err(|e| {
ErrorCode::BadArguments(format!(
"Expect to be valid json, got err: {e:?}"
))
})?;
temp_obj.as_object().unwrap_or(&empty_obj)
} else {
&empty_obj
};

for (k, v) in obj.iter() {
key_builder.put_str(k.as_str());
key_builder.commit_row();
v.write_to_vec(&mut value_builder.builder.data);
value_builder.builder.commit_row();
}
value_builder.commit_row();
}
let key_column = Column::String(key_builder.build());

let value_column = value_builder.build();
let validity = validity.map(|validity| {
let mut inner_validity = MutableBitmap::with_capacity(col.len());
for (index, offsets) in value_column.offsets.windows(2).enumerate() {
inner_validity.extend_constant(
(offsets[1] - offsets[0]) as usize,
validity.get_bit(index),
);
}
inner_validity.into()
});

let new_value_column = self
.run_cast(
span,
&DataType::Variant,
&fields_dest_ty[1],
Value::Column(Column::Variant(value_column.values)),
validity,
options,
)?
.into_column()
.unwrap();

let kv_column = Column::Tuple(vec![key_column, new_value_column]);
Ok(Value::Column(Column::Map(Box::new(ArrayColumn {
values: kv_column,
offsets: col.offsets,
}))))
}
other => unreachable!("source: {}", other),
}
}
(DataType::EmptyMap, DataType::Map(inner_dest_ty)) => match value {
Value::Scalar(Scalar::EmptyMap) => {
let new_column = ColumnBuilder::with_capacity(inner_dest_ty, 0).build();
Expand Down
145 changes: 145 additions & 0 deletions src/query/functions/src/scalars/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::borrow::Cow;
use std::collections::BTreeSet;
use std::collections::HashSet;
use std::iter::once;
use std::sync::Arc;
Expand Down Expand Up @@ -1679,6 +1680,72 @@ pub fn register(registry: &mut FunctionRegistry) {
},
}))
});

registry.register_function_factory("json_object_pick", |_, args_type| {
if args_type.len() < 2 {
return None;
}
if args_type[0].remove_nullable() != DataType::Variant && args_type[0] != DataType::Null {
return None;
}
for arg_type in args_type.iter().skip(1) {
if arg_type.remove_nullable() != DataType::String && *arg_type != DataType::Null {
return None;
}
}
let is_nullable = args_type[0].is_nullable_or_null();
let return_type = if is_nullable {
DataType::Nullable(Box::new(DataType::Variant))
} else {
DataType::Variant
};
Some(Arc::new(Function {
signature: FunctionSignature {
name: "json_object_pick".to_string(),
args_type: args_type.to_vec(),
return_type,
},
eval: FunctionEval::Scalar {
calc_domain: Box::new(|_, _| FunctionDomain::MayThrow),
eval: Box::new(move |args, ctx| {
json_object_pick_or_delete_fn(args, ctx, true, is_nullable)
}),
},
}))
});

registry.register_function_factory("json_object_delete", |_, args_type| {
if args_type.len() < 2 {
return None;
}
if args_type[0].remove_nullable() != DataType::Variant && args_type[0] != DataType::Null {
return None;
}
for arg_type in args_type.iter().skip(1) {
if arg_type.remove_nullable() != DataType::String && *arg_type != DataType::Null {
return None;
}
}
let is_nullable = args_type[0].is_nullable_or_null();
let return_type = if is_nullable {
DataType::Nullable(Box::new(DataType::Variant))
} else {
DataType::Variant
};
Some(Arc::new(Function {
signature: FunctionSignature {
name: "json_object_delete".to_string(),
args_type: args_type.to_vec(),
return_type,
},
eval: FunctionEval::Scalar {
calc_domain: Box::new(|_, _| FunctionDomain::MayThrow),
eval: Box::new(move |args, ctx| {
json_object_pick_or_delete_fn(args, ctx, false, is_nullable)
}),
},
}))
});
}

fn json_array_fn(args: &[ValueRef<AnyType>], ctx: &mut EvalContext) -> Value<AnyType> {
Expand Down Expand Up @@ -2149,6 +2216,84 @@ fn json_object_insert_fn(
}
}

fn json_object_pick_or_delete_fn(
args: &[ValueRef<AnyType>],
ctx: &mut EvalContext,
is_pick: bool,
is_nullable: bool,
) -> Value<AnyType> {
let len_opt = args.iter().find_map(|arg| match arg {
ValueRef::Column(col) => Some(col.len()),
_ => None,
});
let len = len_opt.unwrap_or(1);
let mut keys = BTreeSet::new();
let mut validity = MutableBitmap::with_capacity(len);
let mut builder = BinaryColumnBuilder::with_capacity(len, len * 50);
for idx in 0..len {
let value = match &args[0] {
ValueRef::Scalar(scalar) => scalar.clone(),
ValueRef::Column(col) => unsafe { col.index_unchecked(idx) },
};
if value == ScalarRef::Null {
builder.commit_row();
validity.push(false);
continue;
}
let value = value.as_variant().unwrap();
if !is_object(value) {
ctx.set_error(builder.len(), "Invalid json object");
builder.commit_row();
validity.push(false);
continue;
}
keys.clear();
for arg in args.iter().skip(1) {
let key = match &arg {
ValueRef::Scalar(scalar) => scalar.clone(),
ValueRef::Column(col) => unsafe { col.index_unchecked(idx) },
};
if key == ScalarRef::Null {
continue;
}
let key = key.as_string().unwrap();
keys.insert(*key);
}
let res = if is_pick {
jsonb::object_pick(value, &keys, &mut builder.data)
} else {
jsonb::object_delete(value, &keys, &mut builder.data)
};
if let Err(err) = res {
validity.push(false);
ctx.set_error(builder.len(), err.to_string());
} else {
validity.push(true);
}
builder.commit_row();
}
if is_nullable {
let validity: Bitmap = validity.into();
match len_opt {
Some(_) => {
Value::Column(Column::Variant(builder.build())).wrap_nullable(Some(validity))
}
None => {
if !validity.get_bit(0) {
Value::Scalar(Scalar::Null)
} else {
Value::Scalar(Scalar::Variant(builder.build_scalar()))
}
}
}
} else {
match len_opt {
Some(_) => Value::Column(Column::Variant(builder.build())),
None => Value::Scalar(Scalar::Variant(builder.build_scalar())),
}
}
}

// Extract string for string type, other types convert to JSON string.
fn cast_to_string(v: &[u8]) -> String {
match to_str(v) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2313,9 +2313,11 @@ Functions overloads:
0 json_extract_path_text(String, String) :: String NULL
1 json_extract_path_text(String NULL, String NULL) :: String NULL
0 json_object FACTORY
0 json_object_delete FACTORY
0 json_object_insert FACTORY
0 json_object_keep_null FACTORY
0 json_object_keys(Variant NULL) :: Variant NULL
0 json_object_pick FACTORY
0 json_path_exists FACTORY
0 json_path_match FACTORY
0 json_path_query FACTORY
Expand Down
Loading

0 comments on commit 78629a5

Please sign in to comment.