Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: canonicalization of chunked ExtensionArray #499

Merged
merged 10 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions encodings/datetime-parts/src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ use vortex_error::{vortex_bail, VortexResult};
/// Splitting the components by granularity creates more small values, which enables better
/// cascading compression.
pub fn compress_temporal(array: TemporalArray) -> VortexResult<(Array, Array, Array)> {
let timestamps = try_cast(&array.temporal_values(), PType::I64.into())?.into_primitive()?;
// After this operation, timestamps will be PrimitiveArray<i64>
let timestamps = try_cast(
&array.temporal_values().into_primitive()?.into_array(),
PType::I64.into(),
)?;
let divisor = match array.temporal_metadata().time_unit() {
TimeUnit::Ns => 1_000_000_000,
TimeUnit::Us => 1_000_000,
Expand All @@ -24,14 +28,14 @@ pub fn compress_temporal(array: TemporalArray) -> VortexResult<(Array, Array, Ar
let mut seconds = Vec::with_capacity(length);
let mut subsecond = Vec::with_capacity(length);

for &t in timestamps.maybe_null_slice::<i64>().iter() {
for &t in timestamps.as_primitive().maybe_null_slice::<i64>().iter() {
days.push(t / (86_400 * divisor));
seconds.push((t % (86_400 * divisor)) / divisor);
subsecond.push((t % (86_400 * divisor)) % divisor);
}

Ok((
PrimitiveArray::from_vec(days, timestamps.validity()).into_array(),
PrimitiveArray::from_vec(days, timestamps.as_primitive().validity()).into_array(),
PrimitiveArray::from(seconds).into_array(),
PrimitiveArray::from(subsecond).into_array(),
))
Expand Down
50 changes: 43 additions & 7 deletions vortex-array/src/array/chunked/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ pub(crate) fn try_canonicalize_chunks(
chunks: Vec<Array>,
dtype: &DType,
) -> VortexResult<Canonical> {
if chunks.is_empty() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should add a FLUP to create Canonical::empty(dtype)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we end up redesigning extension arrays to contain their storage_dtype as part of the type definition, i'm not sure we'd need it

vortex_bail!(InvalidArgument: "chunks must be non-empty")
}

let mismatched = chunks
.iter()
.filter(|chunk| !chunk.dtype().eq(dtype))
Expand All @@ -44,15 +48,47 @@ pub(crate) fn try_canonicalize_chunks(
Ok(Canonical::Struct(struct_array))
}

// Extension arrays wrap an internal storage array, which can hold a ChunkedArray until
// it is safe to unpack them.
// Extension arrays are containers that wraps an inner storage array with some metadata.
// We delegate to the canonical format of the internal storage array for every chunk, and
// push the chunking down into the inner storage array.
//
// Input:
// ------
//
// ChunkedArray
// / \
// / \
// ExtensionArray ExtensionArray
// | |
// storage storage
//
//
// Output:
// ------
//
// ExtensionArray
// |
// ChunkedArray
// / \
// storage storage
//
DType::Extension(ext_dtype, _) => {
let ext_array = ExtensionArray::new(
// Recursively apply canonicalization and packing to the storage array backing
// each chunk of the extension array.
let storage_chunks: Vec<Array> = chunks
.iter()
// Extension-typed arrays can be compressed into something that is not an
// ExtensionArray, so we should canonicalize each chunk into ExtensionArray first.
.map(|chunk| chunk.clone().into_extension().unwrap().storage())
a10y marked this conversation as resolved.
Show resolved Hide resolved
.collect();
let storage_dtype = storage_chunks.first().unwrap().dtype().clone();
a10y marked this conversation as resolved.
Show resolved Hide resolved
let chunked_storage =
ChunkedArray::try_new(storage_chunks, storage_dtype)?.into_array();

Ok(Canonical::Extension(ExtensionArray::new(
ext_dtype.clone(),
ChunkedArray::try_new(chunks, dtype.clone())?.into_array(),
);

Ok(Canonical::Extension(ext_array))
chunked_storage,
)))
}

// TODO(aduffy): better list support
Expand Down
Loading