diff --git a/common/datablocks/src/data_block.rs b/common/datablocks/src/data_block.rs index a19d90c5c2b5a..d435af3fdda4c 100644 --- a/common/datablocks/src/data_block.rs +++ b/common/datablocks/src/data_block.rs @@ -185,8 +185,8 @@ impl DataBlock { .iter() .zip(schema.fields().iter()) .map(|(col, f)| match f.is_nullable() { - true => col.into_nullable_column(), - false => col.into_column(), + true => col.into_nullable_column_with_field(&f.to_arrow()), + false => col.into_column_with_field(&f.to_arrow()), }) .collect(); diff --git a/common/datavalues/src/columns/column.rs b/common/datavalues/src/columns/column.rs index d04aa688ae2ae..402e9b7e96438 100644 --- a/common/datavalues/src/columns/column.rs +++ b/common/datavalues/src/columns/column.rs @@ -19,6 +19,7 @@ use common_arrow::arrow::array::Array; use common_arrow::arrow::array::ArrayRef; use common_arrow::arrow::bitmap::Bitmap; use common_arrow::arrow::bitmap::MutableBitmap; +use common_arrow::arrow::datatypes::Field as ArrowField; use common_exception::ErrorCode; use common_exception::Result; @@ -150,6 +151,9 @@ pub trait Column: Send + Sync { pub trait IntoColumn { fn into_column(self) -> ColumnRef; fn into_nullable_column(self) -> ColumnRef; + + fn into_column_with_field(self, f: &ArrowField) -> ColumnRef; + fn into_nullable_column_with_field(self, f: &ArrowField) -> ColumnRef; } impl IntoColumn for A @@ -176,6 +180,7 @@ where A: AsRef Array => Arc::new(ArrayColumn::from_arrow_array(self.as_ref())), Struct => Arc::new(StructColumn::from_arrow_array(self.as_ref())), String => Arc::new(StringColumn::from_arrow_array(self.as_ref())), + Variant => Arc::new(JsonColumn::from_arrow_array(self.as_ref())), } } @@ -192,6 +197,51 @@ where A: AsRef }), )) } + + fn into_column_with_field(self, f: &ArrowField) -> ColumnRef { + use TypeID::*; + match from_arrow_field_meta(f) { + Some(data_type) => { + match data_type.data_type_id() { + // arrow type has no nullable type + Nullable => unimplemented!(), + Null => Arc::new(NullColumn::from_arrow_array(self.as_ref())), + Boolean => Arc::new(BooleanColumn::from_arrow_array(self.as_ref())), + UInt8 => Arc::new(UInt8Column::from_arrow_array(self.as_ref())), + UInt16 | Date16 => Arc::new(UInt16Column::from_arrow_array(self.as_ref())), + UInt32 | DateTime32 => Arc::new(UInt32Column::from_arrow_array(self.as_ref())), + UInt64 => Arc::new(UInt64Column::from_arrow_array(self.as_ref())), + Int8 => Arc::new(Int8Column::from_arrow_array(self.as_ref())), + Int16 => Arc::new(Int16Column::from_arrow_array(self.as_ref())), + Int32 | Date32 => Arc::new(Int32Column::from_arrow_array(self.as_ref())), + Int64 | Interval | DateTime64 => { + Arc::new(Int64Column::from_arrow_array(self.as_ref())) + } + Float32 => Arc::new(Float32Column::from_arrow_array(self.as_ref())), + Float64 => Arc::new(Float64Column::from_arrow_array(self.as_ref())), + Array => Arc::new(ArrayColumn::from_arrow_array(self.as_ref())), + Struct => Arc::new(StructColumn::from_arrow_array(self.as_ref())), + String => Arc::new(StringColumn::from_arrow_array(self.as_ref())), + Variant => Arc::new(JsonColumn::from_arrow_array(self.as_ref())), + } + } + None => self.as_ref().into_column(), + } + } + + fn into_nullable_column_with_field(self, f: &ArrowField) -> ColumnRef { + let size = self.as_ref().len(); + let validity = self.as_ref().validity().cloned(); + let column = self.as_ref().into_column_with_field(f); + Arc::new(NullableColumn::new( + column, + validity.unwrap_or_else(|| { + let mut bm = MutableBitmap::with_capacity(size); + bm.extend_constant(size, true); + Bitmap::from(bm) + }), + )) + } } pub fn display_helper>(iter: I) -> Vec { diff --git a/common/datavalues/src/columns/eq.rs b/common/datavalues/src/columns/eq.rs index 1955245b6dcae..b018589ce2bd8 100644 --- a/common/datavalues/src/columns/eq.rs +++ b/common/datavalues/src/columns/eq.rs @@ -96,7 +96,12 @@ pub fn equal(lhs: &dyn Column, rhs: &dyn Column) -> bool { lhs.values() == rhs.values() } + Variant => { + let lhs: &JsonColumn = lhs.as_any().downcast_ref().unwrap(); + let rhs: &JsonColumn = rhs.as_any().downcast_ref().unwrap(); + lhs.values() == rhs.values() + } other => with_match_physical_primitive_type_error!(other, |$T| { let lhs: &PrimitiveColumn<$T> = lhs.as_any().downcast_ref().unwrap(); let rhs: &PrimitiveColumn<$T> = rhs.as_any().downcast_ref().unwrap(); diff --git a/common/datavalues/src/columns/group_hash.rs b/common/datavalues/src/columns/group_hash.rs index fe136e142ce13..1487956b611fb 100644 --- a/common/datavalues/src/columns/group_hash.rs +++ b/common/datavalues/src/columns/group_hash.rs @@ -293,3 +293,6 @@ impl GroupHash for StringColumn { Ok(()) } } + +// TODO(b41sh): implement GroupHash for JsonColumn +impl GroupHash for JsonColumn {} diff --git a/common/datavalues/src/columns/mod.rs b/common/datavalues/src/columns/mod.rs index 0fa123b12dc89..fb1278ce37aa9 100644 --- a/common/datavalues/src/columns/mod.rs +++ b/common/datavalues/src/columns/mod.rs @@ -26,6 +26,7 @@ mod group_hash; mod mutable; mod null; mod nullable; +mod object; mod primitive; pub mod series; mod string; @@ -42,6 +43,7 @@ pub use group_hash::GroupHash; pub use mutable::*; pub use null::*; pub use nullable::*; +pub use object::*; pub use primitive::*; pub use series::*; pub use string::*; diff --git a/common/datavalues/src/columns/object/iterator.rs b/common/datavalues/src/columns/object/iterator.rs new file mode 100644 index 0000000000000..05e7495e4a2ed --- /dev/null +++ b/common/datavalues/src/columns/object/iterator.rs @@ -0,0 +1,68 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::iter::TrustedLen; + +use crate::prelude::*; + +#[derive(Debug, Clone)] +pub struct ObjectValueIter<'a, T: ObjectType> { + column: &'a ObjectColumn, + index: usize, +} + +impl<'a, T: ObjectType> ObjectValueIter<'a, T> { + /// Creates a new [`ObjectValueIter`] + pub fn new(column: &'a ObjectColumn) -> Self { + Self { column, index: 0 } + } +} + +impl<'a, T> Iterator for ObjectValueIter<'a, T> +where T: Scalar + ObjectType +{ + type Item = T::RefType<'a>; + + #[inline] + fn next(&mut self) -> Option { + let index = self.index; + if self.index >= self.column.len() { + return None; + } else { + self.index += 1; + } + self.column.values.get(index).map(|c| c.as_scalar_ref()) + } + + fn size_hint(&self) -> (usize, Option) { + ( + self.column.len() - self.index, + Some(self.column.len() - self.index), + ) + } +} + +impl<'a, T: ObjectType> ExactSizeIterator for ObjectValueIter<'a, T> { + fn len(&self) -> usize { + self.column.len() - self.index + } +} + +unsafe impl TrustedLen for ObjectValueIter<'_, T> {} + +impl<'a, T: ObjectType> ObjectColumn { + pub fn iter(&'a self) -> ObjectValueIter<'a, T> { + ObjectValueIter::new(self) + } +} diff --git a/common/datavalues/src/columns/object/mod.rs b/common/datavalues/src/columns/object/mod.rs new file mode 100644 index 0000000000000..edc59692c18a7 --- /dev/null +++ b/common/datavalues/src/columns/object/mod.rs @@ -0,0 +1,243 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod iterator; +mod mutable; + +use std::sync::Arc; + +use common_arrow::arrow::array::Array; +use common_arrow::arrow::bitmap::Bitmap; +use common_arrow::arrow::buffer::Buffer; +use common_arrow::arrow::datatypes::DataType as ArrowDataType; +pub use iterator::*; +pub use mutable::*; +use serde_json::Value as JsonValue; + +use crate::prelude::*; + +/// ObjectColumn is a generic struct that wrapped any structure or enumeration, +/// such as JsonValue or BitMap. +#[derive(Clone)] +pub struct ObjectColumn { + values: Vec, +} + +impl From for ObjectColumn { + fn from(array: LargeBinaryArray) -> Self { + Self::new(array) + } +} + +impl ObjectColumn { + pub fn new(array: LargeBinaryArray) -> Self { + let mut values: Vec = Vec::with_capacity(array.len()); + let offsets = array.offsets().as_slice(); + let array_values = array.values().as_slice(); + for i in 0..offsets.len() - 1 { + if let Some(validity) = array.validity() { + if let Some(is_set) = validity.get(i) { + if !is_set { + values.push(T::default()); + continue; + } + } + } + let off = offsets[i] as usize; + let len = (offsets[i + 1] - offsets[i]) as usize; + let val = std::str::from_utf8(&array_values[off..off + len]).unwrap(); + match T::from_str(val) { + Ok(v) => values.push(v), + Err(_) => values.push(T::default()), + } + } + + Self { values } + } + + pub fn from_arrow_array(array: &dyn Array) -> Self { + let array = array + .as_any() + .downcast_ref::() + .expect("object cast should be ok"); + + Self::new(array.clone()) + } + + /// # Safety + /// Assumes that the `i < self.len`. + #[inline] + pub unsafe fn value_unchecked(&self, i: usize) -> T { + // soundness: the invariant of the function + self.values.get_unchecked(i).clone() + } + + pub fn values(&self) -> &[T] { + self.values.as_slice() + } + + /// Create a new DataArray by taking ownership of the Vec. This operation is zero copy. + pub fn new_from_vec(values: Vec) -> Self { + Self { values } + } +} + +impl Column for ObjectColumn { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn data_type(&self) -> DataTypePtr { + T::data_type() + } + + fn len(&self) -> usize { + self.values.len() + } + + fn validity(&self) -> (bool, Option<&Bitmap>) { + (false, None) + } + + fn memory_size(&self) -> usize { + self.values.len() * std::mem::size_of::() + } + + fn as_arrow_array(&self) -> common_arrow::arrow::array::ArrayRef { + let mut offsets: Vec = Vec::with_capacity(self.values.len()); + let mut values: Vec = Vec::with_capacity(self.values.len()); + + let mut offset: i64 = 0; + offsets.push(offset); + for val in &self.values { + let v = val.to_string(); + values.extend(v.as_bytes().to_vec()); + offset += v.len() as i64; + offsets.push(offset); + } + + Arc::new(LargeBinaryArray::from_data( + ArrowDataType::LargeBinary, + Buffer::from_slice(offsets), + Buffer::from_slice(values), + None, + )) + } + + fn arc(&self) -> ColumnRef { + Arc::new(self.clone()) + } + + fn slice(&self, offset: usize, length: usize) -> ColumnRef { + let values = &self.values.clone()[offset..offset + length]; + Arc::new(Self { + values: values.to_vec(), + }) + } + + fn filter(&self, filter: &BooleanColumn) -> ColumnRef { + let length = filter.values().len() - filter.values().null_count(); + if length == self.len() { + return Arc::new(self.clone()); + } + let iter = self + .values() + .iter() + .zip(filter.values().iter()) + .filter(|(_, f)| *f) + .map(|(v, _)| v.clone()); + + let values: Vec = iter.collect(); + let col = ObjectColumn { values }; + + Arc::new(col) + } + + fn scatter(&self, indices: &[usize], scattered_size: usize) -> Vec { + let mut builders = Vec::with_capacity(scattered_size); + for _i in 0..scattered_size { + builders.push(MutableObjectColumn::::with_capacity(self.len())); + } + + indices + .iter() + .zip(self.values()) + .for_each(|(index, value)| { + builders[*index].append_value(value.clone()); + }); + + builders.iter_mut().map(|b| b.to_column()).collect() + } + + fn replicate(&self, offsets: &[usize]) -> ColumnRef { + debug_assert!( + offsets.len() == self.len(), + "Size of offsets must match size of column" + ); + + if offsets.is_empty() { + return self.slice(0, 0); + } + + let mut builder = + MutableObjectColumn::::with_capacity(*offsets.last().unwrap() as usize); + + let mut previous_offset: usize = 0; + + (0..self.len()).for_each(|i| { + let offset: usize = offsets[i]; + let data = unsafe { self.value_unchecked(i) }; + builder + .values + .extend(std::iter::repeat(data).take(offset - previous_offset)); + previous_offset = offset; + }); + builder.to_column() + } + + fn convert_full_column(&self) -> ColumnRef { + Arc::new(self.clone()) + } + + fn get(&self, index: usize) -> DataValue { + self.values[index].clone().into() + } +} + +impl ScalarColumn for ObjectColumn +where T: Scalar + ObjectType +{ + type Builder = MutableObjectColumn; + type OwnedItem = T; + type RefItem<'a> = ::RefType<'a>; + type Iterator<'a> = ObjectValueIter<'a, T>; + + #[inline] + fn get_data(&self, idx: usize) -> Self::RefItem<'_> { + self.values[idx].as_scalar_ref() + } + + fn scalar_iter(&self) -> Self::Iterator<'_> { + ObjectValueIter::new(self) + } +} + +pub type JsonColumn = ObjectColumn; + +impl std::fmt::Debug for ObjectColumn { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + // TODO(b41sh): implement display_fmt + write!(f, "ObjectColumn") + } +} diff --git a/common/datavalues/src/columns/object/mutable.rs b/common/datavalues/src/columns/object/mutable.rs new file mode 100644 index 0000000000000..28fe44e1ba096 --- /dev/null +++ b/common/datavalues/src/columns/object/mutable.rs @@ -0,0 +1,128 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_arrow::arrow::bitmap::MutableBitmap; +use common_exception::Result; + +use crate::columns::mutable::MutableColumn; +use crate::prelude::*; + +#[derive(Debug)] +pub struct MutableObjectColumn +where T: ObjectType +{ + pub(crate) values: Vec, +} + +impl MutableColumn for MutableObjectColumn +where T: ObjectType +{ + fn data_type(&self) -> DataTypePtr { + T::data_type() + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + + fn append_default(&mut self) { + self.append_value(T::default()); + } + + fn validity(&self) -> Option<&MutableBitmap> { + None + } + + fn shrink_to_fit(&mut self) { + self.values.shrink_to_fit(); + } + + fn len(&self) -> usize { + self.values.len() + } + + fn to_column(&mut self) -> crate::ColumnRef { + self.shrink_to_fit(); + Arc::new(ObjectColumn:: { + values: std::mem::take(&mut self.values), + }) + } + + fn append_data_value(&mut self, value: crate::DataValue) -> Result<()> { + let v: T = DFTryFrom::try_from(value)?; + self.append_value(v); + Ok(()) + } +} + +impl Default for MutableObjectColumn +where T: ObjectType +{ + fn default() -> Self { + Self::with_capacity(0) + } +} + +impl MutableObjectColumn +where T: ObjectType +{ + pub fn from_data(values: Vec) -> Self { + Self { values } + } + + pub fn append_value(&mut self, val: T) { + self.values.push(val); + } + + pub fn values(&self) -> &Vec { + &self.values + } + + pub fn with_capacity(capacity: usize) -> Self { + MutableObjectColumn { + values: Vec::::with_capacity(capacity), + } + } +} + +impl ScalarColumnBuilder for MutableObjectColumn +where + T: ObjectType, + T: Scalar>, +{ + type ColumnType = ObjectColumn; + + fn with_capacity(capacity: usize) -> Self { + MutableObjectColumn { + values: Vec::::with_capacity(capacity), + } + } + + fn push(&mut self, value: ::RefType<'_>) { + self.values.push(value.to_owned_scalar()); + } + + fn finish(&mut self) -> Self::ColumnType { + self.shrink_to_fit(); + ObjectColumn:: { + values: std::mem::take(&mut self.values), + } + } +} diff --git a/common/datavalues/src/columns/series.rs b/common/datavalues/src/columns/series.rs index 990c48bd61b8e..05a5fbfb5686a 100644 --- a/common/datavalues/src/columns/series.rs +++ b/common/datavalues/src/columns/series.rs @@ -19,6 +19,7 @@ use common_arrow::arrow::bitmap::MutableBitmap; use common_arrow::arrow::compute::concatenate; use common_exception::ErrorCode; use common_exception::Result; +use serde_json::Value as JsonValue; use crate::prelude::*; use crate::Column; @@ -190,6 +191,12 @@ impl SeriesFrom, Vec> for Series { } } +impl SeriesFrom, Vec> for Series { + fn from_data(v: Vec) -> ColumnRef { + JsonColumn::new_from_vec(v).arc() + } +} + macro_rules! impl_from_option_iterator { ([], $( { $S: ident} ),*) => { $( diff --git a/common/datavalues/src/data_value.rs b/common/datavalues/src/data_value.rs index 1b9f022c60623..108274970621a 100644 --- a/common/datavalues/src/data_value.rs +++ b/common/datavalues/src/data_value.rs @@ -21,6 +21,8 @@ use std::sync::Arc; use common_exception::ErrorCode; use common_exception::Result; use common_macros::MallocSizeOf; +use serde_json::json; +use serde_json::Value as JsonValue; use crate::prelude::*; @@ -268,6 +270,36 @@ impl DFTryFrom for Vec { } } +impl DFTryFrom for JsonValue { + fn try_from(value: DataValue) -> Result { + match value { + DataValue::Null => Ok(JsonValue::Null), + DataValue::Boolean(v) => Ok(v.into()), + DataValue::Int64(v) => Ok(v.into()), + DataValue::UInt64(v) => Ok(v.into()), + DataValue::Float64(v) => Ok(v.into()), + DataValue::String(v) => Ok(v.into()), + DataValue::Array(v) => Ok(json!(v)), + DataValue::Struct(v) => Ok(json!(v)), + } + } +} + +impl DFTryFrom<&DataValue> for JsonValue { + fn try_from(value: &DataValue) -> Result { + match value { + DataValue::Null => Ok(JsonValue::Null), + DataValue::Boolean(v) => Ok((*v as bool).into()), + DataValue::Int64(v) => Ok((*v as i64).into()), + DataValue::UInt64(v) => Ok((*v as u64).into()), + DataValue::Float64(v) => Ok((*v as f64).into()), + DataValue::String(v) => Ok(String::from_utf8(v.to_vec()).unwrap().into()), + DataValue::Array(v) => Ok(json!(*v)), + DataValue::Struct(v) => Ok(json!(*v)), + } + } +} + try_cast_data_value_to_std!(u8, as_u64); try_cast_data_value_to_std!(u16, as_u64); try_cast_data_value_to_std!(u32, as_u64); @@ -322,6 +354,12 @@ impl From>> for DataValue { } } +impl From for DataValue { + fn from(x: JsonValue) -> Self { + DataValue::String(x.to_string().into_bytes()) + } +} + impl fmt::Display for DataValue { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { diff --git a/common/datavalues/src/macros.rs b/common/datavalues/src/macros.rs index 06092976ae5d5..e35a612f470d6 100644 --- a/common/datavalues/src/macros.rs +++ b/common/datavalues/src/macros.rs @@ -28,7 +28,8 @@ macro_rules! for_all_scalar_types { { f32 }, { f64 }, { bool }, - { Vu8 } + { Vu8 }, + { JsonValue } } }; } @@ -75,6 +76,7 @@ macro_rules! for_all_primitive_boolean_types{ #[macro_export] macro_rules! for_all_scalar_varints{ ($macro:tt $(, $x:tt)*) => { + use serde_json::Value as JsonValue; $macro! { [$($x),*], { i8, Int8 }, @@ -88,7 +90,8 @@ macro_rules! for_all_scalar_varints{ { f32, Float32 }, { f64, Float64 }, { bool, Boolean }, - { Vu8, String } + { Vu8, String }, + { JsonValue, Variant } } }; } @@ -200,9 +203,9 @@ macro_rules! with_match_scalar_types_error {( type C = Vec; match $key_type { PhysicalTypeID::Boolean => __with_ty__! { bool }, - PhysicalTypeID::String => __with_ty__! { C }, + PhysicalTypeID::String => __with_ty__! { C }, - PhysicalTypeID::Int8 => __with_ty__! { i8 }, + PhysicalTypeID::Int8 => __with_ty__! { i8 }, PhysicalTypeID::Int16 => __with_ty__! { i16 }, PhysicalTypeID::Int32 => __with_ty__! { i32 }, PhysicalTypeID::Int64 => __with_ty__! { i64 }, diff --git a/common/datavalues/src/scalars/iterator.rs b/common/datavalues/src/scalars/iterator.rs index bf39793bbb2db..5080e201d04f6 100644 --- a/common/datavalues/src/scalars/iterator.rs +++ b/common/datavalues/src/scalars/iterator.rs @@ -110,3 +110,35 @@ impl<'a> ExactSizeIterator for StringViewer<'a> { } unsafe impl<'a> TrustedLen for StringViewer<'a> {} + +impl<'a, T> Iterator for ObjectViewer<'a, T> +where T: Scalar = Self> + ObjectType +{ + type Item = T::RefType<'a>; + fn next(&mut self) -> Option { + if self.pos >= self.size { + return None; + } + + let old = self.pos; + self.pos += 1; + + let value = &self.values[old & self.non_const_mask]; + Some(value.as_scalar_ref()) + } + + fn size_hint(&self) -> (usize, Option) { + (self.size - self.pos, Some(self.size - self.pos)) + } +} + +impl<'a, T> ExactSizeIterator for ObjectViewer<'a, T> +where T: Scalar = Self> + ObjectType +{ + fn len(&self) -> usize { + self.size - self.pos + } +} + +unsafe impl<'a, T> TrustedLen for ObjectViewer<'a, T> where T: Scalar = Self> + ObjectType +{} diff --git a/common/datavalues/src/scalars/type_.rs b/common/datavalues/src/scalars/type_.rs index 335112a0df41b..7c9dc42c8c56e 100644 --- a/common/datavalues/src/scalars/type_.rs +++ b/common/datavalues/src/scalars/type_.rs @@ -15,6 +15,7 @@ use std::any::Any; use common_exception::Result; +use serde_json::Value as JsonValue; use super::column::ScalarColumn; use crate::prelude::*; @@ -148,3 +149,30 @@ impl<'a> ScalarRef<'a> for &'a [u8] { self.to_vec() } } + +impl Scalar for JsonValue { + type ColumnType = ObjectColumn; + type RefType<'a> = &'a JsonValue; + type Viewer<'a> = ObjectViewer<'a, JsonValue>; + + #[inline] + fn as_scalar_ref(&self) -> &JsonValue { + self + } + + #[allow(clippy::needless_lifetimes)] + #[inline] + fn upcast_gat<'short, 'long: 'short>(long: &'long JsonValue) -> &'short JsonValue { + long + } +} + +impl<'a> ScalarRef<'a> for &'a JsonValue { + type ColumnType = ObjectColumn; + type ScalarType = JsonValue; + + #[inline] + fn to_owned_scalar(&self) -> JsonValue { + (*self).clone() + } +} diff --git a/common/datavalues/src/scalars/viewer.rs b/common/datavalues/src/scalars/viewer.rs index 2a2607faa296b..7fa52f7bd7fce 100644 --- a/common/datavalues/src/scalars/viewer.rs +++ b/common/datavalues/src/scalars/viewer.rs @@ -220,6 +220,63 @@ impl<'a> ScalarViewer<'a> for StringViewer<'a> { } } +#[derive(Clone)] +pub struct ObjectViewer<'a, T: ObjectType> { + pub(crate) values: &'a [T], + pub(crate) null_mask: usize, + pub(crate) non_const_mask: usize, + pub(crate) size: usize, + pub(crate) pos: usize, + pub(crate) validity: Bitmap, +} + +impl<'a, T> ScalarViewer<'a> for ObjectViewer<'a, T> +where T: Scalar = Self> + ObjectType +{ + type ScalarItem = T; + type Iterator = Self; + + fn try_create(column: &'a ColumnRef) -> Result { + let (inner, validity) = try_extract_inner(column)?; + let col: &ObjectColumn = Series::check_get(inner)?; + let values = col.values(); + + let null_mask = get_null_mask(column); + let non_const_mask = non_const_mask(column); + let size = column.len(); + + Ok(Self { + values, + null_mask, + non_const_mask, + validity, + size, + pos: 0, + }) + } + + #[inline] + fn value_at(&self, index: usize) -> ::RefType<'a> { + self.values[index & self.non_const_mask].as_scalar_ref() + } + + #[inline] + fn valid_at(&self, i: usize) -> bool { + unsafe { self.validity.get_bit_unchecked(i & self.null_mask) } + } + + #[inline] + fn size(&self) -> usize { + self.size + } + + fn iter(&self) -> Self { + let mut res = self.clone(); + res.pos = 0; + res + } +} + #[inline] fn try_extract_inner(column: &ColumnRef) -> Result<(&ColumnRef, Bitmap)> { let (column, validity) = if column.is_const() { diff --git a/common/datavalues/src/types/data_type.rs b/common/datavalues/src/types/data_type.rs index adb37aa3a7f2c..825b6ee070398 100644 --- a/common/datavalues/src/types/data_type.rs +++ b/common/datavalues/src/types/data_type.rs @@ -144,25 +144,33 @@ pub fn from_arrow_type(dt: &ArrowType) -> DataTypePtr { } } -pub fn from_arrow_field(f: &ArrowField) -> DataTypePtr { +pub fn from_arrow_field_meta(f: &ArrowField) -> Option { if let Some(custom_name) = f.metadata.get(ARROW_EXTENSION_NAME) { let metadata = f.metadata.get(ARROW_EXTENSION_META).cloned(); match custom_name.as_str() { - "Date" | "Date16" => return Date16Type::arc(), - "Date32" => return Date32Type::arc(), - "DateTime" | "DateTime32" => return DateTime32Type::arc(metadata), + "Date" | "Date16" => return Some(Date16Type::arc()), + "Date32" => return Some(Date32Type::arc()), + "DateTime" | "DateTime32" => return Some(DateTime32Type::arc(metadata)), "DateTime64" => match metadata { Some(meta) => { let mut chars = meta.chars(); let precision = chars.next().unwrap().to_digit(10).unwrap(); let tz = chars.collect::(); - return DateTime64Type::arc(precision as usize, Some(tz)); + return Some(DateTime64Type::arc(precision as usize, Some(tz))); } - None => return DateTime64Type::arc(3, None), + None => return Some(DateTime64Type::arc(3, None)), }, - "Interval" => return IntervalType::arc(metadata.unwrap().into()), - _ => {} - } + "Interval" => return Some(IntervalType::arc(metadata.unwrap().into())), + "Variant" => return Some(VariantType::arc()), + _ => return None, + }; + }; + None +} + +pub fn from_arrow_field(f: &ArrowField) -> DataTypePtr { + if let Some(data_type) = from_arrow_field_meta(f) { + return data_type; } let dt = f.data_type(); diff --git a/common/datavalues/src/types/deserializations/mod.rs b/common/datavalues/src/types/deserializations/mod.rs index 911947301afb3..3ebcc7147ce97 100644 --- a/common/datavalues/src/types/deserializations/mod.rs +++ b/common/datavalues/src/types/deserializations/mod.rs @@ -23,6 +23,7 @@ mod null; mod nullable; mod number; mod string; +mod variant; pub use boolean::*; pub use date::*; @@ -31,6 +32,7 @@ pub use null::*; pub use nullable::*; pub use number::*; pub use string::*; +pub use variant::*; pub trait TypeDeserializer: Send + Sync { fn de_binary(&mut self, reader: &mut &[u8]) -> Result<()>; diff --git a/common/datavalues/src/types/deserializations/variant.rs b/common/datavalues/src/types/deserializations/variant.rs new file mode 100644 index 0000000000000..dbc0f9b6511d3 --- /dev/null +++ b/common/datavalues/src/types/deserializations/variant.rs @@ -0,0 +1,76 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Read; + +use common_exception::Result; +use common_io::prelude::BinaryRead; +use serde_json::Value as JsonValue; + +use crate::prelude::*; + +pub struct VariantDeserializer { + pub buffer: Vec, + pub builder: MutableObjectColumn, +} + +impl VariantDeserializer { + pub fn with_capacity(capacity: usize) -> Self { + Self { + buffer: Vec::new(), + builder: MutableObjectColumn::::with_capacity(capacity), + } + } +} + +impl TypeDeserializer for VariantDeserializer { + #[allow(clippy::uninit_vec)] + fn de_binary(&mut self, reader: &mut &[u8]) -> Result<()> { + let offset: u64 = reader.read_uvarint()?; + + self.buffer.clear(); + self.buffer.reserve(offset as usize); + unsafe { + self.buffer.set_len(offset as usize); + } + + reader.read_exact(&mut self.buffer)?; + let val = serde_json::from_slice(self.buffer.as_slice())?; + self.builder.append_value(val); + Ok(()) + } + + fn de_default(&mut self) { + self.builder.append_value(JsonValue::Null); + } + + fn de_fixed_binary_batch(&mut self, reader: &[u8], step: usize, rows: usize) -> Result<()> { + for row in 0..rows { + let reader = &reader[step * row..]; + let val = serde_json::from_slice(reader)?; + self.builder.append_value(val); + } + Ok(()) + } + + fn de_text(&mut self, reader: &[u8]) -> Result<()> { + let val = serde_json::from_slice(reader)?; + self.builder.append_value(val); + Ok(()) + } + + fn finish_to_column(&mut self) -> ColumnRef { + self.builder.to_column() + } +} diff --git a/common/datavalues/src/types/eq.rs b/common/datavalues/src/types/eq.rs index 8a918855f253d..9f044944ccac8 100644 --- a/common/datavalues/src/types/eq.rs +++ b/common/datavalues/src/types/eq.rs @@ -48,7 +48,7 @@ pub fn equal(lhs: &dyn DataType, rhs: &dyn DataType) -> bool { use crate::prelude::TypeID::*; match lhs.data_type_id() { Boolean | UInt8 | UInt16 | UInt32 | UInt64 | Int8 | Int16 | Int32 | Int64 | Float32 - | Float64 | String | Date16 | Date32 | Interval | DateTime32 | Null => true, + | Float64 | String | Date16 | Date32 | Interval | DateTime32 | Null | Variant => true, DateTime64 => { let lhs: &DateTime64Type = lhs.as_any().downcast_ref().unwrap(); diff --git a/common/datavalues/src/types/mod.rs b/common/datavalues/src/types/mod.rs index 8c736830dc2fd..a1b8d89a411a3 100644 --- a/common/datavalues/src/types/mod.rs +++ b/common/datavalues/src/types/mod.rs @@ -28,6 +28,7 @@ pub mod type_primitive; pub mod type_string; pub mod type_struct; pub mod type_traits; +pub mod type_variant; pub mod eq; pub mod type_id; @@ -59,3 +60,4 @@ pub use type_primitive::*; pub use type_string::*; pub use type_struct::*; pub use type_traits::*; +pub use type_variant::*; diff --git a/common/datavalues/src/types/serializations/mod.rs b/common/datavalues/src/types/serializations/mod.rs index 6219f7b0edc4b..5f7310634b7b9 100644 --- a/common/datavalues/src/types/serializations/mod.rs +++ b/common/datavalues/src/types/serializations/mod.rs @@ -26,6 +26,7 @@ mod nullable; mod number; mod string; mod struct_; +mod variant; pub use array::*; pub use boolean::*; @@ -36,6 +37,7 @@ pub use nullable::*; pub use number::*; pub use string::*; pub use struct_::*; +pub use variant::*; pub trait TypeSerializer: Send + Sync { fn serialize_value(&self, value: &DataValue) -> Result; diff --git a/common/datavalues/src/types/serializations/variant.rs b/common/datavalues/src/types/serializations/variant.rs new file mode 100644 index 0000000000000..521c35fba486b --- /dev/null +++ b/common/datavalues/src/types/serializations/variant.rs @@ -0,0 +1,57 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_clickhouse_srv::types::column::ArcColumnWrapper; +use common_clickhouse_srv::types::column::ColumnFrom; +use common_exception::ErrorCode; +use common_exception::Result; +use serde_json; +use serde_json::Value as JsonValue; + +use crate::prelude::*; + +pub struct VariantSerializer {} + +impl TypeSerializer for VariantSerializer { + fn serialize_value(&self, value: &DataValue) -> Result { + if let DataValue::String(v) = value { + let val: JsonValue = serde_json::from_str(std::str::from_utf8(v).unwrap())?; + Ok(format!("{:#}", val)) + } else { + Err(ErrorCode::BadBytes("Incorrect Variant value")) + } + } + + fn serialize_column(&self, column: &ColumnRef) -> Result> { + let column: &JsonColumn = Series::check_get(column)?; + let result: Vec = column.iter().map(|v| format!("{:#}", v)).collect(); + Ok(result) + } + + fn serialize_json(&self, column: &ColumnRef) -> Result> { + let column: &JsonColumn = Series::check_get(column)?; + let result: Vec = column.iter().map(|v| v.to_owned()).collect(); + Ok(result) + } + + fn serialize_clickhouse_format( + &self, + column: &ColumnRef, + ) -> Result { + let column: &JsonColumn = Series::check_get(column)?; + let values: Vec = column.iter().map(|v| format!("{:#}", v)).collect(); + + Ok(Vec::column_from::(values)) + } +} diff --git a/common/datavalues/src/types/type_factory.rs b/common/datavalues/src/types/type_factory.rs index 27f2b5c9f9d80..6fda4ca48a59e 100644 --- a/common/datavalues/src/types/type_factory.rs +++ b/common/datavalues/src/types/type_factory.rs @@ -49,6 +49,7 @@ static TYPE_FACTORY: Lazy> = Lazy::new(|| { type_factory.register(Date32Type::arc()); type_factory.register(DateTime32Type::arc(None)); type_factory.register(DateTime64Type::arc(3, None)); + type_factory.register(VariantType::arc()); type_factory.add_array_wrapper(); type_factory.add_nullable_wrapper(); diff --git a/common/datavalues/src/types/type_id.rs b/common/datavalues/src/types/type_id.rs index 5464a63d535af..6a28709bff803 100644 --- a/common/datavalues/src/types/type_id.rs +++ b/common/datavalues/src/types/type_id.rs @@ -67,6 +67,10 @@ pub enum TypeID { Array, Struct, + + /// Variant is a tagged universal type, which can store values of any other type, + /// including Object and Array, up to a maximum size of 16 MB. + Variant, } impl TypeID { @@ -212,6 +216,7 @@ impl TypeID { String => PhysicalTypeID::String, Array => PhysicalTypeID::Array, Struct => PhysicalTypeID::Struct, + Variant => PhysicalTypeID::Variant, } } } @@ -231,6 +236,7 @@ pub enum PhysicalTypeID { Array, Struct, + Variant, /// A signed 8-bit integer. Int8, diff --git a/common/datavalues/src/types/type_primitive.rs b/common/datavalues/src/types/type_primitive.rs index 48f73b971a5e3..dc4747aa0dda4 100644 --- a/common/datavalues/src/types/type_primitive.rs +++ b/common/datavalues/src/types/type_primitive.rs @@ -123,9 +123,9 @@ macro_rules! impl_numeric { fn typetag_deserialize(&self) {} } - paste::paste!{ - pub type [<$tname Type>] = PrimitiveDataType<$ty>; - } + paste::paste!{ + pub type [<$tname Type>] = PrimitiveDataType<$ty>; + } typetag::inventory::submit! { ::typetag_register(concat!($name, "Type"),(|deserializer|std::result::Result::Ok(std::boxed::Box::new(typetag::erased_serde::deserialize:: >(deserializer)?),))as typetag::DeserializeFn< ::Object> ,) diff --git a/common/datavalues/src/types/type_traits.rs b/common/datavalues/src/types/type_traits.rs index 12e18a67e6c9a..6ab1d2dfc14c3 100644 --- a/common/datavalues/src/types/type_traits.rs +++ b/common/datavalues/src/types/type_traits.rs @@ -16,6 +16,7 @@ use common_arrow::arrow::compute::arithmetics::basic::NativeArithmetics; use num::NumCast; use serde::de::DeserializeOwned; use serde::Serialize; +use serde_json::Value as JsonValue; use crate::DFTryFrom; use crate::DataTypePtr; @@ -25,6 +26,7 @@ use crate::Date32Type; use crate::DateTime32Type; use crate::DateTime64Type; use crate::Scalar; +use crate::VariantType; pub trait PrimitiveType: NativeArithmetics @@ -119,3 +121,25 @@ impl ToDateType for i64 { DateTime64Type::arc(0, None) } } + +pub trait ObjectType: + std::fmt::Display + + Clone + + std::marker::Sync + + std::marker::Send + + DFTryFrom + + Into + + core::str::FromStr + + DeserializeOwned + + Serialize + + Default + + Scalar +{ + fn data_type() -> DataTypePtr; +} + +impl ObjectType for JsonValue { + fn data_type() -> DataTypePtr { + VariantType::arc() + } +} diff --git a/common/datavalues/src/types/type_variant.rs b/common/datavalues/src/types/type_variant.rs new file mode 100644 index 0000000000000..5781b695bbf22 --- /dev/null +++ b/common/datavalues/src/types/type_variant.rs @@ -0,0 +1,96 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; +use std::sync::Arc; + +use common_arrow::arrow::datatypes::DataType as ArrowType; +use common_exception::Result; +use serde_json::Value as JsonValue; + +use super::data_type::DataType; +use super::type_id::TypeID; +use crate::prelude::*; + +#[derive(Default, Clone, serde::Deserialize, serde::Serialize)] +pub struct VariantType {} + +impl VariantType { + pub fn arc() -> DataTypePtr { + Arc::new(Self {}) + } +} + +#[typetag::serde] +impl DataType for VariantType { + fn data_type_id(&self) -> TypeID { + TypeID::Variant + } + + #[inline] + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &str { + "Variant" + } + + fn default_value(&self) -> DataValue { + DataValue::String("Null".as_bytes().to_vec()) + } + + fn create_constant_column(&self, data: &DataValue, size: usize) -> Result { + let value: JsonValue = DFTryFrom::try_from(data)?; + let column = Series::from_data(vec![value]); + Ok(Arc::new(ConstColumn::new(column, size))) + } + + fn create_column(&self, data: &[DataValue]) -> Result { + let values: Vec = data + .iter() + .map(DFTryFrom::try_from) + .collect::>>()?; + + Ok(Series::from_data(values)) + } + + fn arrow_type(&self) -> ArrowType { + ArrowType::LargeBinary + } + + fn custom_arrow_meta(&self) -> Option> { + let mut mp = BTreeMap::new(); + mp.insert(ARROW_EXTENSION_NAME.to_string(), "Variant".to_string()); + Some(mp) + } + + fn create_serializer(&self) -> Box { + Box::new(VariantSerializer {}) + } + + fn create_deserializer(&self, capacity: usize) -> Box { + Box::new(VariantDeserializer::with_capacity(capacity)) + } + + fn create_mutable(&self, capacity: usize) -> Box { + Box::new(MutableObjectColumn::::with_capacity(capacity)) + } +} + +impl std::fmt::Debug for VariantType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.name()) + } +} diff --git a/common/datavalues/tests/it/columns/mod.rs b/common/datavalues/tests/it/columns/mod.rs index 094af5d95e90d..72236c3cb0085 100644 --- a/common/datavalues/tests/it/columns/mod.rs +++ b/common/datavalues/tests/it/columns/mod.rs @@ -14,5 +14,6 @@ mod boolean; mod builder; +mod object; mod primitive; mod string; diff --git a/common/datavalues/tests/it/columns/object.rs b/common/datavalues/tests/it/columns/object.rs new file mode 100644 index 0000000000000..8aa0e9b68e430 --- /dev/null +++ b/common/datavalues/tests/it/columns/object.rs @@ -0,0 +1,54 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_datavalues::prelude::*; +use serde_json::json; +use serde_json::Value as JsonValue; + +#[test] +fn test_empty_object_column() { + let mut builder = MutableObjectColumn::::with_capacity(16); + let data_column: ObjectColumn = builder.finish(); + let mut iter = data_column.iter(); + assert_eq!(None, iter.next()); + assert!(data_column.is_empty()); +} + +#[test] +fn test_new_from_slice() { + let v = vec![&JsonValue::Bool(true), &JsonValue::Bool(false)]; + let data_column: ObjectColumn = JsonColumn::from_slice(v.as_slice()); + let mut iter = data_column.iter(); + assert_eq!(Some(&JsonValue::Bool(true)), iter.next()); + assert_eq!(Some(&JsonValue::Bool(false)), iter.next()); + assert_eq!(None, iter.next()); +} + +#[test] +fn test_object_column() { + const N: usize = 1024; + let a = json!(true); + let b = json!(false); + let it = (0..N).map(|i| if i % 2 == 0 { &a } else { &b }); + let data_column: ObjectColumn = JsonColumn::from_iterator(it); + assert!(!data_column.is_empty()); + assert!(data_column.len() == N); + assert!(!data_column.null_at(1)); + + assert!(data_column.get(512) == DataValue::String("true".as_bytes().to_vec())); + assert!(data_column.get(513) == DataValue::String("false".as_bytes().to_vec())); + + let slice = data_column.slice(0, N / 2); + assert!(slice.len() == N / 2); +} diff --git a/common/datavalues/tests/it/columns/string.rs b/common/datavalues/tests/it/columns/string.rs index f4e6d824f9186..5b638ddbe408a 100644 --- a/common/datavalues/tests/it/columns/string.rs +++ b/common/datavalues/tests/it/columns/string.rs @@ -15,7 +15,7 @@ use common_datavalues::prelude::*; #[test] -fn test_empty_boolean_column() { +fn test_empty_string_column() { let mut builder = MutableStringColumn::with_values_capacity(16, 16); let data_column: StringColumn = builder.finish(); let mut iter = data_column.iter(); @@ -33,7 +33,7 @@ fn test_new_from_slice() { } #[test] -fn test_boolean_column() { +fn test_string_column() { const N: usize = 1024; let it = (0..N).map(|i| if i % 2 == 0 { "你好" } else { "hello" }); let data_column: StringColumn = NewColumn::new_from_iter(it); diff --git a/common/datavalues/tests/it/types/create_column.rs b/common/datavalues/tests/it/types/create_column.rs index f792840ac9326..739ed24bbd37d 100644 --- a/common/datavalues/tests/it/types/create_column.rs +++ b/common/datavalues/tests/it/types/create_column.rs @@ -17,6 +17,8 @@ use std::sync::Arc; use common_datavalues::prelude::*; use common_exception::Result; use pretty_assertions::assert_eq; +use serde_json::json; +use serde_json::Value as JsonValue; #[test] fn test_create_constant() -> Result<()> { @@ -85,6 +87,48 @@ fn test_create_constant() -> Result<()> { size: 2, column_expected: Series::from_data(&[None, None, Some(1i32)][0..2]), }, + Test { + name: "variant_null", + data_type: VariantType::arc(), + value: DataValue::Null, + size: 2, + column_expected: Series::from_data(vec![JsonValue::Null, JsonValue::Null]), + }, + Test { + name: "variant_boolean", + data_type: VariantType::arc(), + value: DataValue::Boolean(true), + size: 2, + column_expected: Series::from_data(vec![json!(true), json!(true)]), + }, + Test { + name: "variant_int64", + data_type: VariantType::arc(), + value: DataValue::Int64(1234), + size: 2, + column_expected: Series::from_data(vec![json!(1234i64), json!(1234i64)]), + }, + Test { + name: "variant_uint64", + data_type: VariantType::arc(), + value: DataValue::UInt64(1234), + size: 2, + column_expected: Series::from_data(vec![json!(1234u64), json!(1234u64)]), + }, + Test { + name: "variant_float64", + data_type: VariantType::arc(), + value: DataValue::Float64(12.34), + size: 2, + column_expected: Series::from_data(vec![json!(12.34f64), json!(12.34f64)]), + }, + Test { + name: "variant_string", + data_type: VariantType::arc(), + value: DataValue::String("hello".as_bytes().to_vec()), + size: 2, + column_expected: Series::from_data(vec![json!("hello"), json!("hello")]), + }, ]; for test in tests { diff --git a/common/datavalues/tests/it/types/serializations.rs b/common/datavalues/tests/it/types/serializations.rs index 42890c2256c15..03c48734838c8 100644 --- a/common/datavalues/tests/it/types/serializations.rs +++ b/common/datavalues/tests/it/types/serializations.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_datavalues::prelude::*; use common_exception::Result; use pretty_assertions::assert_eq; +use serde_json::json; #[test] fn test_serializers() -> Result<()> { @@ -121,6 +122,26 @@ fn test_serializers() -> Result<()> { "('1970-01-02', 3)".to_owned(), ], }, + Test { + name: "variant", + data_type: VariantType::arc(), + value: DataValue::String("true".as_bytes().to_vec()), + column: Arc::new(JsonColumn::new_from_vec(vec![ + json!(null), + json!(true), + json!(false), + json!(123), + json!(12.34), + ])), + val_str: "true", + col_str: vec![ + "null".to_owned(), + "true".to_owned(), + "false".to_owned(), + "123".to_owned(), + "12.34".to_owned(), + ], + }, ]; for test in tests { diff --git a/common/functions/src/scalars/expressions/cast_from_datetimes.rs b/common/functions/src/scalars/expressions/cast_from_datetimes.rs index 74dfa38953a9b..22484619ba9ab 100644 --- a/common/functions/src/scalars/expressions/cast_from_datetimes.rs +++ b/common/functions/src/scalars/expressions/cast_from_datetimes.rs @@ -29,6 +29,7 @@ const TIME_FMT: &str = "%Y-%m-%d %H:%M:%S"; pub fn cast_from_date16( column: &ColumnRef, + from_type: &DataTypePtr, data_type: &DataTypePtr, cast_options: &CastOptions, ) -> Result<(ColumnRef, Option)> { @@ -59,12 +60,13 @@ pub fn cast_from_date16( Ok((result, None)) } - _ => arrow_cast_compute(column, data_type, cast_options), + _ => arrow_cast_compute(column, from_type, data_type, cast_options), } } pub fn cast_from_date32( column: &ColumnRef, + from_type: &DataTypePtr, data_type: &DataTypePtr, cast_options: &CastOptions, ) -> Result<(ColumnRef, Option)> { @@ -95,12 +97,13 @@ pub fn cast_from_date32( Ok((result, None)) } - _ => arrow_cast_compute(column, data_type, cast_options), + _ => arrow_cast_compute(column, from_type, data_type, cast_options), } } pub fn cast_from_datetime32( column: &ColumnRef, + from_type: &DataTypePtr, data_type: &DataTypePtr, cast_options: &CastOptions, ) -> Result<(ColumnRef, Option)> { @@ -137,7 +140,7 @@ pub fn cast_from_datetime32( Ok((result, None)) } - _ => arrow_cast_compute(column, data_type, cast_options), + _ => arrow_cast_compute(column, from_type, data_type, cast_options), } } @@ -188,7 +191,7 @@ pub fn cast_from_datetime64( Ok((result, None)) } - _ => arrow_cast_compute(column, data_type, cast_options), + _ => arrow_cast_compute(column, from_type, data_type, cast_options), } } diff --git a/common/functions/src/scalars/expressions/cast_from_string.rs b/common/functions/src/scalars/expressions/cast_from_string.rs index 0c4aa55a9c1f9..56d2a22abebbf 100644 --- a/common/functions/src/scalars/expressions/cast_from_string.rs +++ b/common/functions/src/scalars/expressions/cast_from_string.rs @@ -26,6 +26,7 @@ use super::cast_with_type::CastOptions; pub fn cast_from_string( column: &ColumnRef, + from_type: &DataTypePtr, data_type: &DataTypePtr, cast_options: &CastOptions, ) -> Result<(ColumnRef, Option)> { @@ -89,7 +90,7 @@ pub fn cast_from_string( Ok((builder.build(size), Some(bitmap.into()))) } TypeID::Interval => todo!(), - _ => arrow_cast_compute(column, data_type, cast_options), + _ => arrow_cast_compute(column, from_type, data_type, cast_options), } } diff --git a/common/functions/src/scalars/expressions/cast_with_type.rs b/common/functions/src/scalars/expressions/cast_with_type.rs index d3e3c1ae8d819..7e2e6bc57ae31 100644 --- a/common/functions/src/scalars/expressions/cast_with_type.rs +++ b/common/functions/src/scalars/expressions/cast_with_type.rs @@ -20,8 +20,10 @@ use common_arrow::arrow::bitmap::MutableBitmap; use common_arrow::arrow::compute::cast; use common_arrow::arrow::compute::cast::CastOptions as ArrowOption; use common_datavalues::prelude::*; +use common_datavalues::with_match_physical_primitive_type; use common_exception::ErrorCode; use common_exception::Result; +use serde_json::Value as JsonValue; use super::cast_from_datetimes::cast_from_date16; use super::cast_from_datetimes::cast_from_date32; @@ -124,15 +126,23 @@ pub fn cast_with_type( let nonull_data_type = remove_nullable(data_type); let (result, valids) = match nonull_from_type.data_type_id() { - TypeID::String => cast_from_string(column, &nonull_data_type, cast_options), - TypeID::Date16 => cast_from_date16(column, &nonull_data_type, cast_options), - TypeID::Date32 => cast_from_date32(column, &nonull_data_type, cast_options), - TypeID::DateTime32 => cast_from_datetime32(column, &nonull_data_type, cast_options), + TypeID::String => { + cast_from_string(column, &nonull_from_type, &nonull_data_type, cast_options) + } + TypeID::Date16 => { + cast_from_date16(column, &nonull_from_type, &nonull_data_type, cast_options) + } + TypeID::Date32 => { + cast_from_date32(column, &nonull_from_type, &nonull_data_type, cast_options) + } + TypeID::DateTime32 => { + cast_from_datetime32(column, &nonull_from_type, &nonull_data_type, cast_options) + } TypeID::DateTime64 => { cast_from_datetime64(column, &nonull_from_type, &nonull_data_type, cast_options) } // TypeID::Interval => arrow_cast_compute(column, &nonull_data_type, cast_options), - _ => arrow_cast_compute(column, &nonull_data_type, cast_options), + _ => arrow_cast_compute(column, &nonull_from_type, &nonull_data_type, cast_options), }?; let (all_nulls, source_valids) = column.validity(); @@ -166,12 +176,63 @@ pub fn cast_with_type( Ok(result) } +pub fn cast_to_variant( + column: &ColumnRef, + from_type: &DataTypePtr, +) -> Result<(ColumnRef, Option)> { + let column = Series::remove_nullable(column); + let size = column.len(); + let mut builder = ColumnBuilder::::with_capacity(size); + + with_match_physical_primitive_type!(from_type.data_type_id().to_physical_type(), |$T| { + let col: &<$T as Scalar>::ColumnType = Series::check_get(&column)?; + for v in col.iter() { + let v = *v as $T; + let x: JsonValue = v.into(); + builder.append(&x); + } + return Ok((builder.build(size), None)); + + }, { + match from_type.data_type_id() { + TypeID::Null => { + for _ in 0..size { + let v = JsonValue::Null; + builder.append(&v); + } + return Ok((builder.build(size), None)); + } + TypeID::Boolean => { + let c: &BooleanColumn = Series::check_get(&column)?; + for v in c.iter() { + let v = v as bool; + let x: JsonValue = v.into(); + builder.append(&x); + } + return Ok((builder.build(size), None)); + } + _ => { + // other data types can't automatically casted to variant + return Err(ErrorCode::BadDataValueType(format!( + "Expression type does not match column data type, expecting VARIANT but got {:?}", + from_type.data_type_id() + ))); + } + } + }); +} + // cast using arrow's cast compute pub fn arrow_cast_compute( column: &ColumnRef, + from_type: &DataTypePtr, data_type: &DataTypePtr, cast_options: &CastOptions, ) -> Result<(ColumnRef, Option)> { + if data_type.data_type_id() == TypeID::Variant { + return cast_to_variant(column, from_type); + } + let arrow_array = column.as_arrow_array(); let arrow_options = cast_options.as_arrow(); let result = cast::cast(arrow_array.as_ref(), &data_type.arrow_type(), arrow_options)?; diff --git a/common/functions/tests/it/scalars/expressions.rs b/common/functions/tests/it/scalars/expressions.rs index 9f28ad68069ff..1f6b1faf0ff0d 100644 --- a/common/functions/tests/it/scalars/expressions.rs +++ b/common/functions/tests/it/scalars/expressions.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use common_datavalues::prelude::*; use common_exception::Result; use common_functions::scalars::*; +use serde_json::json; use super::scalar_function2_test::ScalarFunctionWithFieldTest; use crate::scalars::scalar_function2_test::test_scalar_functions; @@ -98,6 +99,117 @@ fn test_cast_function() -> Result<()> { error: "", }, ), + ( + CastFunction::create("cast", "variant")?, + ScalarFunctionTest { + name: "cast-bool-to-variant-passed", + columns: vec![Series::from_data(vec![true, false])], + expect: Series::from_data(vec![json!(true), json!(false)]), + error: "", + }, + ), + ( + CastFunction::create("cast", "variant")?, + ScalarFunctionTest { + name: "cast-int8-to-variant-passed", + columns: vec![Series::from_data(vec![-128i8, 127])], + expect: Series::from_data(vec![json!(-128i8), json!(127i8)]), + error: "", + }, + ), + ( + CastFunction::create("cast", "variant")?, + ScalarFunctionTest { + name: "cast-int16-to-variant-passed", + columns: vec![Series::from_data(vec![-32768i16, 32767])], + expect: Series::from_data(vec![json!(-32768i16), json!(32767i16)]), + error: "", + }, + ), + ( + CastFunction::create("cast", "variant")?, + ScalarFunctionTest { + name: "cast-int32-to-variant-passed", + columns: vec![Series::from_data(vec![-2147483648i32, 2147483647])], + expect: Series::from_data(vec![json!(-2147483648i32), json!(2147483647i32)]), + error: "", + }, + ), + ( + CastFunction::create("cast", "variant")?, + ScalarFunctionTest { + name: "cast-int64-to-variant-passed", + columns: vec![Series::from_data(vec![ + -9223372036854775808i64, + 9223372036854775807, + ])], + expect: Series::from_data(vec![ + json!(-9223372036854775808i64), + json!(9223372036854775807i64), + ]), + error: "", + }, + ), + ( + CastFunction::create("cast", "variant")?, + ScalarFunctionTest { + name: "cast-uint8-to-variant-passed", + columns: vec![Series::from_data(vec![0u8, 255])], + expect: Series::from_data(vec![json!(0u8), json!(255u8)]), + error: "", + }, + ), + ( + CastFunction::create("cast", "variant")?, + ScalarFunctionTest { + name: "cast-uint16-to-variant-passed", + columns: vec![Series::from_data(vec![0u16, 65535])], + expect: Series::from_data(vec![json!(0u16), json!(65535u16)]), + error: "", + }, + ), + ( + CastFunction::create("cast", "variant")?, + ScalarFunctionTest { + name: "cast-uint32-to-variant-passed", + columns: vec![Series::from_data(vec![0u32, 4294967295])], + expect: Series::from_data(vec![json!(0u32), json!(4294967295u32)]), + error: "", + }, + ), + ( + CastFunction::create("cast", "variant")?, + ScalarFunctionTest { + name: "cast-uint64-to-variant-passed", + columns: vec![Series::from_data(vec![0u64, 18446744073709551615])], + expect: Series::from_data(vec![json!(0u64), json!(18446744073709551615u64)]), + error: "", + }, + ), + ( + CastFunction::create("cast", "variant")?, + ScalarFunctionTest { + name: "cast-float32-to-variant-passed", + columns: vec![Series::from_data(vec![0.12345679f32, 12.34])], + expect: Series::from_data(vec![json!(0.12345679f32), json!(12.34f32)]), + error: "", + }, + ), + ( + CastFunction::create("cast", "variant")?, + ScalarFunctionTest { + name: "cast-float64-to-variant-passed", + columns: vec![Series::from_data(vec![ + 0.12345678912121212f64, + 12.345678912, + ])], + expect: Series::from_data(vec![ + json!(0.12345678912121212f64), + json!(12.345678912f64), + ]), + error: "", + }, + ), ]; for (test_func, test) in tests { diff --git a/docs/doc/03-reference/01-data-types/data-type-semi-structured-types.md b/docs/doc/03-reference/01-data-types/data-type-semi-structured-types.md new file mode 100644 index 0000000000000..acad98e6a6364 --- /dev/null +++ b/docs/doc/03-reference/01-data-types/data-type-semi-structured-types.md @@ -0,0 +1,8 @@ +--- +title: Semi-structured Types +description: Semi-structured Types can hold any other data types +--- + +| Data Type | Syntax | +| -----------------| -------- | +| Variant | Variant diff --git a/query/src/servers/mysql/writers/query_result_writer.rs b/query/src/servers/mysql/writers/query_result_writer.rs index 1fa16ad35969c..673e4095d1d55 100644 --- a/query/src/servers/mysql/writers/query_result_writer.rs +++ b/query/src/servers/mysql/writers/query_result_writer.rs @@ -84,6 +84,7 @@ impl<'a, W: std::io::Write> DFQueryResultWriter<'a, W> { TypeID::Null => Ok(ColumnType::MYSQL_TYPE_NULL), TypeID::Interval => Ok(ColumnType::MYSQL_TYPE_LONG), TypeID::Struct => Ok(ColumnType::MYSQL_TYPE_VARCHAR), + TypeID::Variant => Ok(ColumnType::MYSQL_TYPE_VARCHAR), _ => Err(ErrorCode::UnImplement(format!( "Unsupported column type:{:?}", field.data_type() @@ -163,6 +164,10 @@ impl<'a, W: std::io::Write> DFQueryResultWriter<'a, W> { let serializer = data_type.create_serializer(); row_writer.write_col(serializer.serialize_value(&val)?)? } + (TypeID::Variant, DataValue::String(_)) => { + let serializer = data_type.create_serializer(); + row_writer.write_col(serializer.serialize_value(&val)?)? + } (_, DataValue::Int64(v)) => row_writer.write_col(v)?, (_, DataValue::UInt64(v)) => row_writer.write_col(v)?, diff --git a/query/src/storages/fuse/statistics/accumulator.rs b/query/src/storages/fuse/statistics/accumulator.rs index 3d04904a9c4d8..6627e68c60a51 100644 --- a/query/src/storages/fuse/statistics/accumulator.rs +++ b/query/src/storages/fuse/statistics/accumulator.rs @@ -75,16 +75,19 @@ impl StatisticsAccumulator { let mut min = DataValue::Null; let mut max = DataValue::Null; - let mins = eval_aggr("min", vec![], &[column_field.clone()], rows)?; - let maxs = eval_aggr("max", vec![], &[column_field], rows)?; - - if mins.len() > 0 { - min = mins.get(0); - } - if maxs.len() > 0 { - max = maxs.get(0); + // TODO(b41sh): support max/min aggregate functions for variant + let nonull_data_type = remove_nullable(field.data_type()); + if nonull_data_type.data_type_id() != TypeID::Variant { + let mins = eval_aggr("min", vec![], &[column_field.clone()], rows)?; + let maxs = eval_aggr("max", vec![], &[column_field], rows)?; + + if mins.len() > 0 { + min = mins.get(0); + } + if maxs.len() > 0 { + max = maxs.get(0); + } } - let (is_all_null, bitmap) = col.validity(); let null_count = match (is_all_null, bitmap) { (true, _) => rows, diff --git a/query/src/storages/fuse/statistics/reducers.rs b/query/src/storages/fuse/statistics/reducers.rs index 38295c833fe86..a200906cbbdb9 100644 --- a/query/src/storages/fuse/statistics/reducers.rs +++ b/query/src/storages/fuse/statistics/reducers.rs @@ -17,6 +17,7 @@ use std::borrow::Borrow; use std::collections::hash_map::Entry; use std::collections::HashMap; +use common_datavalues::prelude::*; use common_datavalues::ColumnWithField; use common_datavalues::DataSchema; use common_datavalues::DataValue; @@ -74,34 +75,38 @@ pub fn reduce_block_stats>( // TODO panic let data_type = schema.field((*id) as usize).data_type(); - let field = schema.field((*id) as usize); - // TODO - // for some data types, we shall balance the accuracy and the length - // e.g. for a string col, which max value is "abcdef....", we record the max as something like "b" - let min = data_type.create_column(&min_stats)?; - let max = data_type.create_column(&max_stats)?; - - let mins = eval_aggr( - "min", - vec![], - &[ColumnWithField::new(min.clone(), field.clone())], - min.len(), - )?; - let maxs = eval_aggr( - "max", - vec![], - &[ColumnWithField::new(max.clone(), field.clone())], - max.len(), - )?; - let mut min = DataValue::Null; let mut max = DataValue::Null; - if mins.len() > 0 { - min = mins.get(0); - } - if maxs.len() > 0 { - max = maxs.get(0); + // TODO(b41sh): support max/min aggregate functions for variant + let nonull_data_type = remove_nullable(data_type); + if nonull_data_type.data_type_id() != TypeID::Variant { + let field = schema.field((*id) as usize); + // TODO + // for some data types, we shall balance the accuracy and the length + // e.g. for a string col, which max value is "abcdef....", we record the max as something like "b" + let min_column = data_type.create_column(&min_stats)?; + let max_column = data_type.create_column(&max_stats)?; + + let mins = eval_aggr( + "min", + vec![], + &[ColumnWithField::new(min_column.clone(), field.clone())], + min_column.len(), + )?; + let maxs = eval_aggr( + "max", + vec![], + &[ColumnWithField::new(max_column.clone(), field.clone())], + max_column.len(), + )?; + + if mins.len() > 0 { + min = mins.get(0); + } + if maxs.len() > 0 { + max = maxs.get(0); + } } acc.insert(*id, ColumnStatistics { diff --git a/tests/suites/0_stateless/03_dml/03_0016_insert_into_values.result b/tests/suites/0_stateless/03_dml/03_0016_insert_into_values.result index a47232124af5f..b7a8d5ae7b355 100644 --- a/tests/suites/0_stateless/03_dml/03_0016_insert_into_values.result +++ b/tests/suites/0_stateless/03_dml/03_0016_insert_into_values.result @@ -1,3 +1,15 @@ -1 33 2021-08-15 10:00:00 string1234 101 67 2021-11-15 10:00:00 string5678 100 100 +1 NULL +2 true +3 false +4 1 +5 -1 +6 1000 +7 -1000 +8 9223372036854775807 +9 -9223372036854775808 +10 18446744073709551615 +11 0.12345679 +12 0.12345678912121212 diff --git a/tests/suites/0_stateless/03_dml/03_0016_insert_into_values.sql b/tests/suites/0_stateless/03_dml/03_0016_insert_into_values.sql index d29ffe899f699..c68ad2bd8d4bc 100644 --- a/tests/suites/0_stateless/03_dml/03_0016_insert_into_values.sql +++ b/tests/suites/0_stateless/03_dml/03_0016_insert_into_values.sql @@ -11,4 +11,22 @@ INSERT INTO t1 (a,b,c,d) VALUES(-1, 33, '2021-08-15 10:00:00', 'string1234'), select * from t1; select sum(a),sum(b) from t1; + +CREATE TABLE IF NOT EXISTS t2(id Int, var Variant) Engine = Memory; + +INSERT INTO t2 (id, var) VALUES(1, null), + (2, true), + (3, false), + (4, 1), + (5, -1), + (6, 1000), + (7, -1000), + (8, 9223372036854775807), + (9, -9223372036854775808), + (10, 18446744073709551615), + (11, 0.12345679), + (12, 0.12345678912121212); + +select * from t2; + DROP DATABASE db1;