Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust): BinaryView/Utf8View IPC support #13464

Merged
merged 5 commits into from
Jan 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ arrow_rs = ["arrow-buffer", "arrow-schema", "arrow-data", "arrow-array"]
io_ipc = ["arrow-format", "polars-error/arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures", "async-stream"]
io_ipc_compression = ["lz4", "zstd"]
io_ipc_compression = ["lz4", "zstd", "io_ipc"]
io_flight = ["io_ipc", "arrow-format/flight-data"]

io_avro = ["avro-schema", "polars-error/avro-schema"]
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/binary/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ impl<O: Offset, A: ffi::ArrowArrayRef> FromFfi<A> for BinaryArray<O> {
// assumption that data from FFI is well constructed
let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets) };

Ok(Self::new(data_type, offsets, values, validity))
Self::try_new(data_type, offsets, values, validity)
}
}
7 changes: 6 additions & 1 deletion crates/polars-arrow/src/array/binview/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ impl<T: ViewType + ?Sized, A: ffi::ArrowArrayRef> FromFfi<A> for BinaryViewArray
buffers.push(values);
}

Self::try_new(data_type, views, Arc::from(buffers), validity)
Ok(Self::new_unchecked(
data_type,
views,
Arc::from(buffers),
validity,
))
}
}
11 changes: 11 additions & 0 deletions crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod private {
use private::Sealed;

use crate::array::binview::iterator::BinaryViewValueIter;
use crate::array::binview::mutable::MutableBinaryViewArray;
use crate::array::binview::view::{validate_binary_view, validate_utf8_view};
use crate::array::iterator::NonNullValuesIter;
use crate::bitmap::utils::{BitmapIter, ZipValidity};
Expand Down Expand Up @@ -147,6 +148,10 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
self.buffers.as_ref()
}

pub fn variadic_buffer_lengths(&self) -> Vec<i64> {
self.buffers.iter().map(|buf| buf.len() as i64).collect()
}

pub fn views(&self) -> &Buffer<u128> {
&self.views
}
Expand Down Expand Up @@ -251,6 +256,12 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
impl_sliced!();
impl_mut_validity!();
impl_into_array!();

pub fn from<S: AsRef<T>, P: AsRef<[Option<S>]>>(slice: P) -> Self {
let mutable =
MutableBinaryViewArray::from_iter(slice.as_ref().iter().map(|opt_v| opt_v.as_ref()));
mutable.into()
}
}

impl<T: ViewType + ?Sized> Array for BinaryViewArrayGeneric<T> {
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
mutable.extend_values(iterator);
mutable
}

pub fn from<S: AsRef<T>, P: AsRef<[Option<S>]>>(slice: P) -> Self {
Self::from_iter(slice.as_ref().iter().map(|opt_v| opt_v.as_ref()))
}
}

impl<T: ViewType + ?Sized, P: AsRef<T>> Extend<Option<P>> for MutableBinaryViewArray<T> {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/list/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ impl<O: Offset, A: ffi::ArrowArrayRef> FromFfi<A> for ListArray<O> {
// assumption that data from FFI is well constructed
let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets) };

Ok(Self::new(data_type, offsets, values, validity))
Self::try_new(data_type, offsets, values, validity)
}
}
15 changes: 4 additions & 11 deletions crates/polars-arrow/src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::io::{Read, Seek};
use polars_error::{polars_err, PolarsResult};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};
use super::super::{Compression, IpcBuffer, Node};
use crate::array::BinaryArray;
use crate::buffer::Buffer;
use crate::datatypes::ArrowDataType;
use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node};
use crate::offset::Offset;

#[allow(clippy::too_many_arguments)]
Expand All @@ -22,11 +23,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> PolarsResult<BinaryArray<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
polars_err!(oos =
"IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted."
)
})?;
let field_node = try_get_field_node(field_nodes, &data_type)?;

let validity = read_validity(
buffers,
Expand All @@ -39,11 +36,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);
let length = try_get_array_length(field_node, limit)?;

