Skip to content

Commit

Permalink
Merge branch 'main' into bump-opendal
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Oct 25, 2024
2 parents dc7eea8 + 4a80ca2 commit 93332af
Show file tree
Hide file tree
Showing 29 changed files with 1,076 additions and 224 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ backtrace = { git = "https://github.com/rust-lang/backtrace-rs.git", rev = "7226
] }
color-eyre = { git = "https://github.com/eyre-rs/eyre.git", rev = "e5d92c3" }
ethnum = { git = "https://github.com/datafuse-extras/ethnum-rs", rev = "4cb05f1" }
jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "672e423" }
jsonb = { git = "https://github.com/databendlabs/jsonb", rev = "ada713c" }
openai_api_rust = { git = "https://github.com/datafuse-extras/openai-api", rev = "819a0ed" }
opendal_compat = { git = "https://github.com/apache/opendal", rev = "f6e60f6" }
orc-rust = { git = "https://github.com/datafuse-extras/datafusion-orc", rev = "03372b97" }
Expand Down
128 changes: 115 additions & 13 deletions src/query/expression/src/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;
use std::collections::HashMap;
use std::ops::Not;

Expand All @@ -35,6 +36,7 @@ use crate::types::array::ArrayColumn;
use crate::types::boolean::BooleanDomain;
use crate::types::nullable::NullableColumn;
use crate::types::nullable::NullableDomain;
use crate::types::string::StringColumnBuilder;
use crate::types::ArgType;
use crate::types::ArrayType;
use crate::types::BooleanType;
Expand Down Expand Up @@ -507,7 +509,7 @@ impl<'a> Evaluator<'a> {
},
(DataType::Variant, DataType::Array(inner_dest_ty)) => {
let empty_vec = vec![];
let temp_array: jsonb::Value;
let mut temp_array: jsonb::Value;
match value {
Value::Scalar(Scalar::Variant(x)) => {
let array = if validity.as_ref().map(|v| v.get_bit(0)).unwrap_or(true) {
Expand All @@ -520,22 +522,16 @@ impl<'a> Evaluator<'a> {
} else {
&empty_vec
};

let column = VariantType::create_column_from_variants(array.as_slice());

let validity = validity.map(|validity| {
Bitmap::new_constant(
validity.unset_bits() != validity.len(),
column.len(),
)
});

let validity = None;
let column = Column::Variant(VariantType::create_column_from_variants(
array.as_slice(),
));
let new_array = self
.run_cast(
span,
&DataType::Variant,
inner_dest_ty,
Value::Column(Column::Variant(column)),
Value::Column(column),
validity,
options,
)?
Expand All @@ -547,7 +543,6 @@ impl<'a> Evaluator<'a> {
let mut array_builder =
ArrayType::<VariantType>::create_builder(col.len(), &[]);

let mut temp_array: jsonb::Value;
for (idx, x) in col.iter().enumerate() {
let array = if validity.as_ref().map(|v| v.get_bit(idx)).unwrap_or(true)
{
Expand Down Expand Up @@ -597,6 +592,113 @@ impl<'a> Evaluator<'a> {
other => unreachable!("source: {}", other),
}
}
(DataType::Variant, DataType::Map(box DataType::Tuple(fields_dest_ty)))
if fields_dest_ty.len() == 2 && fields_dest_ty[0] == DataType::String =>
{
let empty_obj = BTreeMap::new();
let mut temp_obj: jsonb::Value;
match value {
Value::Scalar(Scalar::Variant(x)) => {
let obj = if validity.as_ref().map(|v| v.get_bit(0)).unwrap_or(true) {
temp_obj = jsonb::from_slice(&x).map_err(|e| {
ErrorCode::BadArguments(format!(
"Expect to be valid json, got err: {e:?}"
))
})?;
temp_obj.as_object().unwrap_or(&empty_obj)
} else {
&empty_obj
};
let validity = None;

let mut key_builder = StringColumnBuilder::with_capacity(obj.len(), 0);
for k in obj.keys() {
key_builder.put_str(k.as_str());
key_builder.commit_row();
}
let key_column = Column::String(key_builder.build());

let values: Vec<_> = obj.values().cloned().collect();
let value_column = Column::Variant(
VariantType::create_column_from_variants(values.as_slice()),
);

let new_value_column = self
.run_cast(
span,
&DataType::Variant,
&fields_dest_ty[1],
Value::Column(value_column),
validity,
options,
)?
.into_column()
.unwrap();
Ok(Value::Scalar(Scalar::Map(Column::Tuple(vec![
key_column,
new_value_column,
]))))
}
Value::Column(Column::Variant(col)) => {
let mut key_builder = StringColumnBuilder::with_capacity(0, 0);
let mut value_builder =
ArrayType::<VariantType>::create_builder(col.len(), &[]);

for (idx, x) in col.iter().enumerate() {
let obj = if validity.as_ref().map(|v| v.get_bit(idx)).unwrap_or(true) {
temp_obj = jsonb::from_slice(x).map_err(|e| {
ErrorCode::BadArguments(format!(
"Expect to be valid json, got err: {e:?}"
))
})?;
temp_obj.as_object().unwrap_or(&empty_obj)
} else {
&empty_obj
};

for (k, v) in obj.iter() {
key_builder.put_str(k.as_str());
key_builder.commit_row();
v.write_to_vec(&mut value_builder.builder.data);
value_builder.builder.commit_row();
}
value_builder.commit_row();
}
let key_column = Column::String(key_builder.build());

let value_column = value_builder.build();
let validity = validity.map(|validity| {
let mut inner_validity = MutableBitmap::with_capacity(col.len());
for (index, offsets) in value_column.offsets.windows(2).enumerate() {
inner_validity.extend_constant(
(offsets[1] - offsets[0]) as usize,
validity.get_bit(index),
);
}
inner_validity.into()
});

let new_value_column = self
.run_cast(
span,
&DataType::Variant,
&fields_dest_ty[1],
Value::Column(Column::Variant(value_column.values)),
validity,
options,
)?
.into_column()
.unwrap();

let kv_column = Column::Tuple(vec![key_column, new_value_column]);
Ok(Value::Column(Column::Map(Box::new(ArrayColumn {
values: kv_column,
offsets: col.offsets,
}))))
}
other => unreachable!("source: {}", other),
}
}
(DataType::EmptyMap, DataType::Map(inner_dest_ty)) => match value {
Value::Scalar(Scalar::EmptyMap) => {
let new_column = ColumnBuilder::with_capacity(inner_dest_ty, 0).build();
Expand Down
33 changes: 20 additions & 13 deletions src/query/functions/src/aggregates/aggregate_kurtosis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ use crate::aggregates::AggregateFunctionRef;
#[derive(Default, BorshSerialize, BorshDeserialize)]
struct KurtosisState {
pub n: u64,
pub sum: f64,
pub sum_sqr: f64,
pub sum_cub: f64,
pub sum_four: f64,
pub sum: F64,
pub sum_sqr: F64,
pub sum_cub: F64,
pub sum_four: F64,
}

impl<T> UnaryState<T, Float64Type> for KurtosisState
Expand Down Expand Up @@ -78,27 +78,34 @@ where
builder.push(F64::from(0_f64));
return Ok(());
}
let n = self.n as f64;

let (n, sum, sum_sqr, sum_cub, sum_four) = (
self.n as f64,
*self.sum,
*self.sum_sqr,
*self.sum_cub,
*self.sum_four,
);

let temp = 1.0 / n;
if self.sum_sqr - self.sum * self.sum * temp == 0.0 {
if sum_sqr - sum * sum * temp == 0.0 {
builder.push(F64::from(0_f64));
return Ok(());
}
let m4 = temp
* (self.sum_four - 4.0 * self.sum_cub * self.sum * temp
+ 6.0 * self.sum_sqr * self.sum * self.sum * temp * temp
- 3.0 * self.sum.powi(4) * temp.powi(3));
let m2 = temp * (self.sum_sqr - self.sum * self.sum * temp);
* (sum_four - 4.0 * sum_cub * sum * temp + 6.0 * sum_sqr * sum * sum * temp * temp
- 3.0 * sum.powi(4) * temp.powi(3));
let m2 = temp * (sum_sqr - sum * sum * temp);
if m2 <= 0.0 || (n - 2.0) * (n - 3.0) == 0.0 {
builder.push(F64::from(0_f64));
return Ok(());
}
let value =
(n - 1.0) * ((n + 1.0) * m4 / (m2 * m2) - 3.0 * (n - 1.0)) / ((n - 2.0) * (n - 3.0));
if value.is_infinite() || value.is_nan() {
return Err(ErrorCode::SemanticError("Kurtosis is out of range!"));
} else {
if value.is_finite() {
builder.push(F64::from(value));
} else {
builder.push(F64::from(f64::NAN));
}
Ok(())
}
Expand Down
28 changes: 13 additions & 15 deletions src/query/functions/src/aggregates/aggregate_skewness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ use crate::aggregates::aggregate_unary::UnaryState;
#[derive(Default, BorshSerialize, BorshDeserialize)]
pub struct SkewnessStateV2 {
pub n: u64,
pub sum: f64,
pub sum_sqr: f64,
pub sum_cub: f64,
pub sum: F64,
pub sum_sqr: F64,
pub sum_cub: F64,
}

impl<T> UnaryState<T, Float64Type> for SkewnessStateV2
Expand Down Expand Up @@ -75,25 +75,23 @@ where
builder.push(F64::from(0_f64));
return Ok(());
}
let n = self.n as f64;

let (n, sum, sum_sqr, sum_cub) = (self.n as f64, *self.sum, *self.sum_sqr, *self.sum_cub);
let temp = 1.0 / n;
let div = (temp * (self.sum_sqr - self.sum * self.sum * temp))
.powi(3)
.sqrt();
let div = (temp * (sum_sqr - sum * sum * temp)).powi(3).sqrt();
if div == 0.0 {
builder.push(F64::from(0_f64));
return Ok(());
}
let temp1 = (n * (n - 1.0)).sqrt() / (n - 2.0);
let value = temp1
* temp
* (self.sum_cub - 3.0 * self.sum_sqr * self.sum * temp
+ 2.0 * self.sum.powi(3) * temp * temp)
/ div;
if value.is_infinite() || value.is_nan() {
return Err(ErrorCode::SemanticError("Skew is out of range!"));
} else {
let value =
temp1 * temp * (sum_cub - 3.0 * sum_sqr * sum * temp + 2.0 * sum.powi(3) * temp * temp)
/ div;

if value.is_finite() {
builder.push(F64::from(value));
} else {
builder.push(F64::from(f64::NAN));
}
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion src/query/functions/src/scalars/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ const ARRAY_SORT_FUNCTIONS: &[(&str, (bool, bool)); 4] = &[
pub fn register(registry: &mut FunctionRegistry) {
registry.register_aliases("contains", &["array_contains"]);
registry.register_aliases("get", &["array_get"]);
registry.register_aliases("length", &["array_length"]);
registry.register_aliases("length", &["array_length", "array_size"]);
registry.register_aliases("slice", &["array_slice"]);

register_array_aggr(registry);
Expand Down
Loading

0 comments on commit 93332af

Please sign in to comment.