From a69cd206f5d858d2eae80b6cdf697030928c9227 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Mon, 10 Jun 2024 20:28:58 +0100 Subject: [PATCH 01/14] Move AsContiguousFn out of arrays and into private ChunkedArray --- vortex-alp/src/compute.rs | 10 +- vortex-array/Cargo.toml | 4 +- .../src/array/bool/compute/as_contiguous.rs | 27 ---- vortex-array/src/array/bool/compute/mod.rs | 6 - vortex-array/src/array/chunked/compute/mod.rs | 25 +-- .../src/array/chunked/compute/take.rs | 1 - vortex-array/src/array/chunked/flatten.rs | 146 ++++++++++++++++++ vortex-array/src/array/chunked/mod.rs | 17 +- vortex-array/src/array/constant/compute.rs | 34 +--- .../src/array/datetime/localdatetime.rs | 2 +- vortex-array/src/array/extension/compute.rs | 16 -- .../array/primitive/compute/as_contiguous.rs | 31 ---- .../src/array/primitive/compute/mod.rs | 6 - vortex-array/src/array/sparse/compute/mod.rs | 37 +---- vortex-array/src/array/struct/compute.rs | 53 ------- vortex-array/src/array/struct/mod.rs | 17 +- vortex-array/src/array/varbin/compute/mod.rs | 46 +----- vortex-array/src/array/varbin/mod.rs | 2 +- vortex-array/src/compress.rs | 19 ++- vortex-array/src/compute/as_contiguous.rs | 67 -------- vortex-array/src/compute/mod.rs | 6 - vortex-array/src/flatten.rs | 9 +- vortex-array/src/lib.rs | 10 ++ vortex-array/src/stats/mod.rs | 3 + vortex-array/src/validity.rs | 28 ++-- vortex-datetime-parts/src/compute.rs | 42 +---- vortex-dict/src/compute.rs | 9 +- vortex-fastlanes/src/for/compute.rs | 43 +----- 28 files changed, 228 insertions(+), 488 deletions(-) delete mode 100644 vortex-array/src/array/bool/compute/as_contiguous.rs create mode 100644 vortex-array/src/array/chunked/flatten.rs delete mode 100644 vortex-array/src/array/primitive/compute/as_contiguous.rs delete mode 100644 vortex-array/src/compute/as_contiguous.rs diff --git a/vortex-alp/src/compute.rs b/vortex-alp/src/compute.rs index aa5c3d9c1..1a4260cc6 100644 --- a/vortex-alp/src/compute.rs +++ b/vortex-alp/src/compute.rs @@ -1,16 +1,13 @@ -use vortex::compute::as_contiguous::AsContiguousFn; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::slice::{slice, SliceFn}; use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; -use vortex::{impl_default_as_contiguous_fn, Array, ArrayDType, IntoArray}; +use vortex::{Array, ArrayDType, IntoArray}; use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::{match_each_alp_float_ptype, ALPArray}; -impl_default_as_contiguous_fn!(ALPArray); - impl ArrayCompute for ALPArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) @@ -23,10 +20,6 @@ impl ArrayCompute for ALPArray { fn take(&self) -> Option<&dyn TakeFn> { Some(self) } - - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } } impl ScalarAtFn for ALPArray { @@ -72,7 +65,6 @@ impl SliceFn for ALPArray { #[cfg(test)] mod test { use vortex::array::primitive::PrimitiveArray; - use vortex::compute::as_contiguous::AsContiguousFn; use vortex::compute::scalar_at::scalar_at; use vortex::validity::Validity; use vortex::IntoArray; diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 4c7238ab0..31a5f2011 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -15,8 +15,8 @@ rust-version = { workspace = true } name = "vortex" path = "src/lib.rs" -[lints] -workspace = true +#[lints] +#workspace = true [dependencies] arrow-array = { workspace = true } diff --git a/vortex-array/src/array/bool/compute/as_contiguous.rs b/vortex-array/src/array/bool/compute/as_contiguous.rs deleted file mode 100644 index 54f5aeba3..000000000 --- a/vortex-array/src/array/bool/compute/as_contiguous.rs +++ /dev/null @@ -1,27 +0,0 @@ -use arrow_buffer::BooleanBuffer; -use vortex_error::VortexResult; - -use crate::array::bool::BoolArray; -use crate::compute::as_contiguous::AsContiguousFn; -use crate::validity::Validity; -use crate::{Array, ArrayDType, IntoArray}; - -impl AsContiguousFn for BoolArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let validity = if self.dtype().is_nullable() { - Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) - } else { - Validity::NonNullable - }; - - let mut bools = Vec::with_capacity(arrays.iter().map(|a| a.len()).sum()); - for buffer in arrays - .iter() - .map(|a| Self::try_from(a).unwrap().boolean_buffer()) - { - bools.extend(buffer.iter()) - } - - Ok(Self::try_new(BooleanBuffer::from(bools), validity)?.into_array()) - } -} diff --git a/vortex-array/src/array/bool/compute/mod.rs b/vortex-array/src/array/bool/compute/mod.rs index b8832b811..36970c63c 100644 --- a/vortex-array/src/array/bool/compute/mod.rs +++ b/vortex-array/src/array/bool/compute/mod.rs @@ -1,6 +1,5 @@ use crate::array::bool::BoolArray; use crate::compute::as_arrow::AsArrowArray; -use crate::compute::as_contiguous::AsContiguousFn; use crate::compute::compare::CompareFn; use crate::compute::fill::FillForwardFn; use crate::compute::scalar_at::ScalarAtFn; @@ -9,7 +8,6 @@ use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; mod as_arrow; -mod as_contiguous; mod compare; mod fill; mod flatten; @@ -22,10 +20,6 @@ impl ArrayCompute for BoolArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn compare(&self) -> Option<&dyn CompareFn> { Some(self) } diff --git a/vortex-array/src/array/chunked/compute/mod.rs b/vortex-array/src/array/chunked/compute/mod.rs index 7d2950a58..e46fb9395 100644 --- a/vortex-array/src/array/chunked/compute/mod.rs +++ b/vortex-array/src/array/chunked/compute/mod.rs @@ -2,23 +2,21 @@ use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::array::chunked::ChunkedArray; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; +use crate::compute::ArrayCompute; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::scalar_subtract::SubtractScalarFn; use crate::compute::slice::SliceFn; use crate::compute::take::TakeFn; -use crate::compute::ArrayCompute; -use crate::Array; mod slice; mod take; impl ArrayCompute for ChunkedArray { - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + fn subtract_scalar(&self) -> Option<&dyn SubtractScalarFn> { Some(self) } @@ -29,23 +27,6 @@ impl ArrayCompute for ChunkedArray { fn take(&self) -> Option<&dyn TakeFn> { Some(self) } - - fn subtract_scalar(&self) -> Option<&dyn SubtractScalarFn> { - Some(self) - } -} - -impl AsContiguousFn for ChunkedArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - // Combine all the chunks into one, then call as_contiguous again. - let mut chunks = Vec::with_capacity(self.nchunks()); - for array in arrays { - for chunk in Self::try_from(array).unwrap().chunks() { - chunks.push(chunk); - } - } - as_contiguous(&chunks) - } } impl ScalarAtFn for ChunkedArray { diff --git a/vortex-array/src/array/chunked/compute/take.rs b/vortex-array/src/array/chunked/compute/take.rs index 96b357af7..a60487627 100644 --- a/vortex-array/src/array/chunked/compute/take.rs +++ b/vortex-array/src/array/chunked/compute/take.rs @@ -55,7 +55,6 @@ mod test { use itertools::Itertools; use crate::array::chunked::ChunkedArray; - use crate::compute::as_contiguous::as_contiguous; use crate::compute::take::take; use crate::{ArrayDType, ArrayTrait, AsArray, IntoArray}; diff --git a/vortex-array/src/array/chunked/flatten.rs b/vortex-array/src/array/chunked/flatten.rs new file mode 100644 index 000000000..989379b58 --- /dev/null +++ b/vortex-array/src/array/chunked/flatten.rs @@ -0,0 +1,146 @@ +use arrow_buffer::{BooleanBuffer, MutableBuffer, ScalarBuffer}; + +use vortex_dtype::{DType, match_each_native_ptype, Nullability, PType, StructDType}; +use vortex_error::{vortex_bail, VortexResult}; + +use crate::{Array, ArrayTrait, ArrayValidity, Flattened, IntoArray}; +use crate::accessor::ArrayAccessor; +use crate::array::bool::BoolArray; +use crate::array::chunked::ChunkedArray; +use crate::array::primitive::PrimitiveArray; +use crate::array::r#struct::StructArray; +use crate::array::varbin::builder::VarBinBuilder; +use crate::array::varbin::VarBinArray; +use crate::validity::{LogicalValidity, Validity}; + +pub fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult { + match &dtype { + // Structs can have their internal field pointers swizzled to push the chunking down + // one level internally without copying or decompressing any data. + DType::Struct(struct_dtype, _) => { + let struct_array = swizzle_struct_chunks(chunks.as_slice(), struct_dtype)?; + Ok(Flattened::Struct(struct_array)) + } + + // TODO(aduffy): can we pushdown chunks here? + DType::Extension(_, _) => { + // How can we stitch an unknown type back together? + todo!() + } + + // Lists just flatten into their inner PType + DType::List(_, _) => { + todo!() + } + + DType::Bool(nullability) => { + let bool_array = pack_bools(chunks.as_slice(), *nullability)?; + Ok(Flattened::Bool(bool_array)) + } + DType::Primitive(ptype, nullability) => { + let prim_array = pack_primitives(chunks.as_slice(), *ptype, *nullability)?; + Ok(Flattened::Primitive(prim_array)) + } + DType::Utf8(nullability) => { + let varbin_array = pack_varbin(chunks.as_slice(), dtype.clone(), *nullability)?; + Ok(Flattened::VarBin(varbin_array)) + } + DType::Binary(nullability) => { + let varbin_array = pack_varbin(chunks.as_slice(), dtype.clone(), *nullability)?; + Ok(Flattened::VarBin(varbin_array)) + } + DType::Null => { + vortex_bail!(ComputeError: "DType::Null cannot be flattened") + } + } +} + +/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single +/// StructArray pointed at ChunkedArrays of each constituent format. +fn swizzle_struct_chunks(chunks: &[Array], struct_dtype: &StructDType) -> VortexResult { + let chunks = chunks.iter() + .map(StructArray::try_from) + // Figure out how to unwrap result of things + .collect::>>()?; + + let len = chunks.iter().map(|chunk| chunk.len()).sum(); + let validity = chunks.iter() + .map(|chunk| chunk.logical_validity()) + .collect::(); + + let mut field_arrays = Vec::new(); + let field_names = struct_dtype.names().clone(); + let field_dtypes = struct_dtype.dtypes().clone(); + + for (field_idx, field_dtype) in field_dtypes.iter().enumerate() { + let mut field_chunks = Vec::new(); + for chunk in &chunks { + field_chunks.push(chunk.field(field_idx).expect("structarray should contain field")); + } + let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?; + field_arrays.push(field_array.into_array()); + } + + Ok(StructArray::try_new(field_names, field_arrays, len, validity)?) +} + +fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult { + let len = chunks.iter().map(|chunk| chunk.len()).sum(); + let mut logical_validities = Vec::new(); + let mut bools = Vec::with_capacity(len); + for chunk in chunks { + let chunk = chunk.clone().flatten_bool()?; + logical_validities.push(chunk.logical_validity()); + bools.extend(chunk.boolean_buffer().iter()); + } + + BoolArray::try_new( + BooleanBuffer::from(bools), + validity_from_chunks(logical_validities, nullability), + ) +} + +fn pack_primitives(chunks: &[Array], ptype: PType, nullability: Nullability) -> VortexResult { + let len: usize = chunks.iter().map(|chunk| chunk.len()).sum(); + let mut logical_validities = Vec::new(); + let mut buffer = MutableBuffer::with_capacity(len * ptype.byte_width()); + for chunk in chunks { + let chunk = chunk.clone().flatten_primitive()?; + logical_validities.push(chunk.logical_validity()); + buffer.extend_from_slice(chunk.buffer()); + } + + match_each_native_ptype!(ptype, |$T| { + Ok(PrimitiveArray::try_new( + ScalarBuffer::<$T>::from(buffer), + validity_from_chunks(logical_validities, nullability))?) + }) +} + +// TODO(aduffy): This can be slow for really large arrays. +// TODO(aduffy): this doesn't propagate the validity fully +fn pack_varbin(chunks: &[Array], dtype: DType, _nullability: Nullability) -> VortexResult { + let len = chunks.iter() + .map(|chunk| chunk.len()) + .sum(); + let mut builder = VarBinBuilder::::with_capacity(len); + + for chunk in chunks { + let chunk = chunk.clone().flatten_varbin()?; + chunk.with_iterator(|iter| { + for datum in iter { + builder.push(datum); + } + })?; + } + + Ok(builder.finish(dtype)) +} + +fn validity_from_chunks(logical_validities: Vec, nullability: Nullability) -> Validity { + if nullability == Nullability::NonNullable { + Validity::NonNullable + } else { + logical_validities.into_iter().collect() + } +} \ No newline at end of file diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index 25d5eb360..643dd0aee 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -1,24 +1,26 @@ use futures_util::stream; use itertools::Itertools; use serde::{Deserialize, Serialize}; + use vortex_dtype::{Nullability, PType}; use vortex_error::vortex_bail; use vortex_scalar::Scalar; +use crate::{ArrayDType, ArrayFlatten, impl_encoding, IntoArrayData, ToArrayData}; +use crate::array::chunked::flatten::try_flatten_chunks; use crate::array::primitive::PrimitiveArray; -use crate::compute::as_contiguous::as_contiguous; use crate::compute::scalar_at::scalar_at; use crate::compute::scalar_subtract::{subtract_scalar, SubtractScalarFn}; use crate::compute::search_sorted::{search_sorted, SearchSortedSide}; use crate::iter::{ArrayIterator, ArrayIteratorAdapter}; use crate::stream::{ArrayStream, ArrayStreamAdapter}; -use crate::validity::Validity::NonNullable; use crate::validity::{ArrayValidity, LogicalValidity}; +use crate::validity::Validity::NonNullable; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; -use crate::{impl_encoding, ArrayDType, ArrayFlatten, IntoArrayData, ToArrayData}; mod compute; mod stats; +mod flatten; impl_encoding!("vortex.chunked", Chunked); @@ -116,12 +118,7 @@ impl FromIterator for ChunkedArray { impl ArrayFlatten for ChunkedArray { fn flatten(self) -> VortexResult { - let chunks = self.chunks().collect_vec(); - if chunks.is_empty() { - // TODO(ngates): return an empty FlattenedArray with the correct DType. - panic!("Cannot yet flatten an empty chunked array"); - } - as_contiguous(chunks.as_slice())?.flatten() + try_flatten_chunks(self.chunks().collect(), self.dtype().clone()) } } @@ -171,10 +168,10 @@ mod test { use vortex_dtype::{DType, Nullability}; use vortex_dtype::{NativePType, PType}; + use crate::{Array, IntoArray, ToArray}; use crate::array::chunked::ChunkedArray; use crate::compute::scalar_subtract::subtract_scalar; use crate::compute::slice::slice; - use crate::{Array, IntoArray, ToArray}; fn chunked_array() -> ChunkedArray { ChunkedArray::try_new( diff --git a/vortex-array/src/array/constant/compute.rs b/vortex-array/src/array/constant/compute.rs index 4183e7e8b..61d19cd7a 100644 --- a/vortex-array/src/array/constant/compute.rs +++ b/vortex-array/src/array/constant/compute.rs @@ -1,19 +1,13 @@ -use itertools::Itertools; -use vortex_error::{vortex_err, VortexResult}; +use vortex_error::VortexResult; use vortex_scalar::Scalar; +use crate::{Array, IntoArray}; use crate::array::constant::ConstantArray; -use crate::compute::as_contiguous::AsContiguousFn; +use crate::compute::ArrayCompute; use crate::compute::scalar_at::ScalarAtFn; use crate::compute::take::TakeFn; -use crate::compute::ArrayCompute; -use crate::{Array, ArrayTrait, IntoArray}; impl ArrayCompute for ConstantArray { - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -23,28 +17,6 @@ impl ArrayCompute for ConstantArray { } } -impl AsContiguousFn for ConstantArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let chunks = arrays - .iter() - .map(|a| Self::try_from(a).unwrap()) - .collect_vec(); - - if chunks.iter().map(|c| c.scalar()).all_equal() { - Ok(Self::new( - chunks.first().unwrap().scalar().clone(), - chunks.iter().map(|c| c.len()).sum(), - ) - .into_array()) - } else { - // TODO(ngates): we need to flatten the constant arrays and then concatenate them - Err(vortex_err!( - "Cannot concatenate constant arrays with differing scalars" - )) - } - } -} - impl ScalarAtFn for ConstantArray { fn scalar_at(&self, _index: usize) -> VortexResult { Ok(self.scalar().clone()) diff --git a/vortex-array/src/array/datetime/localdatetime.rs b/vortex-array/src/array/datetime/localdatetime.rs index 849d84cf9..5d854731b 100644 --- a/vortex-array/src/array/datetime/localdatetime.rs +++ b/vortex-array/src/array/datetime/localdatetime.rs @@ -16,7 +16,7 @@ use crate::validity::ArrayValidity; use crate::{Array, ArrayDType, ArrayData, IntoArrayData}; lazy_static! { - static ref ID: ExtID = ExtID::from(LocalDateTimeArray::ID); + pub static ref ID: ExtID = ExtID::from(LocalDateTimeArray::ID); } pub struct LocalDateTimeArray { diff --git a/vortex-array/src/array/extension/compute.rs b/vortex-array/src/array/extension/compute.rs index 70ab9706c..d7611d3f7 100644 --- a/vortex-array/src/array/extension/compute.rs +++ b/vortex-array/src/array/extension/compute.rs @@ -5,7 +5,6 @@ use vortex_scalar::Scalar; use crate::array::datetime::LocalDateTimeArray; use crate::array::extension::ExtensionArray; use crate::compute::as_arrow::AsArrowArray; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::cast::CastFn; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::slice::{slice, SliceFn}; @@ -18,10 +17,6 @@ impl ArrayCompute for ExtensionArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn cast(&self) -> Option<&dyn CastFn> { // It's not possible to cast an extension array to another type. // TODO(ngates): we should allow some extension arrays to implement a callback @@ -54,17 +49,6 @@ impl AsArrowArray for ExtensionArray { } } -impl AsContiguousFn for ExtensionArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let storage_arrays = arrays - .iter() - .map(|a| Self::try_from(a).expect("not an extension array").storage()) - .collect::>(); - - Ok(Self::new(self.ext_dtype().clone(), as_contiguous(&storage_arrays)?).into_array()) - } -} - impl ScalarAtFn for ExtensionArray { fn scalar_at(&self, index: usize) -> VortexResult { Ok(Scalar::extension( diff --git a/vortex-array/src/array/primitive/compute/as_contiguous.rs b/vortex-array/src/array/primitive/compute/as_contiguous.rs deleted file mode 100644 index 150849ef3..000000000 --- a/vortex-array/src/array/primitive/compute/as_contiguous.rs +++ /dev/null @@ -1,31 +0,0 @@ -use arrow_buffer::{MutableBuffer, ScalarBuffer}; -use vortex_dtype::match_each_native_ptype; -use vortex_error::VortexResult; - -use crate::array::primitive::PrimitiveArray; -use crate::compute::as_contiguous::AsContiguousFn; -use crate::validity::Validity; -use crate::ArrayDType; -use crate::{Array, IntoArray}; - -impl AsContiguousFn for PrimitiveArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let validity = if self.dtype().is_nullable() { - Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) - } else { - Validity::NonNullable - }; - - let mut buffer = MutableBuffer::with_capacity( - arrays.iter().map(|a| a.len()).sum::() * self.ptype().byte_width(), - ); - for array in arrays { - buffer.extend_from_slice(array.as_primitive().buffer()) - } - match_each_native_ptype!(self.ptype(), |$T| { - Ok(PrimitiveArray::try_new(ScalarBuffer::<$T>::from(buffer), validity) - .unwrap() - .into_array()) - }) - } -} diff --git a/vortex-array/src/array/primitive/compute/mod.rs b/vortex-array/src/array/primitive/compute/mod.rs index 8b87128a1..637f63f36 100644 --- a/vortex-array/src/array/primitive/compute/mod.rs +++ b/vortex-array/src/array/primitive/compute/mod.rs @@ -1,6 +1,5 @@ use crate::array::primitive::PrimitiveArray; use crate::compute::as_arrow::AsArrowArray; -use crate::compute::as_contiguous::AsContiguousFn; use crate::compute::cast::CastFn; use crate::compute::compare::CompareFn; use crate::compute::fill::FillForwardFn; @@ -13,7 +12,6 @@ use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; mod as_arrow; -mod as_contiguous; mod cast; mod compare; mod fill; @@ -29,10 +27,6 @@ impl ArrayCompute for PrimitiveArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn cast(&self) -> Option<&dyn CastFn> { Some(self) } diff --git a/vortex-array/src/array/sparse/compute/mod.rs b/vortex-array/src/array/sparse/compute/mod.rs index 722ba3e08..e9069fd97 100644 --- a/vortex-array/src/array/sparse/compute/mod.rs +++ b/vortex-array/src/array/sparse/compute/mod.rs @@ -1,26 +1,22 @@ use std::collections::HashMap; use itertools::Itertools; + use vortex_dtype::match_each_integer_ptype; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::VortexResult; use vortex_scalar::Scalar; +use crate::{Array, ArrayDType, IntoArray}; use crate::array::primitive::PrimitiveArray; use crate::array::sparse::SparseArray; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; +use crate::compute::ArrayCompute; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::slice::SliceFn; use crate::compute::take::{take, TakeFn}; -use crate::compute::ArrayCompute; -use crate::{Array, ArrayDType, ArrayTrait, IntoArray}; mod slice; impl ArrayCompute for SparseArray { - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -34,27 +30,6 @@ impl ArrayCompute for SparseArray { } } -impl AsContiguousFn for SparseArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let sparse = arrays - .iter() - .map(|a| Self::try_from(a).unwrap()) - .collect_vec(); - - if !sparse.iter().map(|a| a.fill_value()).all_equal() { - vortex_bail!("Cannot concatenate SparseArrays with differing fill values"); - } - - Ok(Self::new( - as_contiguous(&sparse.iter().map(|a| a.indices()).collect_vec())?, - as_contiguous(&sparse.iter().map(|a| a.values()).collect_vec())?, - sparse.iter().map(|a| a.len()).sum(), - self.fill_value().clone(), - ) - .into_array()) - } -} - impl ScalarAtFn for SparseArray { fn scalar_at(&self, index: usize) -> VortexResult { match self.find_index(index)? { @@ -138,17 +113,17 @@ fn take_search_sorted( #[cfg(test)] mod test { use itertools::Itertools; + use vortex_dtype::{DType, Nullability, PType}; use vortex_scalar::Scalar; + use crate::{Array, ArrayTrait, IntoArray}; use crate::array::primitive::PrimitiveArray; use crate::array::sparse::compute::take_map; use crate::array::sparse::SparseArray; - use crate::compute::as_contiguous::as_contiguous; use crate::compute::slice::slice; use crate::compute::take::take; use crate::validity::Validity; - use crate::{Array, ArrayTrait, IntoArray}; fn sparse_array() -> Array { SparseArray::new( diff --git a/vortex-array/src/array/struct/compute.rs b/vortex-array/src/array/struct/compute.rs index e7751652f..7b46317b5 100644 --- a/vortex-array/src/array/struct/compute.rs +++ b/vortex-array/src/array/struct/compute.rs @@ -10,12 +10,10 @@ use vortex_scalar::Scalar; use crate::array::r#struct::StructArray; use crate::compute::as_arrow::{as_arrow, AsArrowArray}; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::slice::{slice, SliceFn}; use crate::compute::take::{take, TakeFn}; use crate::compute::ArrayCompute; -use crate::validity::Validity; use crate::{Array, ArrayDType, IntoArray}; impl ArrayCompute for StructArray { @@ -23,10 +21,6 @@ impl ArrayCompute for StructArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -68,53 +62,6 @@ impl AsArrowArray for StructArray { } } -impl AsContiguousFn for StructArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let struct_arrays = arrays - .iter() - .map(Self::try_from) - .collect::>>()?; - let mut fields = vec![Vec::new(); self.dtypes().len()]; - for array in struct_arrays.iter() { - for (f, field) in fields.iter_mut().enumerate() { - field.push(array.field(f).unwrap()); - } - } - - let fields_len = fields - .first() - .map(|field| field.iter().map(|a| a.len()).sum()) - .unwrap_or_default(); - - let validity = if self.dtype().is_nullable() { - Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) - } else { - Validity::NonNullable - }; - - Self::try_new( - self.names().clone(), - fields - .iter() - .map(|field_arrays| { - // Currently, as_contiguous cannot handle sub-arrays with differing encodings. - // So, first flatten each constituent array, then as_contiguous them back into - // a single array. - let flattened = field_arrays - .iter() - .cloned() - .map(|array| array.flatten().unwrap().into_array()) - .collect::>(); - as_contiguous(flattened.as_slice()) - }) - .try_collect()?, - fields_len, - validity, - ) - .map(|a| a.into_array()) - } -} - impl ScalarAtFn for StructArray { fn scalar_at(&self, index: usize) -> VortexResult { Ok(Scalar::r#struct( diff --git a/vortex-array/src/array/struct/mod.rs b/vortex-array/src/array/struct/mod.rs index 91d4c6c10..997802bf9 100644 --- a/vortex-array/src/array/struct/mod.rs +++ b/vortex-array/src/array/struct/mod.rs @@ -53,7 +53,7 @@ impl StructArray { } impl<'a> StructArray { - pub fn children(&'a self) -> impl Iterator + '_ { + pub fn children(&'a self) -> impl Iterator + '_ { (0..self.nfields()).map(move |idx| self.field(idx).unwrap()) } } @@ -99,20 +99,9 @@ impl StructArray { } impl ArrayFlatten for StructArray { + /// StructEncoding is the canonical form for a [DType::Struct] array, so return self. fn flatten(self) -> VortexResult { - Ok(Flattened::Struct(Self::try_new( - self.names().clone(), - (0..self.nfields()) - .map(|i| { - self.field(i) - .expect("Missing child") - .flatten() - .map(|f| f.into_array()) - }) - .collect::>>()?, - self.len(), - self.validity(), - )?)) + Ok(Flattened::Struct(self)) } } diff --git a/vortex-array/src/array/varbin/compute/mod.rs b/vortex-array/src/array/varbin/compute/mod.rs index fbf94b6e7..7d3c31401 100644 --- a/vortex-array/src/array/varbin/compute/mod.rs +++ b/vortex-array/src/array/varbin/compute/mod.rs @@ -3,24 +3,21 @@ use std::sync::Arc; use arrow_array::{ ArrayRef as ArrowArrayRef, BinaryArray, LargeBinaryArray, LargeStringArray, StringArray, }; -use itertools::Itertools; use vortex_dtype::DType; use vortex_dtype::PType; use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::Scalar; -use crate::array::primitive::PrimitiveArray; use crate::array::varbin::{varbin_scalar, VarBinArray}; use crate::arrow::wrappers::as_offset_buffer; use crate::compute::as_arrow::AsArrowArray; -use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use crate::compute::cast::cast; use crate::compute::scalar_at::ScalarAtFn; use crate::compute::slice::SliceFn; use crate::compute::take::TakeFn; use crate::compute::ArrayCompute; -use crate::validity::{ArrayValidity, Validity}; -use crate::{Array, ArrayDType, IntoArray, ToArray}; +use crate::validity::ArrayValidity; +use crate::{ArrayDType, ToArray}; mod slice; mod take; @@ -30,10 +27,6 @@ impl ArrayCompute for VarBinArray { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } @@ -47,41 +40,6 @@ impl ArrayCompute for VarBinArray { } } -impl AsContiguousFn for VarBinArray { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult { - let bytes_chunks: Vec = arrays - .iter() - .map(|a| Self::try_from(a).unwrap().sliced_bytes()) - .try_collect()?; - let bytes = as_contiguous(&bytes_chunks)?; - - let validity = if self.dtype().is_nullable() { - Validity::from_iter(arrays.iter().map(|a| a.with_dyn(|a| a.logical_validity()))) - } else { - Validity::NonNullable - }; - - let mut offsets = Vec::new(); - offsets.push(0); - for a in arrays.iter().map(|a| Self::try_from(a).unwrap()) { - let first_offset: u64 = a.first_offset()?; - let offsets_array = cast(&a.offsets(), PType::U64.into())?.flatten_primitive()?; - let shift = offsets.last().copied().unwrap_or(0); - offsets.extend( - offsets_array - .typed_data::() - .iter() - .skip(1) // Ignore the zero offset for each array - .map(|o| o + shift - first_offset), - ); - } - - let offsets_array = PrimitiveArray::from(offsets).into_array(); - - Self::try_new(offsets_array, bytes, self.dtype().clone(), validity).map(|a| a.into_array()) - } -} - impl AsArrowArray for VarBinArray { fn as_arrow(&self) -> VortexResult { // Ensure the offsets are either i32 or i64 diff --git a/vortex-array/src/array/varbin/mod.rs b/vortex-array/src/array/varbin/mod.rs index 4a7ec6afa..cfd8de133 100644 --- a/vortex-array/src/array/varbin/mod.rs +++ b/vortex-array/src/array/varbin/mod.rs @@ -41,7 +41,7 @@ impl VarBinArray { if !offsets.dtype().is_int() || offsets.dtype().is_nullable() { vortex_bail!(MismatchedTypes: "non nullable int", offsets.dtype()); } - if !matches!(bytes.dtype(), &DType::BYTES,) { + if !matches!(bytes.dtype(), &DType::BYTES) { vortex_bail!(MismatchedTypes: "u8", bytes.dtype()); } if !matches!(dtype, DType::Binary(_) | DType::Utf8(_)) { diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index 26cf51cc4..c3a4aca81 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -2,8 +2,10 @@ use std::collections::HashSet; use std::fmt::{Debug, Display, Formatter}; use log::{debug, info, warn}; + use vortex_error::{vortex_bail, VortexResult}; +use crate::{Array, ArrayDef, ArrayDType, ArrayFlatten, ArrayTrait, Context, IntoArray}; use crate::array::chunked::{Chunked, ChunkedArray}; use crate::array::constant::{Constant, ConstantArray}; use crate::array::r#struct::{Struct, StructArray}; @@ -13,7 +15,6 @@ use crate::encoding::{ArrayEncoding, EncodingRef}; use crate::sampling::stratified_slices; use crate::stats::ArrayStatistics; use crate::validity::Validity; -use crate::{compute, Array, ArrayDType, ArrayDef, ArrayTrait, Context, IntoArray}; pub trait EncodingCompression: ArrayEncoding { fn cost(&self) -> u8 { @@ -203,7 +204,7 @@ impl<'a> Compressor<'a> { strct.len(), validity, )? - .into_array()) + .into_array()) } _ => { // Otherwise, we run sampled compression over pluggable encodings @@ -311,16 +312,18 @@ pub fn sampled_compression(array: &Array, compressor: &Compressor) -> VortexResu } // Take a sample of the array, then ask codecs for their best compression estimate. - let sample = compute::as_contiguous::as_contiguous( - &stratified_slices( + let sample = ChunkedArray::try_new( + stratified_slices( array.len(), compressor.options.sample_size, compressor.options.sample_count, ) - .into_iter() - .map(|(start, stop)| slice(array, start, stop).unwrap()) - .collect::>(), - )?; + .into_iter() + .map(|(start, stop)| slice(array, start, stop)) + .collect::>>()?, + array.dtype().clone())? + .flatten()? + .into_array(); find_best_compression(candidates, &sample, compressor)? .map(|(compression, best)| { diff --git a/vortex-array/src/compute/as_contiguous.rs b/vortex-array/src/compute/as_contiguous.rs deleted file mode 100644 index 3dd51c39d..000000000 --- a/vortex-array/src/compute/as_contiguous.rs +++ /dev/null @@ -1,67 +0,0 @@ -use itertools::Itertools; -use vortex_error::{vortex_bail, vortex_err, VortexResult}; - -use crate::{Array, ArrayDType}; - -/// Trait that exposes an operation for repacking (and possibly decompressing) an [Array] into -/// a new Array that occupies a contiguous memory range. -pub trait AsContiguousFn { - fn as_contiguous(&self, arrays: &[Array]) -> VortexResult; -} - -#[macro_export] -macro_rules! impl_default_as_contiguous_fn { - ($typ:ty) => { - impl $crate::compute::as_contiguous::AsContiguousFn for $typ { - fn as_contiguous(&self, arrays: &[$crate::Array]) -> vortex_error::VortexResult<$crate::Array> { - let dtype = $crate::ArrayDType::dtype(self).clone(); - if !arrays - .iter() - .map(|array| $crate::ArrayDType::dtype(array).clone()) - .all(|dty| dty == dtype) - { - vortex_error::vortex_bail!(ComputeError: "mismatched dtypes in call to as_contiguous"); - } - - let mut chunks = Vec::with_capacity(arrays.len()); - for array in arrays { - chunks.push(array.clone().flatten()?.into_array()); - } - - let chunked_array = $crate::array::chunked::ChunkedArray::try_new(chunks, dtype)?.into_array(); - $crate::compute::as_contiguous::as_contiguous(&[chunked_array]) - } - } - }; -} - -pub fn as_contiguous(arrays: &[Array]) -> VortexResult { - // Simple case: slice with 1 element - if arrays.len() == 1 { - return Ok(arrays[0].clone()); - } - - if arrays.is_empty() { - vortex_bail!(ComputeError: "No arrays to concatenate"); - } - if !arrays.iter().map(|chunk| chunk.encoding().id()).all_equal() { - vortex_bail!(ComputeError: "Chunks have differing encodings"); - } - if !arrays.iter().map(|chunk| chunk.dtype()).all_equal() { - vortex_bail!(ComputeError: - "Chunks have differing dtypes", - ); - } - - let first = arrays.first().unwrap(); - first.with_dyn(|a| { - a.as_contiguous() - .map(|f| f.as_contiguous(arrays)) - .unwrap_or_else(|| { - Err(vortex_err!( - NotImplemented: "as_contiguous", - first.encoding().id() - )) - }) - }) -} diff --git a/vortex-array/src/compute/mod.rs b/vortex-array/src/compute/mod.rs index 8ca8fd181..9f2f18df6 100644 --- a/vortex-array/src/compute/mod.rs +++ b/vortex-array/src/compute/mod.rs @@ -1,5 +1,4 @@ use as_arrow::AsArrowArray; -use as_contiguous::AsContiguousFn; use cast::CastFn; use compare::CompareFn; use fill::FillForwardFn; @@ -13,7 +12,6 @@ use crate::compute::filter_indices::FilterIndicesFn; use crate::compute::scalar_subtract::SubtractScalarFn; pub mod as_arrow; -pub mod as_contiguous; pub mod cast; pub mod compare; pub mod fill; @@ -30,10 +28,6 @@ pub trait ArrayCompute { None } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - None - } - fn cast(&self) -> Option<&dyn CastFn> { None } diff --git a/vortex-array/src/flatten.rs b/vortex-array/src/flatten.rs index a51f31fbc..7bd3481e0 100644 --- a/vortex-array/src/flatten.rs +++ b/vortex-array/src/flatten.rs @@ -19,10 +19,9 @@ pub enum Flattened { Extension(ExtensionArray), } -/// Support trait for decompressing arrays that have been encoded via a [crate::compress::Compressor]. +/// Support trait for transmuting an array into its [vortex_dtype::DType]'s canonical encoding. /// -/// A flattened array is a copying operation, returning new memory holding the same data in -/// its simplest form. +/// Flattening an Array ensures that the array's encoding matches one of the builtin /// /// DType remains the same before and after a flatten operation. pub trait ArrayFlatten { @@ -34,6 +33,10 @@ impl Array { ArrayEncoding::flatten(self.encoding(), self) } + pub fn flatten_extension(self) -> VortexResult { + ExtensionArray::try_from(self.flatten()?.into_array()) + } + pub fn flatten_bool(self) -> VortexResult { BoolArray::try_from(self.flatten()?.into_array()) } diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index ed3955222..e4bc1e5f3 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -1,3 +1,13 @@ +//! Core Array and Encoding crate for Vortex. +//! +//! At the heart of Vortex are [Arrays](Array) and [encodings](crate::encoding::EncodingCompression). +//! Arrays are typed views of memory buffers that hold [scalars](vortex_scalar::Scalar). Every data +//! type recognized by Vortex has a canonical physical encoding format, examples of which are +//! [crate::array::bool::BoolEncoding] and [crate::array::primitive::PrimitiveEncoding]. +//! +//! Vortex also supports a number of encodings backed by the latest databases research, including +//! +//! pub mod accessor; pub mod array; pub mod arrow; diff --git a/vortex-array/src/stats/mod.rs b/vortex-array/src/stats/mod.rs index b7ef190be..922782421 100644 --- a/vortex-array/src/stats/mod.rs +++ b/vortex-array/src/stats/mod.rs @@ -1,5 +1,6 @@ use std::fmt::{Display, Formatter}; use std::hash::Hash; +use std::sync::Arc; use enum_iterator::Sequence; pub use statsset::*; @@ -55,6 +56,8 @@ pub trait Statistics { fn compute(&self, stat: Stat) -> Option; } +pub type StatisticsRef = Arc; + pub struct EmptyStatistics; impl Statistics for EmptyStatistics { diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index bf0aa4967..fdcf6ee22 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -1,15 +1,15 @@ -use arrow_buffer::{BooleanBuffer, NullBuffer}; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer}; use serde::{Deserialize, Serialize}; + use vortex_dtype::{DType, Nullability}; use vortex_error::{vortex_bail, VortexResult}; +use crate::{Array, ArrayData, IntoArray, IntoArrayData, ToArray, ToArrayData}; use crate::array::bool::BoolArray; -use crate::compute::as_contiguous::as_contiguous; use crate::compute::scalar_at::scalar_at; use crate::compute::slice::slice; use crate::compute::take::take; use crate::stats::ArrayStatistics; -use crate::{Array, ArrayData, IntoArray, IntoArrayData, ToArray, ToArrayData}; pub trait ArrayValidity { fn is_valid(&self, index: usize) -> bool; @@ -196,17 +196,17 @@ impl FromIterator for Validity { return Self::AllInvalid; } - // Otherwise, map each to a bool array and concatenate them. - let arrays = validities - .iter() - .map(|v| { - v.to_present_null_buffer() - .unwrap() - .into_array_data() - .into_array() - }) - .collect::>(); - Self::Array(as_contiguous(&arrays).unwrap()) + // Else, construct the boolean buffer + let mut buffer = BooleanBufferBuilder::new(validities.iter().map(|v| v.len()).sum()); + for validity in validities { + let present = validity.to_present_null_buffer() + .expect("Validity should expose NullBuffer") + .into_inner(); + buffer.append_buffer(&present); + } + let bool_array = BoolArray::try_new(buffer.finish(), Validity::NonNullable) + .expect("BoolArray::try_new from BooleanBuffer should always succeed"); + Self::Array(bool_array.into_array()) } } diff --git a/vortex-datetime-parts/src/compute.rs b/vortex-datetime-parts/src/compute.rs index 4a525af07..e19241a29 100644 --- a/vortex-datetime-parts/src/compute.rs +++ b/vortex-datetime-parts/src/compute.rs @@ -1,12 +1,11 @@ use vortex::array::datetime::{try_parse_time_unit, LocalDateTimeArray, TimeUnit}; use vortex::array::primitive::PrimitiveArray; -use vortex::compute::as_contiguous::{as_contiguous, AsContiguousFn}; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::slice::{slice, SliceFn}; use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; use vortex::validity::ArrayValidity; -use vortex::{Array, ArrayDType, ArrayFlatten, IntoArray}; +use vortex::{Array, ArrayDType, IntoArray}; use vortex_dtype::DType; use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::Scalar; @@ -14,19 +13,15 @@ use vortex_scalar::Scalar; use crate::DateTimePartsArray; impl ArrayCompute for DateTimePartsArray { - fn slice(&self) -> Option<&dyn SliceFn> { - Some(self) - } - - fn take(&self) -> Option<&dyn TakeFn> { + fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) } - fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { + fn slice(&self) -> Option<&dyn SliceFn> { Some(self) } - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { + fn take(&self) -> Option<&dyn TakeFn> { Some(self) } } @@ -142,35 +137,6 @@ pub fn decode_to_localdatetime(array: &Array) -> VortexResult VortexResult { - let dtype = self.dtype().clone(); - - if !arrays - .iter() - .map(|array| array.dtype().clone()) - .all(|dty| dty == dtype) - { - vortex_bail!(ComputeError: "mismatched dtypes in call to as_contiguous"); - } - - let mut chunks = Vec::with_capacity(arrays.len()); - - for array in arrays { - let dt_parts = Self::try_from(array)?; - chunks.push(dt_parts.flatten()?.into_array()); - } - - // Reduces down to as_contiguous on the flattened variants. - as_contiguous(chunks.as_slice()) - } -} - #[cfg(test)] mod test { use vortex::array::datetime::{LocalDateTimeArray, TimeUnit}; diff --git a/vortex-dict/src/compute.rs b/vortex-dict/src/compute.rs index f59febc8a..3ceba5134 100644 --- a/vortex-dict/src/compute.rs +++ b/vortex-dict/src/compute.rs @@ -1,16 +1,13 @@ -use vortex::compute::as_contiguous::AsContiguousFn; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::slice::{slice, SliceFn}; use vortex::compute::take::{take, TakeFn}; use vortex::compute::ArrayCompute; -use vortex::{impl_default_as_contiguous_fn, Array, IntoArray}; +use vortex::{Array, IntoArray}; use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::DictArray; -impl_default_as_contiguous_fn!(DictArray); - impl ArrayCompute for DictArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) @@ -23,10 +20,6 @@ impl ArrayCompute for DictArray { fn take(&self) -> Option<&dyn TakeFn> { Some(self) } - - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } } impl ScalarAtFn for DictArray { diff --git a/vortex-fastlanes/src/for/compute.rs b/vortex-fastlanes/src/for/compute.rs index 4b18d308f..2707b8cbf 100644 --- a/vortex-fastlanes/src/for/compute.rs +++ b/vortex-fastlanes/src/for/compute.rs @@ -1,17 +1,14 @@ -use vortex::compute::as_contiguous::AsContiguousFn; +use vortex::{Array, IntoArray}; +use vortex::compute::ArrayCompute; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::slice::{slice, SliceFn}; use vortex::compute::take::{take, TakeFn}; -use vortex::compute::ArrayCompute; -use vortex::{impl_default_as_contiguous_fn, Array, IntoArray}; use vortex_dtype::match_each_integer_ptype; use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::{PrimitiveScalar, Scalar, ScalarValue}; use crate::FoRArray; -impl_default_as_contiguous_fn!(FoRArray); - impl ArrayCompute for FoRArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { Some(self) @@ -24,10 +21,6 @@ impl ArrayCompute for FoRArray { fn take(&self) -> Option<&dyn TakeFn> { Some(self) } - - fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> { - Some(self) - } } impl TakeFn for FoRArray { @@ -37,7 +30,7 @@ impl TakeFn for FoRArray { self.reference().clone(), self.shift(), ) - .map(|a| a.into_array()) + .map(|a| a.into_array()) } } @@ -67,7 +60,7 @@ impl SliceFn for FoRArray { self.reference().clone(), self.shift(), ) - .map(|a| a.into_array()) + .map(|a| a.into_array()) } } @@ -75,7 +68,6 @@ impl SliceFn for FoRArray { mod test { use vortex::array::primitive::PrimitiveArray; use vortex::compress::{Compressor, EncodingCompression}; - use vortex::compute::as_contiguous::as_contiguous; use vortex::compute::scalar_at::scalar_at; use vortex::Context; @@ -94,31 +86,4 @@ mod test { assert_eq!(scalar_at(&forarr, 1).unwrap(), 15.into()); assert_eq!(scalar_at(&forarr, 2).unwrap(), 19.into()); } - - #[test] - fn for_as_contiguous() { - let forarr1 = FoREncoding - .compress( - PrimitiveArray::from(vec![1, 2, 3, 4]).array(), - None, - Compressor::new(&Context::default()), - ) - .unwrap(); - let forarr2 = FoREncoding - .compress( - PrimitiveArray::from(vec![5, 6, 7, 8]).array(), - None, - Compressor::new(&Context::default()), - ) - .unwrap(); - - let flattened = as_contiguous(&[forarr1, forarr2]).unwrap(); - - [1, 2, 3, 4, 5, 6, 7, 8] - .iter() - .enumerate() - .for_each(|(idx, value)| { - assert_eq!(scalar_at(&flattened, idx).unwrap(), (*value).into()); - }); - } } From faa4561bf7a53a940c724ae488ba18ec9cbdf979 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 11:50:19 +0100 Subject: [PATCH 02/14] first set of comments --- vortex-array/src/array/chunked/flatten.rs | 39 ++++++++++++++++------- vortex-array/src/array/chunked/mod.rs | 9 +----- vortex-array/src/flatten.rs | 5 +-- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/vortex-array/src/array/chunked/flatten.rs b/vortex-array/src/array/chunked/flatten.rs index 989379b58..f430a6426 100644 --- a/vortex-array/src/array/chunked/flatten.rs +++ b/vortex-array/src/array/chunked/flatten.rs @@ -3,16 +3,24 @@ use arrow_buffer::{BooleanBuffer, MutableBuffer, ScalarBuffer}; use vortex_dtype::{DType, match_each_native_ptype, Nullability, PType, StructDType}; use vortex_error::{vortex_bail, VortexResult}; -use crate::{Array, ArrayTrait, ArrayValidity, Flattened, IntoArray}; +use itertools::Itertools; +use crate::{Array, ArrayDType, ArrayFlatten, ArrayTrait, ArrayValidity, Flattened, IntoArray}; use crate::accessor::ArrayAccessor; use crate::array::bool::BoolArray; use crate::array::chunked::ChunkedArray; +use crate::array::extension::ExtensionArray; use crate::array::primitive::PrimitiveArray; use crate::array::r#struct::StructArray; use crate::array::varbin::builder::VarBinBuilder; use crate::array::varbin::VarBinArray; use crate::validity::{LogicalValidity, Validity}; +impl ArrayFlatten for ChunkedArray { + fn flatten(self) -> VortexResult { + try_flatten_chunks(self.chunks().collect(), self.dtype().clone()) + } +} + pub fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult { match &dtype { // Structs can have their internal field pointers swizzled to push the chunking down @@ -22,10 +30,15 @@ pub fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult { - // How can we stitch an unknown type back together? - todo!() + // Extension arrays contain an internal array, so we can push down a ChunkedArray + // to be the storage type of the extension DType. + DType::Extension(ext_dtype, _) => { + let ext_array = ExtensionArray::new( + ext_dtype.clone(), + ChunkedArray::try_new(chunks, dtype.clone())?.into_array() + ); + + Ok(Flattened::Extension(ext_array)) } // Lists just flatten into their inner PType @@ -42,11 +55,11 @@ pub fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult { - let varbin_array = pack_varbin(chunks.as_slice(), dtype.clone(), *nullability)?; + let varbin_array = pack_varbin(chunks.as_slice(), &dtype, *nullability)?; Ok(Flattened::VarBin(varbin_array)) } DType::Binary(nullability) => { - let varbin_array = pack_varbin(chunks.as_slice(), dtype.clone(), *nullability)?; + let varbin_array = pack_varbin(chunks.as_slice(), &dtype, *nullability)?; Ok(Flattened::VarBin(varbin_array)) } DType::Null => { @@ -58,10 +71,9 @@ pub fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult VortexResult { - let chunks = chunks.iter() + let chunks: Vec = chunks.iter() .map(StructArray::try_from) - // Figure out how to unwrap result of things - .collect::>>()?; + .try_collect()?; let len = chunks.iter().map(|chunk| chunk.len()).sum(); let validity = chunks.iter() @@ -84,6 +96,7 @@ fn swizzle_struct_chunks(chunks: &[Array], struct_dtype: &StructDType) -> Vortex Ok(StructArray::try_new(field_names, field_arrays, len, validity)?) } +/// Builds a new [BoolArray] by repacking the values from the chunks in a single contiguous array. fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult { let len = chunks.iter().map(|chunk| chunk.len()).sum(); let mut logical_validities = Vec::new(); @@ -100,6 +113,8 @@ fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult VortexResult { let len: usize = chunks.iter().map(|chunk| chunk.len()).sum(); let mut logical_validities = Vec::new(); @@ -119,7 +134,7 @@ fn pack_primitives(chunks: &[Array], ptype: PType, nullability: Nullability) -> // TODO(aduffy): This can be slow for really large arrays. // TODO(aduffy): this doesn't propagate the validity fully -fn pack_varbin(chunks: &[Array], dtype: DType, _nullability: Nullability) -> VortexResult { +fn pack_varbin(chunks: &[Array], dtype: &DType, _nullability: Nullability) -> VortexResult { let len = chunks.iter() .map(|chunk| chunk.len()) .sum(); @@ -134,7 +149,7 @@ fn pack_varbin(chunks: &[Array], dtype: DType, _nullability: Nullability) -> Vor })?; } - Ok(builder.finish(dtype)) + Ok(builder.finish(dtype.clone())) } fn validity_from_chunks(logical_validities: Vec, nullability: Nullability) -> Validity { diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index 643dd0aee..e221dafa5 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -6,8 +6,7 @@ use vortex_dtype::{Nullability, PType}; use vortex_error::vortex_bail; use vortex_scalar::Scalar; -use crate::{ArrayDType, ArrayFlatten, impl_encoding, IntoArrayData, ToArrayData}; -use crate::array::chunked::flatten::try_flatten_chunks; +use crate::{ArrayDType, impl_encoding, IntoArrayData, ToArrayData}; use crate::array::primitive::PrimitiveArray; use crate::compute::scalar_at::scalar_at; use crate::compute::scalar_subtract::{subtract_scalar, SubtractScalarFn}; @@ -116,12 +115,6 @@ impl FromIterator for ChunkedArray { } } -impl ArrayFlatten for ChunkedArray { - fn flatten(self) -> VortexResult { - try_flatten_chunks(self.chunks().collect(), self.dtype().clone()) - } -} - impl AcceptArrayVisitor for ChunkedArray { fn accept(&self, visitor: &mut dyn ArrayVisitor) -> VortexResult<()> { visitor.visit_child("chunk_ends", &self.chunk_ends())?; diff --git a/vortex-array/src/flatten.rs b/vortex-array/src/flatten.rs index 7bd3481e0..42ad2de57 100644 --- a/vortex-array/src/flatten.rs +++ b/vortex-array/src/flatten.rs @@ -21,9 +21,10 @@ pub enum Flattened { /// Support trait for transmuting an array into its [vortex_dtype::DType]'s canonical encoding. /// -/// Flattening an Array ensures that the array's encoding matches one of the builtin +/// Flattening an Array ensures that the array's encoding matches one of the builtin canonical +/// encodings, each of which has a corresponding [Flattened] variant. /// -/// DType remains the same before and after a flatten operation. +/// **Important**: DType remains the same before and after a flatten operation. pub trait ArrayFlatten { fn flatten(self) -> VortexResult; } From 4d6458ba7b20a37d3ca85336a1e9e22282dddc41 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 12:21:26 +0100 Subject: [PATCH 03/14] check dtypes --- vortex-array/src/array/chunked/flatten.rs | 39 ++++++++++++++++------- 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/vortex-array/src/array/chunked/flatten.rs b/vortex-array/src/array/chunked/flatten.rs index f430a6426..70a7b33d7 100644 --- a/vortex-array/src/array/chunked/flatten.rs +++ b/vortex-array/src/array/chunked/flatten.rs @@ -1,7 +1,7 @@ use arrow_buffer::{BooleanBuffer, MutableBuffer, ScalarBuffer}; use vortex_dtype::{DType, match_each_native_ptype, Nullability, PType, StructDType}; -use vortex_error::{vortex_bail, VortexResult}; +use vortex_error::{ErrString, vortex_bail, VortexResult}; use itertools::Itertools; use crate::{Array, ArrayDType, ArrayFlatten, ArrayTrait, ArrayValidity, Flattened, IntoArray}; @@ -22,6 +22,13 @@ impl ArrayFlatten for ChunkedArray { } pub fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult { + let mismatched = chunks.iter() + .filter(|chunk| !chunk.dtype().eq(&dtype)) + .collect::>(); + if !mismatched.is_empty() { + vortex_bail!(MismatchedTypes: dtype, ErrString::from(format!("{:?}", mismatched))) + } + match &dtype { // Structs can have their internal field pointers swizzled to push the chunking down // one level internally without copying or decompressing any data. @@ -30,12 +37,12 @@ pub fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult { let ext_array = ExtensionArray::new( ext_dtype.clone(), - ChunkedArray::try_new(chunks, dtype.clone())?.into_array() + ChunkedArray::try_new(chunks, dtype.clone())?.into_array(), ); Ok(Flattened::Extension(ext_array)) @@ -70,6 +77,9 @@ pub fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult VortexResult { let chunks: Vec = chunks.iter() .map(StructArray::try_from) @@ -81,22 +91,23 @@ fn swizzle_struct_chunks(chunks: &[Array], struct_dtype: &StructDType) -> Vortex .collect::(); let mut field_arrays = Vec::new(); - let field_names = struct_dtype.names().clone(); - let field_dtypes = struct_dtype.dtypes().clone(); - for (field_idx, field_dtype) in field_dtypes.iter().enumerate() { + for (field_idx, field_dtype) in struct_dtype.dtypes().iter().enumerate() { let mut field_chunks = Vec::new(); for chunk in &chunks { - field_chunks.push(chunk.field(field_idx).expect("structarray should contain field")); + field_chunks.push(chunk.field(field_idx).expect("all chunks must have same dtype")); } let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?; field_arrays.push(field_array.into_array()); } - Ok(StructArray::try_new(field_names, field_arrays, len, validity)?) + Ok(StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity)?) } /// Builds a new [BoolArray] by repacking the values from the chunks in a single contiguous array. +/// +/// It is expected this function is only called from [try_flatten_chunks], and thus all chunks have +/// been checked to have the same DType already. fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult { let len = chunks.iter().map(|chunk| chunk.len()).sum(); let mut logical_validities = Vec::new(); @@ -115,6 +126,9 @@ fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult VortexResult { let len: usize = chunks.iter().map(|chunk| chunk.len()).sum(); let mut logical_validities = Vec::new(); @@ -132,8 +146,11 @@ fn pack_primitives(chunks: &[Array], ptype: PType, nullability: Nullability) -> }) } -// TODO(aduffy): This can be slow for really large arrays. -// TODO(aduffy): this doesn't propagate the validity fully +/// Builds a new [VarBinArray] by repacking the values from the chunks into a single +/// contiguous array. +/// +/// It is expected this function is only called from [try_flatten_chunks], and thus all chunks have +/// been checked to have the same DType already. fn pack_varbin(chunks: &[Array], dtype: &DType, _nullability: Nullability) -> VortexResult { let len = chunks.iter() .map(|chunk| chunk.len()) From de74afaebdb4e535db80f2f7dfd4ed0186be756c Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 12:59:45 +0100 Subject: [PATCH 04/14] nulls --- vortex-array/src/array/chunked/flatten.rs | 6 +++++- vortex-array/src/flatten.rs | 3 +++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/vortex-array/src/array/chunked/flatten.rs b/vortex-array/src/array/chunked/flatten.rs index 70a7b33d7..8eda365dd 100644 --- a/vortex-array/src/array/chunked/flatten.rs +++ b/vortex-array/src/array/chunked/flatten.rs @@ -4,10 +4,12 @@ use vortex_dtype::{DType, match_each_native_ptype, Nullability, PType, StructDTy use vortex_error::{ErrString, vortex_bail, VortexResult}; use itertools::Itertools; +use vortex_scalar::Scalar; use crate::{Array, ArrayDType, ArrayFlatten, ArrayTrait, ArrayValidity, Flattened, IntoArray}; use crate::accessor::ArrayAccessor; use crate::array::bool::BoolArray; use crate::array::chunked::ChunkedArray; +use crate::array::constant::ConstantArray; use crate::array::extension::ExtensionArray; use crate::array::primitive::PrimitiveArray; use crate::array::r#struct::StructArray; @@ -70,7 +72,9 @@ pub fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult { - vortex_bail!(ComputeError: "DType::Null cannot be flattened") + let len = chunks.iter().map(|chunk| chunk.len()).sum(); + let const_array = ConstantArray::new(Scalar::null(DType::Null), len); + Ok(Flattened::Null(const_array)) } } } diff --git a/vortex-array/src/flatten.rs b/vortex-array/src/flatten.rs index 42ad2de57..14a368abe 100644 --- a/vortex-array/src/flatten.rs +++ b/vortex-array/src/flatten.rs @@ -8,9 +8,11 @@ use crate::array::varbin::VarBinArray; use crate::array::varbinview::VarBinViewArray; use crate::encoding::ArrayEncoding; use crate::{Array, IntoArray}; +use crate::array::constant::ConstantArray; /// The set of encodings that can be converted to Arrow with zero-copy. pub enum Flattened { + Null(ConstantArray), Bool(BoolArray), Primitive(PrimitiveArray), Struct(StructArray), @@ -54,6 +56,7 @@ impl Array { impl IntoArray for Flattened { fn into_array(self) -> Array { match self { + Self::Null(a) => a.into_array(), Self::Bool(a) => a.into_array(), Self::Primitive(a) => a.into_array(), Self::Struct(a) => a.into_array(), From 272352cb816ee5fb99acfd97dc5132546fe0ce59 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 15:19:05 +0100 Subject: [PATCH 05/14] fix tests + benchmarks --- vortex-alp/src/compute.rs | 29 ---------------- .../src/array/chunked/compute/take.rs | 12 +++---- vortex-array/src/array/chunked/flatten.rs | 2 +- vortex-array/src/array/sparse/compute/mod.rs | 34 ------------------- vortex-array/src/validity.rs | 14 ++++++-- vortex-fastlanes/src/for/compress.rs | 6 ++++ 6 files changed, 22 insertions(+), 75 deletions(-) diff --git a/vortex-alp/src/compute.rs b/vortex-alp/src/compute.rs index 1a4260cc6..19f7e3636 100644 --- a/vortex-alp/src/compute.rs +++ b/vortex-alp/src/compute.rs @@ -61,32 +61,3 @@ impl SliceFn for ALPArray { .into_array()) } } - -#[cfg(test)] -mod test { - use vortex::array::primitive::PrimitiveArray; - use vortex::compute::scalar_at::scalar_at; - use vortex::validity::Validity; - use vortex::IntoArray; - - use crate::ALPArray; - - #[test] - fn test_as_contiguous() { - let values = vec![1.0, 2.0, 3.0]; - let primitives = PrimitiveArray::from_vec(values, Validity::NonNullable); - let encoded = ALPArray::encode(primitives.into_array()).unwrap(); - let alp = ALPArray::try_from(&encoded).unwrap(); - - let flat = alp.as_contiguous(&[encoded]).unwrap(); - - let a: f64 = scalar_at(&flat, 0).unwrap().try_into().unwrap(); - let b: f64 = scalar_at(&flat, 1).unwrap().try_into().unwrap(); - - let c: f64 = scalar_at(&flat, 2).unwrap().try_into().unwrap(); - - assert_eq!(a, 1.0); - assert_eq!(b, 2.0); - assert_eq!(c, 3.0); - } -} diff --git a/vortex-array/src/array/chunked/compute/take.rs b/vortex-array/src/array/chunked/compute/take.rs index a60487627..4869db957 100644 --- a/vortex-array/src/array/chunked/compute/take.rs +++ b/vortex-array/src/array/chunked/compute/take.rs @@ -52,8 +52,6 @@ impl TakeFn for ChunkedArray { #[cfg(test)] mod test { - use itertools::Itertools; - use crate::array::chunked::ChunkedArray; use crate::compute::take::take; use crate::{ArrayDType, ArrayTrait, AsArray, IntoArray}; @@ -67,14 +65,12 @@ mod test { assert_eq!(arr.len(), 9); let indices = vec![0, 0, 6, 4].into_array(); - let result = as_contiguous( + let result = &ChunkedArray::try_from(take(arr.as_array_ref(), &indices).unwrap()) .unwrap() - .chunks() - .collect_vec(), - ) - .unwrap() - .into_primitive(); + .into_array() + .flatten_primitive() + .unwrap(); assert_eq!(result.typed_data::(), &[1, 1, 1, 2]); } } diff --git a/vortex-array/src/array/chunked/flatten.rs b/vortex-array/src/array/chunked/flatten.rs index 8eda365dd..b69adbded 100644 --- a/vortex-array/src/array/chunked/flatten.rs +++ b/vortex-array/src/array/chunked/flatten.rs @@ -23,7 +23,7 @@ impl ArrayFlatten for ChunkedArray { } } -pub fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult { +pub(crate) fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult { let mismatched = chunks.iter() .filter(|chunk| !chunk.dtype().eq(&dtype)) .collect::>(); diff --git a/vortex-array/src/array/sparse/compute/mod.rs b/vortex-array/src/array/sparse/compute/mod.rs index e9069fd97..e2c822c63 100644 --- a/vortex-array/src/array/sparse/compute/mod.rs +++ b/vortex-array/src/array/sparse/compute/mod.rs @@ -121,7 +121,6 @@ mod test { use crate::array::primitive::PrimitiveArray; use crate::array::sparse::compute::take_map; use crate::array::sparse::SparseArray; - use crate::compute::slice::slice; use crate::compute::take::take; use crate::validity::Validity; @@ -181,39 +180,6 @@ mod test { assert_eq!(taken.len(), 2); } - #[test] - fn take_slices_and_reassemble() { - let sparse = sparse_array(); - let slices = (0..10) - .map(|i| slice(&sparse, i * 10, (i + 1) * 10).unwrap()) - .collect_vec(); - - let taken = slices - .iter() - .map(|s| take(s, &(0u64..10).collect_vec().into_array()).unwrap()) - .collect_vec(); - for i in [1, 2, 5, 6, 7, 8] { - assert_eq!(SparseArray::try_from(&taken[i]).unwrap().indices().len(), 0); - } - for i in [0, 3, 4, 9] { - assert_eq!(SparseArray::try_from(&taken[i]).unwrap().indices().len(), 1); - } - - let contiguous = SparseArray::try_from(as_contiguous(&taken).unwrap()).unwrap(); - assert_eq!( - contiguous.indices().into_primitive().typed_data::(), - [0u64, 7, 7, 9] // relative offsets - ); - assert_eq!( - contiguous.values().into_primitive().typed_data::(), - SparseArray::try_from(sparse) - .unwrap() - .values() - .into_primitive() - .typed_data::() - ); - } - #[test] fn test_take_map() { let sparse = SparseArray::try_from(sparse_array()).unwrap(); diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index fdcf6ee22..d2cf82874 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -199,9 +199,17 @@ impl FromIterator for Validity { // Else, construct the boolean buffer let mut buffer = BooleanBufferBuilder::new(validities.iter().map(|v| v.len()).sum()); for validity in validities { - let present = validity.to_present_null_buffer() - .expect("Validity should expose NullBuffer") - .into_inner(); + let present = match validity { + LogicalValidity::AllValid(count) => { + BooleanBuffer::new_set(count) + } + LogicalValidity::AllInvalid(count) => { + BooleanBuffer::new_unset(count) + } + LogicalValidity::Array(array) => { + array.into_array().flatten_bool().expect("validity must flatten to BoolArray").boolean_buffer() + } + }; buffer.append_buffer(&present); } let bool_array = BoolArray::try_new(buffer.finish(), Validity::NonNullable) diff --git a/vortex-fastlanes/src/for/compress.rs b/vortex-fastlanes/src/for/compress.rs index 7f56d821b..e684ec056 100644 --- a/vortex-fastlanes/src/for/compress.rs +++ b/vortex-fastlanes/src/for/compress.rs @@ -5,6 +5,7 @@ use vortex::array::primitive::PrimitiveArray; use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::stats::{ArrayStatistics, Stat}; use vortex::{Array, ArrayDType, ArrayTrait, IntoArray}; +use vortex::validity::ArrayValidity; use vortex_dtype::{match_each_integer_ptype, NativePType, PType}; use vortex_error::{vortex_err, VortexResult}; use vortex_scalar::Scalar; @@ -29,6 +30,11 @@ impl EncodingCompression for FoREncoding { return None; } + // For all-null, cannot encode. + if parray.logical_validity().all_invalid() { + return None; + } + // Nothing for us to do if the min is already zero and tz == 0 let shift = trailing_zeros(array); let min = parray.statistics().compute_as_cast::(Stat::Min)?; From 63ff347523184e0c584fe6e4639f2fa9df635fd3 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 15:30:56 +0100 Subject: [PATCH 06/14] fmt --- bench-vortex/benches/compress_benchmark.rs | 4 +- vortex-array/src/array/chunked/compute/mod.rs | 2 +- .../src/array/chunked/compute/take.rs | 11 ++-- vortex-array/src/array/chunked/flatten.rs | 62 ++++++++++++------- vortex-array/src/array/chunked/mod.rs | 9 ++- vortex-array/src/array/constant/compute.rs | 4 +- vortex-array/src/array/sparse/compute/mod.rs | 8 +-- vortex-array/src/array/struct/mod.rs | 2 +- vortex-array/src/compress.rs | 18 +++--- vortex-array/src/flatten.rs | 2 +- vortex-array/src/validity.rs | 19 +++--- vortex-fastlanes/src/for/compress.rs | 2 +- vortex-fastlanes/src/for/compute.rs | 8 +-- 13 files changed, 82 insertions(+), 69 deletions(-) diff --git a/bench-vortex/benches/compress_benchmark.rs b/bench-vortex/benches/compress_benchmark.rs index b41bda049..e586cc404 100644 --- a/bench-vortex/benches/compress_benchmark.rs +++ b/bench-vortex/benches/compress_benchmark.rs @@ -7,7 +7,7 @@ use criterion::{black_box, criterion_group, criterion_main, Criterion}; fn vortex_compress_taxi(c: &mut Criterion) { taxi_data_parquet(); - let mut group = c.benchmark_group("end to end"); + let mut group = c.benchmark_group("end to end - taxi"); group.sample_size(10); group.bench_function("compress", |b| b.iter(|| black_box(compress_taxi_data()))); group.finish() @@ -16,7 +16,7 @@ fn vortex_compress_taxi(c: &mut Criterion) { fn vortex_compress_medicare1(c: &mut Criterion) { let dataset = BenchmarkDatasets::PBI(Medicare1); dataset.as_uncompressed(); - let mut group = c.benchmark_group("end to end"); + let mut group = c.benchmark_group("end to end - medicare"); group.sample_size(10); group.bench_function("compress", |b| { b.iter(|| black_box(dataset.compress_to_vortex())) diff --git a/vortex-array/src/array/chunked/compute/mod.rs b/vortex-array/src/array/chunked/compute/mod.rs index e46fb9395..0469a11b9 100644 --- a/vortex-array/src/array/chunked/compute/mod.rs +++ b/vortex-array/src/array/chunked/compute/mod.rs @@ -2,11 +2,11 @@ use vortex_error::VortexResult; use vortex_scalar::Scalar; use crate::array::chunked::ChunkedArray; -use crate::compute::ArrayCompute; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::scalar_subtract::SubtractScalarFn; use crate::compute::slice::SliceFn; use crate::compute::take::TakeFn; +use crate::compute::ArrayCompute; mod slice; mod take; diff --git a/vortex-array/src/array/chunked/compute/take.rs b/vortex-array/src/array/chunked/compute/take.rs index 4869db957..a4c6e5671 100644 --- a/vortex-array/src/array/chunked/compute/take.rs +++ b/vortex-array/src/array/chunked/compute/take.rs @@ -65,12 +65,11 @@ mod test { assert_eq!(arr.len(), 9); let indices = vec![0, 0, 6, 4].into_array(); - let result = - &ChunkedArray::try_from(take(arr.as_array_ref(), &indices).unwrap()) - .unwrap() - .into_array() - .flatten_primitive() - .unwrap(); + let result = &ChunkedArray::try_from(take(arr.as_array_ref(), &indices).unwrap()) + .unwrap() + .into_array() + .flatten_primitive() + .unwrap(); assert_eq!(result.typed_data::(), &[1, 1, 1, 2]); } } diff --git a/vortex-array/src/array/chunked/flatten.rs b/vortex-array/src/array/chunked/flatten.rs index b69adbded..13ed4d09f 100644 --- a/vortex-array/src/array/chunked/flatten.rs +++ b/vortex-array/src/array/chunked/flatten.rs @@ -1,11 +1,9 @@ use arrow_buffer::{BooleanBuffer, MutableBuffer, ScalarBuffer}; - -use vortex_dtype::{DType, match_each_native_ptype, Nullability, PType, StructDType}; -use vortex_error::{ErrString, vortex_bail, VortexResult}; - use itertools::Itertools; +use vortex_dtype::{match_each_native_ptype, DType, Nullability, PType, StructDType}; +use vortex_error::{vortex_bail, ErrString, VortexResult}; use vortex_scalar::Scalar; -use crate::{Array, ArrayDType, ArrayFlatten, ArrayTrait, ArrayValidity, Flattened, IntoArray}; + use crate::accessor::ArrayAccessor; use crate::array::bool::BoolArray; use crate::array::chunked::ChunkedArray; @@ -16,6 +14,7 @@ use crate::array::r#struct::StructArray; use crate::array::varbin::builder::VarBinBuilder; use crate::array::varbin::VarBinArray; use crate::validity::{LogicalValidity, Validity}; +use crate::{Array, ArrayDType, ArrayFlatten, ArrayTrait, ArrayValidity, Flattened, IntoArray}; impl ArrayFlatten for ChunkedArray { fn flatten(self) -> VortexResult { @@ -24,7 +23,8 @@ impl ArrayFlatten for ChunkedArray { } pub(crate) fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResult { - let mismatched = chunks.iter() + let mismatched = chunks + .iter() .filter(|chunk| !chunk.dtype().eq(&dtype)) .collect::>(); if !mismatched.is_empty() { @@ -51,7 +51,7 @@ pub(crate) fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResu } // Lists just flatten into their inner PType - DType::List(_, _) => { + DType::List(..) => { todo!() } @@ -84,13 +84,15 @@ pub(crate) fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResu /// /// It is expected this function is only called from [try_flatten_chunks], and thus all chunks have /// been checked to have the same DType already. -fn swizzle_struct_chunks(chunks: &[Array], struct_dtype: &StructDType) -> VortexResult { - let chunks: Vec = chunks.iter() - .map(StructArray::try_from) - .try_collect()?; +fn swizzle_struct_chunks( + chunks: &[Array], + struct_dtype: &StructDType, +) -> VortexResult { + let chunks: Vec = chunks.iter().map(StructArray::try_from).try_collect()?; let len = chunks.iter().map(|chunk| chunk.len()).sum(); - let validity = chunks.iter() + let validity = chunks + .iter() .map(|chunk| chunk.logical_validity()) .collect::(); @@ -99,13 +101,22 @@ fn swizzle_struct_chunks(chunks: &[Array], struct_dtype: &StructDType) -> Vortex for (field_idx, field_dtype) in struct_dtype.dtypes().iter().enumerate() { let mut field_chunks = Vec::new(); for chunk in &chunks { - field_chunks.push(chunk.field(field_idx).expect("all chunks must have same dtype")); + field_chunks.push( + chunk + .field(field_idx) + .expect("all chunks must have same dtype"), + ); } let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?; field_arrays.push(field_array.into_array()); } - Ok(StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity)?) + Ok(StructArray::try_new( + struct_dtype.names().clone(), + field_arrays, + len, + validity, + )?) } /// Builds a new [BoolArray] by repacking the values from the chunks in a single contiguous array. @@ -133,7 +144,11 @@ fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult VortexResult { +fn pack_primitives( + chunks: &[Array], + ptype: PType, + nullability: Nullability, +) -> VortexResult { let len: usize = chunks.iter().map(|chunk| chunk.len()).sum(); let mut logical_validities = Vec::new(); let mut buffer = MutableBuffer::with_capacity(len * ptype.byte_width()); @@ -155,10 +170,12 @@ fn pack_primitives(chunks: &[Array], ptype: PType, nullability: Nullability) -> /// /// It is expected this function is only called from [try_flatten_chunks], and thus all chunks have /// been checked to have the same DType already. -fn pack_varbin(chunks: &[Array], dtype: &DType, _nullability: Nullability) -> VortexResult { - let len = chunks.iter() - .map(|chunk| chunk.len()) - .sum(); +fn pack_varbin( + chunks: &[Array], + dtype: &DType, + _nullability: Nullability, +) -> VortexResult { + let len = chunks.iter().map(|chunk| chunk.len()).sum(); let mut builder = VarBinBuilder::::with_capacity(len); for chunk in chunks { @@ -173,10 +190,13 @@ fn pack_varbin(chunks: &[Array], dtype: &DType, _nullability: Nullability) -> Vo Ok(builder.finish(dtype.clone())) } -fn validity_from_chunks(logical_validities: Vec, nullability: Nullability) -> Validity { +fn validity_from_chunks( + logical_validities: Vec, + nullability: Nullability, +) -> Validity { if nullability == Nullability::NonNullable { Validity::NonNullable } else { logical_validities.into_iter().collect() } -} \ No newline at end of file +} diff --git a/vortex-array/src/array/chunked/mod.rs b/vortex-array/src/array/chunked/mod.rs index e221dafa5..8d13e642a 100644 --- a/vortex-array/src/array/chunked/mod.rs +++ b/vortex-array/src/array/chunked/mod.rs @@ -1,25 +1,24 @@ use futures_util::stream; use itertools::Itertools; use serde::{Deserialize, Serialize}; - use vortex_dtype::{Nullability, PType}; use vortex_error::vortex_bail; use vortex_scalar::Scalar; -use crate::{ArrayDType, impl_encoding, IntoArrayData, ToArrayData}; use crate::array::primitive::PrimitiveArray; use crate::compute::scalar_at::scalar_at; use crate::compute::scalar_subtract::{subtract_scalar, SubtractScalarFn}; use crate::compute::search_sorted::{search_sorted, SearchSortedSide}; use crate::iter::{ArrayIterator, ArrayIteratorAdapter}; use crate::stream::{ArrayStream, ArrayStreamAdapter}; -use crate::validity::{ArrayValidity, LogicalValidity}; use crate::validity::Validity::NonNullable; +use crate::validity::{ArrayValidity, LogicalValidity}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; +use crate::{impl_encoding, ArrayDType, IntoArrayData, ToArrayData}; mod compute; -mod stats; mod flatten; +mod stats; impl_encoding!("vortex.chunked", Chunked); @@ -161,10 +160,10 @@ mod test { use vortex_dtype::{DType, Nullability}; use vortex_dtype::{NativePType, PType}; - use crate::{Array, IntoArray, ToArray}; use crate::array::chunked::ChunkedArray; use crate::compute::scalar_subtract::subtract_scalar; use crate::compute::slice::slice; + use crate::{Array, IntoArray, ToArray}; fn chunked_array() -> ChunkedArray { ChunkedArray::try_new( diff --git a/vortex-array/src/array/constant/compute.rs b/vortex-array/src/array/constant/compute.rs index 61d19cd7a..103b1693f 100644 --- a/vortex-array/src/array/constant/compute.rs +++ b/vortex-array/src/array/constant/compute.rs @@ -1,11 +1,11 @@ use vortex_error::VortexResult; use vortex_scalar::Scalar; -use crate::{Array, IntoArray}; use crate::array::constant::ConstantArray; -use crate::compute::ArrayCompute; use crate::compute::scalar_at::ScalarAtFn; use crate::compute::take::TakeFn; +use crate::compute::ArrayCompute; +use crate::{Array, IntoArray}; impl ArrayCompute for ConstantArray { fn scalar_at(&self) -> Option<&dyn ScalarAtFn> { diff --git a/vortex-array/src/array/sparse/compute/mod.rs b/vortex-array/src/array/sparse/compute/mod.rs index e2c822c63..bb772bd0b 100644 --- a/vortex-array/src/array/sparse/compute/mod.rs +++ b/vortex-array/src/array/sparse/compute/mod.rs @@ -1,18 +1,17 @@ use std::collections::HashMap; use itertools::Itertools; - use vortex_dtype::match_each_integer_ptype; use vortex_error::VortexResult; use vortex_scalar::Scalar; -use crate::{Array, ArrayDType, IntoArray}; use crate::array::primitive::PrimitiveArray; use crate::array::sparse::SparseArray; -use crate::compute::ArrayCompute; use crate::compute::scalar_at::{scalar_at, ScalarAtFn}; use crate::compute::slice::SliceFn; use crate::compute::take::{take, TakeFn}; +use crate::compute::ArrayCompute; +use crate::{Array, ArrayDType, IntoArray}; mod slice; @@ -113,16 +112,15 @@ fn take_search_sorted( #[cfg(test)] mod test { use itertools::Itertools; - use vortex_dtype::{DType, Nullability, PType}; use vortex_scalar::Scalar; - use crate::{Array, ArrayTrait, IntoArray}; use crate::array::primitive::PrimitiveArray; use crate::array::sparse::compute::take_map; use crate::array::sparse::SparseArray; use crate::compute::take::take; use crate::validity::Validity; + use crate::{Array, ArrayTrait, IntoArray}; fn sparse_array() -> Array { SparseArray::new( diff --git a/vortex-array/src/array/struct/mod.rs b/vortex-array/src/array/struct/mod.rs index 997802bf9..f25ddcc0b 100644 --- a/vortex-array/src/array/struct/mod.rs +++ b/vortex-array/src/array/struct/mod.rs @@ -53,7 +53,7 @@ impl StructArray { } impl<'a> StructArray { - pub fn children(&'a self) -> impl Iterator + '_ { + pub fn children(&'a self) -> impl Iterator + '_ { (0..self.nfields()).map(move |idx| self.field(idx).unwrap()) } } diff --git a/vortex-array/src/compress.rs b/vortex-array/src/compress.rs index c3a4aca81..8e1d0843d 100644 --- a/vortex-array/src/compress.rs +++ b/vortex-array/src/compress.rs @@ -2,10 +2,8 @@ use std::collections::HashSet; use std::fmt::{Debug, Display, Formatter}; use log::{debug, info, warn}; - use vortex_error::{vortex_bail, VortexResult}; -use crate::{Array, ArrayDef, ArrayDType, ArrayFlatten, ArrayTrait, Context, IntoArray}; use crate::array::chunked::{Chunked, ChunkedArray}; use crate::array::constant::{Constant, ConstantArray}; use crate::array::r#struct::{Struct, StructArray}; @@ -15,6 +13,7 @@ use crate::encoding::{ArrayEncoding, EncodingRef}; use crate::sampling::stratified_slices; use crate::stats::ArrayStatistics; use crate::validity::Validity; +use crate::{Array, ArrayDType, ArrayDef, ArrayFlatten, ArrayTrait, Context, IntoArray}; pub trait EncodingCompression: ArrayEncoding { fn cost(&self) -> u8 { @@ -204,7 +203,7 @@ impl<'a> Compressor<'a> { strct.len(), validity, )? - .into_array()) + .into_array()) } _ => { // Otherwise, we run sampled compression over pluggable encodings @@ -318,12 +317,13 @@ pub fn sampled_compression(array: &Array, compressor: &Compressor) -> VortexResu compressor.options.sample_size, compressor.options.sample_count, ) - .into_iter() - .map(|(start, stop)| slice(array, start, stop)) - .collect::>>()?, - array.dtype().clone())? - .flatten()? - .into_array(); + .into_iter() + .map(|(start, stop)| slice(array, start, stop)) + .collect::>>()?, + array.dtype().clone(), + )? + .flatten()? + .into_array(); find_best_compression(candidates, &sample, compressor)? .map(|(compression, best)| { diff --git a/vortex-array/src/flatten.rs b/vortex-array/src/flatten.rs index 14a368abe..da991117a 100644 --- a/vortex-array/src/flatten.rs +++ b/vortex-array/src/flatten.rs @@ -1,6 +1,7 @@ use vortex_error::VortexResult; use crate::array::bool::BoolArray; +use crate::array::constant::ConstantArray; use crate::array::extension::ExtensionArray; use crate::array::primitive::PrimitiveArray; use crate::array::r#struct::StructArray; @@ -8,7 +9,6 @@ use crate::array::varbin::VarBinArray; use crate::array::varbinview::VarBinViewArray; use crate::encoding::ArrayEncoding; use crate::{Array, IntoArray}; -use crate::array::constant::ConstantArray; /// The set of encodings that can be converted to Arrow with zero-copy. pub enum Flattened { diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index d2cf82874..a88fa6671 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -1,15 +1,14 @@ use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder, NullBuffer}; use serde::{Deserialize, Serialize}; - use vortex_dtype::{DType, Nullability}; use vortex_error::{vortex_bail, VortexResult}; -use crate::{Array, ArrayData, IntoArray, IntoArrayData, ToArray, ToArrayData}; use crate::array::bool::BoolArray; use crate::compute::scalar_at::scalar_at; use crate::compute::slice::slice; use crate::compute::take::take; use crate::stats::ArrayStatistics; +use crate::{Array, ArrayData, IntoArray, IntoArrayData, ToArray, ToArrayData}; pub trait ArrayValidity { fn is_valid(&self, index: usize) -> bool; @@ -200,15 +199,13 @@ impl FromIterator for Validity { let mut buffer = BooleanBufferBuilder::new(validities.iter().map(|v| v.len()).sum()); for validity in validities { let present = match validity { - LogicalValidity::AllValid(count) => { - BooleanBuffer::new_set(count) - } - LogicalValidity::AllInvalid(count) => { - BooleanBuffer::new_unset(count) - } - LogicalValidity::Array(array) => { - array.into_array().flatten_bool().expect("validity must flatten to BoolArray").boolean_buffer() - } + LogicalValidity::AllValid(count) => BooleanBuffer::new_set(count), + LogicalValidity::AllInvalid(count) => BooleanBuffer::new_unset(count), + LogicalValidity::Array(array) => array + .into_array() + .flatten_bool() + .expect("validity must flatten to BoolArray") + .boolean_buffer(), }; buffer.append_buffer(&present); } diff --git a/vortex-fastlanes/src/for/compress.rs b/vortex-fastlanes/src/for/compress.rs index e684ec056..e7a3cc82f 100644 --- a/vortex-fastlanes/src/for/compress.rs +++ b/vortex-fastlanes/src/for/compress.rs @@ -4,8 +4,8 @@ use vortex::array::constant::ConstantArray; use vortex::array::primitive::PrimitiveArray; use vortex::compress::{CompressConfig, Compressor, EncodingCompression}; use vortex::stats::{ArrayStatistics, Stat}; -use vortex::{Array, ArrayDType, ArrayTrait, IntoArray}; use vortex::validity::ArrayValidity; +use vortex::{Array, ArrayDType, ArrayTrait, IntoArray}; use vortex_dtype::{match_each_integer_ptype, NativePType, PType}; use vortex_error::{vortex_err, VortexResult}; use vortex_scalar::Scalar; diff --git a/vortex-fastlanes/src/for/compute.rs b/vortex-fastlanes/src/for/compute.rs index 2707b8cbf..463ae7c6c 100644 --- a/vortex-fastlanes/src/for/compute.rs +++ b/vortex-fastlanes/src/for/compute.rs @@ -1,8 +1,8 @@ -use vortex::{Array, IntoArray}; -use vortex::compute::ArrayCompute; use vortex::compute::scalar_at::{scalar_at, ScalarAtFn}; use vortex::compute::slice::{slice, SliceFn}; use vortex::compute::take::{take, TakeFn}; +use vortex::compute::ArrayCompute; +use vortex::{Array, IntoArray}; use vortex_dtype::match_each_integer_ptype; use vortex_error::{vortex_bail, VortexResult}; use vortex_scalar::{PrimitiveScalar, Scalar, ScalarValue}; @@ -30,7 +30,7 @@ impl TakeFn for FoRArray { self.reference().clone(), self.shift(), ) - .map(|a| a.into_array()) + .map(|a| a.into_array()) } } @@ -60,7 +60,7 @@ impl SliceFn for FoRArray { self.reference().clone(), self.shift(), ) - .map(|a| a.into_array()) + .map(|a| a.into_array()) } } From 8696236a5c49f3df525fc55b24d45323f6261ef5 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 15:32:22 +0100 Subject: [PATCH 07/14] re-enable lints as errors --- vortex-array/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/vortex-array/Cargo.toml b/vortex-array/Cargo.toml index 31a5f2011..4c7238ab0 100644 --- a/vortex-array/Cargo.toml +++ b/vortex-array/Cargo.toml @@ -15,8 +15,8 @@ rust-version = { workspace = true } name = "vortex" path = "src/lib.rs" -#[lints] -#workspace = true +[lints] +workspace = true [dependencies] arrow-array = { workspace = true } From 0f24c36e39a1e116eab70a7e3014794941edf478 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 15:40:32 +0100 Subject: [PATCH 08/14] fix comments --- vortex-array/src/array/chunked/flatten.rs | 2 +- vortex-array/src/lib.rs | 14 +++++++------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/vortex-array/src/array/chunked/flatten.rs b/vortex-array/src/array/chunked/flatten.rs index 13ed4d09f..300400907 100644 --- a/vortex-array/src/array/chunked/flatten.rs +++ b/vortex-array/src/array/chunked/flatten.rs @@ -80,7 +80,7 @@ pub(crate) fn try_flatten_chunks(chunks: Vec, dtype: DType) -> VortexResu } /// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single -/// StructArray pointed at ChunkedArrays of each constituent format. +/// StructArray, where the Array for each Field is a ChunkedArray. /// /// It is expected this function is only called from [try_flatten_chunks], and thus all chunks have /// been checked to have the same DType already. diff --git a/vortex-array/src/lib.rs b/vortex-array/src/lib.rs index e4bc1e5f3..b60e554d7 100644 --- a/vortex-array/src/lib.rs +++ b/vortex-array/src/lib.rs @@ -1,12 +1,12 @@ -//! Core Array and Encoding crate for Vortex. +//! Vortex crate containing core logic for encoding and memory representation of [arrays](Array). //! -//! At the heart of Vortex are [Arrays](Array) and [encodings](crate::encoding::EncodingCompression). -//! Arrays are typed views of memory buffers that hold [scalars](vortex_scalar::Scalar). Every data -//! type recognized by Vortex has a canonical physical encoding format, examples of which are -//! [crate::array::bool::BoolEncoding] and [crate::array::primitive::PrimitiveEncoding]. -//! -//! Vortex also supports a number of encodings backed by the latest databases research, including +//! At the heart of Vortex are [arrays](Array) and [encodings](crate::encoding::EncodingCompression). +//! Arrays are typed views of memory buffers that hold [scalars](vortex_scalar::Scalar). These +//! buffers can be held in a number of physical encodings to perform lightweight compression that +//! exploits the particular data distribution of the array's values. //! +//! Every data type recognized by Vortex also has a canonical physical encoding format, which +//! arrays can be [flattened](Flattened) into for ease of access in compute functions. //! pub mod accessor; pub mod array; From 95ac996081abb7e7ce44736b5940b582901de89c Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 15:41:35 +0100 Subject: [PATCH 09/14] remove unused --- vortex-array/src/stats/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/vortex-array/src/stats/mod.rs b/vortex-array/src/stats/mod.rs index 922782421..b7ef190be 100644 --- a/vortex-array/src/stats/mod.rs +++ b/vortex-array/src/stats/mod.rs @@ -1,6 +1,5 @@ use std::fmt::{Display, Formatter}; use std::hash::Hash; -use std::sync::Arc; use enum_iterator::Sequence; pub use statsset::*; @@ -56,8 +55,6 @@ pub trait Statistics { fn compute(&self, stat: Stat) -> Option; } -pub type StatisticsRef = Arc; - pub struct EmptyStatistics; impl Statistics for EmptyStatistics { From 2b9cfdf8969f0b3ed6b2e463c63a7c0b6380d5d3 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 15:48:31 +0100 Subject: [PATCH 10/14] =?UTF-8?q?please=20the=20clippy=20gods=20?= =?UTF-8?q?=F0=9F=99=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cargo.toml | 2 +- vortex-array/src/array/chunked/flatten.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f04c450fb..080a89a81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -107,5 +107,5 @@ zigzag = "0.1.0" warnings = "deny" [workspace.lints.clippy] -all = "deny" +all = { level = "deny", priority = -1 } or_fun_call = "deny" diff --git a/vortex-array/src/array/chunked/flatten.rs b/vortex-array/src/array/chunked/flatten.rs index 300400907..65231f733 100644 --- a/vortex-array/src/array/chunked/flatten.rs +++ b/vortex-array/src/array/chunked/flatten.rs @@ -111,12 +111,12 @@ fn swizzle_struct_chunks( field_arrays.push(field_array.into_array()); } - Ok(StructArray::try_new( + StructArray::try_new( struct_dtype.names().clone(), field_arrays, len, validity, - )?) + ) } /// Builds a new [BoolArray] by repacking the values from the chunks in a single contiguous array. From a24b7d01308416c7946024fd1173b6eb6bae2100 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 15:54:42 +0100 Subject: [PATCH 11/14] format --- vortex-array/src/array/chunked/flatten.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/vortex-array/src/array/chunked/flatten.rs b/vortex-array/src/array/chunked/flatten.rs index 65231f733..61043792f 100644 --- a/vortex-array/src/array/chunked/flatten.rs +++ b/vortex-array/src/array/chunked/flatten.rs @@ -111,12 +111,7 @@ fn swizzle_struct_chunks( field_arrays.push(field_array.into_array()); } - StructArray::try_new( - struct_dtype.names().clone(), - field_arrays, - len, - validity, - ) + StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity) } /// Builds a new [BoolArray] by repacking the values from the chunks in a single contiguous array. From 9c7a20cf88df6727c744287ca6d551269bb78abf Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 16:05:35 +0100 Subject: [PATCH 12/14] i thought we had something special, clippy --- vortex-fastlanes/src/bitpacking/compress.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index 63ed4dba5..a074b7cc0 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -300,7 +300,8 @@ pub fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResult( packed: &[u8], bit_width: usize, From 902fe244a551a2c5883755b7ff8040dba9bbaca6 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 16:13:40 +0100 Subject: [PATCH 13/14] fix finally --- vortex-fastlanes/src/bitpacking/compress.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/vortex-fastlanes/src/bitpacking/compress.rs b/vortex-fastlanes/src/bitpacking/compress.rs index a074b7cc0..bde98d42c 100644 --- a/vortex-fastlanes/src/bitpacking/compress.rs +++ b/vortex-fastlanes/src/bitpacking/compress.rs @@ -300,6 +300,7 @@ pub fn unpack_single(array: &BitPackedArray, index: usize) -> VortexResult( From 49b2453f6ea18ac826a996e659b50d7c3dfbff00 Mon Sep 17 00:00:00 2001 From: Andrew Duffy Date: Tue, 11 Jun 2024 18:33:04 +0100 Subject: [PATCH 14/14] fix SparseArray flatten for Bool, add AsArrowArray impl --- vortex-array/src/array/chunked/flatten.rs | 25 +++---- vortex-array/src/array/constant/as_arrow.rs | 50 +++++++++++++ vortex-array/src/array/constant/mod.rs | 1 + vortex-array/src/array/sparse/flatten.rs | 79 +++++++++++++++++---- vortex-array/src/typed.rs | 6 +- 5 files changed, 131 insertions(+), 30 deletions(-) create mode 100644 vortex-array/src/array/constant/as_arrow.rs diff --git a/vortex-array/src/array/chunked/flatten.rs b/vortex-array/src/array/chunked/flatten.rs index 61043792f..c70b5083d 100644 --- a/vortex-array/src/array/chunked/flatten.rs +++ b/vortex-array/src/array/chunked/flatten.rs @@ -13,7 +13,7 @@ use crate::array::primitive::PrimitiveArray; use crate::array::r#struct::StructArray; use crate::array::varbin::builder::VarBinBuilder; use crate::array::varbin::VarBinArray; -use crate::validity::{LogicalValidity, Validity}; +use crate::validity::Validity; use crate::{Array, ArrayDType, ArrayFlatten, ArrayTrait, ArrayValidity, Flattened, IntoArray}; impl ArrayFlatten for ChunkedArray { @@ -120,18 +120,14 @@ fn swizzle_struct_chunks( /// been checked to have the same DType already. fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult { let len = chunks.iter().map(|chunk| chunk.len()).sum(); - let mut logical_validities = Vec::new(); + let validity = validity_from_chunks(chunks, nullability); let mut bools = Vec::with_capacity(len); for chunk in chunks { let chunk = chunk.clone().flatten_bool()?; - logical_validities.push(chunk.logical_validity()); bools.extend(chunk.boolean_buffer().iter()); } - BoolArray::try_new( - BooleanBuffer::from(bools), - validity_from_chunks(logical_validities, nullability), - ) + BoolArray::try_new(BooleanBuffer::from(bools), validity) } /// Builds a new [PrimitiveArray] by repacking the values from the chunks into a single @@ -145,18 +141,17 @@ fn pack_primitives( nullability: Nullability, ) -> VortexResult { let len: usize = chunks.iter().map(|chunk| chunk.len()).sum(); - let mut logical_validities = Vec::new(); + let validity = validity_from_chunks(chunks, nullability); let mut buffer = MutableBuffer::with_capacity(len * ptype.byte_width()); for chunk in chunks { let chunk = chunk.clone().flatten_primitive()?; - logical_validities.push(chunk.logical_validity()); buffer.extend_from_slice(chunk.buffer()); } match_each_native_ptype!(ptype, |$T| { Ok(PrimitiveArray::try_new( ScalarBuffer::<$T>::from(buffer), - validity_from_chunks(logical_validities, nullability))?) + validity)?) }) } @@ -185,13 +180,13 @@ fn pack_varbin( Ok(builder.finish(dtype.clone())) } -fn validity_from_chunks( - logical_validities: Vec, - nullability: Nullability, -) -> Validity { +fn validity_from_chunks(chunks: &[Array], nullability: Nullability) -> Validity { if nullability == Nullability::NonNullable { Validity::NonNullable } else { - logical_validities.into_iter().collect() + chunks + .iter() + .map(|chunk| chunk.with_dyn(|a| a.logical_validity())) + .collect() } } diff --git a/vortex-array/src/array/constant/as_arrow.rs b/vortex-array/src/array/constant/as_arrow.rs new file mode 100644 index 000000000..f2e75cb0e --- /dev/null +++ b/vortex-array/src/array/constant/as_arrow.rs @@ -0,0 +1,50 @@ +//! Implementation of the [AsArrowArray] trait for [ConstantArray] that is representing +//! [DType::Null] values. + +use std::sync::Arc; + +use arrow_array::{ArrayRef as ArrowArrayRef, NullArray}; +use vortex_dtype::DType; +use vortex_error::{vortex_bail, VortexResult}; + +use crate::array::constant::ConstantArray; +use crate::compute::as_arrow::AsArrowArray; +use crate::{ArrayDType, ArrayTrait}; + +impl AsArrowArray for ConstantArray { + fn as_arrow(&self) -> VortexResult { + if self.dtype() != &DType::Null { + vortex_bail!(InvalidArgument: "only null ConstantArrays convert to arrow"); + } + + let arrow_null = NullArray::new(self.len()); + Ok(Arc::new(arrow_null)) + } +} + +#[cfg(test)] +mod test { + use arrow_array::{Array, NullArray}; + + use crate::array::constant::ConstantArray; + use crate::arrow::FromArrowArray; + use crate::compute::as_arrow::AsArrowArray; + use crate::{ArrayData, IntoArray}; + + #[test] + fn test_round_trip() { + let arrow_nulls = NullArray::new(10); + let vortex_nulls = ArrayData::from_arrow(&arrow_nulls, true).into_array(); + + assert_eq!( + *ConstantArray::try_from(vortex_nulls) + .unwrap() + .as_arrow() + .unwrap() + .as_any() + .downcast_ref::() + .unwrap(), + arrow_nulls + ); + } +} diff --git a/vortex-array/src/array/constant/mod.rs b/vortex-array/src/array/constant/mod.rs index 029239816..d3344fe6c 100644 --- a/vortex-array/src/array/constant/mod.rs +++ b/vortex-array/src/array/constant/mod.rs @@ -7,6 +7,7 @@ use crate::impl_encoding; use crate::stats::Stat; use crate::validity::{ArrayValidity, LogicalValidity}; use crate::visitor::{AcceptArrayVisitor, ArrayVisitor}; +mod as_arrow; mod compute; mod flatten; mod stats; diff --git a/vortex-array/src/array/sparse/flatten.rs b/vortex-array/src/array/sparse/flatten.rs index 8d13350d8..0ebbef70c 100644 --- a/vortex-array/src/array/sparse/flatten.rs +++ b/vortex-array/src/array/sparse/flatten.rs @@ -1,13 +1,14 @@ -use arrow_buffer::BooleanBufferBuilder; +use arrow_buffer::{BooleanBuffer, BooleanBufferBuilder}; use itertools::Itertools; -use vortex_dtype::{match_each_native_ptype, NativePType}; +use vortex_dtype::{match_each_native_ptype, DType, NativePType}; use vortex_error::{VortexError, VortexResult}; use vortex_scalar::Scalar; +use crate::array::bool::BoolArray; use crate::array::primitive::PrimitiveArray; use crate::array::sparse::SparseArray; use crate::validity::Validity; -use crate::{ArrayFlatten, ArrayTrait, Flattened}; +use crate::{ArrayDType, ArrayFlatten, ArrayTrait, Flattened}; impl ArrayFlatten for SparseArray { fn flatten(self) -> VortexResult { @@ -16,20 +17,50 @@ impl ArrayFlatten for SparseArray { let mut validity = BooleanBufferBuilder::new(self.len()); validity.append_n(self.len(), false); - let values = self.values().flatten_primitive()?; - match_each_native_ptype!(values.ptype(), |$P| { - flatten_sparse_values( - values.typed_data::<$P>(), - &indices, - self.len(), - self.fill_value(), - validity - ) - }) + + if matches!(self.dtype(), DType::Bool(_)) { + let values = self.values().flatten_bool()?.boolean_buffer(); + flatten_sparse_bools(values, &indices, self.len(), self.fill_value(), validity) + } else { + let values = self.values().flatten_primitive()?; + match_each_native_ptype!(values.ptype(), |$P| { + flatten_sparse_primitives( + values.typed_data::<$P>(), + &indices, + self.len(), + self.fill_value(), + validity + ) + }) + } + } +} + +fn flatten_sparse_bools( + values: BooleanBuffer, + indices: &[usize], + len: usize, + fill_value: &Scalar, + mut validity: BooleanBufferBuilder, +) -> VortexResult { + let fill_bool: bool = if fill_value.is_null() { + bool::default() + } else { + fill_value.try_into()? + }; + let mut flat_bools = vec![fill_bool; len]; + for idx in indices { + flat_bools[*idx] = values.value(*idx); + validity.set_bit(*idx, true); } + + let validity = Validity::from(validity.finish()); + let bool_values = BoolArray::from_vec(flat_bools, validity); + + Ok(Flattened::Bool(bool_values)) } -fn flatten_sparse_values TryFrom<&'a Scalar, Error = VortexError>>( +fn flatten_sparse_primitives TryFrom<&'a Scalar, Error = VortexError>>( values: &[T], indices: &[usize], len: usize, @@ -56,3 +87,23 @@ fn flatten_sparse_values TryFrom<&'a Scalar, Error = Vo }; Ok(Flattened::Primitive(array)) } + +#[cfg(test)] +mod test { + use vortex_dtype::{DType, Nullability}; + + use crate::array::bool::BoolArray; + use crate::array::sparse::SparseArray; + use crate::validity::Validity; + use crate::{ArrayDType, ArrayFlatten, Flattened, IntoArray}; + + #[test] + fn test_sparse_bool() { + let indices = vec![0u64].into_array(); + let values = BoolArray::from_vec(vec![true], Validity::NonNullable).into_array(); + let sparse_bools = SparseArray::new(indices, values, 10, true.into()); + assert_eq!(*sparse_bools.dtype(), DType::Bool(Nullability::NonNullable)); + let flat_bools = sparse_bools.flatten().unwrap(); + assert!(matches!(flat_bools, Flattened::Bool(_))); + } +} diff --git a/vortex-array/src/typed.rs b/vortex-array/src/typed.rs index 49184e8c5..4284284d6 100644 --- a/vortex-array/src/typed.rs +++ b/vortex-array/src/typed.rs @@ -48,7 +48,11 @@ impl TryFrom for TypedArray { fn try_from(array: Array) -> Result { if array.encoding().id() != D::ENCODING.id() { - vortex_bail!("incorrect encoding"); + vortex_bail!( + "incorrect encoding {}, expected {}", + array.encoding().id().as_ref(), + D::ENCODING.id().as_ref(), + ); } let metadata = match &array { Array::Data(d) => d