Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove AsContiguousFn #346

Merged
merged 14 commits into from
Jun 11, 2024
10 changes: 1 addition & 9 deletions vortex-alp/src/compute.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
use vortex::compute::as_contiguous::AsContiguousFn;
use vortex::compute::scalar_at::{scalar_at, ScalarAtFn};
use vortex::compute::slice::{slice, SliceFn};
use vortex::compute::take::{take, TakeFn};
use vortex::compute::ArrayCompute;
use vortex::{impl_default_as_contiguous_fn, Array, ArrayDType, IntoArray};
use vortex::{Array, ArrayDType, IntoArray};
use vortex_error::VortexResult;
use vortex_scalar::Scalar;

use crate::{match_each_alp_float_ptype, ALPArray};

impl_default_as_contiguous_fn!(ALPArray);

impl ArrayCompute for ALPArray {
fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
Expand All @@ -23,10 +20,6 @@ impl ArrayCompute for ALPArray {
fn take(&self) -> Option<&dyn TakeFn> {
Some(self)
}

fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> {
Some(self)
}
}

impl ScalarAtFn for ALPArray {
Expand Down Expand Up @@ -72,7 +65,6 @@ impl SliceFn for ALPArray {
#[cfg(test)]
mod test {
use vortex::array::primitive::PrimitiveArray;
use vortex::compute::as_contiguous::AsContiguousFn;
use vortex::compute::scalar_at::scalar_at;
use vortex::validity::Validity;
use vortex::IntoArray;
Expand Down
4 changes: 2 additions & 2 deletions vortex-array/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ rust-version = { workspace = true }
name = "vortex"
path = "src/lib.rs"

[lints]
workspace = true
#[lints]
#workspace = true

[dependencies]
arrow-array = { workspace = true }
Expand Down
27 changes: 0 additions & 27 deletions vortex-array/src/array/bool/compute/as_contiguous.rs

This file was deleted.

6 changes: 0 additions & 6 deletions vortex-array/src/array/bool/compute/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::array::bool::BoolArray;
use crate::compute::as_arrow::AsArrowArray;
use crate::compute::as_contiguous::AsContiguousFn;
use crate::compute::compare::CompareFn;
use crate::compute::fill::FillForwardFn;
use crate::compute::scalar_at::ScalarAtFn;
Expand All @@ -9,7 +8,6 @@ use crate::compute::take::TakeFn;
use crate::compute::ArrayCompute;

mod as_arrow;
mod as_contiguous;
mod compare;
mod fill;
mod flatten;
Expand All @@ -22,10 +20,6 @@ impl ArrayCompute for BoolArray {
Some(self)
}

fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> {
Some(self)
}

fn compare(&self) -> Option<&dyn CompareFn> {
Some(self)
}
Expand Down
25 changes: 3 additions & 22 deletions vortex-array/src/array/chunked/compute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@ use vortex_error::VortexResult;
use vortex_scalar::Scalar;

use crate::array::chunked::ChunkedArray;
use crate::compute::as_contiguous::{as_contiguous, AsContiguousFn};
use crate::compute::ArrayCompute;
use crate::compute::scalar_at::{scalar_at, ScalarAtFn};
use crate::compute::scalar_subtract::SubtractScalarFn;
use crate::compute::slice::SliceFn;
use crate::compute::take::TakeFn;
use crate::compute::ArrayCompute;
use crate::Array;

mod slice;
mod take;

impl ArrayCompute for ChunkedArray {
fn as_contiguous(&self) -> Option<&dyn AsContiguousFn> {
fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
Some(self)
}

fn scalar_at(&self) -> Option<&dyn ScalarAtFn> {
fn subtract_scalar(&self) -> Option<&dyn SubtractScalarFn> {
Some(self)
}

Expand All @@ -29,23 +27,6 @@ impl ArrayCompute for ChunkedArray {
fn take(&self) -> Option<&dyn TakeFn> {
Some(self)
}

fn subtract_scalar(&self) -> Option<&dyn SubtractScalarFn> {
Some(self)
}
}

impl AsContiguousFn for ChunkedArray {
fn as_contiguous(&self, arrays: &[Array]) -> VortexResult<Array> {
// Combine all the chunks into one, then call as_contiguous again.
let mut chunks = Vec::with_capacity(self.nchunks());
for array in arrays {
for chunk in Self::try_from(array).unwrap().chunks() {
chunks.push(chunk);
}
}
as_contiguous(&chunks)
}
}

impl ScalarAtFn for ChunkedArray {
Expand Down
1 change: 0 additions & 1 deletion vortex-array/src/array/chunked/compute/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ mod test {
use itertools::Itertools;

use crate::array::chunked::ChunkedArray;
use crate::compute::as_contiguous::as_contiguous;
use crate::compute::take::take;
use crate::{ArrayDType, ArrayTrait, AsArray, IntoArray};

Expand Down
146 changes: 146 additions & 0 deletions vortex-array/src/array/chunked/flatten.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use arrow_buffer::{BooleanBuffer, MutableBuffer, ScalarBuffer};

use vortex_dtype::{DType, match_each_native_ptype, Nullability, PType, StructDType};
use vortex_error::{vortex_bail, VortexResult};

use crate::{Array, ArrayTrait, ArrayValidity, Flattened, IntoArray};
use crate::accessor::ArrayAccessor;
use crate::array::bool::BoolArray;
use crate::array::chunked::ChunkedArray;
use crate::array::primitive::PrimitiveArray;
use crate::array::r#struct::StructArray;
use crate::array::varbin::builder::VarBinBuilder;
use crate::array::varbin::VarBinArray;
use crate::validity::{LogicalValidity, Validity};

pub fn try_flatten_chunks(chunks: Vec<Array>, dtype: DType) -> VortexResult<Flattened> {
a10y marked this conversation as resolved.
Show resolved Hide resolved
match &dtype {
// Structs can have their internal field pointers swizzled to push the chunking down
// one level internally without copying or decompressing any data.
DType::Struct(struct_dtype, _) => {
let struct_array = swizzle_struct_chunks(chunks.as_slice(), struct_dtype)?;
Ok(Flattened::Struct(struct_array))
}

// TODO(aduffy): can we pushdown chunks here?
DType::Extension(_, _) => {
// How can we stitch an unknown type back together?
a10y marked this conversation as resolved.
Show resolved Hide resolved
todo!()
}

// Lists just flatten into their inner PType
DType::List(_, _) => {
todo!()
a10y marked this conversation as resolved.
Show resolved Hide resolved
}

DType::Bool(nullability) => {
let bool_array = pack_bools(chunks.as_slice(), *nullability)?;
Ok(Flattened::Bool(bool_array))
}
DType::Primitive(ptype, nullability) => {
let prim_array = pack_primitives(chunks.as_slice(), *ptype, *nullability)?;
Ok(Flattened::Primitive(prim_array))
}
DType::Utf8(nullability) => {
let varbin_array = pack_varbin(chunks.as_slice(), dtype.clone(), *nullability)?;
Ok(Flattened::VarBin(varbin_array))
}
DType::Binary(nullability) => {
let varbin_array = pack_varbin(chunks.as_slice(), dtype.clone(), *nullability)?;
Ok(Flattened::VarBin(varbin_array))
}
DType::Null => {
vortex_bail!(ComputeError: "DType::Null cannot be flattened")
a10y marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

/// Swizzle the pointers within a ChunkedArray of StructArrays to instead be a single
/// StructArray pointed at ChunkedArrays of each constituent format.
fn swizzle_struct_chunks(chunks: &[Array], struct_dtype: &StructDType) -> VortexResult<StructArray> {
let chunks = chunks.iter()
.map(StructArray::try_from)
// Figure out how to unwrap result of things
.collect::<VortexResult<Vec<_>>>()?;
a10y marked this conversation as resolved.
Show resolved Hide resolved

let len = chunks.iter().map(|chunk| chunk.len()).sum();
let validity = chunks.iter()
.map(|chunk| chunk.logical_validity())
.collect::<Validity>();

let mut field_arrays = Vec::new();
let field_names = struct_dtype.names().clone();
let field_dtypes = struct_dtype.dtypes().clone();
a10y marked this conversation as resolved.
Show resolved Hide resolved

for (field_idx, field_dtype) in field_dtypes.iter().enumerate() {
let mut field_chunks = Vec::new();
for chunk in &chunks {
field_chunks.push(chunk.field(field_idx).expect("structarray should contain field"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This expect is only really valid if you assert all dtypes are equal at the start of this function. Maybe you check that elsewhere, in which case just document the assumed precondition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by document do you mean in the expect() string, or document as a comment and then call unwrap instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i assumed you meant the former since it seems there's a general preference for expect over unwrap, in which case this is done

}
let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
field_arrays.push(field_array.into_array());
}

Ok(StructArray::try_new(field_names, field_arrays, len, validity)?)
}

fn pack_bools(chunks: &[Array], nullability: Nullability) -> VortexResult<BoolArray> {
let len = chunks.iter().map(|chunk| chunk.len()).sum();
let mut logical_validities = Vec::new();
let mut bools = Vec::with_capacity(len);
for chunk in chunks {
let chunk = chunk.clone().flatten_bool()?;
logical_validities.push(chunk.logical_validity());
bools.extend(chunk.boolean_buffer().iter());
}

BoolArray::try_new(
BooleanBuffer::from(bools),
validity_from_chunks(logical_validities, nullability),
)
}

fn pack_primitives(chunks: &[Array], ptype: PType, nullability: Nullability) -> VortexResult<PrimitiveArray> {
let len: usize = chunks.iter().map(|chunk| chunk.len()).sum();
let mut logical_validities = Vec::new();
let mut buffer = MutableBuffer::with_capacity(len * ptype.byte_width());
for chunk in chunks {
let chunk = chunk.clone().flatten_primitive()?;
logical_validities.push(chunk.logical_validity());
buffer.extend_from_slice(chunk.buffer());
}

match_each_native_ptype!(ptype, |$T| {
Ok(PrimitiveArray::try_new(
ScalarBuffer::<$T>::from(buffer),
validity_from_chunks(logical_validities, nullability))?)
})
}

// TODO(aduffy): This can be slow for really large arrays.
// TODO(aduffy): this doesn't propagate the validity fully
fn pack_varbin(chunks: &[Array], dtype: DType, _nullability: Nullability) -> VortexResult<VarBinArray> {
let len = chunks.iter()
.map(|chunk| chunk.len())
.sum();
let mut builder = VarBinBuilder::<i32>::with_capacity(len);

for chunk in chunks {
let chunk = chunk.clone().flatten_varbin()?;
chunk.with_iterator(|iter| {
for datum in iter {
builder.push(datum);
}
})?;
}

Ok(builder.finish(dtype))
}

fn validity_from_chunks(logical_validities: Vec<LogicalValidity>, nullability: Nullability) -> Validity {
if nullability == Nullability::NonNullable {
a10y marked this conversation as resolved.
Show resolved Hide resolved
Validity::NonNullable
} else {
logical_validities.into_iter().collect()
}
}
17 changes: 7 additions & 10 deletions vortex-array/src/array/chunked/mod.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
use futures_util::stream;
use itertools::Itertools;
use serde::{Deserialize, Serialize};

use vortex_dtype::{Nullability, PType};
use vortex_error::vortex_bail;
use vortex_scalar::Scalar;

use crate::{ArrayDType, ArrayFlatten, impl_encoding, IntoArrayData, ToArrayData};
use crate::array::chunked::flatten::try_flatten_chunks;
use crate::array::primitive::PrimitiveArray;
use crate::compute::as_contiguous::as_contiguous;
use crate::compute::scalar_at::scalar_at;
use crate::compute::scalar_subtract::{subtract_scalar, SubtractScalarFn};
use crate::compute::search_sorted::{search_sorted, SearchSortedSide};
use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
use crate::stream::{ArrayStream, ArrayStreamAdapter};
use crate::validity::Validity::NonNullable;
use crate::validity::{ArrayValidity, LogicalValidity};
use crate::validity::Validity::NonNullable;
use crate::visitor::{AcceptArrayVisitor, ArrayVisitor};
use crate::{impl_encoding, ArrayDType, ArrayFlatten, IntoArrayData, ToArrayData};

mod compute;
mod stats;
mod flatten;

impl_encoding!("vortex.chunked", Chunked);

Expand Down Expand Up @@ -116,12 +118,7 @@ impl FromIterator<Array> for ChunkedArray {

impl ArrayFlatten for ChunkedArray {
a10y marked this conversation as resolved.
Show resolved Hide resolved
fn flatten(self) -> VortexResult<Flattened> {
let chunks = self.chunks().collect_vec();
if chunks.is_empty() {
// TODO(ngates): return an empty FlattenedArray with the correct DType.
panic!("Cannot yet flatten an empty chunked array");
}
as_contiguous(chunks.as_slice())?.flatten()
try_flatten_chunks(self.chunks().collect(), self.dtype().clone())
}
}

Expand Down Expand Up @@ -171,10 +168,10 @@ mod test {
use vortex_dtype::{DType, Nullability};
use vortex_dtype::{NativePType, PType};

use crate::{Array, IntoArray, ToArray};
use crate::array::chunked::ChunkedArray;
use crate::compute::scalar_subtract::subtract_scalar;
use crate::compute::slice::slice;
use crate::{Array, IntoArray, ToArray};

fn chunked_array() -> ChunkedArray {
ChunkedArray::try_new(
Expand Down
Loading
Loading