Skip to content

Commit

Permalink
fix lingering C Data Interface issues for *ViewArray
Browse files Browse the repository at this point in the history
  • Loading branch information
a10y committed Sep 6, 2024
1 parent 0491294 commit e8ecf7e
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 16 deletions.
53 changes: 43 additions & 10 deletions arrow-array/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ pub unsafe fn export_array_into_raw(

// returns the number of bits that buffer `i` (in the C data interface) is expected to have.
// This is set by the Arrow specification
fn bit_width(data_type: &DataType, i: usize) -> Result<usize> {
fn bit_width(data_type: &DataType, i: usize, num_buffers: usize) -> Result<usize> {
if let Some(primitive) = data_type.primitive_width() {
return match i {
0 => Err(ArrowError::CDataInterface(format!(
Expand All @@ -161,7 +161,7 @@ fn bit_width(data_type: &DataType, i: usize) -> Result<usize> {
}
(DataType::FixedSizeBinary(num_bytes), 1) => *num_bytes as usize * u8::BITS as usize,
(DataType::FixedSizeList(f, num_elems), 1) => {
let child_bit_width = bit_width(f.data_type(), 1)?;
let child_bit_width = bit_width(f.data_type(), 1, num_buffers)?;
child_bit_width * (*num_elems as usize)
},
(DataType::FixedSizeBinary(_), _) | (DataType::FixedSizeList(_, _), _) => {
Expand Down Expand Up @@ -193,6 +193,19 @@ fn bit_width(data_type: &DataType, i: usize) -> Result<usize> {
"The datatype \"{data_type:?}\" expects 3 buffers, but requested {i}. Please verify that the C data interface is correctly implemented."
)))
}
// Variable-sized views: have 3 or more buffers.
// Buffer 1 is u128 views
// Buffers 2...N-1 are u8
// Buffer N is i64
(DataType::Utf8View, 1) | (DataType::BinaryView,1) => {
u128::BITS as _
}
(DataType::Utf8View, i) | (DataType::BinaryView, i) if i < num_buffers - 1 =>{
u8::BITS as _
}
(DataType::Utf8View, i) | (DataType::BinaryView, i) if i == num_buffers - 1 => {
i64::BITS as _
}
// type ids. UnionArray doesn't have null bitmap so buffer index begins with 0.
(DataType::Union(_, _), 0) => i8::BITS as _,
// Only DenseUnion has 2nd buffer
Expand Down Expand Up @@ -378,7 +391,7 @@ impl<'a> ImportedArrowArray<'a> {
let buffer_begin = can_contain_null_mask as usize;
(buffer_begin..self.array.num_buffers())
.map(|index| {
let len = self.buffer_len(index, &self.data_type)?;
let len = self.buffer_len(index, self.array.num_buffers(), &self.data_type)?;

match unsafe { create_buffer(self.owner.clone(), self.array, index, len) } {
Some(buf) => Ok(buf),
Expand All @@ -399,7 +412,7 @@ impl<'a> ImportedArrowArray<'a> {
/// Rust implementation uses fixed-sized buffers, which require knowledge of their `len`.
/// for variable-sized buffers, such as the second buffer of a stringArray, we need
/// to fetch offset buffer's len to build the second buffer.
fn buffer_len(&self, i: usize, dt: &DataType) -> Result<usize> {
fn buffer_len(&self, i: usize, num_buffers: usize, dt: &DataType) -> Result<usize> {
// Special handling for dictionary type as we only care about the key type in the case.
let data_type = match dt {
DataType::Dictionary(key_data_type, _) => key_data_type.as_ref(),
Expand All @@ -420,7 +433,7 @@ impl<'a> ImportedArrowArray<'a> {
| (DataType::LargeList(_), 1)
| (DataType::Map(_, _), 1) => {
// the len of the offset buffer (buffer 1) equals length + 1
let bits = bit_width(data_type, i)?;
let bits = bit_width(data_type, i, num_buffers)?;
debug_assert_eq!(bits % 8, 0);
(length + 1) * (bits / 8)
}
Expand All @@ -430,7 +443,7 @@ impl<'a> ImportedArrowArray<'a> {
}

// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1, dt)?;
let len = self.buffer_len(1, num_buffers, dt)?;
// first buffer is the null buffer => add(1)
// we assume that pointer is aligned for `i32`, as Utf8 uses `i32` offsets.
#[allow(clippy::cast_ptr_alignment)]
Expand All @@ -444,17 +457,35 @@ impl<'a> ImportedArrowArray<'a> {
}

// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1, dt)?;
let len = self.buffer_len(1, num_buffers, dt)?;
// first buffer is the null buffer => add(1)
// we assume that pointer is aligned for `i64`, as Large uses `i64` offsets.
#[allow(clippy::cast_ptr_alignment)]
let offset_buffer = self.array.buffer(1) as *const i64;
// get last offset
(unsafe { *offset_buffer.add(len / size_of::<i64>() - 1) }) as usize
}
// View types: these have variadic buffers.
// Buffer 1 is the views buffer, which is the same as the length of the array.
// Buffers 2..N-1 are the buffers holding the byte data. Their lengths are variable.
// Buffer N is of length (N - 2) and stores i64 containing the lengths of buffers 2..N-1
(DataType::Utf8View, 1) | (DataType::BinaryView, 1) => length,
(DataType::Utf8View, i) | (DataType::BinaryView, i) if i < num_buffers - 1 => {
// Read the length of buffer N out of the last buffer.
// first buffer is the null buffer => add(1)
// we assume that pointer is aligned for `i32`, as Utf8 uses `i32` offsets.
#[allow(clippy::cast_ptr_alignment)]
let variadic_buffer_lengths = self.array.buffer(num_buffers - 1) as *const i64;
// get last offset
(unsafe { *variadic_buffer_lengths.add(i - 2) }) as usize
}
(DataType::Utf8View, i) | (DataType::BinaryView, i) if i == num_buffers - 1 => {
// Length is equal to number of buffers.
num_buffers - 2
}
// buffer len of primitive types
_ => {
let bits = bit_width(data_type, i)?;
let bits = bit_width(data_type, i, num_buffers)?;
bit_util::ceil(length * bits, 8)
}
})
Expand Down Expand Up @@ -1453,8 +1484,10 @@ mod tests_from_ffi {
owner: &array,
};

let offset_buf_len = imported_array.buffer_len(1, &imported_array.data_type)?;
let data_buf_len = imported_array.buffer_len(2, &imported_array.data_type)?;
let offset_buf_len =
imported_array.buffer_len(1, array.num_buffers(), &imported_array.data_type)?;
let data_buf_len =
imported_array.buffer_len(2, array.num_buffers(), &imported_array.data_type)?;

assert_eq!(offset_buf_len, 4);
assert_eq!(data_buf_len, 0);
Expand Down
6 changes: 4 additions & 2 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ impl Buffer {
pub fn advance(&mut self, offset: usize) {
assert!(
offset <= self.length,
"the offset of the new Buffer cannot exceed the existing length"
"the offset of the new Buffer cannot exceed the existing length: offset={} length={}",
offset,
self.length
);
self.length -= offset;
// Safety:
Expand All @@ -221,7 +223,7 @@ impl Buffer {
pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
assert!(
offset.saturating_add(length) <= self.length,
"the offset of the new Buffer cannot exceed the existing length"
"the offset of the new Buffer cannot exceed the existing length: slice offset={offset} length={length}"
);
// Safety:
// offset + length <= self.length
Expand Down
25 changes: 22 additions & 3 deletions arrow-data/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::bit_mask::set_bits;
use crate::{layout, ArrayData};
use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_buffer::{Buffer, MutableBuffer, ScalarBuffer};
use arrow_schema::DataType;
use std::ffi::c_void;

Expand Down Expand Up @@ -121,7 +121,7 @@ impl FFI_ArrowArray {
pub fn new(data: &ArrayData) -> Self {
let data_layout = layout(data.data_type());

let buffers = if data_layout.can_contain_null_mask {
let mut buffers = if data_layout.can_contain_null_mask {
// * insert the null buffer at the start
// * make all others `Option<Buffer>`.
std::iter::once(align_nulls(data.offset(), data.nulls()))
Expand All @@ -132,7 +132,7 @@ impl FFI_ArrowArray {
};

// `n_buffers` is the number of buffers by the spec.
let n_buffers = {
let mut n_buffers = {
data_layout.buffers.len() + {
// If the layout has a null buffer by Arrow spec.
// Note that even the array doesn't have a null buffer because it has
Expand All @@ -153,6 +153,25 @@ impl FFI_ArrowArray {
})
.collect::<Box<[_]>>();

let need_variadic_buffer_sizes = match data.data_type() {
DataType::Utf8View | DataType::BinaryView => true,
_ => false,
};

if need_variadic_buffer_sizes {
n_buffers += 1;

// Skip null and views buffers.
let variadic_buffer_sizes: Vec<i64> = data
.buffers()
.iter()
.skip(2)
.map(|buf| buf.len() as _)
.collect();

buffers.push(Some(ScalarBuffer::from(variadic_buffer_sizes).into_inner()));
}

let empty = vec![];
let (child_data, dictionary) = match data.data_type() {
DataType::Dictionary(_, _) => (
Expand Down
4 changes: 3 additions & 1 deletion arrow/tests/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use arrow::array::{ArrayRef, Int32Array, StringArray};
use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use arrow::record_batch::RecordBatch;
use arrow_array::StringViewArray;
use pyo3::Python;
use std::sync::Arc;

Expand All @@ -27,7 +28,8 @@ fn test_to_pyarrow() {

let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b"]));
let input = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap();
let c: ArrayRef = Arc::new(StringViewArray::from(vec!["a", "b"]));
let input = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
println!("input: {:?}", input);

let res = Python::with_gil(|py| {
Expand Down

0 comments on commit e8ecf7e

Please sign in to comment.