Skip to content

Commit

Permalink
Merge pull request #4606 from kevinw66/check_json
Browse files Browse the repository at this point in the history
ISSUE-4558: Add `check_json` function
  • Loading branch information
BohuTANG authored Mar 30, 2022
2 parents 26b1914 + f50bc04 commit d1ce6e8
Show file tree
Hide file tree
Showing 8 changed files with 434 additions and 2 deletions.
136 changes: 136 additions & 0 deletions common/functions/src/scalars/semi_structureds/check_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;
use std::sync::Arc;

use common_arrow::arrow::bitmap::Bitmap;
use common_datavalues::prelude::*;
use common_exception::ErrorCode;
use common_exception::Result;
use serde_json::Value as JsonValue;

use crate::scalars::Function;
use crate::scalars::FunctionDescription;
use crate::scalars::FunctionFeatures;

#[derive(Clone)]
pub struct CheckJsonFunction {
display_name: String,
}

impl CheckJsonFunction {
pub fn try_create(display_name: &str) -> Result<Box<dyn Function>> {
Ok(Box::new(CheckJsonFunction {
display_name: display_name.to_string(),
}))
}

pub fn desc() -> FunctionDescription {
FunctionDescription::creator(Box::new(Self::try_create)).features(
FunctionFeatures::default()
.deterministic()
.monotonicity()
.num_arguments(1),
)
}
}

impl Function for CheckJsonFunction {
fn name(&self) -> &str {
&*self.display_name
}

fn return_type(&self, args: &[&DataTypePtr]) -> Result<DataTypePtr> {
if args[0].data_type_id() == TypeID::Null {
return Ok(NullType::arc());
}

Ok(Arc::new(NullableType::create(StringType::arc())))
}

fn eval(&self, columns: &ColumnsWithField, input_rows: usize) -> Result<ColumnRef> {
let data_type = remove_nullable(columns[0].field().data_type());
let mut column = columns[0].column();
let mut _all_null = false;
let mut source_valids: Option<&Bitmap> = None;
if column.is_nullable() {
(_all_null, source_valids) = column.validity();
let nullable_column: &NullableColumn = Series::check_get(column)?;
column = nullable_column.inner();
}

if data_type.data_type_id() == TypeID::Null {
return NullType::arc().create_constant_column(&DataValue::Null, input_rows);
}

let mut builder = NullableColumnBuilder::<Vu8>::with_capacity(input_rows);

if data_type.data_type_id().is_numeric() || data_type.data_type_id() == TypeID::Boolean {
for _i in 0..input_rows {
builder.append_null()
}
} else if data_type.data_type_id() == TypeID::String {
let c: &StringColumn = Series::check_get(column)?;
for (i, v) in c.iter().enumerate() {
if let Some(source_valids) = source_valids {
if !source_valids.get_bit(i) {
builder.append_null();
continue;
}
}

match std::str::from_utf8(v) {
Ok(v) => match serde_json::from_str::<JsonValue>(v) {
Ok(_v) => builder.append_null(),
Err(e) => builder.append(e.to_string().as_bytes(), true),
},
Err(e) => builder.append(e.to_string().as_bytes(), true),
}
}
} else if data_type.data_type_id() == TypeID::Variant {
let c: &ObjectColumn<JsonValue> = Series::check_get(column)?;
for v in c.iter() {
if let JsonValue::String(s) = v {
match serde_json::from_str::<JsonValue>(s.as_str()) {
Ok(_v) => builder.append_null(),
Err(e) => builder.append(e.to_string().as_bytes(), true),
}
} else {
builder.append_null()
}
}
} else if data_type.data_type_id().is_date_or_date_time() {
for _i in 0..input_rows {
builder.append(
format!("{:?} is not a valid JSON", data_type.data_type_id()).as_bytes(),
true,
)
}
} else {
return Err(ErrorCode::BadDataValueType(format!(
"Invalid argument types for function 'CHECK_JSON': {:?}",
column.data_type_id()
)));
}

Ok(builder.build(input_rows))
}
}

