diff --git a/Cargo.lock b/Cargo.lock index cf187e0ab4dd..e89bcdbdbffa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9267,7 +9267,7 @@ dependencies = [ [[package]] name = "jsonb" version = "0.4.3" -source = "git+https://github.com/databendlabs/jsonb?rev=672e423#672e4234758889b8fcb79ba43ac00af8c0aef120" +source = "git+https://github.com/databendlabs/jsonb?rev=ada713c#ada713c16369cd0c0b85f3bbae22183111325ee9" dependencies = [ "byteorder", "fast-float", diff --git a/Cargo.toml b/Cargo.toml index c85f60d3ea20..679998b3c7e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -415,7 +415,7 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226 ] } color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" } ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" } -jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "672e423" } +jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "ada713c" } openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" } opendal_compat = { git = "https://github.com/apache/opendal", rev = "f6e60f6" } orc-rust = { git = "https://github.com/datafuse-extras/datafusion-orc", rev = "03372b97" } diff --git a/src/query/expression/src/evaluator.rs b/src/query/expression/src/evaluator.rs index 0c5ae85032bc..80ec365430af 100644 --- a/src/query/expression/src/evaluator.rs +++ b/src/query/expression/src/evaluator.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::BTreeMap; use std::collections::HashMap; use std::ops::Not; @@ -35,6 +36,7 @@ use crate::types::array::ArrayColumn; use crate::types::boolean::BooleanDomain; use crate::types::nullable::NullableColumn; use crate::types::nullable::NullableDomain; +use crate::types::string::StringColumnBuilder; use crate::types::ArgType; use crate::types::ArrayType; use crate::types::BooleanType; @@ -507,7 +509,7 @@ impl<'a> Evaluator<'a> { }, (DataType::Variant, DataType::Array(inner_dest_ty)) => { let empty_vec = vec![]; - let temp_array: jsonb::Value; + let mut temp_array: jsonb::Value; match value { Value::Scalar(Scalar::Variant(x)) => { let array = if validity.as_ref().map(|v| v.get_bit(0)).unwrap_or(true) { @@ -520,22 +522,16 @@ impl<'a> Evaluator<'a> { } else { &empty_vec }; - - let column = VariantType::create_column_from_variants(array.as_slice()); - - let validity = validity.map(|validity| { - Bitmap::new_constant( - validity.unset_bits() != validity.len(), - column.len(), - ) - }); - + let validity = None; + let column = Column::Variant(VariantType::create_column_from_variants( + array.as_slice(), + )); let new_array = self .run_cast( span, &DataType::Variant, inner_dest_ty, - Value::Column(Column::Variant(column)), + Value::Column(column), validity, options, )? @@ -547,7 +543,6 @@ impl<'a> Evaluator<'a> { let mut array_builder = ArrayType::::create_builder(col.len(), &[]); - let mut temp_array: jsonb::Value; for (idx, x) in col.iter().enumerate() { let array = if validity.as_ref().map(|v| v.get_bit(idx)).unwrap_or(true) { @@ -597,6 +592,113 @@ impl<'a> Evaluator<'a> { other => unreachable!("source: {}", other), } } + (DataType::Variant, DataType::Map(box DataType::Tuple(fields_dest_ty))) + if fields_dest_ty.len() == 2 && fields_dest_ty[0] == DataType::String => + { + let empty_obj = BTreeMap::new(); + let mut temp_obj: jsonb::Value; + match value { + Value::Scalar(Scalar::Variant(x)) => { + let obj = if validity.as_ref().map(|v| v.get_bit(0)).unwrap_or(true) { + temp_obj = jsonb::from_slice(&x).map_err(|e| { + ErrorCode::BadArguments(format!( + "Expect to be valid json, got err: {e:?}" + )) + })?; + temp_obj.as_object().unwrap_or(&empty_obj) + } else { + &empty_obj + }; + let validity = None; + + let mut key_builder = StringColumnBuilder::with_capacity(obj.len(), 0); + for k in obj.keys() { + key_builder.put_str(k.as_str()); + key_builder.commit_row(); + } + let key_column = Column::String(key_builder.build()); + + let values: Vec<_> = obj.values().cloned().collect(); + let value_column = Column::Variant( + VariantType::create_column_from_variants(values.as_slice()), + ); + + let new_value_column = self + .run_cast( + span, + &DataType::Variant, + &fields_dest_ty[1], + Value::Column(value_column), + validity, + options, + )? + .into_column() + .unwrap(); + Ok(Value::Scalar(Scalar::Map(Column::Tuple(vec![ + key_column, + new_value_column, + ])))) + } + Value::Column(Column::Variant(col)) => { + let mut key_builder = StringColumnBuilder::with_capacity(0, 0); + let mut value_builder = + ArrayType::::create_builder(col.len(), &[]); + + for (idx, x) in col.iter().enumerate() { + let obj = if validity.as_ref().map(|v| v.get_bit(idx)).unwrap_or(true) { + temp_obj = jsonb::from_slice(x).map_err(|e| { + ErrorCode::BadArguments(format!( + "Expect to be valid json, got err: {e:?}" + )) + })?; + temp_obj.as_object().unwrap_or(&empty_obj) + } else { + &empty_obj + }; + + for (k, v) in obj.iter() { + key_builder.put_str(k.as_str()); + key_builder.commit_row(); + v.write_to_vec(&mut value_builder.builder.data); + value_builder.builder.commit_row(); + } + value_builder.commit_row(); + } + let key_column = Column::String(key_builder.build()); + + let value_column = value_builder.build(); + let validity = validity.map(|validity| { + let mut inner_validity = MutableBitmap::with_capacity(col.len()); + for (index, offsets) in value_column.offsets.windows(2).enumerate() { + inner_validity.extend_constant( + (offsets[1] - offsets[0]) as usize, + validity.get_bit(index), + ); + } + inner_validity.into() + }); + + let new_value_column = self + .run_cast( + span, + &DataType::Variant, + &fields_dest_ty[1], + Value::Column(Column::Variant(value_column.values)), + validity, + options, + )? + .into_column() + .unwrap(); + + let kv_column = Column::Tuple(vec![key_column, new_value_column]); + Ok(Value::Column(Column::Map(Box::new(ArrayColumn { + values: kv_column, + offsets: col.offsets, + })))) + } + other => unreachable!("source: {}", other), + } + } (DataType::EmptyMap, DataType::Map(inner_dest_ty)) => match value { Value::Scalar(Scalar::EmptyMap) => { let new_column = ColumnBuilder::with_capacity(inner_dest_ty, 0).build(); diff --git a/src/query/functions/src/aggregates/aggregate_kurtosis.rs b/src/query/functions/src/aggregates/aggregate_kurtosis.rs index 84d1b64fb2b9..876564b19d21 100644 --- a/src/query/functions/src/aggregates/aggregate_kurtosis.rs +++ b/src/query/functions/src/aggregates/aggregate_kurtosis.rs @@ -32,10 +32,10 @@ use crate::aggregates::AggregateFunctionRef; #[derive(Default, BorshSerialize, BorshDeserialize)] struct KurtosisState { pub n: u64, - pub sum: f64, - pub sum_sqr: f64, - pub sum_cub: f64, - pub sum_four: f64, + pub sum: F64, + pub sum_sqr: F64, + pub sum_cub: F64, + pub sum_four: F64, } impl UnaryState for KurtosisState @@ -78,27 +78,34 @@ where builder.push(F64::from(0_f64)); return Ok(()); } - let n = self.n as f64; + + let (n, sum, sum_sqr, sum_cub, sum_four) = ( + self.n as f64, + *self.sum, + *self.sum_sqr, + *self.sum_cub, + *self.sum_four, + ); + let temp = 1.0 / n; - if self.sum_sqr - self.sum * self.sum * temp == 0.0 { + if sum_sqr - sum * sum * temp == 0.0 { builder.push(F64::from(0_f64)); return Ok(()); } let m4 = temp - * (self.sum_four - 4.0 * self.sum_cub * self.sum * temp - + 6.0 * self.sum_sqr * self.sum * self.sum * temp * temp - - 3.0 * self.sum.powi(4) * temp.powi(3)); - let m2 = temp * (self.sum_sqr - self.sum * self.sum * temp); + * (sum_four - 4.0 * sum_cub * sum * temp + 6.0 * sum_sqr * sum * sum * temp * temp + - 3.0 * sum.powi(4) * temp.powi(3)); + let m2 = temp * (sum_sqr - sum * sum * temp); if m2 <= 0.0 || (n - 2.0) * (n - 3.0) == 0.0 { builder.push(F64::from(0_f64)); return Ok(()); } let value = (n - 1.0) * ((n + 1.0) * m4 / (m2 * m2) - 3.0 * (n - 1.0)) / ((n - 2.0) * (n - 3.0)); - if value.is_infinite() || value.is_nan() { - return Err(ErrorCode::SemanticError("Kurtosis is out of range!")); - } else { + if value.is_finite() { builder.push(F64::from(value)); + } else { + builder.push(F64::from(f64::NAN)); } Ok(()) } diff --git a/src/query/functions/src/aggregates/aggregate_skewness.rs b/src/query/functions/src/aggregates/aggregate_skewness.rs index b71d5292b1fd..1a798c954104 100644 --- a/src/query/functions/src/aggregates/aggregate_skewness.rs +++ b/src/query/functions/src/aggregates/aggregate_skewness.rs @@ -32,9 +32,9 @@ use crate::aggregates::aggregate_unary::UnaryState; #[derive(Default, BorshSerialize, BorshDeserialize)] pub struct SkewnessStateV2 { pub n: u64, - pub sum: f64, - pub sum_sqr: f64, - pub sum_cub: f64, + pub sum: F64, + pub sum_sqr: F64, + pub sum_cub: F64, } impl UnaryState for SkewnessStateV2 @@ -75,25 +75,23 @@ where builder.push(F64::from(0_f64)); return Ok(()); } - let n = self.n as f64; + + let (n, sum, sum_sqr, sum_cub) = (self.n as f64, *self.sum, *self.sum_sqr, *self.sum_cub); let temp = 1.0 / n; - let div = (temp * (self.sum_sqr - self.sum * self.sum * temp)) - .powi(3) - .sqrt(); + let div = (temp * (sum_sqr - sum * sum * temp)).powi(3).sqrt(); if div == 0.0 { builder.push(F64::from(0_f64)); return Ok(()); } let temp1 = (n * (n - 1.0)).sqrt() / (n - 2.0); - let value = temp1 - * temp - * (self.sum_cub - 3.0 * self.sum_sqr * self.sum * temp - + 2.0 * self.sum.powi(3) * temp * temp) - / div; - if value.is_infinite() || value.is_nan() { - return Err(ErrorCode::SemanticError("Skew is out of range!")); - } else { + let value = + temp1 * temp * (sum_cub - 3.0 * sum_sqr * sum * temp + 2.0 * sum.powi(3) * temp * temp) + / div; + + if value.is_finite() { builder.push(F64::from(value)); + } else { + builder.push(F64::from(f64::NAN)); } Ok(()) } diff --git a/src/query/functions/src/scalars/array.rs b/src/query/functions/src/scalars/array.rs index 6d9366c4ddd5..d7032ec78661 100644 --- a/src/query/functions/src/scalars/array.rs +++ b/src/query/functions/src/scalars/array.rs @@ -98,7 +98,7 @@ const ARRAY_SORT_FUNCTIONS: &[(&str, (bool, bool)); 4] = &[ pub fn register(registry: &mut FunctionRegistry) { registry.register_aliases("contains", &["array_contains"]); registry.register_aliases("get", &["array_get"]); - registry.register_aliases("length", &["array_length"]); + registry.register_aliases("length", &["array_length", "array_size"]); registry.register_aliases("slice", &["array_slice"]); register_array_aggr(registry); diff --git a/src/query/functions/src/scalars/variant.rs b/src/query/functions/src/scalars/variant.rs index 0081f4b5e625..f118929c8014 100644 --- a/src/query/functions/src/scalars/variant.rs +++ b/src/query/functions/src/scalars/variant.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::borrow::Cow; +use std::collections::BTreeSet; use std::collections::HashSet; use std::iter::once; use std::sync::Arc; @@ -1679,6 +1680,72 @@ pub fn register(registry: &mut FunctionRegistry) { }, })) }); + + registry.register_function_factory("json_object_pick", |_, args_type| { + if args_type.len() < 2 { + return None; + } + if args_type[0].remove_nullable() != DataType::Variant && args_type[0] != DataType::Null { + return None; + } + for arg_type in args_type.iter().skip(1) { + if arg_type.remove_nullable() != DataType::String && *arg_type != DataType::Null { + return None; + } + } + let is_nullable = args_type[0].is_nullable_or_null(); + let return_type = if is_nullable { + DataType::Nullable(Box::new(DataType::Variant)) + } else { + DataType::Variant + }; + Some(Arc::new(Function { + signature: FunctionSignature { + name: "json_object_pick".to_string(), + args_type: args_type.to_vec(), + return_type, + }, + eval: FunctionEval::Scalar { + calc_domain: Box::new(|_, _| FunctionDomain::MayThrow), + eval: Box::new(move |args, ctx| { + json_object_pick_or_delete_fn(args, ctx, true, is_nullable) + }), + }, + })) + }); + + registry.register_function_factory("json_object_delete", |_, args_type| { + if args_type.len() < 2 { + return None; + } + if args_type[0].remove_nullable() != DataType::Variant && args_type[0] != DataType::Null { + return None; + } + for arg_type in args_type.iter().skip(1) { + if arg_type.remove_nullable() != DataType::String && *arg_type != DataType::Null { + return None; + } + } + let is_nullable = args_type[0].is_nullable_or_null(); + let return_type = if is_nullable { + DataType::Nullable(Box::new(DataType::Variant)) + } else { + DataType::Variant + }; + Some(Arc::new(Function { + signature: FunctionSignature { + name: "json_object_delete".to_string(), + args_type: args_type.to_vec(), + return_type, + }, + eval: FunctionEval::Scalar { + calc_domain: Box::new(|_, _| FunctionDomain::MayThrow), + eval: Box::new(move |args, ctx| { + json_object_pick_or_delete_fn(args, ctx, false, is_nullable) + }), + }, + })) + }); } fn json_array_fn(args: &[ValueRef], ctx: &mut EvalContext) -> Value { @@ -2149,6 +2216,84 @@ fn json_object_insert_fn( } } +fn json_object_pick_or_delete_fn( + args: &[ValueRef], + ctx: &mut EvalContext, + is_pick: bool, + is_nullable: bool, +) -> Value { + let len_opt = args.iter().find_map(|arg| match arg { + ValueRef::Column(col) => Some(col.len()), + _ => None, + }); + let len = len_opt.unwrap_or(1); + let mut keys = BTreeSet::new(); + let mut validity = MutableBitmap::with_capacity(len); + let mut builder = BinaryColumnBuilder::with_capacity(len, len * 50); + for idx in 0..len { + let value = match &args[0] { + ValueRef::Scalar(scalar) => scalar.clone(), + ValueRef::Column(col) => unsafe { col.index_unchecked(idx) }, + }; + if value == ScalarRef::Null { + builder.commit_row(); + validity.push(false); + continue; + } + let value = value.as_variant().unwrap(); + if !is_object(value) { + ctx.set_error(builder.len(), "Invalid json object"); + builder.commit_row(); + validity.push(false); + continue; + } + keys.clear(); + for arg in args.iter().skip(1) { + let key = match &arg { + ValueRef::Scalar(scalar) => scalar.clone(), + ValueRef::Column(col) => unsafe { col.index_unchecked(idx) }, + }; + if key == ScalarRef::Null { + continue; + } + let key = key.as_string().unwrap(); + keys.insert(*key); + } + let res = if is_pick { + jsonb::object_pick(value, &keys, &mut builder.data) + } else { + jsonb::object_delete(value, &keys, &mut builder.data) + }; + if let Err(err) = res { + validity.push(false); + ctx.set_error(builder.len(), err.to_string()); + } else { + validity.push(true); + } + builder.commit_row(); + } + if is_nullable { + let validity: Bitmap = validity.into(); + match len_opt { + Some(_) => { + Value::Column(Column::Variant(builder.build())).wrap_nullable(Some(validity)) + } + None => { + if !validity.get_bit(0) { + Value::Scalar(Scalar::Null) + } else { + Value::Scalar(Scalar::Variant(builder.build_scalar())) + } + } + } + } else { + match len_opt { + Some(_) => Value::Column(Column::Variant(builder.build())), + None => Value::Scalar(Scalar::Variant(builder.build_scalar())), + } + } +} + // Extract string for string type, other types convert to JSON string. fn cast_to_string(v: &[u8]) -> String { match to_str(v) { diff --git a/src/query/functions/tests/it/scalars/array.rs b/src/query/functions/tests/it/scalars/array.rs index edc1c8c3093d..216c6d24c46f 100644 --- a/src/query/functions/tests/it/scalars/array.rs +++ b/src/query/functions/tests/it/scalars/array.rs @@ -73,6 +73,8 @@ fn test_length(file: &mut impl Write) { run_ast(file, "length([1, 2, 3])", &[]); run_ast(file, "length([true, false])", &[]); run_ast(file, "length(['a', 'b', 'c', 'd'])", &[]); + run_ast(file, "array_size(['a', 'b', 'c', 'd'])", &[]); + run_ast(file, "array_length(['a', 'b', 'c', 'd'])", &[]); } fn test_range(file: &mut impl Write) { diff --git a/src/query/functions/tests/it/scalars/testdata/array.txt b/src/query/functions/tests/it/scalars/testdata/array.txt index 2d8f5c79ff65..5fe69a9a48ca 100644 --- a/src/query/functions/tests/it/scalars/testdata/array.txt +++ b/src/query/functions/tests/it/scalars/testdata/array.txt @@ -100,6 +100,24 @@ output domain : {4..=4} output : 4 +ast : array_size(['a', 'b', 'c', 'd']) +raw expr : array_size(array('a', 'b', 'c', 'd')) +checked expr : length(array("a", "b", "c", "d")) +optimized expr : 4_u64 +output type : UInt64 +output domain : {4..=4} +output : 4 + + +ast : array_length(['a', 'b', 'c', 'd']) +raw expr : array_length(array('a', 'b', 'c', 'd')) +checked expr : length(array("a", "b", "c", "d")) +optimized expr : 4_u64 +output type : UInt64 +output domain : {4..=4} +output : 4 + + ast : range(10, 20) raw expr : range(10, 20) checked expr : range(to_uint64(10_u8), to_uint64(20_u8)) diff --git a/src/query/functions/tests/it/scalars/testdata/function_list.txt b/src/query/functions/tests/it/scalars/testdata/function_list.txt index d14e8fd8697c..7d4cf1b16a97 100644 --- a/src/query/functions/tests/it/scalars/testdata/function_list.txt +++ b/src/query/functions/tests/it/scalars/testdata/function_list.txt @@ -3,6 +3,7 @@ add -> plus array_contains -> contains array_get -> get array_length -> length +array_size -> length array_slice -> slice bitmap_and_not -> bitmap_not bitmap_cardinality -> bitmap_count @@ -2312,9 +2313,11 @@ Functions overloads: 0 json_extract_path_text(String, String) :: String NULL 1 json_extract_path_text(String NULL, String NULL) :: String NULL 0 json_object FACTORY +0 json_object_delete FACTORY 0 json_object_insert FACTORY 0 json_object_keep_null FACTORY 0 json_object_keys(Variant NULL) :: Variant NULL +0 json_object_pick FACTORY 0 json_path_exists FACTORY 0 json_path_match FACTORY 0 json_path_query FACTORY diff --git a/src/query/functions/tests/it/scalars/testdata/variant.txt b/src/query/functions/tests/it/scalars/testdata/variant.txt index 00ea67f28e02..e8545ded6ada 100644 --- a/src/query/functions/tests/it/scalars/testdata/variant.txt +++ b/src/query/functions/tests/it/scalars/testdata/variant.txt @@ -5573,3 +5573,137 @@ evaluation (internal): +--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +ast : json_object_delete('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}'::variant, 'a', 'b', 'c') +raw expr : json_object_delete(CAST('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}' AS Variant), 'a', 'b', 'c') +checked expr : json_object_delete(parse_json("{\"b\":12,\"d\":34,\"m\":[1,2],\"x\":{\"k\":\"v\"}}"), "a", "b", "c") +optimized expr : 0x4000000310000001100000011000000120000002500000105000000e646d785022800000022000000220000002500150024000000110000001100000016b76 +output type : Variant +output domain : Undefined +output : '{"d":34,"m":[1,2],"x":{"k":"v"}}' + + +ast : json_object_delete('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}'::variant, 'm', 'n', 'm', 'x') +raw expr : json_object_delete(CAST('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}' AS Variant), 'm', 'n', 'm', 'x') +checked expr : json_object_delete(parse_json("{\"b\":12,\"d\":34,\"m\":[1,2],\"x\":{\"k\":\"v\"}}"), "m", "n", "m", "x") +optimized expr : 0x40000002100000011000000120000002200000026264500c5022 +output type : Variant +output domain : Undefined +output : '{"b":12,"d":34}' + + +ast : json_object_delete('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}'::variant, 'z', null) +raw expr : json_object_delete(CAST('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}' AS Variant), 'z', NULL) +checked expr : json_object_delete(parse_json("{\"b\":12,\"d\":34,\"m\":[1,2],\"x\":{\"k\":\"v\"}}"), "z", NULL) +optimized expr : 0x40000004100000011000000110000001100000012000000220000002500000105000000e62646d78500c5022800000022000000220000002500150024000000110000001100000016b76 +output type : Variant +output domain : Undefined +output : '{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}' + + +ast : json_object_delete('{}'::variant, 'v', 'vv') +raw expr : json_object_delete(CAST('{}' AS Variant), 'v', 'vv') +checked expr : json_object_delete(parse_json("{}"), "v", "vv") +optimized expr : 0x40000000 +output type : Variant +output domain : Undefined +output : '{}' + + +error: + --> SQL:1:1 + | +1 | json_object_delete('123'::variant, 'v', 'vv') + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Invalid json object while evaluating function `json_object_delete('123', 'v', 'vv')` in expr `json_object_delete(parse_json('123'), 'v', 'vv')` + + + +ast : json_object_delete(parse_json(v), 'a', 'm') +raw expr : json_object_delete(parse_json(v::String NULL), 'a', 'm') +checked expr : json_object_delete(parse_json(v), "a", "m") +evaluation: ++--------+---------------------------------+---------------------+ +| | v | Output | ++--------+---------------------------------+---------------------+ +| Type | String NULL | Variant NULL | +| Domain | {""..="{\"m\":\"n\"}"} ∪ {NULL} | Unknown | +| Row 0 | '{"k":"v"}' | '{"k":"v"}' | +| Row 1 | '{"m":"n"}' | '{}' | +| Row 2 | NULL | NULL | +| Row 3 | '{"a":"b","c":"d","y":"z"}' | '{"c":"d","y":"z"}' | ++--------+---------------------------------+---------------------+ +evaluation (internal): ++--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Column | Data | ++--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| v | NullableColumn { column: StringColumn { data: 0x7b226b223a2276227d7b226d223a226e227d7b2261223a2262222c2263223a2264222c2279223a227a227d, offsets: [0, 9, 18, 18, 43] }, validity: [0b____1011] } | +| Output | NullableColumn { column: BinaryColumn { data: 0x4000000110000001100000016b764000000040000002100000011000000110000001100000016379647a, offsets: [0, 14, 18, 18, 42] }, validity: [0b____1011] } | ++--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + + +ast : json_object_pick('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}'::variant, 'a', 'b', 'c') +raw expr : json_object_pick(CAST('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}' AS Variant), 'a', 'b', 'c') +checked expr : json_object_pick(parse_json("{\"b\":12,\"d\":34,\"m\":[1,2],\"x\":{\"k\":\"v\"}}"), "a", "b", "c") +optimized expr : 0x40000001100000012000000262500c +output type : Variant +output domain : Undefined +output : '{"b":12}' + + +ast : json_object_pick('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}'::variant, 'm', 'n', 'm', 'x') +raw expr : json_object_pick(CAST('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}' AS Variant), 'm', 'n', 'm', 'x') +checked expr : json_object_pick(parse_json("{\"b\":12,\"d\":34,\"m\":[1,2],\"x\":{\"k\":\"v\"}}"), "m", "n", "m", "x") +optimized expr : 0x400000021000000110000001500000105000000e6d78800000022000000220000002500150024000000110000001100000016b76 +output type : Variant +output domain : Undefined +output : '{"m":[1,2],"x":{"k":"v"}}' + + +ast : json_object_pick('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}'::variant, 'z', null) +raw expr : json_object_pick(CAST('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}' AS Variant), 'z', NULL) +checked expr : json_object_pick(parse_json("{\"b\":12,\"d\":34,\"m\":[1,2],\"x\":{\"k\":\"v\"}}"), "z", NULL) +optimized expr : 0x40000000 +output type : Variant +output domain : Undefined +output : '{}' + + +ast : json_object_pick('{}'::variant, 'v', 'vv') +raw expr : json_object_pick(CAST('{}' AS Variant), 'v', 'vv') +checked expr : json_object_pick(parse_json("{}"), "v", "vv") +optimized expr : 0x40000000 +output type : Variant +output domain : Undefined +output : '{}' + + +error: + --> SQL:1:1 + | +1 | json_object_pick('123'::variant, 'v', 'vv') + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Invalid json object while evaluating function `json_object_pick('123', 'v', 'vv')` in expr `json_object_pick(parse_json('123'), 'v', 'vv')` + + + +ast : json_object_pick(parse_json(v), 'a', 'm') +raw expr : json_object_pick(parse_json(v::String NULL), 'a', 'm') +checked expr : json_object_pick(parse_json(v), "a", "m") +evaluation: ++--------+---------------------------------+--------------+ +| | v | Output | ++--------+---------------------------------+--------------+ +| Type | String NULL | Variant NULL | +| Domain | {""..="{\"m\":\"n\"}"} ∪ {NULL} | Unknown | +| Row 0 | '{"k":"v"}' | '{}' | +| Row 1 | '{"m":"n"}' | '{"m":"n"}' | +| Row 2 | NULL | NULL | +| Row 3 | '{"a":"b","c":"d","y":"z"}' | '{"a":"b"}' | ++--------+---------------------------------+--------------+ +evaluation (internal): ++--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| Column | Data | ++--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| v | NullableColumn { column: StringColumn { data: 0x7b226b223a2276227d7b226d223a226e227d7b2261223a2262222c2263223a2264222c2279223a227a227d, offsets: [0, 9, 18, 18, 43] }, validity: [0b____1011] } | +| Output | NullableColumn { column: BinaryColumn { data: 0x400000004000000110000001100000016d6e4000000110000001100000016162, offsets: [0, 4, 18, 18, 32] }, validity: [0b____1011] } | ++--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ + + diff --git a/src/query/functions/tests/it/scalars/variant.rs b/src/query/functions/tests/it/scalars/variant.rs index 5d961e312c28..3599e85a3dbc 100644 --- a/src/query/functions/tests/it/scalars/variant.rs +++ b/src/query/functions/tests/it/scalars/variant.rs @@ -70,6 +70,8 @@ fn test_variant() { test_json_array_except(file); test_json_array_overlap(file); test_json_object_insert(file); + test_json_object_delete(file); + test_json_object_pick(file); } fn test_parse_json(file: &mut impl Write) { @@ -2050,3 +2052,70 @@ fn test_json_object_insert(file: &mut impl Write) { ], ); } + +fn test_json_object_delete(file: &mut impl Write) { + run_ast( + file, + r#"json_object_delete('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}'::variant, 'a', 'b', 'c')"#, + &[], + ); + run_ast( + file, + r#"json_object_delete('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}'::variant, 'm', 'n', 'm', 'x')"#, + &[], + ); + run_ast( + file, + r#"json_object_delete('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}'::variant, 'z', null)"#, + &[], + ); + run_ast(file, r#"json_object_delete('{}'::variant, 'v', 'vv')"#, &[]); + run_ast(file, r#"json_object_delete('123'::variant, 'v', 'vv')"#, &[ + ]); + + run_ast(file, "json_object_delete(parse_json(v), 'a', 'm')", &[( + "v", + StringType::from_data_with_validity( + vec![ + r#"{"k":"v"}"#, + r#"{"m":"n"}"#, + "", + r#"{"a":"b","c":"d","y":"z"}"#, + ], + vec![true, true, false, true], + ), + )]); +} + +fn test_json_object_pick(file: &mut impl Write) { + run_ast( + file, + r#"json_object_pick('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}'::variant, 'a', 'b', 'c')"#, + &[], + ); + run_ast( + file, + r#"json_object_pick('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}'::variant, 'm', 'n', 'm', 'x')"#, + &[], + ); + run_ast( + file, + r#"json_object_pick('{"b":12,"d":34,"m":[1,2],"x":{"k":"v"}}'::variant, 'z', null)"#, + &[], + ); + run_ast(file, r#"json_object_pick('{}'::variant, 'v', 'vv')"#, &[]); + run_ast(file, r#"json_object_pick('123'::variant, 'v', 'vv')"#, &[]); + + run_ast(file, "json_object_pick(parse_json(v), 'a', 'm')", &[( + "v", + StringType::from_data_with_validity( + vec![ + r#"{"k":"v"}"#, + r#"{"m":"n"}"#, + "", + r#"{"a":"b","c":"d","y":"z"}"#, + ], + vec![true, true, false, true], + ), + )]); +} diff --git a/src/query/pipeline/transforms/src/lib.rs b/src/query/pipeline/transforms/src/lib.rs index 23c89473f7db..a8ffc7007b32 100644 --- a/src/query/pipeline/transforms/src/lib.rs +++ b/src/query/pipeline/transforms/src/lib.rs @@ -20,3 +20,4 @@ #![feature(iter_map_windows)] pub mod processors; +pub use processors::*; diff --git a/src/query/service/src/pipelines/processors/mod.rs b/src/query/service/src/pipelines/processors/mod.rs index 1d924d948156..19c56b02d31a 100644 --- a/src/query/service/src/pipelines/processors/mod.rs +++ b/src/query/service/src/pipelines/processors/mod.rs @@ -13,7 +13,7 @@ // limitations under the License. pub use databend_common_pipeline_core::processors::*; -pub(crate) mod transforms; +pub mod transforms; pub use transforms::HashJoinBuildState; pub use transforms::HashJoinDesc; diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_cell.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_cell.rs index 2b0579208fc0..8e91237cdf30 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_cell.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/aggregate_cell.rs @@ -67,6 +67,10 @@ impl HashTableCell { self.hashtable.len() } + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + pub fn allocated_bytes(&self) -> usize { self.hashtable.bytes_len(false) + self.arena.allocated_bytes() diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs index aa0e3feba5cc..b2b65acbce7a 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_aggregate_serializer.rs @@ -163,7 +163,7 @@ impl BlockMetaTransform &self.location_prefix, payload, )?, - false => agg_spilling_aggregate_payload::( + false => agg_spilling_aggregate_payload( self.ctx.clone(), self.operator.clone(), &self.location_prefix, @@ -239,7 +239,7 @@ impl BlockMetaTransform } } -fn agg_spilling_aggregate_payload( +fn agg_spilling_aggregate_payload( ctx: Arc, operator: Operator, location_prefix: &str, diff --git a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs index a5a7777ad422..f8ed4046a520 100644 --- a/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs +++ b/src/query/service/src/pipelines/processors/transforms/aggregator/serde/transform_exchange_group_by_serializer.rs @@ -214,7 +214,7 @@ impl BlockMetaTransform &self.location_prefix, payload, )?, - false => agg_spilling_group_by_payload::( + false => agg_spilling_group_by_payload( self.ctx.clone(), self.operator.clone(), &self.location_prefix, @@ -292,7 +292,7 @@ fn get_columns(data_block: DataBlock) -> Vec { data_block.columns().to_vec() } -fn agg_spilling_group_by_payload( +fn agg_spilling_group_by_payload( ctx: Arc, operator: Operator, location_prefix: &str, diff --git a/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs b/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs index d5d760478350..385cd1a40ce5 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_async_function.rs @@ -40,7 +40,7 @@ pub struct TransformAsyncFunction { } impl TransformAsyncFunction { - pub fn new( + pub(crate) fn new( ctx: Arc, async_func_descs: Vec, operators: BTreeMap>, diff --git a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs index 60bfa69b9e24..ac149d5dd852 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs @@ -129,13 +129,13 @@ where } if let Some(block) = self.output_data.take() { - debug_assert!(matches!(self.state, State::MergeFinal | State::Finish)); + assert!(matches!(self.state, State::MergeFinal | State::Finish)); self.output_block(block); return Ok(Event::NeedConsume); } if matches!(self.state, State::Finish) { - debug_assert!(self.input.is_finished()); + assert!(self.input.is_finished()); self.output.finish(); return Ok(Event::Finished); } @@ -179,6 +179,7 @@ where if meta.is_none() { // It means we get the last block. // We can launch external merge sort now. + self.input.finish(); self.state = State::Merging; } self.input_data = Some(block); diff --git a/src/query/service/tests/it/pipelines/transforms/sort.rs b/src/query/service/tests/it/pipelines/transforms/sort.rs index 3cc91634ba0a..ba4e99323216 100644 --- a/src/query/service/tests/it/pipelines/transforms/sort.rs +++ b/src/query/service/tests/it/pipelines/transforms/sort.rs @@ -19,7 +19,6 @@ use databend_common_base::base::tokio; use databend_common_base::base::tokio::sync::mpsc::channel; use databend_common_base::base::tokio::sync::mpsc::Receiver; use databend_common_exception::Result; -use databend_common_expression::block_debug::pretty_format_blocks; use databend_common_expression::types::Int32Type; use databend_common_expression::DataBlock; use databend_common_expression::DataField; @@ -34,130 +33,16 @@ use databend_common_pipeline_core::PipeItem; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_sinks::SyncSenderSink; use databend_common_pipeline_sources::BlocksSource; -use databend_common_pipeline_transforms::processors::add_k_way_merge_sort; use databend_query::pipelines::executor::ExecutorSettings; use databend_query::pipelines::executor::QueryPipelineExecutor; use databend_query::sessions::QueryContext; use databend_query::test_kits::TestFixture; -use itertools::Itertools; -use parking_lot::Mutex; use rand::rngs::ThreadRng; use rand::Rng; -#[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn test_k_way_merge_sort() -> Result<()> { - let fixture = TestFixture::setup().await?; - let ctx = fixture.new_query_ctx().await?; - - let worker = 3; - let block_size = 4; - let limit = None; - let (data, expected) = basic_test_data(None); - let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?; - - executor.execute()?; - - let mut got: Vec = Vec::new(); - while !rx.is_empty() { - got.push(rx.recv().await.unwrap()?); - } - - check_result(got, expected); - - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 1)] -async fn test_k_way_merge_sort_fuzz() -> Result<()> { - let mut rng = rand::thread_rng(); - let fixture = TestFixture::setup().await?; - - for _ in 0..10 { - let ctx = fixture.new_query_ctx().await?; - run_fuzz(ctx, &mut rng, false).await?; - } - - for _ in 0..10 { - let ctx = fixture.new_query_ctx().await?; - run_fuzz(ctx, &mut rng, true).await?; - } - Ok(()) -} - -async fn run_fuzz(ctx: Arc, rng: &mut ThreadRng, with_limit: bool) -> Result<()> { - let worker = rng.gen_range(1..=5); - let block_size = rng.gen_range(1..=20); - let (data, expected, limit) = random_test_data(rng, with_limit); - - // println!("\nwith_limit {with_limit}"); - // for (input, blocks) in data.iter().enumerate() { - // println!("intput {input}"); - // for b in blocks { - // println!("{:?}", b.columns()[0].value); - // } - // } - - let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?; - executor.execute()?; - - let mut got: Vec = Vec::new(); - while !rx.is_empty() { - got.push(rx.recv().await.unwrap()?); - } - - check_result(got, expected); - - Ok(()) -} - -fn create_pipeline( - ctx: Arc, - data: Vec>, - worker: usize, - block_size: usize, - limit: Option, -) -> Result<(Arc, Receiver>)> { - let mut pipeline = Pipeline::create(); - - let data_type = data[0][0].get_by_offset(0).data_type.clone(); - let source_pipe = create_source_pipe(ctx, data)?; - pipeline.add_pipe(source_pipe); - - let schema = DataSchemaRefExt::create(vec![DataField::new("a", data_type)]); - let sort_desc = Arc::new(vec![SortColumnDescription { - offset: 0, - asc: true, - nulls_first: true, - is_nullable: false, - }]); - add_k_way_merge_sort( - &mut pipeline, - schema, - worker, - block_size, - limit, - sort_desc, - false, - true, - )?; - - let (mut rx, sink_pipe) = create_sink_pipe(1)?; - let rx = rx.pop().unwrap(); - pipeline.add_pipe(sink_pipe); - pipeline.set_max_threads(3); - - let settings = ExecutorSettings { - query_id: Arc::new("".to_string()), - max_execute_time_in_seconds: Default::default(), - enable_queries_executor: false, - max_threads: 8, - executor_node_id: "".to_string(), - }; - let executor = QueryPipelineExecutor::create(pipeline, settings)?; - Ok((executor, rx)) -} - fn create_source_pipe(ctx: Arc, data: Vec>) -> Result { + use parking_lot::Mutex; + let size = data.len(); let mut items = Vec::with_capacity(size); @@ -179,7 +64,7 @@ fn create_source_pipe(ctx: Arc, data: Vec>) -> Resu fn create_sink_pipe(size: usize) -> Result<(Vec>>, Pipe)> { let mut rxs = Vec::with_capacity(size); let mut items = Vec::with_capacity(size); - for _index in 0..size { + for _ in 0..size { let input = InputPort::create(); let (tx, rx) = channel(1000); rxs.push(rx); @@ -193,21 +78,11 @@ fn create_sink_pipe(size: usize) -> Result<(Vec>>, Pi Ok((rxs, Pipe::create(size, 0, items))) } -/// Returns (input, expected) -pub fn basic_test_data(limit: Option) -> (Vec>, DataBlock) { - let data = vec![ - vec![vec![1, 2, 3, 4], vec![4, 5, 6, 7]], - vec![vec![1, 1, 1, 1], vec![1, 10, 100, 2000]], - vec![vec![0, 2, 4, 5]], - ]; - - prepare_input_and_result(data, limit) -} - -fn prepare_input_and_result( +fn prepare_multi_input_and_result( data: Vec>>, limit: Option, ) -> (Vec>, DataBlock) { + use itertools::Itertools; let input = data .clone() .into_iter() @@ -229,7 +104,17 @@ fn prepare_input_and_result( (input, result) } +fn prepare_single_input_and_result( + data: Vec>, + limit: Option, +) -> (Vec, DataBlock) { + let (mut input, expected) = prepare_multi_input_and_result(vec![data], limit); + (input.remove(0), expected) +} + fn check_result(result: Vec, expected: DataBlock) { + use databend_common_expression::block_debug::pretty_format_blocks; + if expected.is_empty() { if !result.is_empty() && !DataBlock::concat(&result).unwrap().is_empty() { panic!( @@ -240,46 +125,15 @@ fn check_result(result: Vec, expected: DataBlock) { return; } - let result_rows: usize = result.iter().map(|v| v.num_rows()).sum(); - let result = pretty_format_blocks(&result).unwrap(); let expected_rows = expected.num_rows(); let expected = pretty_format_blocks(&[expected]).unwrap(); + let result_rows: usize = result.iter().map(|v| v.num_rows()).sum(); + let result = pretty_format_blocks(&result).unwrap(); assert_eq!( expected, result, - "\nexpected (num_rows = {}):\n{}\nactual (num_rows = {}):\n{}", - expected_rows, expected, result_rows, result + "\nexpected (num_rows = {expected_rows}):\n{expected}\nactual (num_rows = {result_rows}):\n{result}", ); } -fn random_test_data( - rng: &mut ThreadRng, - with_limit: bool, -) -> (Vec>, DataBlock, Option) { - let random_batch_size = rng.gen_range(1..=10); - let random_num_streams = rng.gen_range(5..=10); - - let random_data = (0..random_num_streams) - .map(|_| { - let random_num_blocks = rng.gen_range(1..=10); - let mut data = (0..random_batch_size * random_num_blocks) - .map(|_| rng.gen_range(0..=1000)) - .collect::>(); - data.sort(); - data.chunks(random_batch_size) - .map(|v| v.to_vec()) - .collect::>() - }) - .collect::>(); - - let num_rows = random_data - .iter() - .map(|v| v.iter().map(|v| v.len()).sum::()) - .sum::(); - let limit = if with_limit { - Some(rng.gen_range(0..=num_rows)) - } else { - None - }; - let (input, expected) = prepare_input_and_result(random_data, limit); - (input, expected, limit) -} +mod k_way; +mod spill; diff --git a/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs b/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs new file mode 100644 index 000000000000..e217b1833c67 --- /dev/null +++ b/src/query/service/tests/it/pipelines/transforms/sort/k_way.rs @@ -0,0 +1,174 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_pipeline_transforms::processors::add_k_way_merge_sort; + +use super::*; + +fn create_pipeline( + ctx: Arc, + data: Vec>, + worker: usize, + block_size: usize, + limit: Option, +) -> Result<(Arc, Receiver>)> { + let mut pipeline = Pipeline::create(); + + let data_type = data[0][0].get_by_offset(0).data_type.clone(); + let source_pipe = create_source_pipe(ctx, data)?; + pipeline.add_pipe(source_pipe); + + let schema = DataSchemaRefExt::create(vec![DataField::new("a", data_type)]); + let sort_desc = Arc::new(vec![SortColumnDescription { + offset: 0, + asc: true, + nulls_first: true, + is_nullable: false, + }]); + add_k_way_merge_sort( + &mut pipeline, + schema, + worker, + block_size, + limit, + sort_desc, + false, + true, + )?; + + let (mut rx, sink_pipe) = create_sink_pipe(1)?; + let rx = rx.pop().unwrap(); + pipeline.add_pipe(sink_pipe); + pipeline.set_max_threads(3); + + let settings = ExecutorSettings { + query_id: Arc::new("".to_string()), + max_execute_time_in_seconds: Default::default(), + enable_queries_executor: false, + max_threads: 8, + executor_node_id: "".to_string(), + }; + let executor = QueryPipelineExecutor::create(pipeline, settings)?; + Ok((executor, rx)) +} + +async fn run_fuzz(ctx: Arc, rng: &mut ThreadRng, with_limit: bool) -> Result<()> { + let worker = rng.gen_range(1..=5); + let block_size = rng.gen_range(1..=20); + let (data, expected, limit) = random_test_data(rng, with_limit); + + // println!("\nwith_limit {with_limit}"); + // for (input, blocks) in data.iter().enumerate() { + // println!("intput {input}"); + // for b in blocks { + // println!("{:?}", b.columns()[0].value); + // } + // } + + let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?; + executor.execute()?; + + let mut got: Vec = Vec::new(); + while !rx.is_empty() { + got.push(rx.recv().await.unwrap()?); + } + + check_result(got, expected); + + Ok(()) +} + +/// Returns (input, expected) +fn basic_test_data(limit: Option) -> (Vec>, DataBlock) { + let data = vec![ + vec![vec![1, 2, 3, 4], vec![4, 5, 6, 7]], + vec![vec![1, 1, 1, 1], vec![1, 10, 100, 2000]], + vec![vec![0, 2, 4, 5]], + ]; + + prepare_multi_input_and_result(data, limit) +} + +fn random_test_data( + rng: &mut ThreadRng, + with_limit: bool, +) -> (Vec>, DataBlock, Option) { + let random_batch_size = rng.gen_range(1..=10); + let random_num_streams = rng.gen_range(5..=10); + + let random_data = (0..random_num_streams) + .map(|_| { + let random_num_blocks = rng.gen_range(1..=10); + let mut data = (0..random_batch_size * random_num_blocks) + .map(|_| rng.gen_range(0..=1000)) + .collect::>(); + data.sort(); + data.chunks(random_batch_size) + .map(|v| v.to_vec()) + .collect::>() + }) + .collect::>(); + + let num_rows = random_data + .iter() + .map(|v| v.iter().map(|v| v.len()).sum::()) + .sum::(); + let limit = if with_limit { + Some(rng.gen_range(0..=num_rows)) + } else { + None + }; + let (input, expected) = prepare_multi_input_and_result(random_data, limit); + (input, expected, limit) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_k_way_merge_sort() -> Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + + let worker = 3; + let block_size = 4; + let limit = None; + let (data, expected) = basic_test_data(None); + let (executor, mut rx) = create_pipeline(ctx, data, worker, block_size, limit)?; + + executor.execute()?; + + let mut got: Vec = Vec::new(); + while !rx.is_empty() { + got.push(rx.recv().await.unwrap()?); + } + + check_result(got, expected); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_k_way_merge_sort_fuzz() -> Result<()> { + let mut rng = rand::thread_rng(); + let fixture = TestFixture::setup().await?; + + for _ in 0..3 { + let ctx = fixture.new_query_ctx().await?; + run_fuzz(ctx, &mut rng, false).await?; + } + + for _ in 0..3 { + let ctx = fixture.new_query_ctx().await?; + run_fuzz(ctx, &mut rng, true).await?; + } + Ok(()) +} diff --git a/src/query/service/tests/it/pipelines/transforms/sort/spill.rs b/src/query/service/tests/it/pipelines/transforms/sort/spill.rs new file mode 100644 index 000000000000..a717c11f108f --- /dev/null +++ b/src/query/service/tests/it/pipelines/transforms/sort/spill.rs @@ -0,0 +1,207 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_pipeline_transforms::sort::SimpleRowConverter; +use databend_common_pipeline_transforms::sort::SimpleRowsAsc; +use databend_common_pipeline_transforms::TransformPipelineHelper; +use databend_common_pipeline_transforms::TransformSortMerge; +use databend_common_pipeline_transforms::TransformSortMergeBase; +use databend_common_storage::DataOperator; +use databend_query::pipelines::processors::transforms::create_transform_sort_spill; +use databend_query::spillers::Spiller; +use databend_query::spillers::SpillerConfig; +use databend_query::spillers::SpillerType; + +use super::*; + +fn create_sort_spill_pipeline( + ctx: Arc, + data: Vec, + block_size: usize, + limit: Option, +) -> Result<(Arc, Receiver>)> { + let mut pipeline = Pipeline::create(); + + let data_type = data[0].get_by_offset(0).data_type.clone(); + let source_pipe = create_source_pipe(ctx.clone(), vec![data])?; + pipeline.add_pipe(source_pipe); + + let schema = DataSchemaRefExt::create(vec![DataField::new("a", data_type)]); + let sort_desc = Arc::new(vec![SortColumnDescription { + offset: 0, + asc: true, + nulls_first: true, + is_nullable: false, + }]); + + let order_col_generated = false; + let output_order_col = false; + let max_memory_usage = 100; + let spilling_bytes_threshold_per_core = 1; + let spilling_batch_bytes = 1000; + let enable_loser_tree = true; + + pipeline.try_add_accumulating_transformer(|| { + TransformSortMergeBase::< + TransformSortMerge>, + SimpleRowsAsc, + SimpleRowConverter, + >::try_create( + schema.clone(), + sort_desc.clone(), + order_col_generated, + output_order_col, + max_memory_usage, + spilling_bytes_threshold_per_core, + spilling_batch_bytes, + TransformSortMerge::create( + schema.clone(), + sort_desc.clone(), + block_size, + enable_loser_tree, + ), + ) + })?; + + let spill_config = SpillerConfig { + spiller_type: SpillerType::OrderBy, + location_prefix: "_sort_spill".to_string(), + disk_spill: None, + use_parquet: true, + }; + let op = DataOperator::instance().operator(); + let spiller = Spiller::create(ctx.clone(), op, spill_config.clone())?; + pipeline.add_transform(|input, output| { + Ok(ProcessorPtr::create(create_transform_sort_spill( + input, + output, + schema.clone(), + sort_desc.clone(), + limit, + spiller.clone(), + true, + enable_loser_tree, + ))) + })?; + + let (mut rx, sink_pipe) = create_sink_pipe(1)?; + let rx = rx.pop().unwrap(); + pipeline.add_pipe(sink_pipe); + pipeline.set_max_threads(3); + + let settings = ExecutorSettings { + query_id: Arc::new("".to_string()), + max_execute_time_in_seconds: Default::default(), + enable_queries_executor: false, + max_threads: 8, + executor_node_id: "".to_string(), + }; + let executor = QueryPipelineExecutor::create(pipeline, settings)?; + Ok((executor, rx)) +} + +fn basic_test_data(limit: Option) -> (Vec, DataBlock) { + let data = vec![vec![1, 1, 1, 1], vec![1, 2, 3, 4], vec![4, 5, 6, 7]]; + + prepare_single_input_and_result(data, limit) +} + +fn random_test_data( + rng: &mut ThreadRng, + with_limit: bool, +) -> (Vec, DataBlock, Option) { + let num_rows = rng.gen_range(1..=100); + let mut data = (0..num_rows) + .map(|_| rng.gen_range(0..1000)) + .collect::>(); + data.sort(); + + let mut data = VecDeque::from(data); + let mut random_data = Vec::new(); + while !data.is_empty() { + let n = rng.gen_range(1..=10).min(data.len()); + random_data.push(data.drain(..n).collect::>()); + } + + let limit = if with_limit { + Some(rng.gen_range(1..=num_rows)) + } else { + None + }; + let (input, expected) = prepare_single_input_and_result(random_data, limit); + (input, expected, limit) +} + +async fn run_fuzz(ctx: Arc, rng: &mut ThreadRng, with_limit: bool) -> Result<()> { + let block_size = rng.gen_range(1..=20); + let (data, expected, limit) = random_test_data(rng, with_limit); + + // println!("\nwith_limit {with_limit}"); + // for (input, block) in data.iter().enumerate() { + // println!("intput {input}"); + // println!("{:?}", block.columns()[0].value); + // } + + let (executor, mut rx) = create_sort_spill_pipeline(ctx, data, block_size, limit)?; + executor.execute()?; + + let mut got: Vec = Vec::new(); + while !rx.is_empty() { + got.push(rx.recv().await.unwrap()?); + } + + check_result(got, expected); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_sort_spill() -> Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + + let block_size = 4; + let limit = None; + let (data, expected) = basic_test_data(None); + let (executor, mut rx) = create_sort_spill_pipeline(ctx, data, block_size, limit)?; + executor.execute()?; + + let mut got: Vec = Vec::new(); + while !rx.is_empty() { + got.push(rx.recv().await.unwrap()?); + } + + check_result(got, expected); + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_sort_spill_fuzz() -> Result<()> { + let mut rng = rand::thread_rng(); + let fixture = TestFixture::setup().await?; + + for _ in 0..3 { + let ctx = fixture.new_query_ctx().await?; + run_fuzz(ctx, &mut rng, false).await?; + } + + for _ in 0..3 { + let ctx = fixture.new_query_ctx().await?; + run_fuzz(ctx, &mut rng, true).await?; + } + Ok(()) +} diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 4f7137cebf0b..1246d74aae24 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -955,7 +955,8 @@ impl DefaultSettings { None => std::cmp::min(num_cpus, 64), Some(conf) => match conf.storage.params.is_fs() { true => 48, - false => std::cmp::min(num_cpus, 64), + // This value is chosen based on the performance test of pruning phase on cloud platform. + false => 64, }, } } diff --git a/src/query/sql/src/planner/planner.rs b/src/query/sql/src/planner/planner.rs index 2df5dd3032f3..1b66b4b2fec5 100644 --- a/src/query/sql/src/planner/planner.rs +++ b/src/query/sql/src/planner/planner.rs @@ -176,26 +176,27 @@ impl Planner { } }; - let mut insert_values_stmt = false; + let mut maybe_partial_insert = false; + if is_insert_or_replace_stmt && matches!(tokenizer.peek(), Some(Ok(_))) { if let Ok(PlanExtras { statement: Statement::Insert(InsertStmt { - source: InsertSource::RawValues { .. }, + source: InsertSource::Select { .. }, .. }), .. }) = &res { - insert_values_stmt = true; + maybe_partial_insert = true; } } - if insert_values_stmt || (res.is_err() && matches!(tokenizer.peek(), Some(Ok(_)))) { + if (maybe_partial_insert || res.is_err()) && matches!(tokenizer.peek(), Some(Ok(_))) { // Remove the EOI. tokens.pop(); // Tokenize more and try again. - if !insert_values_stmt && tokens.len() < PROBE_INSERT_MAX_TOKENS { + if !maybe_partial_insert && tokens.len() < PROBE_INSERT_MAX_TOKENS { let iter = (&mut tokenizer) .take(tokens.len() * 2) .take_while(|token| token.is_ok()) diff --git a/tests/sqllogictests/suites/base/03_common/03_0016_insert_into_values.test b/tests/sqllogictests/suites/base/03_common/03_0016_insert_into_values.test index d69019cc9c77..002df922dda7 100644 --- a/tests/sqllogictests/suites/base/03_common/03_0016_insert_into_values.test +++ b/tests/sqllogictests/suites/base/03_common/03_0016_insert_into_values.test @@ -66,6 +66,52 @@ select * from ts1 ---- 2023-02-17 07:56:36.564000 +statement ok +create table my_table(column1 string, column2 string, column3 string); + +statement ok +INSERT INTO my_table (column1, column2, column3) VALUES +('value1_1', 'value1_2', 'value1_3'), +('value2_1', 'value2_2', 'value2_3'), +('value3_1', 'value3_2', 'value3_3'), +('value4_1', 'value4_2', 'value4_3'), +('value5_1', 'value5_2', 'value5_3'), +('value6_1', 'value6_2', 'value6_3'), +('value7_1', 'value7_2', 'value7_3'), +('value8_1', 'value8_2', 'value8_3'), +('value9_1', 'value9_2', 'value9_3'), +('value10_1', 'value10_2', 'value10_3'), +('value11_1', 'value11_2', 'value11_3'), +('value12_1', 'value12_2', 'value12_3'), +('value13_1', 'value13_2', 'value13_3'), +('value14_1', 'value14_2', 'value14_3'), +('value15_1', 'value15_2', 'value15_3'), +('value16_1', 'value16_2', 'value16_3'), +('value17_1', 'value17_2', 'value17_3'), +('value18_1', 'value18_2', 'value18_3'), +('value19_1', 'value19_2', 'value19_3'), +('value20_1', 'value20_2', 'value20_3'), +('value21_1', 'value21_2', 'value21_3'), +('value22_1', 'value22_2', 'value22_3'), +('value23_1', 'value23_2', 'value23_3'), +('value24_1', 'value24_2', 'value24_3'), +('value25_1', 'value25_2', 'value25_3'), +('value26_1', 'value26_2', 'value26_3'), +('value27_1', 'value27_2', 'value27_3'), +('value28_1', 'value28_2', 'value28_3'), +('value29_1', 'value29_2', 'value29_3'), +('value30_1', 'value30_2', 'value30_3'), +('value31_1', 'value31_2', 'value31_3'), +('value32_1', 'value32_2', 'value32_3'); + +query TTTI +select max(column1), max(column2), max(column3), count() from my_table +---- +value9_1 value9_2 value9_3 32 + +statement ok +drop table my_table + statement ok drop table ts1 diff --git a/tests/sqllogictests/suites/query/functions/02_0000_function_aggregate_mix.test b/tests/sqllogictests/suites/query/functions/02_0000_function_aggregate_mix.test index 9c3a5d8a7911..b170af49fb42 100644 --- a/tests/sqllogictests/suites/query/functions/02_0000_function_aggregate_mix.test +++ b/tests/sqllogictests/suites/query/functions/02_0000_function_aggregate_mix.test @@ -317,6 +317,16 @@ select skewness (10) from numbers(5) ---- 0.0 +query I +select skewness(number), kurtosis(number) from (select if(number > 5, number::double, 'NAN'::Double) as number from numbers(100)); +---- +NaN NaN + +query I +select skewness(number), kurtosis(number) from (select if(number > 5, number::double, 'INF'::Double) as number from numbers(100)); +---- +NaN NaN + query III select skewness(k), skewness(v), skewness(v2) from aggr ---- diff --git a/tests/sqllogictests/suites/query/functions/02_0002_function_cast.test b/tests/sqllogictests/suites/query/functions/02_0002_function_cast.test index 2a56ddac8f95..a206d7dc9d76 100644 --- a/tests/sqllogictests/suites/query/functions/02_0002_function_cast.test +++ b/tests/sqllogictests/suites/query/functions/02_0002_function_cast.test @@ -482,5 +482,15 @@ select '[1,2,"3"]'::Variant a, a::Array(Variant) b, b::Variant = a; ---- [1,2,"3"] ['1','2','"3"'] 1 +query TTT +select '{"k1":"v1","k2":"v2"}'::Variant a, a::Map(String, String) b, b::Variant = a; +---- +{"k1":"v1","k2":"v2"} {'k1':'v1','k2':'v2'} 1 + +query TTT +select '{"a":1,"b":2}'::Variant a, a::Map(String, Int) b, b::Variant = a; +---- +{"a":1,"b":2} {'a':1,'b':2} 1 + statement ok drop table t diff --git a/tests/sqllogictests/suites/query/functions/02_0061_function_array.test b/tests/sqllogictests/suites/query/functions/02_0061_function_array.test index 0b340c991168..701fd718a8ea 100644 --- a/tests/sqllogictests/suites/query/functions/02_0061_function_array.test +++ b/tests/sqllogictests/suites/query/functions/02_0061_function_array.test @@ -29,6 +29,16 @@ select length(col1), length(col2), length(col3), length(col4) from t ---- 4 4 1 1 +query II +select array_length(col1), array_length(col2) from t +---- +4 4 + +query II +select array_size(col3), array_size(col4) from t +---- +1 1 + query ITT select get(col1, index - 7), get(col2, index - 8), get(col3, index - 9) from t ---- @@ -309,6 +319,11 @@ select length(col1), length(col2), length(col3), length(col4) from t ---- 4 4 1 1 +query IIII +select array_size(col1), array_size(col2), array_length(col3), array_length(col4) from t1 +---- +3 3 3 3 + statement ok DROP TABLE IF EXISTS t2 diff --git a/tests/sqllogictests/suites/query/functions/02_0065_function_json.test b/tests/sqllogictests/suites/query/functions/02_0065_function_json.test index 28c92fc25c20..c7ccb550e572 100644 --- a/tests/sqllogictests/suites/query/functions/02_0065_function_json.test +++ b/tests/sqllogictests/suites/query/functions/02_0065_function_json.test @@ -1434,5 +1434,55 @@ SELECT id, json_object_insert(v1, 'b', '100'::variant) from t5 2 {"a":[1,2,3],"b":100,"c":{"c1":"v1","c2":"v2"},"m":true} 3 {"a":1,"b":100,"h":2,"m":3,"n":4} +query T +SELECT json_object_delete('{"a":1,"b":2,"d":4}'::variant, 'a', 'c') +---- +{"b":2,"d":4} + +query T +SELECT json_object_delete('{"a":1,"b":2,"d":4}'::variant, 'b', 'b', null, 'd') +---- +{"a":1} + +query T +SELECT json_object_delete('{"a":1,"b":2,"d":4}'::variant, 'A', 'B') +---- +{"a":1,"b":2,"d":4} + +statement error 1006 +SELECT json_object_delete('1234'::variant, 'a') + +query IT +SELECT id, json_object_delete(v1, 'a', 'k1') from t5 +---- +1 {"k2":"v2"} +2 {"c":{"c1":"v1","c2":"v2"},"m":true} +3 {"h":2,"m":3,"n":4} + +query T +SELECT json_object_pick('{"a":1,"b":2,"d":4}'::variant, 'a', 'c') +---- +{"a":1} + +query T +SELECT json_object_pick('{"a":1,"b":2,"d":4}'::variant, 'b', 'b', null, 'd') +---- +{"b":2,"d":4} + +query T +SELECT json_object_pick('{"a":1,"b":2,"d":4}'::variant, 'A', 'B') +---- +{} + +statement error 1006 +SELECT json_object_pick('1234'::variant, 'a') + +query IT +SELECT id, json_object_pick(v1, 'a', 'k1') from t5 +---- +1 {"k1":"v1"} +2 {"a":[1,2,3]} +3 {"a":1} + statement ok DROP TABLE IF EXISTS t5