Skip to content

Commit

Permalink
feat(rust): BinaryView/Utf8View IPC support (#13464)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 6, 2024
1 parent a1108fc commit f22e7fd
Show file tree
Hide file tree
Showing 41 changed files with 822 additions and 540 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ arrow_rs = ["arrow-buffer", "arrow-schema", "arrow-data", "arrow-array"]
io_ipc = ["arrow-format", "polars-error/arrow-format"]
io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures", "async-stream"]
io_ipc_compression = ["lz4", "zstd"]
io_ipc_compression = ["lz4", "zstd", "io_ipc"]
io_flight = ["io_ipc", "arrow-format/flight-data"]

io_avro = ["avro-schema", "polars-error/avro-schema"]
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/binary/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ impl<O: Offset, A: ffi::ArrowArrayRef> FromFfi<A> for BinaryArray<O> {
// assumption that data from FFI is well constructed
let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets) };

Ok(Self::new(data_type, offsets, values, validity))
Self::try_new(data_type, offsets, values, validity)
}
}
7 changes: 6 additions & 1 deletion crates/polars-arrow/src/array/binview/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ impl<T: ViewType + ?Sized, A: ffi::ArrowArrayRef> FromFfi<A> for BinaryViewArray
buffers.push(values);
}

Self::try_new(data_type, views, Arc::from(buffers), validity)
Ok(Self::new_unchecked(
data_type,
views,
Arc::from(buffers),
validity,
))
}
}
11 changes: 11 additions & 0 deletions crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod private {
use private::Sealed;

use crate::array::binview::iterator::BinaryViewValueIter;
use crate::array::binview::mutable::MutableBinaryViewArray;
use crate::array::binview::view::{validate_binary_view, validate_utf8_view};
use crate::array::iterator::NonNullValuesIter;
use crate::bitmap::utils::{BitmapIter, ZipValidity};
Expand Down Expand Up @@ -147,6 +148,10 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
self.buffers.as_ref()
}

pub fn variadic_buffer_lengths(&self) -> Vec<i64> {
self.buffers.iter().map(|buf| buf.len() as i64).collect()
}

pub fn views(&self) -> &Buffer<u128> {
&self.views
}
Expand Down Expand Up @@ -251,6 +256,12 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
impl_sliced!();
impl_mut_validity!();
impl_into_array!();

pub fn from<S: AsRef<T>, P: AsRef<[Option<S>]>>(slice: P) -> Self {
let mutable =
MutableBinaryViewArray::from_iter(slice.as_ref().iter().map(|opt_v| opt_v.as_ref()));
mutable.into()
}
}

impl<T: ViewType + ?Sized> Array for BinaryViewArrayGeneric<T> {
Expand Down
4 changes: 4 additions & 0 deletions crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
mutable.extend_values(iterator);
mutable
}

pub fn from<S: AsRef<T>, P: AsRef<[Option<S>]>>(slice: P) -> Self {
Self::from_iter(slice.as_ref().iter().map(|opt_v| opt_v.as_ref()))
}
}

impl<T: ViewType + ?Sized, P: AsRef<T>> Extend<Option<P>> for MutableBinaryViewArray<T> {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/list/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ impl<O: Offset, A: ffi::ArrowArrayRef> FromFfi<A> for ListArray<O> {
// assumption that data from FFI is well constructed
let offsets = unsafe { OffsetsBuffer::new_unchecked(offsets) };

Ok(Self::new(data_type, offsets, values, validity))
Self::try_new(data_type, offsets, values, validity)
}
}
15 changes: 4 additions & 11 deletions crates/polars-arrow/src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use std::io::{Read, Seek};
use polars_error::{polars_err, PolarsResult};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};
use super::super::{Compression, IpcBuffer, Node};
use crate::array::BinaryArray;
use crate::buffer::Buffer;
use crate::datatypes::ArrowDataType;
use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node};
use crate::offset::Offset;