impl fmt::Display for CheckJsonFunction {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.display_name.to_uppercase())
}
}
2 changes: 2 additions & 0 deletions common/functions/src/scalars/semi_structureds/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod check_json;
mod parse_json;
mod semi_structured;

pub use check_json::CheckJsonFunction;
pub use parse_json::ParseJsonFunction;
pub use parse_json::TryParseJsonFunction;
pub use semi_structured::SemiStructuredFunction;
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use super::parse_json::ParseJsonFunction;
use super::parse_json::TryParseJsonFunction;
use crate::scalars::CheckJsonFunction;
use crate::scalars::FunctionFactory;

pub struct SemiStructuredFunction;
Expand All @@ -22,5 +23,6 @@ impl SemiStructuredFunction {
pub fn register(factory: &mut FunctionFactory) {
factory.register("parse_json", ParseJsonFunction::desc());
factory.register("try_parse_json", TryParseJsonFunction::desc());
factory.register("check_json", CheckJsonFunction::desc());
}
}
207 changes: 207 additions & 0 deletions common/functions/tests/it/scalars/semi_structureds/check_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::Result;
use common_functions::scalars::CheckJsonFunction;
use serde_json::json;

use crate::scalars::scalar_function2_test::test_scalar_functions;
use crate::scalars::scalar_function2_test::ScalarFunctionTest;