let offsets: Buffer<O> = read_buffer(
buffers,
Expand Down
82 changes: 82 additions & 0 deletions crates/polars-arrow/src/io/ipc/read/array/binview.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::collections::VecDeque;
use std::io::{Read, Seek};
use std::sync::Arc;

use polars_error::{polars_err, PolarsResult};

use super::super::read_basic::*;
use super::*;
use crate::array::{ArrayRef, BinaryViewArrayGeneric, ViewType};
use crate::buffer::Buffer;
use crate::datatypes::ArrowDataType;

#[allow(clippy::too_many_arguments)]
pub fn read_binview<T: ViewType + ?Sized, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
variadic_buffer_counts: &mut VecDeque<usize>,
data_type: ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> PolarsResult<ArrayRef> {
let field_node = try_get_field_node(field_nodes, &data_type)?;

let validity = read_validity(
buffers,
field_node,
reader,
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let length = try_get_array_length(field_node, limit)?;
let views: Buffer<u128> = read_buffer(
buffers,
length,
reader,
block_offset,
is_little_endian,
compression,
scratch,
)?;

let n_variadic = variadic_buffer_counts.pop_front().ok_or_else(
|| polars_err!(ComputeError: "IPC: unable to fetch the variadic buffers\n\nThe file or stream is corrupted.")
)?;

let variadic_buffer_lengths: Buffer<i64> = read_buffer(
buffers,
n_variadic,
reader,
block_offset,
is_little_endian,
compression,
scratch,
)?;

let variadic_buffers = variadic_buffer_lengths
.iter()
.map(|length| {
let length = *length as usize;
read_buffer(
buffers,
length,
reader,
block_offset,
is_little_endian,
compression,
scratch,
)
})
.collect::<PolarsResult<Vec<Buffer<u8>>>>()?;

BinaryViewArrayGeneric::<T>::try_new(data_type, views, Arc::from(variadic_buffers), validity)
.map(|arr| arr.boxed())
}
15 changes: 4 additions & 11 deletions crates/polars-arrow/src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::io::{Read, Seek};
use polars_error::{polars_err, PolarsResult};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};
use super::super::{Compression, IpcBuffer, Node};
use crate::array::BooleanArray;
use crate::datatypes::ArrowDataType;
use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node};

#[allow(clippy::too_many_arguments)]
pub fn read_boolean<R: Read + Seek>(
Expand All @@ -20,11 +21,7 @@ pub fn read_boolean<R: Read + Seek>(
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> PolarsResult<BooleanArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
polars_err!(oos =
"IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted."
)
})?;
let field_node = try_get_field_node(field_nodes, &data_type)?;

let validity = read_validity(
buffers,
Expand All @@ -37,11 +34,7 @@ pub fn read_boolean<R: Read + Seek>(
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);
let length = try_get_array_length(field_node, limit)?;

let values = read_bitmap(
buffers,
Expand Down
15 changes: 4 additions & 11 deletions crates/polars-arrow/src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::io::{Read, Seek};
use polars_error::{polars_err, PolarsResult};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};
use super::super::{Compression, IpcBuffer, Node};
use crate::array::FixedSizeBinaryArray;
use crate::datatypes::ArrowDataType;
use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node};

#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_binary<R: Read + Seek>(
Expand All @@ -20,11 +21,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> PolarsResult<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
polars_err!(ComputeError:
"IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted."
)
})?;
let field_node = try_get_field_node(field_nodes, &data_type)?;

let validity = read_validity(
buffers,
Expand All @@ -37,11 +34,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);
let length = try_get_array_length(field_node, limit)?;

let length = length.saturating_mul(FixedSizeBinaryArray::maybe_get_size(&data_type)?);
let values = read_buffer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use super::super::read_basic::*;
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};
use crate::array::FixedSizeListArray;
use crate::datatypes::ArrowDataType;
use crate::io::ipc::read::array::try_get_field_node;

#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_list<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
variadic_buffer_counts: &mut VecDeque<usize>,
data_type: ArrowDataType,
ipc_field: &IpcField,
buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -25,11 +27,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
version: Version,
scratch: &mut Vec<u8>,
) -> PolarsResult<FixedSizeListArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
polars_err!(ComputeError:
"IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted."
)
})?;
let field_node = try_get_field_node(field_nodes, &data_type)?;

let validity = read_validity(
buffers,
Expand All @@ -48,6 +46,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(

let values = read(
field_nodes,
variadic_buffer_counts,
field,
&ipc_field.fields[0],
buffers,
Expand Down
17 changes: 6 additions & 11 deletions crates/polars-arrow/src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ use polars_error::{polars_err, PolarsResult};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};
use crate::array::ListArray;
use crate::buffer::Buffer;
use crate::datatypes::ArrowDataType;
use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node};
use crate::offset::Offset;

#[allow(clippy::too_many_arguments)]
pub fn read_list<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
variadic_buffer_counts: &mut VecDeque<usize>,
data_type: ArrowDataType,
ipc_field: &IpcField,
buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -31,11 +33,7 @@ pub fn read_list<O: Offset, R: Read + Seek>(
where
Vec<u8>: TryInto<O::Bytes>,
{
let field_node = field_nodes.pop_front().ok_or_else(|| {
polars_err!(ComputeError:
"IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted."
)
})?;
let field_node = try_get_field_node(field_nodes, &data_type)?;

let validity = read_validity(
buffers,
Expand All @@ -48,11 +46,7 @@ where
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);
let length = try_get_array_length(field_node, limit)?;

let offsets = read_buffer::<O, _>(
buffers,
Expand All @@ -72,6 +66,7 @@ where

let values = read(
field_nodes,
variadic_buffer_counts,
field,
&ipc_field.fields[0],
buffers,
Expand Down
Loading