#[allow(clippy::too_many_arguments)]
Expand All @@ -22,11 +23,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> PolarsResult<BinaryArray<O>> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
polars_err!(oos =
"IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted."
)
})?;
let field_node = try_get_field_node(field_nodes, &data_type)?;

let validity = read_validity(
buffers,
Expand All @@ -39,11 +36,7 @@ pub fn read_binary<O: Offset, R: Read + Seek>(
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);
let length = try_get_array_length(field_node, limit)?;

let offsets: Buffer<O> = read_buffer(
buffers,
Expand Down
82 changes: 82 additions & 0 deletions crates/polars-arrow/src/io/ipc/read/array/binview.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use std::collections::VecDeque;
use std::io::{Read, Seek};
use std::sync::Arc;

use polars_error::{polars_err, PolarsResult};

use super::super::read_basic::*;
use super::*;
use crate::array::{ArrayRef, BinaryViewArrayGeneric, ViewType};
use crate::buffer::Buffer;
use crate::datatypes::ArrowDataType;

#[allow(clippy::too_many_arguments)]
pub fn read_binview<T: ViewType + ?Sized, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
variadic_buffer_counts: &mut VecDeque<usize>,
data_type: ArrowDataType,
buffers: &mut VecDeque<IpcBuffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<Compression>,
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> PolarsResult<ArrayRef> {
let field_node = try_get_field_node(field_nodes, &data_type)?;

let validity = read_validity(
buffers,
field_node,
reader,
block_offset,
is_little_endian,
compression,
limit,
scratch,
)?;

let length = try_get_array_length(field_node, limit)?;
let views: Buffer<u128> = read_buffer(
buffers,
length,
reader,
block_offset,
is_little_endian,
compression,
scratch,
)?;

let n_variadic = variadic_buffer_counts.pop_front().ok_or_else(
|| polars_err!(ComputeError: "IPC: unable to fetch the variadic buffers\n\nThe file or stream is corrupted.")
)?;

let variadic_buffer_lengths: Buffer<i64> = read_buffer(
buffers,
n_variadic,
reader,
block_offset,
is_little_endian,
compression,
scratch,
)?;

let variadic_buffers = variadic_buffer_lengths
.iter()
.map(|length| {
let length = *length as usize;
read_buffer(
buffers,
length,
reader,
block_offset,
is_little_endian,
compression,
scratch,
)
})
.collect::<PolarsResult<Vec<Buffer<u8>>>>()?;

BinaryViewArrayGeneric::<T>::try_new(data_type, views, Arc::from(variadic_buffers), validity)
.map(|arr| arr.boxed())
}
15 changes: 4 additions & 11 deletions crates/polars-arrow/src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::io::{Read, Seek};
use polars_error::{polars_err, PolarsResult};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};
use super::super::{Compression, IpcBuffer, Node};
use crate::array::BooleanArray;
use crate::datatypes::ArrowDataType;
use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node};

#[allow(clippy::too_many_arguments)]
pub fn read_boolean<R: Read + Seek>(
Expand All @@ -20,11 +21,7 @@ pub fn read_boolean<R: Read + Seek>(
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> PolarsResult<BooleanArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
polars_err!(oos =
"IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted."
)
})?;
let field_node = try_get_field_node(field_nodes, &data_type)?;

let validity = read_validity(
buffers,
Expand All @@ -37,11 +34,7 @@ pub fn read_boolean<R: Read + Seek>(
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);
let length = try_get_array_length(field_node, limit)?;

let values = read_bitmap(
buffers,
Expand Down
15 changes: 4 additions & 11 deletions crates/polars-arrow/src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::io::{Read, Seek};
use polars_error::{polars_err, PolarsResult};

use super::super::read_basic::*;
use super::super::{Compression, IpcBuffer, Node, OutOfSpecKind};
use super::super::{Compression, IpcBuffer, Node};
use crate::array::FixedSizeBinaryArray;
use crate::datatypes::ArrowDataType;
use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node};

#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_binary<R: Read + Seek>(
Expand All @@ -20,11 +21,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
limit: Option<usize>,
scratch: &mut Vec<u8>,
) -> PolarsResult<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
polars_err!(ComputeError:
"IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted."
)
})?;
let field_node = try_get_field_node(field_nodes, &data_type)?;