#[test]
fn test_check_json_function() -> Result<()> {
use common_datavalues::prelude::*;

let tests = vec![
ScalarFunctionTest {
name: "check_json_bool",
columns: vec![Series::from_data(vec![true, false])],
expect: Series::from_data(vec![None::<&str>, None::<&str>]),
error: "",
},
ScalarFunctionTest {
name: "check_json_int",
columns: vec![Series::from_data(vec![1_i16, -1, 100])],
expect: Series::from_data(vec![None::<&str>, None::<&str>, None::<&str>]),
error: "",
},
ScalarFunctionTest {
name: "check_json_float",
columns: vec![Series::from_data(vec![12.34_f64, 56.79, 0.12345679])],
expect: Series::from_data(vec![None::<&str>, None::<&str>, None::<&str>]),
error: "",
},
ScalarFunctionTest {
name: "check_json_string",
columns: vec![Series::from_data(vec!["\"abcd\"", "true", "123"])],
expect: Series::from_data(vec![None::<&str>, None::<&str>, None::<&str>]),
error: "",
},
ScalarFunctionTest {
name: "check_json_array",
columns: vec![Series::from_data(vec![
"[\"an\", \"array\"]",
"[\"str\", true]",
"[1, 2, 3]",
])],
expect: Series::from_data(vec![None::<&str>, None::<&str>, None::<&str>]),
error: "",
},
ScalarFunctionTest {
name: "check_json_object",
columns: vec![Series::from_data(vec![
"{\"an\": \"object\"}",
"{\"key\": true}",
"{\"k\": 1}",
])],
expect: Series::from_data(vec![None::<&str>, None::<&str>, None::<&str>]),
error: "",
},
ScalarFunctionTest {
name: "check_json_invalid_string",
columns: vec![Series::from_data(vec!["\"abcd\"", "[1,2", "{\"k"])],
expect: Series::from_data(vec![
None::<&str>,
Some("EOF while parsing a list at line 1 column 4"),
Some("EOF while parsing a string at line 1 column 3"),
]),
error: "",
},
ScalarFunctionTest {
name: "check_json_nullable_bool",
columns: vec![Series::from_data(vec![None, Some(true), Some(false)])],
expect: Series::from_data(vec![None::<&str>, None::<&str>, None::<&str>]),
error: "",
},
ScalarFunctionTest {
name: "check_json_nullable_int",
columns: vec![Series::from_data(vec![Some(1_i16), Some(-1), Some(100)])],
expect: Series::from_data(vec![None::<&str>, None::<&str>, None::<&str>]),
error: "",
},
ScalarFunctionTest {
name: "check_json_nullable_float",
columns: vec![Series::from_data(vec![
Some(12.34_f64),
Some(56.79),
Some(0.12345679),
])],
expect: Series::from_data(vec![None::<&str>, None::<&str>, None::<&str>]),
error: "",
},
ScalarFunctionTest {
name: "check_json_nullable_string",
columns: vec![Series::from_data(vec![
None,
Some("\"abcd\""),
Some("true"),
])],
expect: Series::from_data(vec![None::<&str>, None::<&str>, None::<&str>]),
error: "",
},
ScalarFunctionTest {
name: "check_json_nullable_array",
columns: vec![Series::from_data(vec![
Some("[\"an\", \"array\"]"),
Some("[\"str\", true]"),
Some("[1, 2, 3]"),
])],
expect: Series::from_data(vec![None::<&str>, None::<&str>, None::<&str>]),
error: "",
},
ScalarFunctionTest {
name: "check_json_nullable_object",
columns: vec![Series::from_data(vec![
Some("{\"an\": \"object\"}"),
Some("{\"k\": true}"),
Some("{\"k\": 1}"),
])],
expect: Series::from_data(vec![None::<&str>, None::<&str>, None::<&str>]),
error: "",
},
ScalarFunctionTest {
name: "check_json_nullable_invalid_string",
columns: vec![Series::from_data(vec![
Some("\"abcd\""),
Some("[1,2"),
Some("{\"k"),
])],
expect: Series::from_data(vec![
None::<&str>,
Some("EOF while parsing a list at line 1 column 4"),
Some("EOF while parsing a string at line 1 column 3"),
]),
error: "",
},
ScalarFunctionTest {
name: "check_json_array",
columns: vec![Arc::new(ArrayColumn::from_data(
Arc::new(ArrayType::create(StringType::arc())),
vec![0, 1, 3, 6].into(),
Series::from_data(vec!["test", "data", "bend", "hello", "world", "NULL"]),
))],
expect: Series::from_data(vec![None::<&str>, None::<&str>, None::<&str>]),
error: "Invalid argument types for function 'CHECK_JSON': Array",
},
ScalarFunctionTest {
name: "check_json_struct",
columns: vec![Arc::new(StructColumn::from_data(
vec![
Series::from_data(vec![18869i32, 18948i32, 1]),
Series::from_data(vec![1i8, 2i8, 3]),
],
Arc::new(StructType::create(
vec!["date".to_owned(), "integer".to_owned()],
vec![Date32Type::arc(), Int8Type::arc()],
)),
))],
expect: Series::from_data(vec![None::<&str>, None::<&str>, None::<&str>]),
error: "Invalid argument types for function 'CHECK_JSON': Struct",
},
ScalarFunctionTest {
name: "check_json_variant",
columns: vec![Arc::new(JsonColumn::new_from_vec(vec![
json!(null),
json!(true),
json!(false),
json!(123),
json!(12.34),
json!("{\"k\": 1}"),
json!("\"abcd\""),
json!("[1,2"),
]))],
expect: Series::from_data(vec![
None::<&str>,
None::<&str>,
None::<&str>,
None::<&str>,
None::<&str>,
None::<&str>,
None::<&str>,
Some("EOF while parsing a list at line 1 column 4"),
]),
error: "",
},
ScalarFunctionTest {
name: "check_json_null",
columns: vec![Arc::new(NullColumn::new(1))],
expect: Arc::new(NullColumn::new(1)),
error: "",
},
];

test_scalar_functions(CheckJsonFunction::try_create("check_json")?, &tests, false)
}
16 changes: 16 additions & 0 deletions common/functions/tests/it/scalars/semi_structureds/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright 2021 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod check_json;
mod parse_json;
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use common_functions::scalars::TryParseJsonFunction;
use serde_json::json;
use serde_json::Value as JsonValue;

use super::scalar_function2_test::test_scalar_functions;
use super::scalar_function2_test::ScalarFunctionTest;
use crate::scalars::scalar_function2_test::test_scalar_functions;
use crate::scalars::scalar_function2_test::ScalarFunctionTest;

#[test]
fn test_parse_json_function() -> Result<()> {
Expand Down
Loading

0 comments on commit d1ce6e8

Please sign in to comment.