From eb9e2e621646ff2b90361a0e77028d11e4d0d8a5 Mon Sep 17 00:00:00 2001 From: andylokandy Date: Sun, 13 Oct 2024 16:57:25 +0800 Subject: [PATCH] feat: implement StringColumn using StringViewArray --- .../arrow/src/arrow/array/binview/mutable.rs | 4 +- .../expression/src/converts/arrow2/from.rs | 289 ++++++++--------- .../expression/src/converts/arrow2/to.rs | 56 +--- .../select_value/select_column_scalar.rs | 96 ++---- src/query/expression/src/kernels/concat.rs | 35 +-- src/query/expression/src/kernels/filter.rs | 124 +------- .../src/kernels/group_by_hash/method.rs | 5 +- .../group_by_hash/method_dict_serializer.rs | 7 +- .../group_by_hash/method_fixed_keys.rs | 25 +- .../group_by_hash/method_serializer.rs | 14 +- .../group_by_hash/method_single_string.rs | 41 +-- .../src/kernels/group_by_hash/utils.rs | 27 +- src/query/expression/src/kernels/take.rs | 50 +-- .../expression/src/kernels/take_chunks.rs | 51 +-- .../expression/src/kernels/take_compact.rs | 69 +--- .../expression/src/kernels/take_ranges.rs | 35 +-- src/query/expression/src/row/row_converter.rs | 5 +- src/query/expression/src/types/binary.rs | 161 ++++------ src/query/expression/src/types/bitmap.rs | 2 +- src/query/expression/src/types/geography.rs | 34 +- src/query/expression/src/types/geometry.rs | 2 +- src/query/expression/src/types/string.rs | 295 +++++------------- src/query/expression/src/types/variant.rs | 2 +- src/query/expression/src/utils/arrow.rs | 5 + src/query/expression/src/utils/display.rs | 12 +- src/query/expression/src/values.rs | 11 +- src/query/expression/tests/it/group_by.rs | 4 +- src/query/expression/tests/it/row.rs | 37 +-- .../formats/src/field_decoder/fast_values.rs | 2 +- src/query/formats/src/field_decoder/nested.rs | 2 +- src/query/functions/src/scalars/arithmetic.rs | 30 +- src/query/functions/src/scalars/binary.rs | 66 ++-- src/query/functions/src/scalars/comparison.rs | 31 +- src/query/functions/src/scalars/datetime.rs | 8 +- src/query/functions/src/scalars/hash.rs | 24 +- src/query/functions/src/scalars/other.rs | 12 +- src/query/functions/src/scalars/string.rs | 92 ++---- .../processors/transforms/sort/rows/common.rs | 6 +- .../group_by/aggregator_polymorphic_keys.rs | 5 +- .../storages/common/index/src/bloom_index.rs | 2 +- .../operations/read/runtime_filter_prunner.rs | 4 +- .../row_based/formats/csv/block_builder.rs | 8 +- 42 files changed, 521 insertions(+), 1269 deletions(-) diff --git a/src/common/arrow/src/arrow/array/binview/mutable.rs b/src/common/arrow/src/arrow/array/binview/mutable.rs index 64ef7880bcf85..38c088ec88b13 100644 --- a/src/common/arrow/src/arrow/array/binview/mutable.rs +++ b/src/common/arrow/src/arrow/array/binview/mutable.rs @@ -41,9 +41,9 @@ pub struct MutableBinaryViewArray { pub(super) validity: Option, pub(super) phantom: std::marker::PhantomData, /// Total bytes length if we would concatenate them all. - pub(super) total_bytes_len: usize, + pub total_bytes_len: usize, /// Total bytes in the buffer (excluding remaining capacity) - pub(super) total_buffer_len: usize, + pub total_buffer_len: usize, } impl Clone for MutableBinaryViewArray { diff --git a/src/query/expression/src/converts/arrow2/from.rs b/src/query/expression/src/converts/arrow2/from.rs index 0a264afce0d0c..d58fe5a0b202c 100644 --- a/src/query/expression/src/converts/arrow2/from.rs +++ b/src/query/expression/src/converts/arrow2/from.rs @@ -12,12 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_arrow::arrow::array::BinaryArray; +use databend_common_arrow::arrow::array::FixedSizeBinaryArray; +use databend_common_arrow::arrow::array::Utf8Array; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_arrow::arrow::datatypes::DataType as ArrowDataType; use databend_common_arrow::arrow::datatypes::Field as ArrowField; use databend_common_arrow::arrow::datatypes::Schema as ArrowSchema; use databend_common_arrow::arrow::datatypes::TimeUnit; +use databend_common_arrow::arrow::types::Offset; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -27,7 +31,9 @@ use super::ARROW_EXT_TYPE_EMPTY_MAP; use super::ARROW_EXT_TYPE_GEOMETRY; use super::ARROW_EXT_TYPE_VARIANT; use crate::types::array::ArrayColumn; +use crate::types::binary; use crate::types::binary::BinaryColumn; +use crate::types::binary::BinaryColumnBuilder; use crate::types::decimal::DecimalColumn; use crate::types::geography::GeographyColumn; use crate::types::nullable::NullableColumn; @@ -348,16 +354,7 @@ impl Column { .expect( "fail to read `Binary` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Binary(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Binary(binary_array_to_binary_column(arrow_col)) } (DataType::Binary, ArrowDataType::LargeBinary) => { let arrow_col = arrow_col @@ -366,10 +363,7 @@ impl Column { .expect( "fail to read `Binary` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Binary(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Binary(binary_array_to_binary_column(arrow_col)) } (DataType::Binary, ArrowDataType::FixedSizeBinary(size)) => { let arrow_col = arrow_col @@ -378,13 +372,7 @@ impl Column { .expect( "fail to read `Binary` from arrow: array should be `FixedSizeBinaryArray`", ); - let offsets = (0..arrow_col.len() as u64 + 1) - .map(|x| x * (*size) as u64) - .collect::>(); - Column::Binary(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Binary(fixed_size_binary_array_to_binary_column(arrow_col)) } (DataType::Binary, ArrowDataType::Utf8) => { let arrow_col = arrow_col @@ -393,16 +381,7 @@ impl Column { .expect( "fail to read `Binary` from arrow: array should be `Utf8Array`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Binary(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Binary(utf8_array_to_binary_column(arrow_col)) } (DataType::Binary, ArrowDataType::LargeUtf8) => { let arrow_col = arrow_col @@ -411,10 +390,16 @@ impl Column { .expect( "fail to read `Binary` from arrow: array should be `Utf8Array`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Binary(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Binary(utf8_array_to_binary_column(arrow_col)) + } + (DataType::Binary, ArrowDataType::BinaryView) => { + let arrow_col = arrow_col + .as_any() + .downcast_ref::() + .expect( + "fail to read `String` from arrow: array should be `BinaryViewArray`", + ); + Column::Binary(BinaryColumn::new(arrow_col.clone())) } (DataType::String, ArrowDataType::Binary) => { let arrow_col = arrow_col @@ -423,14 +408,8 @@ impl Column { .expect( "fail to read `String` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - let column = StringColumn::new(arrow_col.values().clone(), offsets.into()); - Column::String(column) + let binary_col = binary_array_to_binary_column(arrow_col); + Column::String(StringColumn::try_from(binary_col)?) } (DataType::String, ArrowDataType::LargeBinary) => { let arrow_col = arrow_col @@ -439,24 +418,18 @@ impl Column { .expect( "fail to read `String` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - let column = StringColumn::new(arrow_col.values().clone(), offsets); - Column::String(column) + let binary_col = binary_array_to_binary_column(arrow_col); + Column::String(StringColumn::try_from(binary_col)?) } - (DataType::String, ArrowDataType::FixedSizeBinary(size)) => { + (DataType::String, ArrowDataType::FixedSizeBinary(_)) => { let arrow_col = arrow_col .as_any() .downcast_ref::() .expect( "fail to read `String` from arrow: array should be `FixedSizeBinaryArray`", ); - let offsets = (0..arrow_col.len() as u64 + 1) - .map(|x| x * (*size) as u64) - .collect::>(); - let column = StringColumn::new(arrow_col.values().clone(), offsets.into()); - Column::String(column) + let binary_col = fixed_size_binary_array_to_binary_column(arrow_col); + Column::String(StringColumn::try_from(binary_col)?) } (DataType::String, ArrowDataType::Utf8) => { let arrow_col = arrow_col @@ -465,18 +438,8 @@ impl Column { .expect( "fail to read `String` from arrow: array should be `Utf8Array`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - unsafe { - Column::String(StringColumn::new_unchecked( - arrow_col.values().clone(), - offsets.into(), - )) - } + let binary_col = utf8_array_to_binary_column(arrow_col); + Column::String(StringColumn::try_from(binary_col)?) } (DataType::String, ArrowDataType::LargeUtf8) => { let arrow_col = arrow_col @@ -485,15 +448,18 @@ impl Column { .expect( "fail to read `String` from arrow: array should be `Utf8Array`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - unsafe { - Column::String(StringColumn::new_unchecked( - arrow_col.values().clone(), - offsets, - )) - } + let binary_col = utf8_array_to_binary_column(arrow_col); + Column::String(StringColumn::try_from(binary_col)?) + } + (DataType::String, ArrowDataType::Utf8View) => { + let arrow_col = arrow_col + .as_any() + .downcast_ref::() + .expect( + "fail to read `String` from arrow: array should be `Utf8ViewArray`", + ); + let binary_col = BinaryColumn::new(arrow_col.to_binview()); + Column::String(StringColumn::try_from(binary_col)?) } (DataType::Timestamp, ArrowDataType::Timestamp(uint, _)) => { let values = arrow_col @@ -534,32 +500,14 @@ impl Column { .as_any() .downcast_ref::>() .expect("fail to read from arrow: array should be `BinaryArray`"); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Variant(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Variant(binary_array_to_binary_column(arrow_col)) } (DataType::Variant, ArrowDataType::Binary) => { let arrow_col = arrow_col .as_any() .downcast_ref::>() .expect("fail to read from arrow: array should be `BinaryArray`"); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Variant(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Variant(binary_array_to_binary_column(arrow_col)) } ( DataType::Variant, @@ -571,10 +519,19 @@ impl Column { .expect( "fail to read `Variant` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Variant(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Variant(binary_array_to_binary_column(arrow_col)) + } + ( + DataType::Variant, + ArrowDataType::Extension(name, box ArrowDataType::BinaryView, None), + ) if name == ARROW_EXT_TYPE_VARIANT => { + let arrow_col = arrow_col + .as_any() + .downcast_ref::() + .expect( + "fail to read `Variant` from arrow: array should be `BinaryViewArray`", + ); + Column::Variant(BinaryColumn::new(arrow_col.clone())) } (DataType::Variant, ArrowDataType::LargeBinary) => { let arrow_col = arrow_col @@ -583,10 +540,7 @@ impl Column { .expect( "fail to read `Variant` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Variant(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Variant(binary_array_to_binary_column(arrow_col)) } (DataType::Array(ty), ArrowDataType::List(_)) => { let values_col = arrow_col @@ -660,16 +614,7 @@ impl Column { .expect( "fail to read `Bitmap` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Bitmap(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Bitmap(binary_array_to_binary_column(arrow_col)) } ( DataType::Bitmap, @@ -681,10 +626,7 @@ impl Column { .expect( "fail to read `Bitmap` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Bitmap(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Bitmap(binary_array_to_binary_column(arrow_col)) } (DataType::Bitmap, ArrowDataType::Binary) => { let arrow_col = arrow_col @@ -693,16 +635,7 @@ impl Column { .expect( "fail to read `Bitmap` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Bitmap(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Bitmap(binary_array_to_binary_column(arrow_col)) } (DataType::Bitmap, ArrowDataType::LargeBinary) => { let arrow_col = arrow_col @@ -711,10 +644,16 @@ impl Column { .expect( "fail to read `Bitmap` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Bitmap(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Bitmap(binary_array_to_binary_column(arrow_col)) + } + (DataType::Bitmap, ArrowDataType::BinaryView) => { + let arrow_col = arrow_col + .as_any() + .downcast_ref::() + .expect( + "fail to read `Bitmap` from arrow: array should be `BinaryViewArray`", + ); + Column::Bitmap(BinaryColumn::new(arrow_col.clone())) } ( DataType::Geometry, @@ -726,16 +665,7 @@ impl Column { .expect( "fail to read `Geometry` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Geometry(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Geometry(binary_array_to_binary_column(arrow_col)) } ( DataType::Geometry, @@ -747,10 +677,19 @@ impl Column { .expect( "fail to read `Geometry` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Geometry(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Geometry(binary_array_to_binary_column(arrow_col)) + } + ( + DataType::Geometry, + ArrowDataType::Extension(name, box ArrowDataType::BinaryView, None), + ) if name == ARROW_EXT_TYPE_GEOMETRY => { + let arrow_col = arrow_col + .as_any() + .downcast_ref::() + .expect( + "fail to read `Geometry` from arrow: array should be `BinaryViewArray`", + ); + Column::Geometry(BinaryColumn::new(arrow_col.clone())) } (DataType::Geometry, ArrowDataType::Binary) => { let arrow_col = arrow_col @@ -759,16 +698,7 @@ impl Column { .expect( "fail to read `Geometry` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col - .offsets() - .buffer() - .iter() - .map(|x| *x as u64) - .collect::>(); - Column::Geometry(BinaryColumn::new( - arrow_col.values().clone(), - offsets.into(), - )) + Column::Geometry(binary_array_to_binary_column(arrow_col)) } (DataType::Geometry, ArrowDataType::LargeBinary) => { let arrow_col = arrow_col @@ -777,10 +707,7 @@ impl Column { .expect( "fail to read `Geometry` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Geometry(BinaryColumn::new(arrow_col.values().clone(), offsets)) + Column::Geometry(binary_array_to_binary_column(arrow_col)) } (DataType::Geography, ArrowDataType::LargeBinary) => { let arrow_col = arrow_col @@ -789,13 +716,16 @@ impl Column { .expect( "fail to read `Geography` from arrow: array should be `BinaryArray`", ); - let offsets = arrow_col.offsets().clone().into_inner(); - let offsets = - unsafe { std::mem::transmute::, Buffer>(offsets) }; - Column::Geography(GeographyColumn(BinaryColumn::new( - arrow_col.values().clone(), - offsets, - ))) + Column::Geography(GeographyColumn(binary_array_to_binary_column(arrow_col))) + } + (DataType::Geography, ArrowDataType::BinaryView) => { + let arrow_col = arrow_col + .as_any() + .downcast_ref::() + .expect( + "fail to read `Geography` from arrow: array should be `BinaryViewArray`", + ); + Column::Geography(GeographyColumn(BinaryColumn::new(arrow_col.clone()))) } (data_type, ArrowDataType::Extension(_, arrow_type, _)) => { from_arrow_with_arrow_type(arrow_col, arrow_type, data_type)? @@ -820,3 +750,30 @@ impl Column { from_arrow_with_arrow_type(arrow_col, arrow_col.data_type(), data_type) } } + +fn binary_array_to_binary_column(array: &BinaryArray) -> BinaryColumn { + let mut builder = BinaryColumnBuilder::with_capacity(array.len(), array.values().len()); + for value in array.values_iter() { + builder.put_slice(value); + builder.commit_row(); + } + builder.build() +} + +fn utf8_array_to_binary_column(array: &Utf8Array) -> BinaryColumn { + let mut builder = BinaryColumnBuilder::with_capacity(array.len(), array.values().len()); + for value in array.values_iter() { + builder.put_str(value); + builder.commit_row(); + } + builder.build() +} + +fn fixed_size_binary_array_to_binary_column(array: &FixedSizeBinaryArray) -> BinaryColumn { + let mut builder = BinaryColumnBuilder::with_capacity(array.len(), array.values().len()); + for value in array.values_iter() { + builder.put_slice(value); + builder.commit_row(); + } + builder.build() +} diff --git a/src/query/expression/src/converts/arrow2/to.rs b/src/query/expression/src/converts/arrow2/to.rs index 5d01b76fe873e..e154406aae576 100644 --- a/src/query/expression/src/converts/arrow2/to.rs +++ b/src/query/expression/src/converts/arrow2/to.rs @@ -92,8 +92,8 @@ fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType { None, ), TableDataType::Boolean => ArrowDataType::Boolean, - TableDataType::Binary => ArrowDataType::LargeBinary, - TableDataType::String => ArrowDataType::LargeUtf8, + TableDataType::Binary => ArrowDataType::BinaryView, + TableDataType::String => ArrowDataType::Utf8View, TableDataType::Number(ty) => with_number_type!(|TYPE| match ty { NumberDataType::TYPE => ArrowDataType::TYPE, }), @@ -135,7 +135,7 @@ fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType { } TableDataType::Bitmap => ArrowDataType::Extension( ARROW_EXT_TYPE_BITMAP.to_string(), - Box::new(ArrowDataType::LargeBinary), + Box::new(ArrowDataType::BinaryView), None, ), TableDataType::Tuple { @@ -157,17 +157,17 @@ fn table_type_to_arrow_type(ty: &TableDataType) -> ArrowDataType { } TableDataType::Variant => ArrowDataType::Extension( ARROW_EXT_TYPE_VARIANT.to_string(), - Box::new(ArrowDataType::LargeBinary), + Box::new(ArrowDataType::BinaryView), None, ), TableDataType::Geometry => ArrowDataType::Extension( ARROW_EXT_TYPE_GEOMETRY.to_string(), - Box::new(ArrowDataType::LargeBinary), + Box::new(ArrowDataType::BinaryView), None, ), TableDataType::Geography => ArrowDataType::Extension( ARROW_EXT_TYPE_GEOGRAPHY.to_string(), - Box::new(ArrowDataType::LargeBinary), + Box::new(ArrowDataType::BinaryView), None, ), } @@ -304,32 +304,10 @@ impl Column { ) .unwrap(), ), - Column::Binary(col) => { - let offsets: Buffer = - col.offsets().iter().map(|offset| *offset as i64).collect(); - Box::new( - databend_common_arrow::arrow::array::BinaryArray::::try_new( - arrow_type, - unsafe { OffsetsBuffer::new_unchecked(offsets) }, - col.data().clone(), - None, - ) - .unwrap(), - ) - } - Column::String(col) => { - let offsets: Buffer = - col.offsets().iter().map(|offset| *offset as i64).collect(); - Box::new( - databend_common_arrow::arrow::array::Utf8Array::::try_new( - arrow_type, - unsafe { OffsetsBuffer::new_unchecked(offsets) }, - col.data().clone(), - None, - ) - .unwrap(), - ) - } + Column::Binary(col) => Box::new(col.clone().into_inner()), + Column::String(col) => unsafe { + Box::new(col.clone().into_inner().to_utf8view_unchecked()) + }, Column::Timestamp(col) => Box::new( databend_common_arrow::arrow::array::PrimitiveArray::::try_new( arrow_type, @@ -401,19 +379,7 @@ impl Column { Column::Bitmap(col) | Column::Variant(col) | Column::Geometry(col) - | Column::Geography(GeographyColumn(col)) => { - let offsets: Buffer = - col.offsets().iter().map(|offset| *offset as i64).collect(); - Box::new( - databend_common_arrow::arrow::array::BinaryArray::::try_new( - arrow_type, - unsafe { OffsetsBuffer::new_unchecked(offsets) }, - col.data().clone(), - None, - ) - .unwrap(), - ) - } + | Column::Geography(GeographyColumn(col)) => Box::new(col.clone().into_inner()), } } } diff --git a/src/query/expression/src/filter/select_value/select_column_scalar.rs b/src/query/expression/src/filter/select_value/select_column_scalar.rs index 3d36884cde645..d85e44427ca30 100644 --- a/src/query/expression/src/filter/select_value/select_column_scalar.rs +++ b/src/query/expression/src/filter/select_value/select_column_scalar.rs @@ -239,48 +239,19 @@ impl<'a> Selector<'a> { Some(validity) => { // search the whole string buffer if let LikePattern::SurroundByPercent(searcher) = like_pattern { - let needle = searcher.needle(); - let needle_byte_len = needle.len(); - let data = column.data().as_slice(); - let offsets = column.offsets().as_slice(); - let mut idx = 0; - let mut pos = (*offsets.first().unwrap()) as usize; - let end = (*offsets.last().unwrap()) as usize; - - while pos < end && idx < count { - if let Some(p) = searcher.search(&data[pos..end]) { - while offsets[idx + 1] as usize <= pos + p { - let ret = NOT && validity.get_bit_unchecked(idx); - update_index( - ret, - idx as u32, - true_selection, - false_selection, - ); - idx += 1; - } - - // check if the substring is in bound - let ret = - pos + p + needle_byte_len <= offsets[idx + 1] as usize; - - let ret = if NOT { - validity.get_bit_unchecked(idx) && !ret - } else { - validity.get_bit_unchecked(idx) && ret - }; - update_index(ret, idx as u32, true_selection, false_selection); - - pos = offsets[idx + 1] as usize; - idx += 1; + for idx in 0u32..count as u32 { + let ret = if NOT { + validity.get_bit_unchecked(idx as usize) + && !searcher + .search(column.index_unchecked_bytes(idx as usize)) + .is_some() } else { - break; - } - } - while idx < count { - let ret = NOT && validity.get_bit_unchecked(idx); - update_index(ret, idx as u32, true_selection, false_selection); - idx += 1; + validity.get_bit_unchecked(idx as usize) + && searcher + .search(column.index_unchecked_bytes(idx as usize)) + .is_some() + }; + update_index(ret, idx, true_selection, false_selection); } } else { for idx in 0u32..count as u32 { @@ -300,40 +271,17 @@ impl<'a> Selector<'a> { None => { // search the whole string buffer if let LikePattern::SurroundByPercent(searcher) = like_pattern { - let needle = searcher.needle(); - let needle_byte_len = needle.len(); - let data = column.data().as_slice(); - let offsets = column.offsets().as_slice(); - let mut idx = 0; - let mut pos = (*offsets.first().unwrap()) as usize; - let end = (*offsets.last().unwrap()) as usize; - - while pos < end && idx < count { - if let Some(p) = searcher.search(&data[pos..end]) { - while offsets[idx + 1] as usize <= pos + p { - update_index( - NOT, - idx as u32, - true_selection, - false_selection, - ); - idx += 1; - } - // check if the substring is in bound - let ret = - pos + p + needle_byte_len <= offsets[idx + 1] as usize; - let ret = if NOT { !ret } else { ret }; - update_index(ret, idx as u32, true_selection, false_selection); - - pos = offsets[idx + 1] as usize; - idx += 1; + for idx in 0u32..count as u32 { + let ret = if NOT { + !searcher + .search(column.index_unchecked_bytes(idx as usize)) + .is_some() } else { - break; - } - } - while idx < count { - update_index(NOT, idx as u32, true_selection, false_selection); - idx += 1; + searcher + .search(column.index_unchecked_bytes(idx as usize)) + .is_some() + }; + update_index(ret, idx, true_selection, false_selection); } } else { for idx in 0u32..count as u32 { diff --git a/src/query/expression/src/kernels/concat.rs b/src/query/expression/src/kernels/concat.rs index 957fb93e6180d..39035c5a02cf3 100644 --- a/src/query/expression/src/kernels/concat.rs +++ b/src/query/expression/src/kernels/concat.rs @@ -28,6 +28,7 @@ use crate::kernels::utils::set_vec_len_by_ptr; use crate::store_advance_aligned; use crate::types::array::ArrayColumnBuilder; use crate::types::binary::BinaryColumn; +use crate::types::binary::BinaryColumnBuilder; use crate::types::decimal::DecimalColumn; use crate::types::geography::GeographyColumn; use crate::types::geometry::GeometryType; @@ -387,37 +388,11 @@ impl Column { cols: impl Iterator + Clone, num_rows: usize, ) -> BinaryColumn { - // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, - // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let mut offsets: Vec = Vec::with_capacity(num_rows + 1); - let mut data_size = 0; - - // Build [`offset`] and calculate `data_size` required by [`data`]. - offsets.push(0); - for col in cols.clone() { - let mut start = col.offsets()[0]; - for end in col.offsets()[1..].iter() { - data_size += end - start; - start = *end; - offsets.push(data_size); - } - } - - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); - - unsafe { - for col in cols { - let offsets = col.offsets(); - let col_data = &(col.data().as_slice()) - [offsets[0] as usize..offsets[offsets.len() - 1] as usize]; - copy_advance_aligned(col_data.as_ptr(), &mut data_ptr, col_data.len()); - } - set_vec_len_by_ptr(&mut data, data_ptr); + let mut builder = BinaryColumnBuilder::with_capacity(num_rows, 0); + for col in cols { + builder.append_column(&col); } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } pub fn concat_string_types( diff --git a/src/query/expression/src/kernels/filter.rs b/src/query/expression/src/kernels/filter.rs index 1b730323a24fc..aee6644801232 100644 --- a/src/query/expression/src/kernels/filter.rs +++ b/src/query/expression/src/kernels/filter.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use binary::BinaryColumnBuilder; use databend_common_arrow::arrow::bitmap::utils::BitChunkIterExact; use databend_common_arrow::arrow::bitmap::utils::BitChunksExact; use databend_common_arrow::arrow::bitmap::Bitmap; @@ -519,124 +520,15 @@ impl<'a> FilterVisitor<'a> { } fn filter_binary_types(&mut self, values: &BinaryColumn) -> BinaryColumn { - // Each element of `items` is (string pointer(u64), string length). - let mut items: Vec<(u64, usize)> = Vec::with_capacity(self.num_rows); - // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, - // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let values_offset = values.offsets().as_slice(); - let values_data_ptr = values.data().as_slice().as_ptr(); - let mut offsets: Vec = Vec::with_capacity(self.num_rows + 1); - let mut offsets_ptr = offsets.as_mut_ptr(); - let mut items_ptr = items.as_mut_ptr(); - let mut data_size = 0; - - // Build [`offset`] and calculate `data_size` required by [`data`]. - unsafe { - store_advance_aligned::(0, &mut offsets_ptr); - let mut idx = 0; - let (mut slice, offset, mut length) = self.filter.as_slice(); - if offset > 0 { - let mut mask = slice[0]; - while mask != 0 { - let n = mask.trailing_zeros() as usize; - // If `offset` > 0, the valid bits of this byte start at `offset`, we also - // need to ensure that we cannot iterate more than `length` bits. - if n >= offset && n < offset + length { - let start = *values_offset.get_unchecked(n - offset) as usize; - let len = *values_offset.get_unchecked(n - offset + 1) as usize - start; - data_size += len as u64; - store_advance_aligned(data_size, &mut offsets_ptr); - store_advance_aligned( - (values_data_ptr.add(start) as u64, len), - &mut items_ptr, - ); - } - mask = mask & (mask - 1); + let mut builder = BinaryColumnBuilder::with_capacity(self.num_rows, 0); + for i in 0..self.num_rows { + if self.filter.get_bit(i) { + unsafe { + builder.put_slice(values.index_unchecked(i)); + builder.commit_row(); } - let bits_to_align = 8 - offset; - length = if length >= bits_to_align { - length - bits_to_align - } else { - 0 - }; - slice = &slice[1..]; - idx += bits_to_align; } - - const CHUNK_SIZE: usize = 64; - let mut mask_chunks = BitChunksExact::::new(slice, length); - let mut continuous_selected = 0; - for mut mask in mask_chunks.by_ref() { - if mask == u64::MAX { - continuous_selected += CHUNK_SIZE; - } else { - if continuous_selected > 0 { - let start = *values_offset.get_unchecked(idx) as usize; - let len = *values_offset.get_unchecked(idx + continuous_selected) as usize - - start; - store_advance_aligned( - (values_data_ptr.add(start) as u64, len), - &mut items_ptr, - ); - for i in 0..continuous_selected { - data_size += *values_offset.get_unchecked(idx + i + 1) - - *values_offset.get_unchecked(idx + i); - store_advance_aligned(data_size, &mut offsets_ptr); - } - idx += continuous_selected; - continuous_selected = 0; - } - while mask != 0 { - let n = mask.trailing_zeros() as usize; - let start = *values_offset.get_unchecked(idx + n) as usize; - let len = *values_offset.get_unchecked(idx + n + 1) as usize - start; - data_size += len as u64; - store_advance_aligned( - (values_data_ptr.add(start) as u64, len), - &mut items_ptr, - ); - store_advance_aligned(data_size, &mut offsets_ptr); - mask = mask & (mask - 1); - } - idx += CHUNK_SIZE; - } - } - if continuous_selected > 0 { - let start = *values_offset.get_unchecked(idx) as usize; - let len = *values_offset.get_unchecked(idx + continuous_selected) as usize - start; - store_advance_aligned((values_data_ptr.add(start) as u64, len), &mut items_ptr); - for i in 0..continuous_selected { - data_size += *values_offset.get_unchecked(idx + i + 1) - - *values_offset.get_unchecked(idx + i); - store_advance_aligned(data_size, &mut offsets_ptr); - } - idx += continuous_selected; - } - - for (i, is_selected) in mask_chunks.remainder_iter().enumerate() { - if is_selected { - let start = *values_offset.get_unchecked(idx + i) as usize; - let len = *values_offset.get_unchecked(idx + i + 1) as usize - start; - data_size += len as u64; - store_advance_aligned((values_data_ptr.add(start) as u64, len), &mut items_ptr); - store_advance_aligned(data_size, &mut offsets_ptr); - } - } - set_vec_len_by_ptr(&mut items, items_ptr); - set_vec_len_by_ptr(&mut offsets, offsets_ptr); } - - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); - - unsafe { - for (str_ptr, len) in items.iter() { - copy_advance_aligned(*str_ptr as *const u8, &mut data_ptr, *len); - } - set_vec_len_by_ptr(&mut data, data_ptr); - } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } } diff --git a/src/query/expression/src/kernels/group_by_hash/method.rs b/src/query/expression/src/kernels/group_by_hash/method.rs index 0485c07ab17a0..17f17e64a40e4 100644 --- a/src/query/expression/src/kernels/group_by_hash/method.rs +++ b/src/query/expression/src/kernels/group_by_hash/method.rs @@ -67,14 +67,11 @@ pub trait KeyAccessor { pub trait HashMethod: Clone + Sync + Send + 'static { type HashKey: ?Sized + Eq + FastHash + Debug; - type HashKeyIter<'a>: Iterator + TrustedLen - where Self: 'a; - fn name(&self) -> String; fn build_keys_state(&self, group_columns: InputColumns, rows: usize) -> Result; - fn build_keys_iter<'a>(&self, keys_state: &'a KeysState) -> Result>; + fn for_each_keys(&self, keys_state: &KeysState, f: &mut dyn FnMut(&Self::HashKey)); fn build_keys_accessor( &self, diff --git a/src/query/expression/src/kernels/group_by_hash/method_dict_serializer.rs b/src/query/expression/src/kernels/group_by_hash/method_dict_serializer.rs index a2ce1d4c57e97..35b6f314abad3 100644 --- a/src/query/expression/src/kernels/group_by_hash/method_dict_serializer.rs +++ b/src/query/expression/src/kernels/group_by_hash/method_dict_serializer.rs @@ -32,7 +32,6 @@ pub struct HashMethodDictionarySerializer { impl HashMethod for HashMethodDictionarySerializer { type HashKey = DictionaryKeys; - type HashKeyIter<'a> = std::slice::Iter<'a, DictionaryKeys>; fn name(&self) -> String { "DictionarySerializer".to_string() @@ -91,9 +90,11 @@ impl HashMethod for HashMethodDictionarySerializer { }) } - fn build_keys_iter<'a>(&self, keys_state: &'a KeysState) -> Result> { + fn for_each_keys(&self, keys_state: &KeysState, f: &mut dyn FnMut(&Self::HashKey)) { match keys_state { - KeysState::Dictionary { dictionaries, .. } => Ok(dictionaries.iter()), + KeysState::Dictionary { dictionaries, .. } => { + dictionaries.iter().for_each(f); + } _ => unreachable!(), } } diff --git a/src/query/expression/src/kernels/group_by_hash/method_fixed_keys.rs b/src/query/expression/src/kernels/group_by_hash/method_fixed_keys.rs index a50ca23a9746c..68971bede0133 100644 --- a/src/query/expression/src/kernels/group_by_hash/method_fixed_keys.rs +++ b/src/query/expression/src/kernels/group_by_hash/method_fixed_keys.rs @@ -229,7 +229,6 @@ macro_rules! impl_hash_method_fixed_keys { ($dt: ident, $ty:ty, $signed_ty: ty) => { impl HashMethod for HashMethodFixedKeys<$ty> { type HashKey = $ty; - type HashKeyIter<'a> = std::slice::Iter<'a, $ty>; fn name(&self) -> String { format!("FixedKeys{}", std::mem::size_of::()) @@ -259,15 +258,12 @@ macro_rules! impl_hash_method_fixed_keys { let col = Buffer::<$ty>::from(keys); Ok(KeysState::Column(NumberType::<$ty>::upcast_column(col))) } - - #[inline] - fn build_keys_iter<'a>( - &self, - key_state: &'a KeysState, - ) -> Result> { + fn for_each_keys(&self, keys_state: &KeysState, f: &mut dyn FnMut(&Self::HashKey)) { use crate::types::ArgType; - match key_state { - KeysState::Column(Column::Number(NumberColumn::$dt(col))) => Ok(col.iter()), + match keys_state { + KeysState::Column(Column::Number(NumberColumn::$dt(col))) => { + col.iter().for_each(f); + } other => unreachable!("{:?} -> {}", other, NumberType::<$ty>::data_type()), } } @@ -308,8 +304,6 @@ macro_rules! impl_hash_method_fixed_large_keys { impl HashMethod for HashMethodFixedKeys<$ty> { type HashKey = $ty; - type HashKeyIter<'a> = std::slice::Iter<'a, $ty>; - fn name(&self) -> String { format!("FixedKeys{}", std::mem::size_of::()) } @@ -340,12 +334,9 @@ macro_rules! impl_hash_method_fixed_large_keys { Ok(KeysState::$name(keys.into())) } - fn build_keys_iter<'a>( - &self, - key_state: &'a KeysState, - ) -> Result> { - match key_state { - KeysState::$name(v) => Ok(v.iter()), + fn for_each_keys(&self, keys_state: &KeysState, f: &mut dyn FnMut(&Self::HashKey)) { + match keys_state { + KeysState::$name(v) => v.iter().for_each(f), _ => unreachable!(), } } diff --git a/src/query/expression/src/kernels/group_by_hash/method_serializer.rs b/src/query/expression/src/kernels/group_by_hash/method_serializer.rs index 4cb277bf416c8..37cdad18f02f7 100644 --- a/src/query/expression/src/kernels/group_by_hash/method_serializer.rs +++ b/src/query/expression/src/kernels/group_by_hash/method_serializer.rs @@ -30,8 +30,6 @@ pub struct HashMethodSerializer {} impl HashMethod for HashMethodSerializer { type HashKey = [u8]; - type HashKeyIter<'a> = BinaryIterator<'a>; - fn name(&self) -> String { "Serializer".to_string() } @@ -49,9 +47,11 @@ impl HashMethod for HashMethodSerializer { )))) } - fn build_keys_iter<'a>(&self, key_state: &'a KeysState) -> Result> { - match key_state { - KeysState::Column(Column::Binary(col)) => Ok(col.iter()), + fn for_each_keys(&self, keys_state: &KeysState, f: &mut dyn FnMut(&Self::HashKey)) { + match keys_state { + KeysState::Column(Column::Binary(col)) => { + col.iter().for_each(|v| f(v)); + } _ => unreachable!(), } } @@ -62,8 +62,8 @@ impl HashMethod for HashMethodSerializer { ) -> Result>> { match keys_state { KeysState::Column(Column::Binary(col)) => { - let (data, offsets) = col.into_buffer(); - Ok(Box::new(BinaryKeyAccessor::new(data, offsets))) + let data = col.into_inner(); + Ok(Box::new(BinaryKeyAccessor::new(data))) } _ => unreachable!(), } diff --git a/src/query/expression/src/kernels/group_by_hash/method_single_string.rs b/src/query/expression/src/kernels/group_by_hash/method_single_string.rs index 3c77a7bd58afa..d41a159f51324 100644 --- a/src/query/expression/src/kernels/group_by_hash/method_single_string.rs +++ b/src/query/expression/src/kernels/group_by_hash/method_single_string.rs @@ -12,11 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +use databend_common_arrow::arrow::array::BinaryViewArray; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::Result; use databend_common_hashtable::hash_join_fast_string_hash; use crate::types::binary::BinaryIterator; +use crate::types::BinaryColumn; use crate::Column; use crate::HashMethod; use crate::InputColumns; @@ -29,8 +31,6 @@ pub struct HashMethodSingleBinary {} impl HashMethod for HashMethodSingleBinary { type HashKey = [u8]; - type HashKeyIter<'a> = BinaryIterator<'a>; - fn name(&self) -> String { "SingleBinary".to_string() } @@ -39,12 +39,16 @@ impl HashMethod for HashMethodSingleBinary { Ok(KeysState::Column(group_columns[0].clone())) } - fn build_keys_iter<'a>(&self, keys_state: &'a KeysState) -> Result> { + fn for_each_keys(&self, keys_state: &KeysState, f: &mut dyn FnMut(&Self::HashKey)) { match keys_state { KeysState::Column(Column::Binary(col)) | KeysState::Column(Column::Variant(col)) - | KeysState::Column(Column::Bitmap(col)) => Ok(col.iter()), - KeysState::Column(Column::String(col)) => Ok(col.iter_binary()), + | KeysState::Column(Column::Bitmap(col)) => { + col.iter().for_each(|v| f(v)); + } + KeysState::Column(Column::String(col)) => { + BinaryColumn::from(col.clone()).iter().for_each(|v| f(v)); + } _ => unreachable!(), } } @@ -57,12 +61,12 @@ impl HashMethod for HashMethodSingleBinary { KeysState::Column(Column::Binary(col)) | KeysState::Column(Column::Variant(col)) | KeysState::Column(Column::Bitmap(col)) => { - let (data, offsets) = col.into_buffer(); - Ok(Box::new(BinaryKeyAccessor::new(data, offsets))) + let data = col.into_inner(); + Ok(Box::new(BinaryKeyAccessor::new(data))) } KeysState::Column(Column::String(col)) => { - let (data, offsets) = col.into_buffer(); - Ok(Box::new(BinaryKeyAccessor::new(data, offsets))) + let data = col.into_inner(); + Ok(Box::new(BinaryKeyAccessor::new(data))) } _ => unreachable!(), } @@ -76,7 +80,11 @@ impl HashMethod for HashMethodSingleBinary { hashes.extend(col.iter().map(hash_join_fast_string_hash)); } KeysState::Column(Column::String(col)) => { - hashes.extend(col.iter_binary().map(hash_join_fast_string_hash)); + hashes.extend( + BinaryColumn::from(col.clone()) + .iter() + .map(hash_join_fast_string_hash), + ); } _ => unreachable!(), } @@ -84,13 +92,12 @@ impl HashMethod for HashMethodSingleBinary { } pub struct BinaryKeyAccessor { - data: Buffer, - offsets: Buffer, + data: BinaryViewArray, } impl BinaryKeyAccessor { - pub fn new(data: Buffer, offsets: Buffer) -> Self { - Self { data, offsets } + pub fn new(data: BinaryViewArray) -> Self { + Self { data } } } @@ -100,9 +107,7 @@ impl KeyAccessor for BinaryKeyAccessor { /// # Safety /// Calling this method with an out-of-bounds index is *[undefined behavior]*. unsafe fn key_unchecked(&self, index: usize) -> &Self::Key { - debug_assert!(index + 1 < self.offsets.len()); - - &self.data[*self.offsets.get_unchecked(index) as usize - ..*self.offsets.get_unchecked(index + 1) as usize] + debug_assert!(index < self.data.len()); + self.data.value_unchecked(index) } } diff --git a/src/query/expression/src/kernels/group_by_hash/utils.rs b/src/query/expression/src/kernels/group_by_hash/utils.rs index 7bd7f6240614e..85bcb54d6ae49 100644 --- a/src/query/expression/src/kernels/group_by_hash/utils.rs +++ b/src/query/expression/src/kernels/group_by_hash/utils.rs @@ -19,6 +19,7 @@ use crate::kernels::utils::set_vec_len_by_ptr; use crate::kernels::utils::store_advance; use crate::kernels::utils::store_advance_aligned; use crate::types::binary::BinaryColumn; +use crate::types::binary::BinaryColumnBuilder; use crate::types::decimal::DecimalColumn; use crate::types::NumberColumn; use crate::with_decimal_mapped_type; @@ -32,29 +33,15 @@ pub fn serialize_group_columns( num_rows: usize, serialize_size: usize, ) -> BinaryColumn { - // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, - // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let mut data: Vec = Vec::with_capacity(serialize_size); - let mut offsets: Vec = Vec::with_capacity(num_rows + 1); - let mut data_ptr = data.as_mut_ptr(); - let mut offsets_ptr = offsets.as_mut_ptr(); - let mut offset = 0; - - unsafe { - store_advance_aligned::(0, &mut offsets_ptr); - for i in 0..num_rows { - let old_ptr = data_ptr; - for col in columns.iter() { - serialize_column_binary(col, i, &mut data_ptr); + let mut builder = BinaryColumnBuilder::with_capacity(num_rows, serialize_size); + for i in 0..num_rows { + for col in columns.iter() { + unsafe { + serialize_column_binary(col, i, &mut builder.data.as_mut_ptr()); } - offset += data_ptr as u64 - old_ptr as u64; - store_advance_aligned::(offset, &mut offsets_ptr); } - set_vec_len_by_ptr(&mut data, data_ptr); - set_vec_len_by_ptr(&mut offsets, offsets_ptr); } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } /// This function must be consistent with the `push_binary` function of `src/query/expression/src/values.rs`. diff --git a/src/query/expression/src/kernels/take.rs b/src/query/expression/src/kernels/take.rs index 84d120b0390cb..c68870ed5e7fe 100644 --- a/src/query/expression/src/kernels/take.rs +++ b/src/query/expression/src/kernels/take.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use binary::BinaryColumnBuilder; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::Result; @@ -243,50 +244,13 @@ where I: databend_common_arrow::arrow::types::Index fn take_binary_types(&mut self, col: &BinaryColumn) -> BinaryColumn { let num_rows = self.indices.len(); - - // Each element of `items` is (string pointer(u64), string length), if `string_items_buf` - // can be reused, we will not re-allocate memory. - let mut items: Option> = match &self.string_items_buf { - Some(string_items_buf) if string_items_buf.capacity() >= num_rows => None, - _ => Some(Vec::with_capacity(num_rows)), - }; - let items = match items.is_some() { - true => items.as_mut().unwrap(), - false => self.string_items_buf.as_mut().unwrap(), - }; - - // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, - // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let col_offset = col.offsets().as_slice(); - let col_data_ptr = col.data().as_slice().as_ptr(); - let mut offsets: Vec = Vec::with_capacity(num_rows + 1); - let mut data_size = 0; - - // Build [`offset`] and calculate `data_size` required by [`data`]. - unsafe { - items.set_len(num_rows); - offsets.set_len(num_rows + 1); - *offsets.get_unchecked_mut(0) = 0; - for (i, index) in self.indices.iter().enumerate() { - let start = *col_offset.get_unchecked(index.to_usize()) as usize; - let len = *col_offset.get_unchecked(index.to_usize() + 1) as usize - start; - data_size += len as u64; - *items.get_unchecked_mut(i) = (col_data_ptr.add(start) as u64, len); - *offsets.get_unchecked_mut(i + 1) = data_size; - } - } - - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); - - unsafe { - for (str_ptr, len) in items.iter() { - copy_advance_aligned(*str_ptr as *const u8, &mut data_ptr, *len); + let mut builder = BinaryColumnBuilder::with_capacity(num_rows, 0); + for index in self.indices.iter() { + unsafe { + builder.put_slice(col.index_unchecked(index.to_usize())); + builder.commit_row(); } - set_vec_len_by_ptr(&mut data, data_ptr); } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } } diff --git a/src/query/expression/src/kernels/take_chunks.rs b/src/query/expression/src/kernels/take_chunks.rs index 34b0d2598fb9f..bc4fb363ff45f 100644 --- a/src/query/expression/src/kernels/take_chunks.rs +++ b/src/query/expression/src/kernels/take_chunks.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use binary::BinaryColumnBuilder; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_arrow::arrow::compute::merge_sort::MergeSlice; @@ -789,50 +790,16 @@ impl Column { indices: &[RowPtr], binary_items_buf: Option<&mut Vec<(u64, usize)>>, ) -> BinaryColumn { - let num_rows = indices.len(); - - // Each element of `items` is (string pointer(u64), string length), if `binary_items_buf` - // can be reused, we will not re-allocate memory. - let mut items: Option> = match &binary_items_buf { - Some(binary_items_buf) if binary_items_buf.capacity() >= num_rows => None, - _ => Some(Vec::with_capacity(num_rows)), - }; - let items = match items.is_some() { - true => items.as_mut().unwrap(), - false => binary_items_buf.unwrap(), - }; - - // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, - // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let mut offsets: Vec = Vec::with_capacity(num_rows + 1); - let mut data_size = 0; - - // Build [`offset`] and calculate `data_size` required by [`data`]. - unsafe { - items.set_len(num_rows); - offsets.set_len(num_rows + 1); - *offsets.get_unchecked_mut(0) = 0; - for (i, row_ptr) in indices.iter().enumerate() { - let item = - col[row_ptr.chunk_index as usize].index_unchecked(row_ptr.row_index as usize); - data_size += item.len() as u64; - *items.get_unchecked_mut(i) = (item.as_ptr() as u64, item.len()); - *offsets.get_unchecked_mut(i + 1) = data_size; - } - } - - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); - - unsafe { - for (str_ptr, len) in items.iter() { - copy_advance_aligned(*str_ptr as *const u8, &mut data_ptr, *len); + let mut builder = BinaryColumnBuilder::with_capacity(indices.len(), 0); + for row_ptr in indices { + unsafe { + builder.put_slice( + col[row_ptr.chunk_index as usize].index_unchecked(row_ptr.row_index as usize), + ); + builder.commit_row(); } - set_vec_len_by_ptr(&mut data, data_ptr); } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } pub fn take_block_vec_string_types( diff --git a/src/query/expression/src/kernels/take_compact.rs b/src/query/expression/src/kernels/take_compact.rs index a2f97b8949569..0f33479f31001 100644 --- a/src/query/expression/src/kernels/take_compact.rs +++ b/src/query/expression/src/kernels/take_compact.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use binary::BinaryColumnBuilder; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::Result; @@ -218,68 +219,16 @@ impl<'a> TakeCompactVisitor<'a> { } fn take_binary_types(&mut self, col: &BinaryColumn) -> BinaryColumn { - // Each element of `items` is (string(&[u8]), repeat times). - let mut items = Vec::with_capacity(self.indices.len()); - let mut items_ptr = items.as_mut_ptr(); - - // [`BinaryColumn`] consists of [`data`] and [`offset`], we build [`data`] and [`offset`] respectively, - // and then call `BinaryColumn::new(data.into(), offsets.into())` to create [`BinaryColumn`]. - let mut offsets = Vec::with_capacity(self.num_rows + 1); - let mut offsets_ptr = offsets.as_mut_ptr(); - let mut data_size = 0; - - // Build [`offset`] and calculate `data_size` required by [`data`]. - unsafe { - store_advance_aligned::(0, &mut offsets_ptr); - for (index, cnt) in self.indices.iter() { - let item = col.index_unchecked(*index as usize); - store_advance_aligned((item, *cnt), &mut items_ptr); - for _ in 0..*cnt { - data_size += item.len() as u64; - store_advance_aligned(data_size, &mut offsets_ptr); - } - } - set_vec_len_by_ptr(&mut offsets, offsets_ptr); - set_vec_len_by_ptr(&mut items, items_ptr); - } - - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); - let mut remain; - - unsafe { - for (item, cnt) in items { - let len = item.len(); - if cnt == 1 { - copy_advance_aligned(item.as_ptr(), &mut data_ptr, len); - continue; - } - - // Using the doubling method to copy the max segment memory. - // [___________] => [x__________] => [xx_________] => [xxxx_______] => [xxxxxxxx___] - // Since cnt > 0, then 31 - cnt.leading_zeros() >= 0. - let max_bit_num = 1 << (31 - cnt.leading_zeros()); - let max_segment = max_bit_num * len; - let base_data_ptr = data_ptr; - copy_advance_aligned(item.as_ptr(), &mut data_ptr, len); - let mut cur_segment = len; - while cur_segment < max_segment { - copy_advance_aligned(base_data_ptr, &mut data_ptr, cur_segment); - cur_segment <<= 1; - } - - // Copy the remaining memory directly. - // [xxxxxxxxxx____] => [xxxxxxxxxxxxxx] - // ^^^^ ---> ^^^^ - remain = cnt as usize - max_bit_num; - if remain > 0 { - copy_advance_aligned(base_data_ptr, &mut data_ptr, remain * len); + let num_rows = self.num_rows; + let mut builder = BinaryColumnBuilder::with_capacity(num_rows, 0); + for (index, cnt) in self.indices.iter() { + for _ in 0..*cnt { + unsafe { + builder.put_slice(col.index_unchecked(*index as usize)); + builder.commit_row(); } } - set_vec_len_by_ptr(&mut data, data_ptr); } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } } diff --git a/src/query/expression/src/kernels/take_ranges.rs b/src/query/expression/src/kernels/take_ranges.rs index 71fbd0acc1af2..4a89a84a2a4f3 100644 --- a/src/query/expression/src/kernels/take_ranges.rs +++ b/src/query/expression/src/kernels/take_ranges.rs @@ -15,6 +15,7 @@ use core::ops::Range; use std::sync::Arc; +use binary::BinaryColumnBuilder; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_exception::Result; @@ -249,36 +250,14 @@ impl<'a> TakeRangeVisitor<'a> { } fn take_binary_types(&mut self, values: &BinaryColumn) -> BinaryColumn { - let mut offsets: Vec = Vec::with_capacity(self.num_rows + 1); - let mut data_size = 0; - - let value_data = values.data().as_slice(); - let values_offset = values.offsets().as_slice(); - // Build [`offset`] and calculate `data_size` required by [`data`]. - offsets.push(0); + let mut builder = BinaryColumnBuilder::with_capacity(self.num_rows, 0); for range in self.ranges { - let mut offset_start = values_offset[range.start as usize]; - for offset_end in values_offset[range.start as usize + 1..range.end as usize + 1].iter() - { - data_size += offset_end - offset_start; - offset_start = *offset_end; - offsets.push(data_size); - } - } - - // Build [`data`]. - let mut data: Vec = Vec::with_capacity(data_size as usize); - let mut data_ptr = data.as_mut_ptr(); - - unsafe { - for range in self.ranges { - let col_data = &value_data[values_offset[range.start as usize] as usize - ..values_offset[range.end as usize] as usize]; - copy_advance_aligned(col_data.as_ptr(), &mut data_ptr, col_data.len()); + for index in range.start as usize..range.end as usize { + let value = unsafe { values.index_unchecked(index) }; + builder.put_slice(value); + builder.commit_row(); } - set_vec_len_by_ptr(&mut data, data_ptr); } - - BinaryColumn::new(data.into(), offsets.into()) + builder.build() } } diff --git a/src/query/expression/src/row/row_converter.rs b/src/query/expression/src/row/row_converter.rs index 3c4d967ae5c0a..f259df0731920 100644 --- a/src/query/expression/src/row/row_converter.rs +++ b/src/query/expression/src/row/row_converter.rs @@ -84,10 +84,7 @@ impl RowConverter { encode_column(&mut builder, column, field.asc, field.nulls_first); } - let rows = builder.build(); - debug_assert_eq!(*rows.offsets().last().unwrap(), rows.data().len() as u64); - debug_assert!(rows.offsets().windows(2).all(|w| w[0] <= w[1])); - rows + builder.build() } fn new_empty_rows(&self, cols: &[Column], num_rows: usize) -> BinaryColumnBuilder { diff --git a/src/query/expression/src/types/binary.rs b/src/query/expression/src/types/binary.rs index cf2708bfc1d75..25378f6ac13e0 100644 --- a/src/query/expression/src/types/binary.rs +++ b/src/query/expression/src/types/binary.rs @@ -17,6 +17,9 @@ use std::iter::once; use std::marker::PhantomData; use std::ops::Range; +use databend_common_arrow::arrow::array::Array; +use databend_common_arrow::arrow::array::BinaryViewArray; +use databend_common_arrow::arrow::array::MutableBinaryViewArray; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_arrow::arrow::trusted_len::TrustedLen; use databend_common_exception::ErrorCode; @@ -167,7 +170,7 @@ impl ValueType for BinaryType { } fn column_memory_size(col: &Self::Column) -> usize { - col.data().len() + col.offsets().len() * 8 + col.memory_size() } #[inline(always)] @@ -190,45 +193,32 @@ impl ArgType for BinaryType { #[derive(Clone, PartialEq)] pub struct BinaryColumn { - pub(crate) data: Buffer, - pub(crate) offsets: Buffer, + pub(crate) data: BinaryViewArray, } impl BinaryColumn { - pub fn new(data: Buffer, offsets: Buffer) -> Self { - debug_assert!({ offsets.windows(2).all(|w| w[0] <= w[1]) }); - - BinaryColumn { data, offsets } + pub fn new(data: BinaryViewArray) -> Self { + BinaryColumn { data } } pub fn len(&self) -> usize { - self.offsets.len() - 1 + self.data.len() } pub fn current_buffer_len(&self) -> usize { - (*self.offsets().last().unwrap() - *self.offsets().first().unwrap()) as _ - } - - pub fn data(&self) -> &Buffer { - &self.data - } - - pub fn offsets(&self) -> &Buffer { - &self.offsets + self.data.total_bytes_len() } pub fn memory_size(&self) -> usize { - let offsets = self.offsets.as_slice(); - let len = offsets.len(); - len * 8 + (offsets[len - 1] - offsets[0]) as usize + self.data.total_buffer_len() } pub fn index(&self, index: usize) -> Option<&[u8]> { - if index + 1 < self.offsets.len() { - Some(&self.data[(self.offsets[index] as usize)..(self.offsets[index + 1] as usize)]) - } else { - None + if index >= self.len() { + return None; } + + Some(unsafe { self.index_unchecked(index) }) } /// # Safety @@ -236,93 +226,53 @@ impl BinaryColumn { /// Calling this method with an out-of-bounds index is *[undefined behavior]* #[inline] pub unsafe fn index_unchecked(&self, index: usize) -> &[u8] { - let start = *self.offsets.get_unchecked(index) as usize; - let end = *self.offsets.get_unchecked(index + 1) as usize; - self.data.get_unchecked(start..end) + debug_assert!(index < self.data.len()); + self.data.value_unchecked(index) } pub fn slice(&self, range: Range) -> Self { - let offsets = self - .offsets + let data = self + .data .clone() - .sliced(range.start, range.end - range.start + 1); - BinaryColumn { - data: self.data.clone(), - offsets, - } + .sliced(range.start, range.end - range.start); + BinaryColumn { data } } pub fn iter(&self) -> BinaryIterator { BinaryIterator { - data: &self.data, - offsets: self.offsets.windows(2), - _t: PhantomData, + col: self, + index: 0, } } - pub fn into_buffer(self) -> (Buffer, Buffer) { - (self.data, self.offsets) - } - - pub fn check_valid(&self) -> Result<()> { - let offsets = self.offsets.as_slice(); - let len = offsets.len(); - if len < 1 { - return Err(ErrorCode::Internal(format!( - "BinaryColumn offsets length must be equal or greater than 1, but got {}", - len - ))); - } - - for i in 1..len { - if offsets[i] < offsets[i - 1] { - return Err(ErrorCode::Internal(format!( - "BinaryColumn offsets value must be equal or greater than previous value, but got {}", - offsets[i] - ))); - } - } - Ok(()) - } -} - -pub type BinaryIterator<'a> = BinaryLikeIterator<'a, &'a [u8]>; - -pub trait BinaryLike<'a> { - fn from(value: &'a [u8]) -> Self; -} - -impl<'a> BinaryLike<'a> for &'a [u8] { - fn from(value: &'a [u8]) -> Self { - value + pub fn into_inner(self) -> BinaryViewArray { + self.data } } -pub struct BinaryLikeIterator<'a, T> -where T: BinaryLike<'a> -{ - pub(crate) data: &'a [u8], - pub(crate) offsets: std::slice::Windows<'a, u64>, - pub(crate) _t: PhantomData, +pub struct BinaryIterator<'a> { + col: &'a BinaryColumn, + index: usize, } -impl<'a, T: BinaryLike<'a>> Iterator for BinaryLikeIterator<'a, T> { - type Item = T; +impl<'a> Iterator for BinaryIterator<'a> { + type Item = &'a [u8]; fn next(&mut self) -> Option { - self.offsets - .next() - .map(|range| T::from(&self.data[(range[0] as usize)..(range[1] as usize)])) + let value = self.col.index(self.index)?; + self.index += 1; + Some(value) } fn size_hint(&self) -> (usize, Option) { - self.offsets.size_hint() + let remaining = self.col.len() - self.index; + (remaining, Some(remaining)) } } -unsafe impl<'a, T: BinaryLike<'a>> TrustedLen for BinaryLikeIterator<'a, T> {} +unsafe impl<'a> TrustedLen for BinaryIterator<'a> {} -unsafe impl<'a, T: BinaryLike<'a>> std::iter::TrustedLen for BinaryLikeIterator<'a, T> {} +unsafe impl<'a> std::iter::TrustedLen for BinaryIterator<'a> {} #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct BinaryColumnBuilder { @@ -344,11 +294,12 @@ impl BinaryColumnBuilder { } pub fn from_column(col: BinaryColumn) -> Self { - BinaryColumnBuilder { - need_estimated: col.data.is_empty(), - data: buffer_into_mut(col.data), - offsets: col.offsets.to_vec(), + let mut builder = BinaryColumnBuilder::with_capacity(col.len(), col.current_buffer_len()); + for item in col.iter() { + builder.put_slice(item); + builder.commit_row(); } + builder } pub fn from_data(data: Vec, offsets: Vec) -> Self { @@ -443,23 +394,25 @@ impl BinaryColumnBuilder { } pub fn append_column(&mut self, other: &BinaryColumn) { - // the first offset of other column may not be zero - let other_start = *other.offsets.first().unwrap(); - let other_last = *other.offsets.last().unwrap(); - let start = self.offsets.last().cloned().unwrap(); - self.data - .extend_from_slice(&other.data[(other_start as usize)..(other_last as usize)]); - self.offsets.extend( - other - .offsets - .iter() - .skip(1) - .map(|offset| start + offset - other_start), - ); + self.data.reserve(other.current_buffer_len()); + for item in other.iter() { + self.put_slice(item); + self.commit_row(); + } } pub fn build(self) -> BinaryColumn { - BinaryColumn::new(self.data.into(), self.offsets.into()) + let mut bulider = MutableBinaryViewArray::with_capacity(self.len()); + for (start, end) in self + .offsets + .windows(2) + .map(|w| (w[0] as usize, w[1] as usize)) + { + bulider.push_value(&self.data[start..end]); + } + BinaryColumn { + data: bulider.into(), + } } pub fn build_scalar(self) -> Vec { diff --git a/src/query/expression/src/types/bitmap.rs b/src/query/expression/src/types/bitmap.rs index 1823941ba2b73..ab411346980ab 100644 --- a/src/query/expression/src/types/bitmap.rs +++ b/src/query/expression/src/types/bitmap.rs @@ -161,7 +161,7 @@ impl ValueType for BitmapType { } fn column_memory_size(col: &Self::Column) -> usize { - col.data().len() + col.offsets().len() * 8 + col.memory_size() } #[inline(always)] diff --git a/src/query/expression/src/types/geography.rs b/src/query/expression/src/types/geography.rs index 8c0d95e92c4e0..403afa408a061 100644 --- a/src/query/expression/src/types/geography.rs +++ b/src/query/expression/src/types/geography.rs @@ -15,10 +15,13 @@ use std::cmp::Ordering; use std::fmt::Debug; use std::hash::Hash; +use std::marker::PhantomData; use std::ops::Range; use borsh::BorshDeserialize; use borsh::BorshSerialize; +use databend_common_arrow::arrow::trusted_len::TrustedLen; +use databend_common_arrow::parquet::encoding::plain_byte_array::BinaryIter; use databend_common_exception::Result; use databend_common_io::geography::*; use databend_common_io::wkb::make_point; @@ -29,8 +32,7 @@ use geozero::ToWkt; use serde::Deserialize; use serde::Serialize; -use super::binary::BinaryLike; -use super::binary::BinaryLikeIterator; +use super::binary::BinaryIterator; use crate::property::Domain; use crate::types::binary::BinaryColumn; use crate::types::binary::BinaryColumnBuilder; @@ -83,12 +85,6 @@ impl<'a> GeographyRef<'a> { } } -impl<'a> BinaryLike<'a> for GeographyRef<'a> { - fn from(value: &'a [u8]) -> Self { - GeographyRef(value) - } -} - #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct GeographyType; @@ -281,16 +277,24 @@ impl GeographyColumn { } pub fn iter(&self) -> GeographyIterator<'_> { - BinaryLikeIterator { - data: &self.0.data, - offsets: self.0.offsets.windows(2), - _t: std::marker::PhantomData, + GeographyIterator { + inner: self.0.iter(), } } +} - pub fn check_valid(&self) -> Result<()> { - self.0.check_valid() +pub struct GeographyIterator<'a> { + inner: BinaryIterator<'a>, +} + +impl<'a> Iterator for GeographyIterator<'a> { + type Item = GeographyRef<'a>; + + fn next(&mut self) -> Option { + self.inner.next().map(GeographyRef) } } -pub type GeographyIterator<'a> = BinaryLikeIterator<'a, GeographyRef<'a>>; +unsafe impl<'a> TrustedLen for GeographyIterator<'a> {} + +unsafe impl<'a> std::iter::TrustedLen for GeographyIterator<'a> {} diff --git a/src/query/expression/src/types/geometry.rs b/src/query/expression/src/types/geometry.rs index 67f1afc95ec92..d1dfe01911cc2 100644 --- a/src/query/expression/src/types/geometry.rs +++ b/src/query/expression/src/types/geometry.rs @@ -165,7 +165,7 @@ impl ValueType for GeometryType { } fn column_memory_size(col: &Self::Column) -> usize { - col.data().len() + col.offsets().len() * 8 + col.memory_size() } #[inline(always)] diff --git a/src/query/expression/src/types/string.rs b/src/query/expression/src/types/string.rs index c4fe32534c961..8aba9adafa2d8 100644 --- a/src/query/expression/src/types/string.rs +++ b/src/query/expression/src/types/string.rs @@ -16,6 +16,8 @@ use std::cmp::Ordering; use std::iter::once; use std::ops::Range; +use databend_common_arrow::arrow::array::Array; +use databend_common_arrow::arrow::array::BinaryViewArray; use databend_common_arrow::arrow::buffer::Buffer; use databend_common_arrow::arrow::trusted_len::TrustedLen; use databend_common_exception::ErrorCode; @@ -166,7 +168,7 @@ impl ValueType for StringType { } fn column_memory_size(col: &Self::Column) -> usize { - col.data().len() + col.offsets().len() * 8 + col.memory_size() } #[inline(always)] @@ -224,13 +226,12 @@ impl ArgType for StringType { #[derive(Clone, PartialEq)] pub struct StringColumn { - data: Buffer, - offsets: Buffer, + pub(crate) data: BinaryViewArray, } impl StringColumn { - pub fn new(data: Buffer, offsets: Buffer) -> Self { - let col = BinaryColumn::new(data, offsets); + pub fn new(data: BinaryViewArray) -> Self { + let col = BinaryColumn::new(data); col.check_utf8().unwrap(); @@ -241,8 +242,8 @@ impl StringColumn { /// This function is unsound iff: /// * the offsets are not monotonically increasing /// * The `data` between two consecutive `offsets` are not valid utf8 - pub unsafe fn new_unchecked(data: Buffer, offsets: Buffer) -> Self { - let col = BinaryColumn::new(data, offsets); + pub unsafe fn new_unchecked(data: BinaryViewArray) -> Self { + let col = BinaryColumn::new(data); #[cfg(debug_assertions)] col.check_utf8().unwrap(); @@ -259,44 +260,28 @@ impl StringColumn { col.check_utf8().unwrap(); StringColumn { - data: col.data, - offsets: col.offsets, + data: col.into_inner(), } } pub fn len(&self) -> usize { - self.offsets.len() - 1 + self.data.len() } pub fn current_buffer_len(&self) -> usize { - (*self.offsets().last().unwrap() - *self.offsets().first().unwrap()) as _ - } - - pub fn data(&self) -> &Buffer { - &self.data - } - - pub fn offsets(&self) -> &Buffer { - &self.offsets + self.data.total_bytes_len() } pub fn memory_size(&self) -> usize { - let offsets = self.offsets.as_slice(); - let len = offsets.len(); - len * 8 + (offsets[len - 1] - offsets[0]) as usize + self.data.total_buffer_len() } pub fn index(&self, index: usize) -> Option<&str> { - if index + 1 >= self.offsets.len() { + if index >= self.len() { return None; } - let bytes = &self.data[(self.offsets[index] as usize)..(self.offsets[index + 1] as usize)]; - - #[cfg(debug_assertions)] - bytes.check_utf8().unwrap(); - - unsafe { Some(std::str::from_utf8_unchecked(bytes)) } + Some(unsafe { self.index_unchecked(index) }) } /// # Safety @@ -304,11 +289,9 @@ impl StringColumn { /// Calling this method with an out-of-bounds index is *[undefined behavior]* #[inline] pub unsafe fn index_unchecked(&self, index: usize) -> &str { - debug_assert!(index + 1 < self.offsets.len()); + debug_assert!(index < self.data.len()); - let start = *self.offsets.get_unchecked(index) as usize; - let end = *self.offsets.get_unchecked(index + 1) as usize; - let bytes = &self.data.get_unchecked(start..end); + let bytes = self.data.value_unchecked(index); #[cfg(debug_assertions)] bytes.check_utf8().unwrap(); @@ -321,71 +304,34 @@ impl StringColumn { /// Calling this method with an out-of-bounds index is *[undefined behavior]* #[inline] pub unsafe fn index_unchecked_bytes(&self, index: usize) -> &[u8] { - debug_assert!(index + 1 < self.offsets.len()); + debug_assert!(index < self.data.len()); - let start = *self.offsets.get_unchecked(index) as usize; - let end = *self.offsets.get_unchecked(index + 1) as usize; - self.data.get_unchecked(start..end) + self.data.value_unchecked(index) } pub fn slice(&self, range: Range) -> Self { - let offsets = self - .offsets + let data = self + .data .clone() - .sliced(range.start, range.end - range.start + 1); - StringColumn { - data: self.data.clone(), - offsets, - } + .sliced(range.start, range.end - range.start); + unsafe { Self::from_binary_unchecked(BinaryColumn::new(data)) } } pub fn iter(&self) -> StringIterator { StringIterator { - data: &self.data, - offsets: self.offsets.windows(2), - } - } - - pub fn iter_binary(&self) -> BinaryIterator { - BinaryIterator { - data: &self.data, - offsets: self.offsets.windows(2), - _t: std::marker::PhantomData, + col: self, + index: 0, } } - pub fn into_buffer(self) -> (Buffer, Buffer) { - (self.data, self.offsets) - } - - pub fn check_valid(&self) -> Result<()> { - let offsets = self.offsets.as_slice(); - let len = offsets.len(); - if len < 1 { - return Err(ErrorCode::Internal(format!( - "StringColumn offsets length must be equal or greater than 1, but got {}", - len - ))); - } - - for i in 1..len { - if offsets[i] < offsets[i - 1] { - return Err(ErrorCode::Internal(format!( - "StringColumn offsets value must be equal or greater than previous value, but got {}", - offsets[i] - ))); - } - } - Ok(()) + pub fn into_inner(self) -> BinaryViewArray { + self.data } } impl From for BinaryColumn { fn from(col: StringColumn) -> BinaryColumn { - BinaryColumn { - data: col.data, - offsets: col.offsets, - } + BinaryColumn::new(col.into_inner()) } } @@ -395,34 +341,28 @@ impl TryFrom for StringColumn { fn try_from(col: BinaryColumn) -> Result { col.check_utf8()?; Ok(StringColumn { - data: col.data, - offsets: col.offsets, + data: col.into_inner(), }) } } pub struct StringIterator<'a> { - data: &'a [u8], - offsets: std::slice::Windows<'a, u64>, + col: &'a StringColumn, + index: usize, } impl<'a> Iterator for StringIterator<'a> { type Item = &'a str; fn next(&mut self) -> Option { - let bytes = self - .offsets - .next() - .map(|range| &self.data[(range[0] as usize)..(range[1] as usize)])?; - - #[cfg(debug_assertions)] - bytes.check_utf8().unwrap(); - - unsafe { Some(std::str::from_utf8_unchecked(bytes)) } + let value = self.col.index(self.index)?; + self.index += 1; + Some(value) } fn size_hint(&self) -> (usize, Option) { - self.offsets.size_hint() + let remaining = self.col.len() - self.index; + (remaining, Some(remaining)) } } @@ -432,29 +372,19 @@ unsafe impl<'a> std::iter::TrustedLen for StringIterator<'a> {} #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct StringColumnBuilder { - // if the StringColumnBuilder is created with `data_capacity`, need_estimated is false - pub need_estimated: bool, - pub data: Vec, - pub offsets: Vec, + inner: BinaryColumnBuilder, } impl StringColumnBuilder { pub fn with_capacity(len: usize, data_capacity: usize) -> Self { - let mut offsets = Vec::with_capacity(len + 1); - offsets.push(0); StringColumnBuilder { - need_estimated: data_capacity == 0 && len > 0, - data: Vec::with_capacity(data_capacity), - offsets, + inner: BinaryColumnBuilder::with_capacity(len, data_capacity), } } pub fn from_column(col: StringColumn) -> Self { - StringColumnBuilder { - need_estimated: col.data.is_empty(), - data: buffer_into_mut(col.data), - offsets: col.offsets.to_vec(), - } + let builder = BinaryColumnBuilder::from_column(col.into()); + unsafe { StringColumnBuilder::from_binary_unchecked(builder) } } pub fn from_data(data: Vec, offsets: Vec) -> Self { @@ -471,114 +401,69 @@ impl StringColumnBuilder { #[cfg(debug_assertions)] col.check_utf8().unwrap(); - StringColumnBuilder { - need_estimated: col.need_estimated, - data: col.data, - offsets: col.offsets, - } + StringColumnBuilder { inner: col } } pub fn repeat(scalar: &str, n: usize) -> Self { - let len = scalar.len(); - let data = scalar.as_bytes().repeat(n); - let offsets = once(0) - .chain((0..n).map(|i| (len * (i + 1)) as u64)) - .collect(); - StringColumnBuilder { - data, - offsets, - need_estimated: false, - } + let builder = BinaryColumnBuilder::repeat(scalar.as_bytes(), n); + unsafe { StringColumnBuilder::from_binary_unchecked(builder) } } pub fn repeat_default(n: usize) -> Self { - StringColumnBuilder { - data: vec![], - offsets: vec![0; n + 1], - need_estimated: false, - } + let builder = BinaryColumnBuilder::repeat_default(n); + unsafe { StringColumnBuilder::from_binary_unchecked(builder) } } pub fn len(&self) -> usize { - self.offsets.len() - 1 + self.inner.len() } pub fn memory_size(&self) -> usize { - self.offsets.len() * 8 + self.data.len() + self.inner.memory_size() } pub fn put_char(&mut self, item: char) { - self.data - .extend_from_slice(item.encode_utf8(&mut [0; 4]).as_bytes()); + self.inner.put_char(item); } #[inline] - #[deprecated] pub fn put_slice(&mut self, item: &[u8]) { #[cfg(debug_assertions)] item.check_utf8().unwrap(); - self.data.extend_from_slice(item); + self.inner.put_slice(item); } #[inline] pub fn put_str(&mut self, item: &str) { - self.data.extend_from_slice(item.as_bytes()); + self.inner.put_str(item); } pub fn put_char_iter(&mut self, iter: impl Iterator) { - for c in iter { - let mut buf = [0; 4]; - let result = c.encode_utf8(&mut buf); - self.data.extend_from_slice(result.as_bytes()); - } + self.inner.put_char_iter(iter); } #[inline] pub fn commit_row(&mut self) { - self.offsets.push(self.data.len() as u64); - - if self.need_estimated - && self.offsets.len() - 1 == 64 - && self.offsets.len() < self.offsets.capacity() - { - let bytes_per_row = self.data.len() / 64 + 1; - let bytes_estimate = bytes_per_row * self.offsets.capacity(); - - const MAX_HINT_SIZE: usize = 1_000_000_000; - // if we are more than 10% over the capacity, we reserve more - if bytes_estimate < MAX_HINT_SIZE - && bytes_estimate as f64 > self.data.capacity() as f64 * 1.10f64 - { - self.data.reserve(bytes_estimate - self.data.capacity()); - } - } + self.inner.commit_row(); } pub fn append_column(&mut self, other: &StringColumn) { - // the first offset of other column may not be zero - let other_start = *other.offsets.first().unwrap(); - let other_last = *other.offsets.last().unwrap(); - let start = self.offsets.last().cloned().unwrap(); - self.data - .extend_from_slice(&other.data[(other_start as usize)..(other_last as usize)]); - self.offsets.extend( - other - .offsets - .iter() - .skip(1) - .map(|offset| start + offset - other_start), - ); + let other = BinaryColumn::from(other.clone()); + self.inner.append_column(&other); } pub fn build(self) -> StringColumn { - unsafe { StringColumn::new_unchecked(self.data.into(), self.offsets.into()) } + let col = self.inner.build(); + + #[cfg(debug_assertions)] + col.check_utf8().unwrap(); + + unsafe { StringColumn::from_binary_unchecked(col) } } pub fn build_scalar(self) -> String { - assert_eq!(self.offsets.len(), 2); - - let bytes = self.data[(self.offsets[0] as usize)..(self.offsets[1] as usize)].to_vec(); + let bytes = self.inner.build_scalar(); #[cfg(debug_assertions)] bytes.check_utf8().unwrap(); @@ -588,18 +473,14 @@ impl StringColumnBuilder { #[inline] pub fn may_resize(&self, add_size: usize) -> bool { - self.data.len() + add_size > self.data.capacity() + self.inner.may_resize(add_size) } /// # Safety /// /// Calling this method with an out-of-bounds index is *[undefined behavior]* pub unsafe fn index_unchecked(&self, row: usize) -> &str { - debug_assert!(row + 1 < self.offsets.len()); - - let start = *self.offsets.get_unchecked(row) as usize; - let end = *self.offsets.get_unchecked(row + 1) as usize; - let bytes = self.data.get_unchecked(start..end); + let bytes = self.inner.index_unchecked(row); #[cfg(debug_assertions)] bytes.check_utf8().unwrap(); @@ -608,37 +489,20 @@ impl StringColumnBuilder { } pub fn push_repeat(&mut self, item: &str, n: usize) { - self.data.reserve(item.len() * n); - if self.need_estimated && self.offsets.len() - 1 < 64 { - for _ in 0..n { - self.data.extend_from_slice(item.as_bytes()); - self.commit_row(); - } - } else { - let start = self.data.len(); - let len = item.len(); - for _ in 0..n { - self.data.extend_from_slice(item.as_bytes()); - } - self.offsets - .extend((1..=n).map(|i| (start + len * i) as u64)); - } + self.inner.push_repeat(item.as_bytes(), n); } pub fn pop(&mut self) -> Option { - if self.len() > 0 { - let index = self.len() - 1; - let start = unsafe { *self.offsets.get_unchecked(index) as usize }; - self.offsets.pop(); - let val = self.data.split_off(start); - + self.inner.pop().map(|bytes| unsafe { #[cfg(debug_assertions)] - val.check_utf8().unwrap(); + bytes.check_utf8().unwrap(); - Some(unsafe { String::from_utf8_unchecked(val) }) - } else { - None - } + String::from_utf8_unchecked(bytes) + }) + } + + pub fn as_inner_mut(&mut self) -> &mut BinaryColumnBuilder { + &mut self.inner } } @@ -656,11 +520,7 @@ impl<'a> FromIterator<&'a str> for StringColumnBuilder { impl From for BinaryColumnBuilder { fn from(builder: StringColumnBuilder) -> BinaryColumnBuilder { - BinaryColumnBuilder { - need_estimated: builder.need_estimated, - data: builder.data, - offsets: builder.offsets, - } + builder.inner } } @@ -669,11 +529,7 @@ impl TryFrom for StringColumnBuilder { fn try_from(builder: BinaryColumnBuilder) -> Result { builder.check_utf8()?; - Ok(StringColumnBuilder { - need_estimated: builder.need_estimated, - data: builder.data, - offsets: builder.offsets, - }) + Ok(StringColumnBuilder { inner: builder }) } } @@ -712,7 +568,10 @@ impl CheckUTF8 for Vec { impl CheckUTF8 for BinaryColumn { fn check_utf8(&self) -> Result<()> { - check_utf8_column(&self.offsets, &self.data) + for bytes in self.iter() { + bytes.check_utf8()?; + } + Ok(()) } } diff --git a/src/query/expression/src/types/variant.rs b/src/query/expression/src/types/variant.rs index 6d7ab89a3c8d1..262d2f36aa72a 100644 --- a/src/query/expression/src/types/variant.rs +++ b/src/query/expression/src/types/variant.rs @@ -176,7 +176,7 @@ impl ValueType for VariantType { } fn column_memory_size(col: &Self::Column) -> usize { - col.data().len() + col.offsets().len() * 8 + col.memory_size() } #[inline(always)] diff --git a/src/query/expression/src/utils/arrow.rs b/src/query/expression/src/utils/arrow.rs index a57a871c09a6a..adfd8e8320067 100644 --- a/src/query/expression/src/utils/arrow.rs +++ b/src/query/expression/src/utils/arrow.rs @@ -15,6 +15,8 @@ use std::io::Cursor; use databend_common_arrow::arrow::array::Array; +use databend_common_arrow::arrow::array::BinaryArray; +use databend_common_arrow::arrow::array::BinaryViewArray; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::bitmap::MutableBitmap; use databend_common_arrow::arrow::buffer::Buffer; @@ -23,9 +25,12 @@ use databend_common_arrow::arrow::io::ipc::read::read_file_metadata; use databend_common_arrow::arrow::io::ipc::read::FileReader; use databend_common_arrow::arrow::io::ipc::write::FileWriter; use databend_common_arrow::arrow::io::ipc::write::WriteOptions as IpcWriteOptions; +use databend_common_arrow::arrow::types::Offset; use databend_common_exception::ErrorCode; use databend_common_exception::Result; +use crate::types::binary::BinaryColumnBuilder; +use crate::types::BinaryColumn; use crate::BlockEntry; use crate::Column; use crate::ColumnBuilder; diff --git a/src/query/expression/src/utils/display.rs b/src/query/expression/src/utils/display.rs index c0023fc7b8a92..2932f8e5d7cc5 100755 --- a/src/query/expression/src/utils/display.rs +++ b/src/query/expression/src/utils/display.rs @@ -417,11 +417,7 @@ impl Debug for DecimalColumn { impl Debug for BinaryColumn { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_struct("BinaryColumn") - .field( - "data", - &format_args!("0x{}", &hex::encode(self.data().as_slice())), - ) - .field("offsets", &self.offsets()) + .field("data", &format_args!("{:?}", self.data)) .finish() } } @@ -429,11 +425,7 @@ impl Debug for BinaryColumn { impl Debug for StringColumn { fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { f.debug_struct("StringColumn") - .field( - "data", - &format_args!("0x{}", &hex::encode(self.data().as_slice())), - ) - .field("offsets", &self.offsets()) + .field("data", &format_args!("{:?}", self.data)) .finish() } } diff --git a/src/query/expression/src/values.rs b/src/query/expression/src/values.rs index 685d948684a8e..0666d1603e676 100755 --- a/src/query/expression/src/values.rs +++ b/src/query/expression/src/values.rs @@ -1171,12 +1171,6 @@ impl Column { pub fn check_valid(&self) -> Result<()> { match self { - Column::Binary(x) => x.check_valid(), - Column::String(x) => x.check_valid(), - Column::Variant(x) => x.check_valid(), - Column::Geometry(x) => x.check_valid(), - Column::Geography(x) => x.check_valid(), - Column::Bitmap(x) => x.check_valid(), Column::Map(x) => { for y in x.iter() { y.check_valid()?; @@ -1665,8 +1659,8 @@ impl ColumnBuilder { builder.len() * 32 } ColumnBuilder::Boolean(c) => c.as_slice().len(), - ColumnBuilder::Binary(col) => col.data.len() + col.offsets.len() * 8, - ColumnBuilder::String(col) => col.data.len() + col.offsets.len() * 8, + ColumnBuilder::Binary(col) => col.memory_size(), + ColumnBuilder::String(col) => col.memory_size(), ColumnBuilder::Timestamp(col) => col.len() * 8, ColumnBuilder::Date(col) => col.len() * 4, ColumnBuilder::Array(col) => col.builder.memory_size() + col.offsets.len() * 8, @@ -2045,6 +2039,7 @@ impl ColumnBuilder { builder.commit_row(); } ColumnBuilder::String(builder) => { + let builder = builder.as_inner_mut(); let offset = reader.read_scalar::()? as usize; builder.data.resize(offset + builder.data.len(), 0); let last = *builder.offsets.last().unwrap() as usize; diff --git a/src/query/expression/tests/it/group_by.rs b/src/query/expression/tests/it/group_by.rs index 735e9171f95ae..821a51bbf1989 100644 --- a/src/query/expression/tests/it/group_by.rs +++ b/src/query/expression/tests/it/group_by.rs @@ -51,8 +51,8 @@ fn test_group_by_hash() -> Result<()> { let hash = HashMethodKeysU32::default(); let state = hash.build_keys_state(group_columns, block.num_rows())?; - let keys_iter = hash.build_keys_iter(&state)?; - let keys: Vec = keys_iter.copied().collect(); + let mut keys = vec![]; + hash.for_each_keys(&state, &mut |key| keys.push(*key)); assert_eq!(keys, vec![ 0x10101, 0x10101, 0x20202, 0x10101, 0x20202, 0x30303 ]); diff --git a/src/query/expression/tests/it/row.rs b/src/query/expression/tests/it/row.rs index f7ba9b75dd9ca..2f534821a4f20 100644 --- a/src/query/expression/tests/it/row.rs +++ b/src/query/expression/tests/it/row.rs @@ -70,31 +70,6 @@ fn test_fixed_width() { let rows = converter.convert_columns(&cols, cols[0].len()); - assert_eq!( - rows.offsets().clone(), - vec![0, 8, 16, 24, 32, 40, 48, 56].into() - ); - assert_eq!( - rows.data().clone(), - vec![ - 1, 128, 1, // - 1, 191, 166, 102, 102, // - 1, 128, 2, // - 1, 192, 32, 0, 0, // - 0, 0, 0, // - 0, 0, 0, 0, 0, // - 1, 127, 251, // - 1, 192, 128, 0, 0, // - 1, 128, 2, // - 1, 189, 204, 204, 205, // - 1, 128, 2, // - 1, 63, 127, 255, 255, // - 1, 128, 0, // - 1, 127, 255, 255, 255 // - ] - .into() - ); - unsafe { assert!(rows.index_unchecked(3) < rows.index_unchecked(6)); assert!(rows.index_unchecked(0) < rows.index_unchecked(1)); @@ -549,17 +524,7 @@ fn fuzz_test() { // arrow_ord does not support LargeBinary converted from Databend String Column::Nullable(c) => match &c.column { Column::String(sc) => { - let offsets = - sc.offsets().iter().map(|offset| *offset as i64).collect(); - let array = Box::new( - databend_common_arrow::arrow::array::Utf8Array::::try_new( - databend_common_arrow::arrow::datatypes::DataType::LargeUtf8, - unsafe { OffsetsBuffer::new_unchecked(offsets) }, - sc.data().clone(), - None, - ) - .unwrap(), - ); + let array = Box::new(sc.clone().into_inner()); set_validities(array, &c.validity) } _ => col.as_arrow(), diff --git a/src/query/formats/src/field_decoder/fast_values.rs b/src/query/formats/src/field_decoder/fast_values.rs index df66a4eef3748..407fa6549503a 100644 --- a/src/query/formats/src/field_decoder/fast_values.rs +++ b/src/query/formats/src/field_decoder/fast_values.rs @@ -270,7 +270,7 @@ impl FastFieldDecoderValues { reader: &mut Cursor, positions: &mut VecDeque, ) -> Result<()> { - self.read_string_inner(reader, &mut column.data, positions)?; + self.read_string_inner(reader, &mut column.as_inner_mut().data, positions)?; column.commit_row(); Ok(()) } diff --git a/src/query/formats/src/field_decoder/nested.rs b/src/query/formats/src/field_decoder/nested.rs index 6b1f7323ef539..f75b8c0bba390 100644 --- a/src/query/formats/src/field_decoder/nested.rs +++ b/src/query/formats/src/field_decoder/nested.rs @@ -196,7 +196,7 @@ impl NestedValues { column: &mut StringColumnBuilder, reader: &mut Cursor, ) -> Result<()> { - reader.read_quoted_text(&mut column.data, b'\'')?; + reader.read_quoted_text(&mut column.as_inner_mut().data, b'\'')?; column.commit_row(); Ok(()) } diff --git a/src/query/functions/src/scalars/arithmetic.rs b/src/query/functions/src/scalars/arithmetic.rs index 7d58f69782089..2d3533ea4336f 100644 --- a/src/query/functions/src/scalars/arithmetic.rs +++ b/src/query/functions/src/scalars/arithmetic.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_expression::serialize::read_decimal_with_size; +use databend_common_expression::types::binary::BinaryColumnBuilder; use databend_common_expression::types::decimal::DecimalDomain; use databend_common_expression::types::decimal::DecimalType; use databend_common_expression::types::nullable::NullableColumn; @@ -37,6 +38,7 @@ use databend_common_expression::types::NullableType; use databend_common_expression::types::NumberClass; use databend_common_expression::types::NumberDataType; use databend_common_expression::types::SimpleDomain; +use databend_common_expression::types::StringColumn; use databend_common_expression::types::StringType; use databend_common_expression::types::ALL_FLOAT_TYPES; use databend_common_expression::types::ALL_INTEGER_TYPES; @@ -972,29 +974,23 @@ pub fn register_number_to_string(registry: &mut FunctionRegistry) { let options = NUM_TYPE::lexical_options(); const FORMAT: u128 = lexical_core::format::STANDARD; + type Native = ::Native; + let mut builder = StringColumnBuilder::with_capacity(from.len(), from.len() + 1); - let values = &mut builder.data; + let mut buffer = Vec::with_capacity(::Native::FORMATTED_SIZE_DECIMAL); - type Native = ::Native; - let mut offset: usize = 0; unsafe { for x in from.iter() { - values.reserve(offset + Native::FORMATTED_SIZE_DECIMAL); - values.set_len(offset + Native::FORMATTED_SIZE_DECIMAL); - let bytes = &mut values[offset..]; - - let len = lexical_core::write_with_options_unchecked::< - _, - FORMAT, - >( - Native::from(*x), bytes, &options + let len = lexical_core::write_with_options_unchecked::<_, FORMAT>( + Native::from(*x), + &mut buffer, + &options, ) .len(); - offset += len; - builder.offsets.push(offset as u64); + builder.put_slice(&buffer[..len]); + builder.commit_row(); } - values.set_len(offset); } Value::Column(builder.build()) } @@ -1009,7 +1005,7 @@ pub fn register_number_to_string(registry: &mut FunctionRegistry) { let options = NUM_TYPE::lexical_options(); const FORMAT: u128 = lexical_core::format::STANDARD; let mut builder = - StringColumnBuilder::with_capacity(from.len(), from.len() + 1); + BinaryColumnBuilder::with_capacity(from.len(), from.len() + 1); let values = &mut builder.data; type Native = ::Native; @@ -1031,7 +1027,7 @@ pub fn register_number_to_string(registry: &mut FunctionRegistry) { } values.set_len(offset); } - let result = builder.build(); + let result = StringColumn::try_from(builder.build()).unwrap(); Value::Column(NullableColumn::new( result, Bitmap::new_constant(true, from.len()), diff --git a/src/query/functions/src/scalars/binary.rs b/src/query/functions/src/scalars/binary.rs index 6c76598768f6a..0fa31bb5e92e8 100644 --- a/src/query/functions/src/scalars/binary.rs +++ b/src/query/functions/src/scalars/binary.rs @@ -31,6 +31,8 @@ use databend_common_expression::types::NumberType; use databend_common_expression::types::StringType; use databend_common_expression::types::UInt8Type; use databend_common_expression::types::ValueType; +use databend_common_expression::vectorize_1_arg; +use databend_common_expression::vectorize_2_arg; use databend_common_expression::Column; use databend_common_expression::EvalContext; use databend_common_expression::Function; @@ -49,19 +51,7 @@ pub fn register(registry: &mut FunctionRegistry) { registry.register_passthrough_nullable_1_arg::, _, _>( "length", |_, _| FunctionDomain::Full, - |val, _| match val { - ValueRef::Scalar(s) => Value::Scalar(s.len() as u64), - ValueRef::Column(c) => { - let diffs = c - .offsets() - .iter() - .zip(c.offsets().iter().skip(1)) - .map(|(a, b)| b - a) - .collect::>(); - - Value::Column(diffs.into()) - } - }, + vectorize_1_arg::>(|val, _| val.len() as u64), ); registry.register_passthrough_nullable_1_arg::( @@ -101,12 +91,12 @@ pub fn register(registry: &mut FunctionRegistry) { "to_hex", |_, _| FunctionDomain::Full, vectorize_binary_to_string( - |col| col.data().len() * 2, + |col| col.current_buffer_len() * 2, |val, output, _| { - let old_len = output.data.len(); + let old_len = output.as_inner_mut().data.len(); let extra_len = val.len() * 2; - output.data.resize(old_len + extra_len, 0); - hex::encode_to_slice(val, &mut output.data[old_len..]).unwrap(); + output.as_inner_mut().data.resize(old_len + extra_len, 0); + hex::encode_to_slice(val, &mut output.as_inner_mut().data[old_len..]).unwrap(); output.commit_row(); }, ), @@ -128,10 +118,10 @@ pub fn register(registry: &mut FunctionRegistry) { "to_base64", |_, _| FunctionDomain::Full, vectorize_binary_to_string( - |col| col.data().len() * 4 / 3 + col.len() * 4, + |col| col.current_buffer_len() * 4 / 3 + col.len() * 4, |val, output, _| { base64::write::EncoderWriter::new( - &mut output.data, + &mut output.as_inner_mut().data, &base64::engine::general_purpose::STANDARD, ) .write_all(val) @@ -203,7 +193,7 @@ pub fn register(registry: &mut FunctionRegistry) { fn eval_binary_to_string(val: ValueRef, ctx: &mut EvalContext) -> Value { vectorize_binary_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, ctx| { if let Ok(val) = simdutf8::basic::from_utf8(val) { output.put_str(val); @@ -217,7 +207,7 @@ fn eval_binary_to_string(val: ValueRef, ctx: &mut EvalContext) -> Va fn eval_unhex(val: ValueRef, ctx: &mut EvalContext) -> Value { vectorize_string_to_binary( - |col| col.data().len() / 2, + |col| col.current_buffer_len() / 2, |val, output, ctx| { let old_len = output.data.len(); let extra_len = val.len() / 2; @@ -232,7 +222,7 @@ fn eval_unhex(val: ValueRef, ctx: &mut EvalContext) -> Value, ctx: &mut EvalContext) -> Value { vectorize_string_to_binary( - |col| col.data().len() * 4 / 3 + col.len() * 4, + |col| col.current_buffer_len() * 4 / 3 + col.len() * 4, |val, output, ctx| { if let Err(err) = base64::Engine::decode_vec( &base64::engine::general_purpose::STANDARD, @@ -304,34 +294,18 @@ fn char_fn(args: &[ValueRef], _: &mut EvalContext) -> Value { }); let input_rows = len.unwrap_or(1); - let mut values: Vec = vec![0; input_rows * args.len()]; - let values_ptr = values.as_mut_ptr(); + let mut builder = BinaryColumnBuilder::with_capacity(input_rows, 0); - for (i, arg) in args.iter().enumerate() { - match arg { - ValueRef::Scalar(v) => { - for j in 0..input_rows { - unsafe { - *values_ptr.add(args.len() * j + i) = *v; - } - } - } - ValueRef::Column(c) => { - for (j, ch) in UInt8Type::iter_column(c).enumerate() { - unsafe { - *values_ptr.add(args.len() * j + i) = ch; - } - } - } + for _ in 0..input_rows { + for arg in &args { + let val = arg.index(0).unwrap(); + builder.put_u8(val); } + builder.commit_row(); } - let offsets = (0..(input_rows + 1) as u64 * args.len() as u64) - .step_by(args.len()) - .collect::>(); - let result = BinaryColumn::new(values.into(), offsets.into()); match len { - Some(_) => Value::Column(Column::Binary(result)), - _ => Value::Scalar(Scalar::Binary(result.index(0).unwrap().to_vec())), + Some(_) => Value::Column(Column::Binary(builder.build())), + _ => Value::Scalar(Scalar::Binary(builder.build_scalar())), } } diff --git a/src/query/functions/src/scalars/comparison.rs b/src/query/functions/src/scalars/comparison.rs index 71f667de8bd59..d1e0c3d46ea02 100644 --- a/src/query/functions/src/scalars/comparison.rs +++ b/src/query/functions/src/scalars/comparison.rs @@ -531,35 +531,8 @@ fn vectorize_like( let mut builder = MutableBitmap::with_capacity(arg1.len()); let pattern_type = generate_like_pattern(arg2.as_bytes(), arg1.current_buffer_len()); if let LikePattern::SurroundByPercent(searcher) = pattern_type { - let needle_byte_len = searcher.needle().len(); - let data = arg1.data().as_slice(); - let offsets = arg1.offsets().as_slice(); - let mut idx = 0; - let mut pos = (*offsets.first().unwrap()) as usize; - let end = (*offsets.last().unwrap()) as usize; - - while pos < end { - if let Some(p) = searcher.search(&data[pos..end]) { - // data: {3x}googlex|{3x}googlex|{3x}googlex - // needle_size: 6 - // offsets: 0, 10, 20, 30 - // (pos, p): (0, 3) , (10, 3), (20, 3), () - while offsets[idx + 1] as usize <= pos + p { - builder.push(false); - idx += 1; - } - // check if the substring is in bound - builder.push(pos + p + needle_byte_len <= offsets[idx + 1] as usize); - pos = offsets[idx + 1] as usize; - idx += 1; - } else { - break; - } - } - - while idx < arg1.len() { - builder.push(false); - idx += 1; + for arg1 in arg1_iter { + builder.push(searcher.search(arg1.as_bytes()).is_some()); } } else { for arg1 in arg1_iter { diff --git a/src/query/functions/src/scalars/datetime.rs b/src/query/functions/src/scalars/datetime.rs index a8c54dc8c0fbc..3a1e9e8aa4402 100644 --- a/src/query/functions/src/scalars/datetime.rs +++ b/src/query/functions/src/scalars/datetime.rs @@ -660,7 +660,7 @@ fn register_to_string(registry: &mut FunctionRegistry) { "to_string", |_, _| FunctionDomain::Full, vectorize_with_builder_1_arg::(|val, output, ctx| { - write!(output.data, "{}", date_to_string(val, ctx.func_ctx.tz.tz)).unwrap(); + write!(output.as_inner_mut().data, "{}", date_to_string(val, ctx.func_ctx.tz.tz)).unwrap(); output.commit_row(); }), ); @@ -670,7 +670,7 @@ fn register_to_string(registry: &mut FunctionRegistry) { |_, _| FunctionDomain::Full, vectorize_with_builder_1_arg::(|val, output, ctx| { write!( - output.data, + output.as_inner_mut().data, "{}", timestamp_to_string(val, ctx.func_ctx.tz.tz) ) @@ -692,7 +692,7 @@ fn register_to_string(registry: &mut FunctionRegistry) { }, vectorize_with_builder_1_arg::>(|val, output, ctx| { write!( - output.builder.data, + output.builder.as_inner_mut().data, "{}", date_to_string(val, ctx.func_ctx.tz.tz) ) @@ -716,7 +716,7 @@ fn register_to_string(registry: &mut FunctionRegistry) { vectorize_with_builder_1_arg::>( |val, output, ctx| { write!( - output.builder.data, + output.builder.as_inner_mut().data, "{}", timestamp_to_string(val, ctx.func_ctx.tz.tz) ) diff --git a/src/query/functions/src/scalars/hash.rs b/src/query/functions/src/scalars/hash.rs index 4e00a7f4aeab3..80797e2b96b01 100644 --- a/src/query/functions/src/scalars/hash.rs +++ b/src/query/functions/src/scalars/hash.rs @@ -80,14 +80,14 @@ pub fn register(registry: &mut FunctionRegistry) { "md5", |_, _| FunctionDomain::MayThrow, vectorize_string_to_string( - |col| col.data().len() * 32, + |col| col.current_buffer_len() * 32, |val, output, ctx| { // TODO md5 lib doesn't allow encode into buffer... - let old_len = output.data.len(); - output.data.resize(old_len + 32, 0); + let old_len = output.as_inner_mut().data.len(); + output.as_inner_mut().data.resize(old_len + 32, 0); if let Err(err) = hex::encode_to_slice( Md5Hasher::digest(val).as_slice(), - &mut output.data[old_len..], + &mut output.as_inner_mut().data[old_len..], ) { ctx.set_error(output.len(), err.to_string()); } @@ -100,16 +100,16 @@ pub fn register(registry: &mut FunctionRegistry) { "sha", |_, _| FunctionDomain::MayThrow, vectorize_string_to_string( - |col| col.data().len() * 40, + |col| col.current_buffer_len() * 40, |val, output, ctx| { - let old_len = output.data.len(); - output.data.resize(old_len + 40, 0); + let old_len = output.as_inner_mut().data.len(); + output.as_inner_mut().data.resize(old_len + 40, 0); // TODO sha1 lib doesn't allow encode into buffer... let mut m = ::sha1::Sha1::new(); sha1::digest::Update::update(&mut m, val.as_bytes()); if let Err(err) = - hex::encode_to_slice(m.finalize().as_slice(), &mut output.data[old_len..]) + hex::encode_to_slice(m.finalize().as_slice(), &mut output.as_inner_mut().data[old_len..]) { ctx.set_error(output.len(), err.to_string()); } @@ -122,13 +122,13 @@ pub fn register(registry: &mut FunctionRegistry) { "blake3", |_, _| FunctionDomain::MayThrow, vectorize_string_to_string( - |col| col.data().len() * 64, + |col| col.current_buffer_len() * 64, |val, output, ctx| { - let old_len = output.data.len(); - output.data.resize(old_len + 64, 0); + let old_len = output.as_inner_mut().data.len(); + output.as_inner_mut().data.resize(old_len + 64, 0); if let Err(err) = hex::encode_to_slice( blake3::hash(val.as_bytes()).as_bytes(), - &mut output.data[old_len..], + &mut output.as_inner_mut().data[old_len..], ) { ctx.set_error(output.len(), err.to_string()); } diff --git a/src/query/functions/src/scalars/other.rs b/src/query/functions/src/scalars/other.rs index 2ed8f5eeb4569..d6a0276a3d269 100644 --- a/src/query/functions/src/scalars/other.rs +++ b/src/query/functions/src/scalars/other.rs @@ -22,6 +22,7 @@ use databend_common_base::base::convert_number_size; use databend_common_base::base::uuid::Uuid; use databend_common_base::base::OrderedFloat; use databend_common_expression::error_to_null; +use databend_common_expression::types::binary::BinaryColumnBuilder; use databend_common_expression::types::boolean::BooleanDomain; use databend_common_expression::types::nullable::NullableColumn; use databend_common_expression::types::number::Float32Type; @@ -31,6 +32,7 @@ use databend_common_expression::types::number::UInt32Type; use databend_common_expression::types::number::UInt8Type; use databend_common_expression::types::number::F64; use databend_common_expression::types::string::StringColumn; +use databend_common_expression::types::string::StringColumnBuilder; use databend_common_expression::types::ArgType; use databend_common_expression::types::DataType; use databend_common_expression::types::DateType; @@ -232,16 +234,14 @@ pub fn register(registry: &mut FunctionRegistry) { "gen_random_uuid", |_| FunctionDomain::Full, |ctx| { - let mut values: Vec = Vec::with_capacity(ctx.num_rows * 36); - let mut offsets: Vec = Vec::with_capacity(ctx.num_rows); - offsets.push(0); + let mut builder = BinaryColumnBuilder::with_capacity(ctx.num_rows, 0); for _ in 0..ctx.num_rows { let value = Uuid::new_v4(); - offsets.push(offsets.last().unwrap() + 36u64); - write!(&mut values, "{:x}", value).unwrap(); + write!(&mut builder.data, "{}", value).unwrap(); } - let col = StringColumn::new(values.into(), offsets.into()); + + let col = StringColumn::try_from(builder.build()).unwrap(); Value::Column(col) }, ); diff --git a/src/query/functions/src/scalars/string.rs b/src/query/functions/src/scalars/string.rs index 25f10b82ccd26..ec8d3c343f987 100644 --- a/src/query/functions/src/scalars/string.rs +++ b/src/query/functions/src/scalars/string.rs @@ -26,6 +26,7 @@ use databend_common_expression::types::ArrayType; use databend_common_expression::types::NumberType; use databend_common_expression::types::StringType; use databend_common_expression::unify_string; +use databend_common_expression::vectorize_1_arg; use databend_common_expression::vectorize_with_builder_1_arg; use databend_common_expression::vectorize_with_builder_2_arg; use databend_common_expression::vectorize_with_builder_3_arg; @@ -103,7 +104,7 @@ pub fn register(registry: &mut FunctionRegistry) { "upper", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, _| { for ch in val.chars() { if ch.is_ascii() { @@ -123,7 +124,7 @@ pub fn register(registry: &mut FunctionRegistry) { "lower", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, _| { for ch in val.chars() { if ch.is_ascii() { @@ -148,19 +149,7 @@ pub fn register(registry: &mut FunctionRegistry) { registry.register_passthrough_nullable_1_arg::, _, _>( "octet_length", |_, _| FunctionDomain::Full, - |val, _| match val { - ValueRef::Scalar(s) => Value::Scalar(s.len() as u64), - ValueRef::Column(c) => { - let diffs = c - .offsets() - .iter() - .zip(c.offsets().iter().skip(1)) - .map(|(a, b)| b - a) - .collect::>(); - - Value::Column(diffs.into()) - } - }, + vectorize_1_arg::>(|val, _| val.len() as u64), ); registry.register_1_arg::, _, _>( @@ -383,7 +372,7 @@ pub fn register(registry: &mut FunctionRegistry) { "quote", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len() * 2, + |col| col.current_buffer_len() * 2, |val, output, _| { for ch in val.chars() { match ch { @@ -407,7 +396,7 @@ pub fn register(registry: &mut FunctionRegistry) { "reverse", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, _| { for char in val.chars().rev() { output.put_char(char); @@ -437,7 +426,7 @@ pub fn register(registry: &mut FunctionRegistry) { "ltrim", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, _| { output.put_str(val.trim_start()); output.commit_row(); @@ -449,7 +438,7 @@ pub fn register(registry: &mut FunctionRegistry) { "rtrim", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, _| { output.put_str(val.trim_end()); output.commit_row(); @@ -461,7 +450,7 @@ pub fn register(registry: &mut FunctionRegistry) { "trim", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len(), + |col| col.current_buffer_len(), |val, output, _| { output.put_str(val.trim()); output.commit_row(); @@ -473,7 +462,7 @@ pub fn register(registry: &mut FunctionRegistry) { "trim_leading", |_, _, _| FunctionDomain::Full, vectorize_string_to_string_2_arg( - |col, _| col.data().len(), + |col, _| col.current_buffer_len(), |val, trim_str, _, output| { if trim_str.is_empty() { output.put_str(val); @@ -491,7 +480,7 @@ pub fn register(registry: &mut FunctionRegistry) { "trim_trailing", |_, _, _| FunctionDomain::Full, vectorize_string_to_string_2_arg( - |col, _| col.data().len(), + |col, _| col.current_buffer_len(), |val, trim_str, _, output| { if trim_str.is_empty() { output.put_str(val); @@ -509,7 +498,7 @@ pub fn register(registry: &mut FunctionRegistry) { "trim_both", |_, _, _| FunctionDomain::Full, vectorize_string_to_string_2_arg( - |col, _| col.data().len(), + |col, _| col.current_buffer_len(), |val, trim_str, _, output| { if trim_str.is_empty() { output.put_str(val); @@ -536,12 +525,12 @@ pub fn register(registry: &mut FunctionRegistry) { "to_hex", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| col.data().len() * 2, + |col| col.current_buffer_len() * 2, |val, output, _| { - let old_len = output.data.len(); + let old_len = output.as_inner_mut().data.len(); let extra_len = val.len() * 2; - output.data.resize(old_len + extra_len, 0); - hex::encode_to_slice(val, &mut output.data[old_len..]).unwrap(); + output.as_inner_mut().data.resize(old_len + extra_len, 0); + hex::encode_to_slice(val, &mut output.as_inner_mut().data[old_len..]).unwrap(); output.commit_row(); }, ), @@ -553,7 +542,7 @@ pub fn register(registry: &mut FunctionRegistry) { "bin", |_, _| FunctionDomain::Full, vectorize_with_builder_1_arg::, StringType>(|val, output, _| { - write!(output.data, "{val:b}").unwrap(); + write!(output.as_inner_mut().data, "{val:b}").unwrap(); output.commit_row(); }), ); @@ -561,7 +550,7 @@ pub fn register(registry: &mut FunctionRegistry) { "oct", |_, _| FunctionDomain::Full, vectorize_with_builder_1_arg::, StringType>(|val, output, _| { - write!(output.data, "{val:o}").unwrap(); + write!(output.as_inner_mut().data, "{val:o}").unwrap(); output.commit_row(); }), ); @@ -569,7 +558,7 @@ pub fn register(registry: &mut FunctionRegistry) { "to_hex", |_, _| FunctionDomain::Full, vectorize_with_builder_1_arg::, StringType>(|val, output, _| { - write!(output.data, "{val:x}").unwrap(); + write!(output.as_inner_mut().data, "{val:x}").unwrap(); output.commit_row(); }), ); @@ -625,7 +614,7 @@ pub fn register(registry: &mut FunctionRegistry) { "soundex", |_, _| FunctionDomain::Full, vectorize_string_to_string( - |col| usize::max(col.data().len(), 4 * col.len()), + |col| usize::max(col.current_buffer_len(), 4 * col.len()), soundex::soundex, ), ); @@ -634,46 +623,21 @@ pub fn register(registry: &mut FunctionRegistry) { registry.register_passthrough_nullable_1_arg::, StringType, _, _>( "space", |_, _| FunctionDomain::MayThrow, - |times, ctx| match times { - ValueRef::Scalar(times) => { + vectorize_with_builder_1_arg::, StringType>( + |times, output, ctx| { if times > MAX_SPACE_LENGTH { ctx.set_error( - 0, + output.len(), format!("space length is too big, max is: {}", MAX_SPACE_LENGTH), ); - Value::Scalar("".to_string()) } else { - Value::Scalar(" ".repeat(times as usize)) - } - } - ValueRef::Column(col) => { - let mut total_space: u64 = 0; - let mut offsets: Vec = Vec::with_capacity(col.len() + 1); - offsets.push(0); - for (i, times) in col.iter().enumerate() { - if times > &MAX_SPACE_LENGTH { - ctx.set_error( - i, - format!("space length is too big, max is: {}", MAX_SPACE_LENGTH), - ); - break; + for _ in 0..times { + output.put_char(' '); } - total_space += times; - offsets.push(total_space); - } - if ctx.errors.is_some() { - offsets.truncate(1); - total_space = 0; } - let col = StringColumnBuilder { - data: " ".repeat(total_space as usize).into_bytes(), - offsets, - need_estimated: false, - } - .build(); - Value::Column(col) - } - }, + output.commit_row(); + }, + ), ); registry.register_passthrough_nullable_2_arg::, StringType, _, _>( diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs index 6db4b91886016..f554b6157f38b 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs @@ -98,8 +98,10 @@ impl RowConverter for CommonRowConverter { let (_, validity) = c.validity(); let col = c.remove_nullable(); let col = col.as_variant().unwrap(); - let mut builder = - BinaryColumnBuilder::with_capacity(col.len(), col.data().len()); + let mut builder = BinaryColumnBuilder::with_capacity( + col.len(), + col.current_buffer_len(), + ); for (i, val) in col.iter().enumerate() { if let Some(validity) = validity { if unsafe { !validity.get_bit_unchecked(i) } { diff --git a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs index 1a7e3591deedb..e45c11006f28a 100644 --- a/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs +++ b/src/query/service/src/pipelines/processors/transforms/group_by/aggregator_polymorphic_keys.rs @@ -604,7 +604,6 @@ impl PartitionedHashMethod { impl HashMethod for PartitionedHashMethod { type HashKey = Method::HashKey; - type HashKeyIter<'a> = Method::HashKeyIter<'a> where Self: 'a; fn name(&self) -> String { format!("Partitioned{}", self.method.name()) @@ -614,8 +613,8 @@ impl HashMethod for PartitionedHashMethod { self.method.build_keys_state(group_columns, rows) } - fn build_keys_iter<'a>(&self, keys_state: &'a KeysState) -> Result> { - self.method.build_keys_iter(keys_state) + fn for_each_keys(&self, keys_state: &KeysState, f: &mut dyn FnMut(&Self::HashKey)) { + self.method.for_each_keys(keys_state, f) } fn build_keys_accessor( diff --git a/src/query/storages/common/index/src/bloom_index.rs b/src/query/storages/common/index/src/bloom_index.rs index a5614b8b2a734..8763f35ae9a5a 100644 --- a/src/query/storages/common/index/src/bloom_index.rs +++ b/src/query/storages/common/index/src/bloom_index.rs @@ -545,7 +545,7 @@ impl BloomIndex { /// If it does, the bloom index for the column will not be established. fn check_large_string(column: &Column) -> bool { if let Column::String(v) = &column { - let bytes_per_row = v.data().len() / v.len().max(1); + let bytes_per_row = v.current_buffer_len() / v.len().max(1); if bytes_per_row > 256 { return true; } diff --git a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs index 4d4674b50d4a9..066a418b626df 100644 --- a/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs +++ b/src/query/storages/fuse/src/operations/read/runtime_filter_prunner.rs @@ -150,8 +150,8 @@ pub(crate) fn update_bitmap_with_bloom_filter( } idx += 1; }), - KeysState::Column(Column::String(col)) => col.iter_binary().for_each(|key| { - let hash = key.fast_hash(); + KeysState::Column(Column::String(col)) => col.iter().for_each(|key| { + let hash = key.as_bytes().fast_hash(); if filter.contains(&hash) { bitmap.set(idx, true); } diff --git a/src/query/storages/stage/src/read/row_based/formats/csv/block_builder.rs b/src/query/storages/stage/src/read/row_based/formats/csv/block_builder.rs index 199900f4d2832..dbfa146210669 100644 --- a/src/query/storages/stage/src/read/row_based/formats/csv/block_builder.rs +++ b/src/query/storages/stage/src/read/row_based/formats/csv/block_builder.rs @@ -186,14 +186,10 @@ impl RowDecoder for CsvDecoder { Ok(vec![]) } - fn flush(&self, columns: Vec, num_rows: usize) -> Vec { + fn flush(&self, columns: Vec, _num_rows: usize) -> Vec { if let Some(projection) = &self.load_context.pos_projection { let empty_strings = Column::String( - StringColumnBuilder { - need_estimated: false, - data: vec![], - offsets: vec![0; num_rows + 1], - } + StringColumnBuilder::with_capacity(0, 0) .build(), ); columns