let validity = read_validity(
buffers,
Expand All @@ -37,11 +34,7 @@ pub fn read_fixed_size_binary<R: Read + Seek>(
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);
let length = try_get_array_length(field_node, limit)?;

let length = length.saturating_mul(FixedSizeBinaryArray::maybe_get_size(&data_type)?);
let values = read_buffer(
Expand Down
9 changes: 4 additions & 5 deletions crates/polars-arrow/src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use super::super::read_basic::*;
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};
use crate::array::FixedSizeListArray;
use crate::datatypes::ArrowDataType;
use crate::io::ipc::read::array::try_get_field_node;

#[allow(clippy::too_many_arguments)]
pub fn read_fixed_size_list<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
variadic_buffer_counts: &mut VecDeque<usize>,
data_type: ArrowDataType,
ipc_field: &IpcField,
buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -25,11 +27,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(
version: Version,
scratch: &mut Vec<u8>,
) -> PolarsResult<FixedSizeListArray> {
let field_node = field_nodes.pop_front().ok_or_else(|| {
polars_err!(ComputeError:
"IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted."
)
})?;
let field_node = try_get_field_node(field_nodes, &data_type)?;

let validity = read_validity(
buffers,
Expand All @@ -48,6 +46,7 @@ pub fn read_fixed_size_list<R: Read + Seek>(

let values = read(
field_nodes,
variadic_buffer_counts,
field,
&ipc_field.fields[0],
buffers,
Expand Down
17 changes: 6 additions & 11 deletions crates/polars-arrow/src/io/ipc/read/array/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ use polars_error::{polars_err, PolarsResult};
use super::super::super::IpcField;
use super::super::deserialize::{read, skip};
use super::super::read_basic::*;
use super::super::{Compression, Dictionaries, IpcBuffer, Node, OutOfSpecKind, Version};
use super::super::{Compression, Dictionaries, IpcBuffer, Node, Version};
use crate::array::ListArray;
use crate::buffer::Buffer;
use crate::datatypes::ArrowDataType;
use crate::io::ipc::read::array::{try_get_array_length, try_get_field_node};
use crate::offset::Offset;

#[allow(clippy::too_many_arguments)]
pub fn read_list<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
variadic_buffer_counts: &mut VecDeque<usize>,
data_type: ArrowDataType,
ipc_field: &IpcField,
buffers: &mut VecDeque<IpcBuffer>,
Expand All @@ -31,11 +33,7 @@ pub fn read_list<O: Offset, R: Read + Seek>(
where
Vec<u8>: TryInto<O::Bytes>,
{
let field_node = field_nodes.pop_front().ok_or_else(|| {
polars_err!(ComputeError:
"IPC: unable to fetch the field for {data_type:?}. The file or stream is corrupted."
)
})?;
let field_node = try_get_field_node(field_nodes, &data_type)?;

let validity = read_validity(
buffers,
Expand All @@ -48,11 +46,7 @@ where
scratch,
)?;

let length: usize = field_node
.length()
.try_into()
.map_err(|_| polars_err!(oos = OutOfSpecKind::NegativeFooterLength))?;
let length = limit.map(|limit| limit.min(length)).unwrap_or(length);
let length = try_get_array_length(field_node, limit)?;

let offsets = read_buffer::<O, _>(
buffers,
Expand All @@ -72,6 +66,7 @@ where

let values = read(
field_nodes,
variadic_buffer_counts,
field,
&ipc_field.fields[0],
buffers,
Expand Down
Loading

0 comments on commit f22e7fd

Please sign in to comment.