From f22e7fd5f2ec636f0013463b896a785a1aa33db2 Mon Sep 17 00:00:00 2001 From: Ritchie Vink Date: Sat, 6 Jan 2024 11:15:22 +0100 Subject: [PATCH] feat(rust): `BinaryView`/`Utf8View` IPC support (#13464) --- crates/polars-arrow/Cargo.toml | 2 +- crates/polars-arrow/src/array/binary/ffi.rs | 2 +- crates/polars-arrow/src/array/binview/ffi.rs | 7 +- crates/polars-arrow/src/array/binview/mod.rs | 11 + .../polars-arrow/src/array/binview/mutable.rs | 4 + crates/polars-arrow/src/array/list/ffi.rs | 2 +- .../src/io/ipc/read/array/binary.rs | 15 +- .../src/io/ipc/read/array/binview.rs | 82 ++++ .../src/io/ipc/read/array/boolean.rs | 15 +- .../io/ipc/read/array/fixed_size_binary.rs | 15 +- .../src/io/ipc/read/array/fixed_size_list.rs | 9 +- .../src/io/ipc/read/array/list.rs | 17 +- .../polars-arrow/src/io/ipc/read/array/map.rs | 17 +- .../polars-arrow/src/io/ipc/read/array/mod.rs | 26 + .../src/io/ipc/read/array/null.rs | 15 +- .../src/io/ipc/read/array/primitive.rs | 15 +- .../src/io/ipc/read/array/struct_.rs | 9 +- .../src/io/ipc/read/array/union.rs | 17 +- .../src/io/ipc/read/array/utf8.rs | 15 +- crates/polars-arrow/src/io/ipc/read/common.rs | 7 + .../src/io/ipc/read/deserialize.rs | 34 +- crates/polars-arrow/src/io/ipc/read/file.rs | 4 +- crates/polars-arrow/src/io/ipc/read/mod.rs | 5 +- crates/polars-arrow/src/io/ipc/read/schema.rs | 4 +- .../polars-arrow/src/io/ipc/write/common.rs | 3 +- .../polars-arrow/src/io/ipc/write/schema.rs | 3 +- .../src/io/ipc/write/serialize/binary.rs | 93 ++++ .../src/io/ipc/write/serialize/binview.rs | 44 ++ .../src/io/ipc/write/serialize/boolean.rs | 27 ++ .../src/io/ipc/write/serialize/dictionary.rs | 37 ++ .../ipc/write/serialize/fixed_size_binary.rs | 20 + .../ipc/write/serialize/fixed_sized_list.rs | 29 ++ .../src/io/ipc/write/serialize/list.rs | 58 +++ .../src/io/ipc/write/serialize/map.rs | 58 +++ .../write/{serialize.rs => serialize/mod.rs} | 457 ++---------------- .../src/io/ipc/write/serialize/primitive.rs | 28 ++ .../src/io/ipc/write/serialize/struct_.rs | 31 ++ .../src/io/ipc/write/serialize/union.rs | 42 ++ crates/polars-arrow/tests/it/io/ipc/mod.rs | 80 +++ crates/polars-arrow/tests/it/io/mod.rs | 1 + crates/polars-arrow/tests/it/main.rs | 2 + 41 files changed, 822 insertions(+), 540 deletions(-) create mode 100644 crates/polars-arrow/src/io/ipc/read/array/binview.rs create mode 100644 crates/polars-arrow/src/io/ipc/write/serialize/binary.rs create mode 100644 crates/polars-arrow/src/io/ipc/write/serialize/binview.rs create mode 100644 crates/polars-arrow/src/io/ipc/write/serialize/boolean.rs create mode 100644 crates/polars-arrow/src/io/ipc/write/serialize/dictionary.rs create mode 100644 crates/polars-arrow/src/io/ipc/write/serialize/fixed_size_binary.rs create mode 100644 crates/polars-arrow/src/io/ipc/write/serialize/fixed_sized_list.rs create mode 100644 crates/polars-arrow/src/io/ipc/write/serialize/list.rs create mode 100644 crates/polars-arrow/src/io/ipc/write/serialize/map.rs rename crates/polars-arrow/src/io/ipc/write/{serialize.rs => serialize/mod.rs} (55%) create mode 100644 crates/polars-arrow/src/io/ipc/write/serialize/primitive.rs create mode 100644 crates/polars-arrow/src/io/ipc/write/serialize/struct_.rs create mode 100644 crates/polars-arrow/src/io/ipc/write/serialize/union.rs create mode 100644 crates/polars-arrow/tests/it/io/ipc/mod.rs create mode 100644 crates/polars-arrow/tests/it/io/mod.rs create mode 100644 crates/polars-arrow/tests/it/main.rs diff --git a/crates/polars-arrow/Cargo.toml b/crates/polars-arrow/Cargo.toml index db7897986c4d..288f998ecc71 100644 --- a/crates/polars-arrow/Cargo.toml +++ b/crates/polars-arrow/Cargo.toml @@ -121,7 +121,7 @@ arrow_rs = ["arrow-buffer", "arrow-schema", "arrow-data", "arrow-array"] io_ipc = ["arrow-format", "polars-error/arrow-format"] io_ipc_write_async = ["io_ipc", "futures"] io_ipc_read_async = ["io_ipc", "futures", "async-stream"] -io_ipc_compression = ["lz4", "zstd"] +io_ipc_compression = ["lz4", "zstd", "io_ipc"] io_flight = ["io_ipc", "arrow-format/flight-data"] io_avro = ["avro-schema", "polars-error/avro-schema"] diff --git a/crates/polars-arrow/src/array/binary/ffi.rs b/crates/polars-arrow/src/array/binary/ffi.rs index b9d2f2b4184c..5b16f18dcf50 100644 --- a/crates/polars-arrow/src/array/binary/ffi.rs +++ b/crates/polars-arrow/src/array/binary/ffi.rs @@ -59,6 +59,6 @@ impl FromFfi for BinaryArray { // assumption that data from FFI is well constructed let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets) }; - Ok(Self::new(data_type, offsets, values, validity)) + Self::try_new(data_type, offsets, values, validity) } } diff --git a/crates/polars-arrow/src/array/binview/ffi.rs b/crates/polars-arrow/src/array/binview/ffi.rs index 00f9896b9e3d..697422d8f109 100644 --- a/crates/polars-arrow/src/array/binview/ffi.rs +++ b/crates/polars-arrow/src/array/binview/ffi.rs @@ -67,6 +67,11 @@ impl FromFfi for BinaryViewArray buffers.push(values); } - Self::try_new(data_type, views, Arc::from(buffers), validity) + Ok(Self::new_unchecked( + data_type, + views, + Arc::from(buffers), + validity, + )) } } diff --git a/crates/polars-arrow/src/array/binview/mod.rs b/crates/polars-arrow/src/array/binview/mod.rs index 98ed1b6f8e67..9757c96e6fd7 100644 --- a/crates/polars-arrow/src/array/binview/mod.rs +++ b/crates/polars-arrow/src/array/binview/mod.rs @@ -26,6 +26,7 @@ mod private { use private::Sealed; use crate::array::binview::iterator::BinaryViewValueIter; +use crate::array::binview::mutable::MutableBinaryViewArray; use crate::array::binview::view::{validate_binary_view, validate_utf8_view}; use crate::array::iterator::NonNullValuesIter; use crate::bitmap::utils::{BitmapIter, ZipValidity}; @@ -147,6 +148,10 @@ impl BinaryViewArrayGeneric { self.buffers.as_ref() } + pub fn variadic_buffer_lengths(&self) -> Vec { + self.buffers.iter().map(|buf| buf.len() as i64).collect() + } + pub fn views(&self) -> &Buffer { &self.views } @@ -251,6 +256,12 @@ impl BinaryViewArrayGeneric { impl_sliced!(); impl_mut_validity!(); impl_into_array!(); + + pub fn from, P: AsRef<[Option]>>(slice: P) -> Self { + let mutable = + MutableBinaryViewArray::from_iter(slice.as_ref().iter().map(|opt_v| opt_v.as_ref())); + mutable.into() + } } impl Array for BinaryViewArrayGeneric { diff --git a/crates/polars-arrow/src/array/binview/mutable.rs b/crates/polars-arrow/src/array/binview/mutable.rs index 5bbd0c170f70..b6fc86f7c5b8 100644 --- a/crates/polars-arrow/src/array/binview/mutable.rs +++ b/crates/polars-arrow/src/array/binview/mutable.rs @@ -165,6 +165,10 @@ impl MutableBinaryViewArray { mutable.extend_values(iterator); mutable } + + pub fn from, P: AsRef<[Option]>>(slice: P) -> Self { + Self::from_iter(slice.as_ref().iter().map(|opt_v| opt_v.as_ref())) + } } impl> Extend> for MutableBinaryViewArray { diff --git a/crates/polars-arrow/src/array/list/ffi.rs b/crates/polars-arrow/src/array/list/ffi.rs index 5b68cdce84be..2709634681c7 100644 --- a/crates/polars-arrow/src/array/list/ffi.rs +++ b/crates/polars-arrow/src/array/list/ffi.rs @@ -64,6 +64,6 @@ impl FromFfi for ListArray { // assumption that data from FFI is well constructed let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets) }; - Ok(Self::new(data_type, offsets, values, validity)) + Self::try_new(data_type, offsets, values, validity) } } diff --git a/crates/polars-arrow/src/io/ipc/read/array/binary.rs b/crates/polars-arrow/src/io/ipc/read/array/binary.rs index e33c2dda05a8..9553212ec5c4 100644 --- a/crates/polars-arrow/src/io/ipc/read/array/binary.rs +++ b/crates/polars-arrow/src/io/ipc/read/array/binary.rs @@ -4,10 +4,11 @@ use std::io::{Read, Seek}; use polars_error::{polars_err, PolarsResult}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::super::{Compression, IpcBuffer, Node}; use crate::array::BinaryArray; use crate::buffer::Buffer; use crate::datatypes::ArrowDataType; +use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node}; use crate::offset::Offset; #[allow(clippy::too_many_arguments)] @@ -22,11 +23,7 @@ pub fn read_binary( limit: Option, scratch: &mut Vec, ) -> PolarsResult> { - let field_node = field_nodes.pop_front().ok_or_else(|| { - polars_err!(oos = - "IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted." - ) - })?; + let field_node = try_get_field_node(field_nodes, &data_type)?; let validity = read_validity( buffers, @@ -39,11 +36,7 @@ pub fn read_binary( scratch, )?; - let length: usize = field_node - .length() - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; - let length = limit.map(|limit| limit.min(length)).unwrap_or(length); + let length = try_get_array_length(field_node, limit)?; let offsets: Buffer = read_buffer( buffers, diff --git a/crates/polars-arrow/src/io/ipc/read/array/binview.rs b/crates/polars-arrow/src/io/ipc/read/array/binview.rs new file mode 100644 index 000000000000..edf7675a8d6b --- /dev/null +++ b/crates/polars-arrow/src/io/ipc/read/array/binview.rs @@ -0,0 +1,82 @@ +use std::collections::VecDeque; +use std::io::{Read, Seek}; +use std::sync::Arc; + +use polars_error::{polars_err, PolarsResult}; + +use super::super::read_basic::*; +use super::*; +use crate::array::{ArrayRef, BinaryViewArrayGeneric, ViewType}; +use crate::buffer::Buffer; +use crate::datatypes::ArrowDataType; + +#[allow(clippy::too_many_arguments)] +pub fn read_binview( + field_nodes: &mut VecDeque, + variadic_buffer_counts: &mut VecDeque, + data_type: ArrowDataType, + buffers: &mut VecDeque, + reader: &mut R, + block_offset: u64, + is_little_endian: bool, + compression: Option, + limit: Option, + scratch: &mut Vec, +) -> PolarsResult { + let field_node = try_get_field_node(field_nodes, &data_type)?; + + let validity = read_validity( + buffers, + field_node, + reader, + block_offset, + is_little_endian, + compression, + limit, + scratch, + )?; + + let length = try_get_array_length(field_node, limit)?; + let views: Buffer = read_buffer( + buffers, + length, + reader, + block_offset, + is_little_endian, + compression, + scratch, + )?; + + let n_variadic = variadic_buffer_counts.pop_front().ok_or_else( + || polars_err!(ComputeError: "IPC: unable to fetch the variadic buffers\n\nThe file or stream is corrupted.") + )?; + + let variadic_buffer_lengths: Buffer = read_buffer( + buffers, + n_variadic, + reader, + block_offset, + is_little_endian, + compression, + scratch, + )?; + + let variadic_buffers = variadic_buffer_lengths + .iter() + .map(|length| { + let length = *length as usize; + read_buffer( + buffers, + length, + reader, + block_offset, + is_little_endian, + compression, + scratch, + ) + }) + .collect::>>>()?; + + BinaryViewArrayGeneric::::try_new(data_type, views, Arc::from(variadic_buffers), validity) + .map(|arr| arr.boxed()) +} diff --git a/crates/polars-arrow/src/io/ipc/read/array/boolean.rs b/crates/polars-arrow/src/io/ipc/read/array/boolean.rs index da06930b0b87..16443b0b8af0 100644 --- a/crates/polars-arrow/src/io/ipc/read/array/boolean.rs +++ b/crates/polars-arrow/src/io/ipc/read/array/boolean.rs @@ -4,9 +4,10 @@ use std::io::{Read, Seek}; use polars_error::{polars_err, PolarsResult}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::super::{Compression, IpcBuffer, Node}; use crate::array::BooleanArray; use crate::datatypes::ArrowDataType; +use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node}; #[allow(clippy::too_many_arguments)] pub fn read_boolean( @@ -20,11 +21,7 @@ pub fn read_boolean( limit: Option, scratch: &mut Vec, ) -> PolarsResult { - let field_node = field_nodes.pop_front().ok_or_else(|| { - polars_err!(oos = - "IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted." - ) - })?; + let field_node = try_get_field_node(field_nodes, &data_type)?; let validity = read_validity( buffers, @@ -37,11 +34,7 @@ pub fn read_boolean( scratch, )?; - let length: usize = field_node - .length() - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; - let length = limit.map(|limit| limit.min(length)).unwrap_or(length); + let length = try_get_array_length(field_node, limit)?; let values = read_bitmap( buffers, diff --git a/crates/polars-arrow/src/io/ipc/read/array/fixed_size_binary.rs b/crates/polars-arrow/src/io/ipc/read/array/fixed_size_binary.rs index c06366b09ff1..9683952c6d6c 100644 --- a/crates/polars-arrow/src/io/ipc/read/array/fixed_size_binary.rs +++ b/crates/polars-arrow/src/io/ipc/read/array/fixed_size_binary.rs @@ -4,9 +4,10 @@ use std::io::{Read, Seek}; use polars_error::{polars_err, PolarsResult}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::super::{Compression, IpcBuffer, Node}; use crate::array::FixedSizeBinaryArray; use crate::datatypes::ArrowDataType; +use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node}; #[allow(clippy::too_many_arguments)] pub fn read_fixed_size_binary( @@ -20,11 +21,7 @@ pub fn read_fixed_size_binary( limit: Option, scratch: &mut Vec, ) -> PolarsResult { - let field_node = field_nodes.pop_front().ok_or_else(|| { - polars_err!(ComputeError: - "IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted." - ) - })?; + let field_node = try_get_field_node(field_nodes, &data_type)?; let validity = read_validity( buffers, @@ -37,11 +34,7 @@ pub fn read_fixed_size_binary( scratch, )?; - let length: usize = field_node - .length() - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; - let length = limit.map(|limit| limit.min(length)).unwrap_or(length); + let length = try_get_array_length(field_node, limit)?; let length = length.saturating_mul(FixedSizeBinaryArray::maybe_get_size(&data_type)?); let values = read_buffer( diff --git a/crates/polars-arrow/src/io/ipc/read/array/fixed_size_list.rs b/crates/polars-arrow/src/io/ipc/read/array/fixed_size_list.rs index 36b11ac00b10..335a426d0e44 100644 --- a/crates/polars-arrow/src/io/ipc/read/array/fixed_size_list.rs +++ b/crates/polars-arrow/src/io/ipc/read/array/fixed_size_list.rs @@ -9,10 +9,12 @@ use super::super::read_basic::*; use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version}; use crate::array::FixedSizeListArray; use crate::datatypes::ArrowDataType; +use crate::io::ipc::read::array::try_get_field_node; #[allow(clippy::too_many_arguments)] pub fn read_fixed_size_list( field_nodes: &mut VecDeque, + variadic_buffer_counts: &mut VecDeque, data_type: ArrowDataType, ipc_field: &IpcField, buffers: &mut VecDeque, @@ -25,11 +27,7 @@ pub fn read_fixed_size_list( version: Version, scratch: &mut Vec, ) -> PolarsResult { - let field_node = field_nodes.pop_front().ok_or_else(|| { - polars_err!(ComputeError: - "IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted." - ) - })?; + let field_node = try_get_field_node(field_nodes, &data_type)?; let validity = read_validity( buffers, @@ -48,6 +46,7 @@ pub fn read_fixed_size_list( let values = read( field_nodes, + variadic_buffer_counts, field, &ipc_field.fields[0], buffers, diff --git a/crates/polars-arrow/src/io/ipc/read/array/list.rs b/crates/polars-arrow/src/io/ipc/read/array/list.rs index 1f07d9dcb1b4..c36646fe0192 100644 --- a/crates/polars-arrow/src/io/ipc/read/array/list.rs +++ b/crates/polars-arrow/src/io/ipc/read/array/list.rs @@ -7,15 +7,17 @@ use polars_error::{polars_err, PolarsResult}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version}; use crate::array::ListArray; use crate::buffer::Buffer; use crate::datatypes::ArrowDataType; +use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node}; use crate::offset::Offset; #[allow(clippy::too_many_arguments)] pub fn read_list( field_nodes: &mut VecDeque, + variadic_buffer_counts: &mut VecDeque, data_type: ArrowDataType, ipc_field: &IpcField, buffers: &mut VecDeque, @@ -31,11 +33,7 @@ pub fn read_list( where Vec: TryInto, { - let field_node = field_nodes.pop_front().ok_or_else(|| { - polars_err!(ComputeError: - "IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted." - ) - })?; + let field_node = try_get_field_node(field_nodes, &data_type)?; let validity = read_validity( buffers, @@ -48,11 +46,7 @@ where scratch, )?; - let length: usize = field_node - .length() - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; - let length = limit.map(|limit| limit.min(length)).unwrap_or(length); + let length = try_get_array_length(field_node, limit)?; let offsets = read_buffer::( buffers, @@ -72,6 +66,7 @@ where let values = read( field_nodes, + variadic_buffer_counts, field, &ipc_field.fields[0], buffers, diff --git a/crates/polars-arrow/src/io/ipc/read/array/map.rs b/crates/polars-arrow/src/io/ipc/read/array/map.rs index 8e398b7c7168..2301085136b2 100644 --- a/crates/polars-arrow/src/io/ipc/read/array/map.rs +++ b/crates/polars-arrow/src/io/ipc/read/array/map.rs @@ -6,14 +6,16 @@ use polars_error::{polars_err, PolarsResult}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version}; use crate::array::MapArray; use crate::buffer::Buffer; use crate::datatypes::ArrowDataType; +use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node}; #[allow(clippy::too_many_arguments)] pub fn read_map( field_nodes: &mut VecDeque, + variadic_buffer_counts: &mut VecDeque, data_type: ArrowDataType, ipc_field: &IpcField, buffers: &mut VecDeque, @@ -26,11 +28,7 @@ pub fn read_map( version: Version, scratch: &mut Vec, ) -> PolarsResult { - let field_node = field_nodes.pop_front().ok_or_else(|| { - polars_err!(oos = - "IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted." - ) - })?; + let field_node = try_get_field_node(field_nodes, &data_type)?; let validity = read_validity( buffers, @@ -43,11 +41,7 @@ pub fn read_map( scratch, )?; - let length: usize = field_node - .length() - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; - let length = limit.map(|limit| limit.min(length)).unwrap_or(length); + let length = try_get_array_length(field_node, limit)?; let offsets = read_buffer::( buffers, @@ -67,6 +61,7 @@ pub fn read_map( let field = read( field_nodes, + variadic_buffer_counts, field, &ipc_field.fields[0], buffers, diff --git a/crates/polars-arrow/src/io/ipc/read/array/mod.rs b/crates/polars-arrow/src/io/ipc/read/array/mod.rs index 249e5e05e165..2ffe1a369c25 100644 --- a/crates/polars-arrow/src/io/ipc/read/array/mod.rs +++ b/crates/polars-arrow/src/io/ipc/read/array/mod.rs @@ -1,4 +1,7 @@ mod primitive; + +use std::collections::VecDeque; + pub use primitive::*; mod boolean; pub use boolean::*; @@ -20,5 +23,28 @@ mod dictionary; pub use dictionary::*; mod union; pub use union::*; +mod binview; mod map; +pub use binview::*; pub use map::*; +use polars_error::{PolarsResult, *}; + +use super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use crate::datatypes::ArrowDataType; + +fn try_get_field_node<'a>( + field_nodes: &mut VecDeque>, + data_type: &ArrowDataType, +) -> PolarsResult> { + field_nodes.pop_front().ok_or_else(|| { + polars_err!(ComputeError: "IPC: unable to fetch the field for {:?}\n\nThe file or stream is corrupted.", data_type) + }) +} + +fn try_get_array_length(field_node: Node, limit: Option) -> PolarsResult { + let length: usize = field_node + .length() + .try_into() + .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; + Ok(limit.map(|limit| limit.min(length)).unwrap_or(length)) +} diff --git a/crates/polars-arrow/src/io/ipc/read/array/null.rs b/crates/polars-arrow/src/io/ipc/read/array/null.rs index da0d78e6f5b9..f9df4d254900 100644 --- a/crates/polars-arrow/src/io/ipc/read/array/null.rs +++ b/crates/polars-arrow/src/io/ipc/read/array/null.rs @@ -2,24 +2,19 @@ use std::collections::VecDeque; use polars_error::{polars_err, PolarsResult}; -use super::super::{Node, OutOfSpecKind}; +use super::super::Node; use crate::array::NullArray; use crate::datatypes::ArrowDataType; +use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node}; pub fn read_null( field_nodes: &mut VecDeque, data_type: ArrowDataType, + limit: Option, ) -> PolarsResult { - let field_node = field_nodes.pop_front().ok_or_else(|| { - polars_err!(oos = - "IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted." - ) - })?; + let field_node = try_get_field_node(field_nodes, &data_type)?; - let length: usize = field_node - .length() - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; + let length = try_get_array_length(field_node, limit)?; NullArray::try_new(data_type, length) } diff --git a/crates/polars-arrow/src/io/ipc/read/array/primitive.rs b/crates/polars-arrow/src/io/ipc/read/array/primitive.rs index 05dad5da4326..24b2a05ec6a4 100644 --- a/crates/polars-arrow/src/io/ipc/read/array/primitive.rs +++ b/crates/polars-arrow/src/io/ipc/read/array/primitive.rs @@ -5,9 +5,10 @@ use std::io::{Read, Seek}; use polars_error::{polars_err, PolarsResult}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::super::{Compression, IpcBuffer, Node}; use crate::array::PrimitiveArray; use crate::datatypes::ArrowDataType; +use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node}; use crate::types::NativeType; #[allow(clippy::too_many_arguments)] @@ -25,11 +26,7 @@ pub fn read_primitive( where Vec: TryInto, { - let field_node = field_nodes.pop_front().ok_or_else(|| { - polars_err!(oos = - "IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted." - ) - })?; + let field_node = try_get_field_node(field_nodes, &data_type)?; let validity = read_validity( buffers, @@ -42,11 +39,7 @@ where scratch, )?; - let length: usize = field_node - .length() - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; - let length = limit.map(|limit| limit.min(length)).unwrap_or(length); + let length = try_get_array_length(field_node, limit)?; let values = read_buffer( buffers, diff --git a/crates/polars-arrow/src/io/ipc/read/array/struct_.rs b/crates/polars-arrow/src/io/ipc/read/array/struct_.rs index 27db9ed9113e..b90ba11a4028 100644 --- a/crates/polars-arrow/src/io/ipc/read/array/struct_.rs +++ b/crates/polars-arrow/src/io/ipc/read/array/struct_.rs @@ -9,10 +9,12 @@ use super::super::read_basic::*; use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version}; use crate::array::StructArray; use crate::datatypes::ArrowDataType; +use crate::io::ipc::read::array::try_get_field_node; #[allow(clippy::too_many_arguments)] pub fn read_struct( field_nodes: &mut VecDeque, + variadic_buffer_counts: &mut VecDeque, data_type: ArrowDataType, ipc_field: &IpcField, buffers: &mut VecDeque, @@ -25,11 +27,7 @@ pub fn read_struct( version: Version, scratch: &mut Vec, ) -> PolarsResult { - let field_node = field_nodes.pop_front().ok_or_else(|| { - polars_err!(oos = - "IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted." - ) - })?; + let field_node = try_get_field_node(field_nodes, &data_type)?; let validity = read_validity( buffers, @@ -50,6 +48,7 @@ pub fn read_struct( .map(|(field, ipc_field)| { read( field_nodes, + variadic_buffer_counts, field, ipc_field, buffers, diff --git a/crates/polars-arrow/src/io/ipc/read/array/union.rs b/crates/polars-arrow/src/io/ipc/read/array/union.rs index 407982fc97a1..00409ef58e68 100644 --- a/crates/polars-arrow/src/io/ipc/read/array/union.rs +++ b/crates/polars-arrow/src/io/ipc/read/array/union.rs @@ -6,14 +6,16 @@ use polars_error::{polars_err, PolarsResult}; use super::super::super::IpcField; use super::super::deserialize::{read, skip}; use super::super::read_basic::*; -use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version}; +use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version}; use crate::array::UnionArray; use crate::datatypes::ArrowDataType; use crate::datatypes::UnionMode::Dense; +use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node}; #[allow(clippy::too_many_arguments)] pub fn read_union( field_nodes: &mut VecDeque, + variadic_buffer_counts: &mut VecDeque, data_type: ArrowDataType, ipc_field: &IpcField, buffers: &mut VecDeque, @@ -26,11 +28,7 @@ pub fn read_union( version: Version, scratch: &mut Vec, ) -> PolarsResult { - let field_node = field_nodes.pop_front().ok_or_else(|| { - polars_err!(ComputeError: - "IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted." - ) - })?; + let field_node = try_get_field_node(field_nodes, &data_type)?; if version != Version::V5 { let _ = buffers @@ -38,11 +36,7 @@ pub fn read_union( .ok_or_else(|| polars_err!(oos = "IPC: missing validity buffer."))?; }; - let length: usize = field_node - .length() - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; - let length = limit.map(|limit| limit.min(length)).unwrap_or(length); + let length = try_get_array_length(field_node, limit)?; let types = read_buffer( buffers, @@ -80,6 +74,7 @@ pub fn read_union( .map(|(field, ipc_field)| { read( field_nodes, + variadic_buffer_counts, field, ipc_field, buffers, diff --git a/crates/polars-arrow/src/io/ipc/read/array/utf8.rs b/crates/polars-arrow/src/io/ipc/read/array/utf8.rs index 1ec11eb1e22e..1408ff41435e 100644 --- a/crates/polars-arrow/src/io/ipc/read/array/utf8.rs +++ b/crates/polars-arrow/src/io/ipc/read/array/utf8.rs @@ -4,7 +4,7 @@ use std::io::{Read, Seek}; use polars_error::{polars_err, PolarsResult}; use super::super::read_basic::*; -use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind}; +use super::*; use crate::array::Utf8Array; use crate::buffer::Buffer; use crate::datatypes::ArrowDataType; @@ -22,11 +22,7 @@ pub fn read_utf8( limit: Option, scratch: &mut Vec, ) -> PolarsResult> { - let field_node = field_nodes.pop_front().ok_or_else(|| { - polars_err!(oos = - "IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted." - ) - })?; + let field_node = try_get_field_node(field_nodes, &data_type)?; let validity = read_validity( buffers, @@ -39,12 +35,7 @@ pub fn read_utf8( scratch, )?; - let length: usize = field_node - .length() - .try_into() - .map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?; - - let length = limit.map(|limit| limit.min(length)).unwrap_or(length); + let length = try_get_array_length(field_node, limit)?; let offsets: Buffer = read_buffer( buffers, diff --git a/crates/polars-arrow/src/io/ipc/read/common.rs b/crates/polars-arrow/src/io/ipc/read/common.rs index 0c7937516c30..87005dc76cc4 100644 --- a/crates/polars-arrow/src/io/ipc/read/common.rs +++ b/crates/polars-arrow/src/io/ipc/read/common.rs @@ -93,6 +93,11 @@ pub fn read_record_batch( .buffers() .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferBuffers(err)))? .ok_or_else(|| polars_err!(oos = OutOfSpecKind::MissingMessageBuffers))?; + let mut variadic_buffer_counts = batch + .variadic_buffer_counts() + .map_err(|err| polars_err!(oos = OutOfSpecKind::InvalidFlatbufferRecordBatches(err)))? + .map(|v| v.iter().map(|v| v as usize).collect::>()) + .unwrap_or_else(VecDeque::new); let mut buffers: VecDeque = buffers.iter().collect(); // check that the sum of the sizes of all buffers is <= than the size of the file @@ -129,6 +134,7 @@ pub fn read_record_batch( .map(|maybe_field| match maybe_field { ProjectionResult::Selected((field, ipc_field)) => Ok(Some(read( &mut field_nodes, + &mut variadic_buffer_counts, field, ipc_field, &mut buffers, @@ -157,6 +163,7 @@ pub fn read_record_batch( .map(|(field, ipc_field)| { read( &mut field_nodes, + &mut variadic_buffer_counts, field, ipc_field, &mut buffers, diff --git a/crates/polars-arrow/src/io/ipc/read/deserialize.rs b/crates/polars-arrow/src/io/ipc/read/deserialize.rs index 4736b3dfe650..972884c0af3f 100644 --- a/crates/polars-arrow/src/io/ipc/read/deserialize.rs +++ b/crates/polars-arrow/src/io/ipc/read/deserialize.rs @@ -14,6 +14,7 @@ use crate::{match_integer_type, with_match_primitive_type_full}; #[allow(clippy::too_many_arguments)] pub fn read( field_nodes: &mut VecDeque, + variadic_buffer_counts: &mut VecDeque, field: &Field, ipc_field: &IpcField, buffers: &mut VecDeque, @@ -30,7 +31,7 @@ pub fn read( let data_type = field.data_type.clone(); match data_type.to_physical_type() { - Null => read_null(field_nodes, data_type).map(|x| x.boxed()), + Null => read_null(field_nodes, data_type, limit).map(|x| x.boxed()), Boolean => read_boolean( field_nodes, data_type, @@ -119,6 +120,7 @@ pub fn read( .map(|x| x.boxed()), List => read_list::( field_nodes, + variadic_buffer_counts, data_type, ipc_field, buffers, @@ -134,6 +136,7 @@ pub fn read( .map(|x| x.boxed()), LargeList => read_list::( field_nodes, + variadic_buffer_counts, data_type, ipc_field, buffers, @@ -149,6 +152,7 @@ pub fn read( .map(|x| x.boxed()), FixedSizeList => read_fixed_size_list( field_nodes, + variadic_buffer_counts, data_type, ipc_field, buffers, @@ -164,6 +168,7 @@ pub fn read( .map(|x| x.boxed()), Struct => read_struct( field_nodes, + variadic_buffer_counts, data_type, ipc_field, buffers, @@ -197,6 +202,7 @@ pub fn read( }, Union => read_union( field_nodes, + variadic_buffer_counts, data_type, ipc_field, buffers, @@ -212,6 +218,7 @@ pub fn read( .map(|x| x.boxed()), Map => read_map( field_nodes, + variadic_buffer_counts, data_type, ipc_field, buffers, @@ -225,7 +232,30 @@ pub fn read( scratch, ) .map(|x| x.boxed()), - Utf8View | BinaryView => todo!(), + Utf8View => read_binview::( + field_nodes, + variadic_buffer_counts, + data_type, + buffers, + reader, + block_offset, + is_little_endian, + compression, + limit, + scratch, + ), + BinaryView => read_binview::<[u8], _>( + field_nodes, + variadic_buffer_counts, + data_type, + buffers, + reader, + block_offset, + is_little_endian, + compression, + limit, + scratch, + ), } } diff --git a/crates/polars-arrow/src/io/ipc/read/file.rs b/crates/polars-arrow/src/io/ipc/read/file.rs index 711e8b85fa59..6f1f4ca8f511 100644 --- a/crates/polars-arrow/src/io/ipc/read/file.rs +++ b/crates/polars-arrow/src/io/ipc/read/file.rs @@ -2,9 +2,9 @@ use std::convert::TryInto; use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; -use ahash::AHashMap; use arrow_format::ipc::planus::ReadAsRoot; use polars_error::{polars_bail, polars_err, PolarsResult}; +use polars_utils::aliases::{InitHashMaps, PlHashMap}; use super::super::{ARROW_MAGIC_V1, ARROW_MAGIC_V2, CONTINUATION_MARKER}; use super::common::*; @@ -123,7 +123,7 @@ pub fn read_file_dictionaries( let blocks = if let Some(blocks) = &metadata.dictionaries { blocks } else { - return Ok(AHashMap::new()); + return Ok(PlHashMap::new()); }; // use a temporary smaller scratch for the messages let mut message_scratch = Default::default(); diff --git a/crates/polars-arrow/src/io/ipc/read/mod.rs b/crates/polars-arrow/src/io/ipc/read/mod.rs index 887cf7b36258..3688816273e5 100644 --- a/crates/polars-arrow/src/io/ipc/read/mod.rs +++ b/crates/polars-arrow/src/io/ipc/read/mod.rs @@ -4,8 +4,6 @@ //! which provides arbitrary access to any of its messages, and the //! [`StreamReader`](stream::StreamReader), which only supports reading //! data in the order it was written in. -use ahash::AHashMap; - use crate::array::Array; mod array; @@ -32,12 +30,13 @@ pub(crate) use common::first_dict_field; #[cfg(feature = "io_flight")] pub(crate) use common::{read_dictionary, read_record_batch}; pub use file::{read_batch, read_file_dictionaries, read_file_metadata, FileMetadata}; +use polars_utils::aliases::PlHashMap; pub use reader::FileReader; pub use schema::deserialize_schema; pub use stream::{read_stream_metadata, StreamMetadata, StreamReader, StreamState}; /// how dictionaries are tracked in this crate -pub type Dictionaries = AHashMap>; +pub type Dictionaries = PlHashMap>; pub(crate) type Node<'a> = arrow_format::ipc::FieldNodeRef<'a>; pub(crate) type IpcBuffer<'a> = arrow_format::ipc::BufferRef<'a>; diff --git a/crates/polars-arrow/src/io/ipc/read/schema.rs b/crates/polars-arrow/src/io/ipc/read/schema.rs index 9f704f3ef5cc..a6c1743e6a0b 100644 --- a/crates/polars-arrow/src/io/ipc/read/schema.rs +++ b/crates/polars-arrow/src/io/ipc/read/schema.rs @@ -277,6 +277,8 @@ fn get_data_type( LargeBinary(_) => (ArrowDataType::LargeBinary, IpcField::default()), Utf8(_) => (ArrowDataType::Utf8, IpcField::default()), LargeUtf8(_) => (ArrowDataType::LargeUtf8, IpcField::default()), + BinaryView(_) => (ArrowDataType::BinaryView, IpcField::default()), + Utf8View(_) => (ArrowDataType::Utf8View, IpcField::default()), FixedSizeBinary(fixed) => ( ArrowDataType::FixedSizeBinary( fixed @@ -350,8 +352,6 @@ fn get_data_type( Union(union_) => deserialize_union(union_, field)?, Map(map) => deserialize_map(map, field)?, RunEndEncoded(_) => todo!(), - BinaryView(_) => todo!(), - Utf8View(_) => todo!(), LargeListView(_) | ListView(_) => todo!(), }) } diff --git a/crates/polars-arrow/src/io/ipc/write/common.rs b/crates/polars-arrow/src/io/ipc/write/common.rs index ec3852ead81f..d4d0fd4a259c 100644 --- a/crates/polars-arrow/src/io/ipc/write/common.rs +++ b/crates/polars-arrow/src/io/ipc/write/common.rs @@ -39,7 +39,7 @@ fn encode_dictionary( use PhysicalType::*; match array.data_type().to_physical_type() { Utf8 | LargeUtf8 | Binary | LargeBinary | Primitive(_) | Boolean | Null - | FixedSizeBinary => Ok(()), + | FixedSizeBinary | BinaryView | Utf8View => Ok(()), Dictionary(key_type) => match_integer_type!(key_type, |$T| { let dict_id = field.dictionary_id .ok_or_else(|| polars_err!(InvalidOperation: "Dictionaries must have an associated id"))?; @@ -167,7 +167,6 @@ fn encode_dictionary( encoded_dictionaries, ) }, - Utf8View | BinaryView => todo!(), } } diff --git a/crates/polars-arrow/src/io/ipc/write/schema.rs b/crates/polars-arrow/src/io/ipc/write/schema.rs index eb4f1dac9036..41e88b29f7ea 100644 --- a/crates/polars-arrow/src/io/ipc/write/schema.rs +++ b/crates/polars-arrow/src/io/ipc/write/schema.rs @@ -257,7 +257,8 @@ fn serialize_type(data_type: &ArrowDataType) -> arrow_format::ipc::Type { Struct(_) => ipc::Type::Struct(Box::new(ipc::Struct {})), Dictionary(_, v, _) => serialize_type(v), Extension(_, v, _) => serialize_type(v), - Utf8View | BinaryView => todo!(), + Utf8View => ipc::Type::Utf8View(Box::new(ipc::Utf8View {})), + BinaryView => ipc::Type::BinaryView(Box::new(ipc::BinaryView {})), } } diff --git a/crates/polars-arrow/src/io/ipc/write/serialize/binary.rs b/crates/polars-arrow/src/io/ipc/write/serialize/binary.rs new file mode 100644 index 000000000000..9642ded1f78b --- /dev/null +++ b/crates/polars-arrow/src/io/ipc/write/serialize/binary.rs @@ -0,0 +1,93 @@ +use super::*; + +#[allow(clippy::too_many_arguments)] +fn write_generic_binary( + validity: Option<&Bitmap>, + offsets: &OffsetsBuffer, + values: &[u8], + buffers: &mut Vec, + arrow_data: &mut Vec, + offset: &mut i64, + is_little_endian: bool, + compression: Option, +) { + let offsets = offsets.buffer(); + write_bitmap( + validity, + offsets.len() - 1, + buffers, + arrow_data, + offset, + compression, + ); + + let first = *offsets.first().unwrap(); + let last = *offsets.last().unwrap(); + if first == O::default() { + write_buffer( + offsets, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); + } else { + write_buffer_from_iter( + offsets.iter().map(|x| *x - first), + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); + } + + write_bytes( + &values[first.to_usize()..last.to_usize()], + buffers, + arrow_data, + offset, + compression, + ); +} + +pub(super) fn write_binary( + array: &BinaryArray, + buffers: &mut Vec, + arrow_data: &mut Vec, + offset: &mut i64, + is_little_endian: bool, + compression: Option, +) { + write_generic_binary( + array.validity(), + array.offsets(), + array.values(), + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); +} + +pub(super) fn write_utf8( + array: &Utf8Array, + buffers: &mut Vec, + arrow_data: &mut Vec, + offset: &mut i64, + is_little_endian: bool, + compression: Option, +) { + write_generic_binary( + array.validity(), + array.offsets(), + array.values(), + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); +} diff --git a/crates/polars-arrow/src/io/ipc/write/serialize/binview.rs b/crates/polars-arrow/src/io/ipc/write/serialize/binview.rs new file mode 100644 index 000000000000..bcf5d98970db --- /dev/null +++ b/crates/polars-arrow/src/io/ipc/write/serialize/binview.rs @@ -0,0 +1,44 @@ +use super::*; +use crate::array; + +#[allow(clippy::too_many_arguments)] +pub(super) fn write_binview( + array: &BinaryViewArrayGeneric, + buffers: &mut Vec, + arrow_data: &mut Vec, + offset: &mut i64, + is_little_endian: bool, + compression: Option, +) { + write_bitmap( + array.validity(), + array::Array::len(array), + buffers, + arrow_data, + offset, + compression, + ); + + write_buffer( + array.views(), + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); + + let vbl = array.variadic_buffer_lengths(); + write_buffer( + &vbl, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); + + for data in array.data_buffers() { + write_bytes(data, buffers, arrow_data, offset, compression); + } +} diff --git a/crates/polars-arrow/src/io/ipc/write/serialize/boolean.rs b/crates/polars-arrow/src/io/ipc/write/serialize/boolean.rs new file mode 100644 index 000000000000..f699860b89cd --- /dev/null +++ b/crates/polars-arrow/src/io/ipc/write/serialize/boolean.rs @@ -0,0 +1,27 @@ +use super::*; + +pub(super) fn write_boolean( + array: &BooleanArray, + buffers: &mut Vec, + arrow_data: &mut Vec, + offset: &mut i64, + _: bool, + compression: Option, +) { + write_bitmap( + array.validity(), + array.len(), + buffers, + arrow_data, + offset, + compression, + ); + write_bitmap( + Some(&array.values().clone()), + array.len(), + buffers, + arrow_data, + offset, + compression, + ); +} diff --git a/crates/polars-arrow/src/io/ipc/write/serialize/dictionary.rs b/crates/polars-arrow/src/io/ipc/write/serialize/dictionary.rs new file mode 100644 index 000000000000..0d1eb96ea7e3 --- /dev/null +++ b/crates/polars-arrow/src/io/ipc/write/serialize/dictionary.rs @@ -0,0 +1,37 @@ +use super::*; + +// use `write_keys` to either write keys or values +#[allow(clippy::too_many_arguments)] +pub fn write_dictionary( + array: &DictionaryArray, + buffers: &mut Vec, + arrow_data: &mut Vec, + nodes: &mut Vec, + offset: &mut i64, + is_little_endian: bool, + compression: Option, + write_keys: bool, +) -> usize { + if write_keys { + write_primitive( + array.keys(), + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); + array.keys().len() + } else { + write( + array.values().as_ref(), + buffers, + arrow_data, + nodes, + offset, + is_little_endian, + compression, + ); + array.values().len() + } +} diff --git a/crates/polars-arrow/src/io/ipc/write/serialize/fixed_size_binary.rs b/crates/polars-arrow/src/io/ipc/write/serialize/fixed_size_binary.rs new file mode 100644 index 000000000000..dc1e973b4d4a --- /dev/null +++ b/crates/polars-arrow/src/io/ipc/write/serialize/fixed_size_binary.rs @@ -0,0 +1,20 @@ +use super::*; + +pub(super) fn write_fixed_size_binary( + array: &FixedSizeBinaryArray, + buffers: &mut Vec, + arrow_data: &mut Vec, + offset: &mut i64, + _is_little_endian: bool, + compression: Option, +) { + write_bitmap( + array.validity(), + array.len(), + buffers, + arrow_data, + offset, + compression, + ); + write_bytes(array.values(), buffers, arrow_data, offset, compression); +} diff --git a/crates/polars-arrow/src/io/ipc/write/serialize/fixed_sized_list.rs b/crates/polars-arrow/src/io/ipc/write/serialize/fixed_sized_list.rs new file mode 100644 index 000000000000..da8fa7db962b --- /dev/null +++ b/crates/polars-arrow/src/io/ipc/write/serialize/fixed_sized_list.rs @@ -0,0 +1,29 @@ +use super::*; + +pub(super) fn write_fixed_size_list( + array: &FixedSizeListArray, + buffers: &mut Vec, + arrow_data: &mut Vec, + nodes: &mut Vec, + offset: &mut i64, + is_little_endian: bool, + compression: Option, +) { + write_bitmap( + array.validity(), + array.len(), + buffers, + arrow_data, + offset, + compression, + ); + write( + array.values().as_ref(), + buffers, + arrow_data, + nodes, + offset, + is_little_endian, + compression, + ); +} diff --git a/crates/polars-arrow/src/io/ipc/write/serialize/list.rs b/crates/polars-arrow/src/io/ipc/write/serialize/list.rs new file mode 100644 index 000000000000..8cca7eba1b87 --- /dev/null +++ b/crates/polars-arrow/src/io/ipc/write/serialize/list.rs @@ -0,0 +1,58 @@ +use super::*; + +pub(super) fn write_list( + array: &ListArray, + buffers: &mut Vec, + arrow_data: &mut Vec, + nodes: &mut Vec, + offset: &mut i64, + is_little_endian: bool, + compression: Option, +) { + let offsets = array.offsets().buffer(); + let validity = array.validity(); + + write_bitmap( + validity, + offsets.len() - 1, + buffers, + arrow_data, + offset, + compression, + ); + + let first = *offsets.first().unwrap(); + let last = *offsets.last().unwrap(); + if first == O::zero() { + write_buffer( + offsets, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); + } else { + write_buffer_from_iter( + offsets.iter().map(|x| *x - first), + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); + } + + write( + array + .values() + .sliced(first.to_usize(), last.to_usize() - first.to_usize()) + .as_ref(), + buffers, + arrow_data, + nodes, + offset, + is_little_endian, + compression, + ); +} diff --git a/crates/polars-arrow/src/io/ipc/write/serialize/map.rs b/crates/polars-arrow/src/io/ipc/write/serialize/map.rs new file mode 100644 index 000000000000..19492679e418 --- /dev/null +++ b/crates/polars-arrow/src/io/ipc/write/serialize/map.rs @@ -0,0 +1,58 @@ +use super::*; + +pub(super) fn write_map( + array: &MapArray, + buffers: &mut Vec, + arrow_data: &mut Vec, + nodes: &mut Vec, + offset: &mut i64, + is_little_endian: bool, + compression: Option, +) { + let offsets = array.offsets().buffer(); + let validity = array.validity(); + + write_bitmap( + validity, + offsets.len() - 1, + buffers, + arrow_data, + offset, + compression, + ); + + let first = *offsets.first().unwrap(); + let last = *offsets.last().unwrap(); + if first == 0 { + write_buffer( + offsets, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); + } else { + write_buffer_from_iter( + offsets.iter().map(|x| *x - first), + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); + } + + write( + array + .field() + .sliced(first as usize, last as usize - first as usize) + .as_ref(), + buffers, + arrow_data, + nodes, + offset, + is_little_endian, + compression, + ); +} diff --git a/crates/polars-arrow/src/io/ipc/write/serialize.rs b/crates/polars-arrow/src/io/ipc/write/serialize/mod.rs similarity index 55% rename from crates/polars-arrow/src/io/ipc/write/serialize.rs rename to crates/polars-arrow/src/io/ipc/write/serialize/mod.rs index 670bf9b11ba8..b33f50b2277a 100644 --- a/crates/polars-arrow/src/io/ipc/write/serialize.rs +++ b/crates/polars-arrow/src/io/ipc/write/serialize/mod.rs @@ -11,419 +11,29 @@ use crate::offset::{Offset, OffsetsBuffer}; use crate::trusted_len::TrustedLen; use crate::types::NativeType; use crate::{match_integer_type, with_match_primitive_type_full}; - -fn write_primitive( - array: &PrimitiveArray, - buffers: &mut Vec, - arrow_data: &mut Vec, - offset: &mut i64, - is_little_endian: bool, - compression: Option, -) { - write_bitmap( - array.validity(), - array.len(), - buffers, - arrow_data, - offset, - compression, - ); - - write_buffer( - array.values(), - buffers, - arrow_data, - offset, - is_little_endian, - compression, - ) -} - -fn write_boolean( - array: &BooleanArray, - buffers: &mut Vec, - arrow_data: &mut Vec, - offset: &mut i64, - _: bool, - compression: Option, -) { - write_bitmap( - array.validity(), - array.len(), - buffers, - arrow_data, - offset, - compression, - ); - write_bitmap( - Some(&array.values().clone()), - array.len(), - buffers, - arrow_data, - offset, - compression, - ); -} - -#[allow(clippy::too_many_arguments)] -fn write_generic_binary( - validity: Option<&Bitmap>, - offsets: &OffsetsBuffer, - values: &[u8], - buffers: &mut Vec, - arrow_data: &mut Vec, - offset: &mut i64, - is_little_endian: bool, - compression: Option, -) { - let offsets = offsets.buffer(); - write_bitmap( - validity, - offsets.len() - 1, - buffers, - arrow_data, - offset, - compression, - ); - - let first = *offsets.first().unwrap(); - let last = *offsets.last().unwrap(); - if first == O::default() { - write_buffer( - offsets, - buffers, - arrow_data, - offset, - is_little_endian, - compression, - ); - } else { - write_buffer_from_iter( - offsets.iter().map(|x| *x - first), - buffers, - arrow_data, - offset, - is_little_endian, - compression, - ); - } - - write_bytes( - &values[first.to_usize()..last.to_usize()], - buffers, - arrow_data, - offset, - compression, - ); -} - -fn write_binary( - array: &BinaryArray, - buffers: &mut Vec, - arrow_data: &mut Vec, - offset: &mut i64, - is_little_endian: bool, - compression: Option, -) { - write_generic_binary( - array.validity(), - array.offsets(), - array.values(), - buffers, - arrow_data, - offset, - is_little_endian, - compression, - ); -} - -fn write_utf8( - array: &Utf8Array, - buffers: &mut Vec, - arrow_data: &mut Vec, - offset: &mut i64, - is_little_endian: bool, - compression: Option, -) { - write_generic_binary( - array.validity(), - array.offsets(), - array.values(), - buffers, - arrow_data, - offset, - is_little_endian, - compression, - ); -} - -fn write_fixed_size_binary( - array: &FixedSizeBinaryArray, - buffers: &mut Vec, - arrow_data: &mut Vec, - offset: &mut i64, - _is_little_endian: bool, - compression: Option, -) { - write_bitmap( - array.validity(), - array.len(), - buffers, - arrow_data, - offset, - compression, - ); - write_bytes(array.values(), buffers, arrow_data, offset, compression); -} - -fn write_list( - array: &ListArray, - buffers: &mut Vec, - arrow_data: &mut Vec, - nodes: &mut Vec, - offset: &mut i64, - is_little_endian: bool, - compression: Option, -) { - let offsets = array.offsets().buffer(); - let validity = array.validity(); - - write_bitmap( - validity, - offsets.len() - 1, - buffers, - arrow_data, - offset, - compression, - ); - - let first = *offsets.first().unwrap(); - let last = *offsets.last().unwrap(); - if first == O::zero() { - write_buffer( - offsets, - buffers, - arrow_data, - offset, - is_little_endian, - compression, - ); - } else { - write_buffer_from_iter( - offsets.iter().map(|x| *x - first), - buffers, - arrow_data, - offset, - is_little_endian, - compression, - ); - } - - write( - array - .values() - .sliced(first.to_usize(), last.to_usize() - first.to_usize()) - .as_ref(), - buffers, - arrow_data, - nodes, - offset, - is_little_endian, - compression, - ); -} - -pub fn write_struct( - array: &StructArray, - buffers: &mut Vec, - arrow_data: &mut Vec, - nodes: &mut Vec, - offset: &mut i64, - is_little_endian: bool, - compression: Option, -) { - write_bitmap( - array.validity(), - array.len(), - buffers, - arrow_data, - offset, - compression, - ); - array.values().iter().for_each(|array| { - write( - array.as_ref(), - buffers, - arrow_data, - nodes, - offset, - is_little_endian, - compression, - ); - }); -} - -pub fn write_union( - array: &UnionArray, - buffers: &mut Vec, - arrow_data: &mut Vec, - nodes: &mut Vec, - offset: &mut i64, - is_little_endian: bool, - compression: Option, -) { - write_buffer( - array.types(), - buffers, - arrow_data, - offset, - is_little_endian, - compression, - ); - - if let Some(offsets) = array.offsets() { - write_buffer( - offsets, - buffers, - arrow_data, - offset, - is_little_endian, - compression, - ); - } - array.fields().iter().for_each(|array| { - write( - array.as_ref(), - buffers, - arrow_data, - nodes, - offset, - is_little_endian, - compression, - ) - }); -} - -fn write_map( - array: &MapArray, - buffers: &mut Vec, - arrow_data: &mut Vec, - nodes: &mut Vec, - offset: &mut i64, - is_little_endian: bool, - compression: Option, -) { - let offsets = array.offsets().buffer(); - let validity = array.validity(); - - write_bitmap( - validity, - offsets.len() - 1, - buffers, - arrow_data, - offset, - compression, - ); - - let first = *offsets.first().unwrap(); - let last = *offsets.last().unwrap(); - if first == 0 { - write_buffer( - offsets, - buffers, - arrow_data, - offset, - is_little_endian, - compression, - ); - } else { - write_buffer_from_iter( - offsets.iter().map(|x| *x - first), - buffers, - arrow_data, - offset, - is_little_endian, - compression, - ); - } - - write( - array - .field() - .sliced(first as usize, last as usize - first as usize) - .as_ref(), - buffers, - arrow_data, - nodes, - offset, - is_little_endian, - compression, - ); -} - -fn write_fixed_size_list( - array: &FixedSizeListArray, - buffers: &mut Vec, - arrow_data: &mut Vec, - nodes: &mut Vec, - offset: &mut i64, - is_little_endian: bool, - compression: Option, -) { - write_bitmap( - array.validity(), - array.len(), - buffers, - arrow_data, - offset, - compression, - ); - write( - array.values().as_ref(), - buffers, - arrow_data, - nodes, - offset, - is_little_endian, - compression, - ); -} - -// use `write_keys` to either write keys or values -#[allow(clippy::too_many_arguments)] -pub(super) fn write_dictionary( - array: &DictionaryArray, - buffers: &mut Vec, - arrow_data: &mut Vec, - nodes: &mut Vec, - offset: &mut i64, - is_little_endian: bool, - compression: Option, - write_keys: bool, -) -> usize { - if write_keys { - write_primitive( - array.keys(), - buffers, - arrow_data, - offset, - is_little_endian, - compression, - ); - array.keys().len() - } else { - write( - array.values().as_ref(), - buffers, - arrow_data, - nodes, - offset, - is_little_endian, - compression, - ); - array.values().len() - } -} +mod binary; +mod binview; +mod boolean; +mod dictionary; +mod fixed_size_binary; +mod fixed_sized_list; +mod list; +mod map; +mod primitive; +mod struct_; +mod union; + +use binary::*; +use binview::*; +use boolean::*; +pub(super) use dictionary::*; +use fixed_size_binary::*; +use fixed_sized_list::*; +use list::*; +use map::*; +use primitive::*; +use struct_::*; +use union::*; /// Writes an [`Array`] to `arrow_data` pub fn write( @@ -564,14 +174,31 @@ pub fn write( compression, ); }, - Utf8View | BinaryView => todo!(), + Utf8View => write_binview( + array.as_any().downcast_ref::().unwrap(), + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ), + BinaryView => write_binview( + array.as_any().downcast_ref::().unwrap(), + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ), } } #[inline] fn pad_buffer_to_64(buffer: &mut Vec, length: usize) { let pad_len = pad_to_64(length); - buffer.extend_from_slice(&vec![0u8; pad_len]); + for _ in 0..pad_len { + buffer.push(0u8); + } } /// writes `bytes` to `arrow_data` updating `buffers` and `offset` and guaranteeing a 8 byte boundary. diff --git a/crates/polars-arrow/src/io/ipc/write/serialize/primitive.rs b/crates/polars-arrow/src/io/ipc/write/serialize/primitive.rs new file mode 100644 index 000000000000..acd3ad672f78 --- /dev/null +++ b/crates/polars-arrow/src/io/ipc/write/serialize/primitive.rs @@ -0,0 +1,28 @@ +use super::*; + +pub(super) fn write_primitive( + array: &PrimitiveArray, + buffers: &mut Vec, + arrow_data: &mut Vec, + offset: &mut i64, + is_little_endian: bool, + compression: Option, +) { + write_bitmap( + array.validity(), + array.len(), + buffers, + arrow_data, + offset, + compression, + ); + + write_buffer( + array.values(), + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ) +} diff --git a/crates/polars-arrow/src/io/ipc/write/serialize/struct_.rs b/crates/polars-arrow/src/io/ipc/write/serialize/struct_.rs new file mode 100644 index 000000000000..67353746d4cd --- /dev/null +++ b/crates/polars-arrow/src/io/ipc/write/serialize/struct_.rs @@ -0,0 +1,31 @@ +use super::*; + +pub(super) fn write_struct( + array: &StructArray, + buffers: &mut Vec, + arrow_data: &mut Vec, + nodes: &mut Vec, + offset: &mut i64, + is_little_endian: bool, + compression: Option, +) { + write_bitmap( + array.validity(), + array.len(), + buffers, + arrow_data, + offset, + compression, + ); + array.values().iter().for_each(|array| { + write( + array.as_ref(), + buffers, + arrow_data, + nodes, + offset, + is_little_endian, + compression, + ); + }); +} diff --git a/crates/polars-arrow/src/io/ipc/write/serialize/union.rs b/crates/polars-arrow/src/io/ipc/write/serialize/union.rs new file mode 100644 index 000000000000..9f0e53fcf67b --- /dev/null +++ b/crates/polars-arrow/src/io/ipc/write/serialize/union.rs @@ -0,0 +1,42 @@ +use super::*; + +pub(super) fn write_union( + array: &UnionArray, + buffers: &mut Vec, + arrow_data: &mut Vec, + nodes: &mut Vec, + offset: &mut i64, + is_little_endian: bool, + compression: Option, +) { + write_buffer( + array.types(), + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); + + if let Some(offsets) = array.offsets() { + write_buffer( + offsets, + buffers, + arrow_data, + offset, + is_little_endian, + compression, + ); + } + array.fields().iter().for_each(|array| { + write( + array.as_ref(), + buffers, + arrow_data, + nodes, + offset, + is_little_endian, + compression, + ) + }); +} diff --git a/crates/polars-arrow/tests/it/io/ipc/mod.rs b/crates/polars-arrow/tests/it/io/ipc/mod.rs new file mode 100644 index 000000000000..fe490da7886a --- /dev/null +++ b/crates/polars-arrow/tests/it/io/ipc/mod.rs @@ -0,0 +1,80 @@ +use std::io::Cursor; +use std::sync::Arc; + +use polars_arrow::array::*; +use polars_arrow::chunk::Chunk; +use polars_arrow::datatypes::{ArrowSchema, ArrowSchemaRef, Field}; +use polars_arrow::io::ipc::read::{read_file_metadata, FileReader}; +use polars_arrow::io::ipc::write::*; +use polars_arrow::io::ipc::IpcField; +use polars_error::*; + +pub(crate) fn write( + batches: &[Chunk>], + schema: &ArrowSchemaRef, + ipc_fields: Option>, + compression: Option, +) -> PolarsResult> { + let result = vec![]; + let options = WriteOptions { compression }; + let mut writer = FileWriter::try_new(result, schema.clone(), ipc_fields.clone(), options)?; + for batch in batches { + writer.write(batch, ipc_fields.as_ref().map(|x| x.as_ref()))?; + } + writer.finish()?; + Ok(writer.into_inner()) +} + +fn round_trip( + columns: Chunk>, + schema: ArrowSchemaRef, + ipc_fields: Option>, + compression: Option, +) -> PolarsResult<()> { + let (expected_schema, expected_batches) = (schema.clone(), vec![columns]); + + let result = write(&expected_batches, &schema, ipc_fields, compression)?; + let mut reader = Cursor::new(result); + let metadata = read_file_metadata(&mut reader)?; + let schema = metadata.schema.clone(); + + let reader = FileReader::new(reader, metadata, None, None); + + assert_eq!(schema, expected_schema); + + let batches = reader.collect::>>()?; + + assert_eq!(batches, expected_batches); + Ok(()) +} + +fn prep_schema(array: &dyn Array) -> ArrowSchemaRef { + let fields = vec![Field::new("a", array.data_type().clone(), true)]; + Arc::new(ArrowSchema::from(fields)) +} + +#[test] +fn write_boolean() -> PolarsResult<()> { + let array = BooleanArray::from([Some(true), Some(false), None, Some(true)]).boxed(); + let schema = prep_schema(array.as_ref()); + let columns = Chunk::try_new(vec![array])?; + round_trip(columns, schema, None, Some(Compression::ZSTD)) +} + +#[test] +fn write_sliced_utf8() -> PolarsResult<()> { + let array = Utf8Array::::from_slice(["aa", "bb"]) + .sliced(1, 1) + .boxed(); + let schema = prep_schema(array.as_ref()); + let columns = Chunk::try_new(vec![array])?; + round_trip(columns, schema, None, Some(Compression::ZSTD)) +} + +#[test] +fn write_binview() -> PolarsResult<()> { + let array = Utf8ViewArray::from([Some("foo"), Some("bar"), None, Some("hamlet")]).boxed(); + let schema = prep_schema(array.as_ref()); + let columns = Chunk::try_new(vec![array])?; + round_trip(columns, schema, None, Some(Compression::ZSTD)) +} diff --git a/crates/polars-arrow/tests/it/io/mod.rs b/crates/polars-arrow/tests/it/io/mod.rs new file mode 100644 index 000000000000..c00b27ad365d --- /dev/null +++ b/crates/polars-arrow/tests/it/io/mod.rs @@ -0,0 +1 @@ +mod ipc; diff --git a/crates/polars-arrow/tests/it/main.rs b/crates/polars-arrow/tests/it/main.rs new file mode 100644 index 000000000000..481814590a8c --- /dev/null +++ b/crates/polars-arrow/tests/it/main.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "io_ipc_compression")] +mod io;