Skip to content

Commit

Permalink
Support StringViewArray interop with python: fix lingering C Data Int…
Browse files Browse the repository at this point in the history
…erface issues for *ViewArray (#6368)

* fix lingering C Data Interface issues for *ViewArray

Fixes #6366

* report views length in elements -> bytes

* use pyarrow 17

* use only good versions

* fix support for View arrays in C FFI, add test

* update comment in github action

* more ffi test cases

* more byte_view tests for into_pyarrow
  • Loading branch information
a10y committed Sep 12, 2024
1 parent e838e62 commit 60ec869
Show file tree
Hide file tree
Showing 6 changed files with 247 additions and 26 deletions.
7 changes: 3 additions & 4 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ on:
- arrow/**

jobs:

integration:
name: Archery test With other arrows
runs-on: ubuntu-latest
Expand Down Expand Up @@ -118,9 +117,9 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
rust: [ stable ]
# PyArrow 13 was the last version prior to introduction to Arrow PyCapsules
pyarrow: [ "13", "14" ]
rust: [stable]
# PyArrow 15 was the first version to introduce StringView/BinaryView support
pyarrow: ["15", "16", "17"]
steps:
- uses: actions/checkout@v4
with:
Expand Down
168 changes: 154 additions & 14 deletions arrow-array/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ fn bit_width(data_type: &DataType, i: usize) -> Result<usize> {
"The datatype \"{data_type:?}\" expects 3 buffers, but requested {i}. Please verify that the C data interface is correctly implemented."
)))
}
// Variable-sized views: have 3 or more buffers.
// Buffer 1 are the u128 views
// Buffers 2...N-1 are u8 byte buffers
(DataType::Utf8View, 1) | (DataType::BinaryView,1) => u128::BITS as _,
(DataType::Utf8View, _) | (DataType::BinaryView, _) => {
u8::BITS as _
}
// type ids. UnionArray doesn't have null bitmap so buffer index begins with 0.
(DataType::Union(_, _), 0) => i8::BITS as _,
// Only DenseUnion has 2nd buffer
Expand Down Expand Up @@ -300,7 +307,7 @@ impl<'a> ImportedArrowArray<'a> {
};

let data_layout = layout(&self.data_type);
let buffers = self.buffers(data_layout.can_contain_null_mask)?;
let buffers = self.buffers(data_layout.can_contain_null_mask, data_layout.variadic)?;

let null_bit_buffer = if data_layout.can_contain_null_mask {
self.null_bit_buffer()
Expand Down Expand Up @@ -373,13 +380,30 @@ impl<'a> ImportedArrowArray<'a> {

/// returns all buffers, as organized by Rust (i.e. null buffer is skipped if it's present
/// in the spec of the type)
fn buffers(&self, can_contain_null_mask: bool) -> Result<Vec<Buffer>> {
fn buffers(&self, can_contain_null_mask: bool, variadic: bool) -> Result<Vec<Buffer>> {
// + 1: skip null buffer
let buffer_begin = can_contain_null_mask as usize;
(buffer_begin..self.array.num_buffers())
.map(|index| {
let len = self.buffer_len(index, &self.data_type)?;
let buffer_end = self.array.num_buffers() - usize::from(variadic);

let variadic_buffer_lens = if variadic {
// Each views array has 1 (optional) null buffer, 1 views buffer, 1 lengths buffer.
// Rest are variadic.
let num_variadic_buffers =
self.array.num_buffers() - (2 + usize::from(can_contain_null_mask));
if num_variadic_buffers == 0 {
&[]
} else {
let lengths = self.array.buffer(self.array.num_buffers() - 1);
// SAFETY: is lengths is non-null, then it must be valid for up to num_variadic_buffers.
unsafe { std::slice::from_raw_parts(lengths.cast::<i64>(), num_variadic_buffers) }
}
} else {
&[]
};

(buffer_begin..buffer_end)
.map(|index| {
let len = self.buffer_len(index, variadic_buffer_lens, &self.data_type)?;
match unsafe { create_buffer(self.owner.clone(), self.array, index, len) } {
Some(buf) => Ok(buf),
None if len == 0 => {
Expand All @@ -399,7 +423,12 @@ impl<'a> ImportedArrowArray<'a> {
/// Rust implementation uses fixed-sized buffers, which require knowledge of their `len`.
/// for variable-sized buffers, such as the second buffer of a stringArray, we need
/// to fetch offset buffer's len to build the second buffer.
fn buffer_len(&self, i: usize, dt: &DataType) -> Result<usize> {
fn buffer_len(
&self,
i: usize,
variadic_buffer_lengths: &[i64],
dt: &DataType,
) -> Result<usize> {
// Special handling for dictionary type as we only care about the key type in the case.
let data_type = match dt {
DataType::Dictionary(key_data_type, _) => key_data_type.as_ref(),
Expand Down Expand Up @@ -430,7 +459,7 @@ impl<'a> ImportedArrowArray<'a> {
}

// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1, dt)?;
let len = self.buffer_len(1, variadic_buffer_lengths, dt)?;
// first buffer is the null buffer => add(1)
// we assume that pointer is aligned for `i32`, as Utf8 uses `i32` offsets.
#[allow(clippy::cast_ptr_alignment)]
Expand All @@ -444,14 +473,24 @@ impl<'a> ImportedArrowArray<'a> {
}

// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1, dt)?;
let len = self.buffer_len(1, variadic_buffer_lengths, dt)?;
// first buffer is the null buffer => add(1)
// we assume that pointer is aligned for `i64`, as Large uses `i64` offsets.
#[allow(clippy::cast_ptr_alignment)]
let offset_buffer = self.array.buffer(1) as *const i64;
// get last offset
(unsafe { *offset_buffer.add(len / size_of::<i64>() - 1) }) as usize
}
// View types: these have variadic buffers.
// Buffer 1 is the views buffer, which stores 1 u128 per length of the array.
// Buffers 2..N-1 are the buffers holding the byte data. Their lengths are variable.
// Buffer N is of length (N - 2) and stores i64 containing the lengths of buffers 2..N-1
(DataType::Utf8View, 1) | (DataType::BinaryView, 1) => {
std::mem::size_of::<u128>() * length
}
(DataType::Utf8View, i) | (DataType::BinaryView, i) => {
variadic_buffer_lengths[i - 2] as usize
}
// buffer len of primitive types
_ => {
let bits = bit_width(data_type, i)?;
Expand Down Expand Up @@ -1229,18 +1268,18 @@ mod tests_from_ffi {
use arrow_data::ArrayData;
use arrow_schema::{DataType, Field};

use crate::types::Int32Type;
use super::{ImportedArrowArray, Result};
use crate::builder::GenericByteViewBuilder;
use crate::types::{BinaryViewType, ByteViewType, Int32Type, StringViewType};
use crate::{
array::{
Array, BooleanArray, DictionaryArray, FixedSizeBinaryArray, FixedSizeListArray,
Int32Array, Int64Array, StringArray, StructArray, UInt32Array, UInt64Array,
},
ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema},
make_array, ArrayRef, ListArray,
make_array, ArrayRef, GenericByteViewArray, ListArray,
};

use super::{ImportedArrowArray, Result};

fn test_round_trip(expected: &ArrayData) -> Result<()> {
// here we export the array
let array = FFI_ArrowArray::new(expected);
Expand Down Expand Up @@ -1453,8 +1492,8 @@ mod tests_from_ffi {
owner: &array,
};

let offset_buf_len = imported_array.buffer_len(1, &imported_array.data_type)?;
let data_buf_len = imported_array.buffer_len(2, &imported_array.data_type)?;
let offset_buf_len = imported_array.buffer_len(1, &[], &imported_array.data_type)?;
let data_buf_len = imported_array.buffer_len(2, &[], &imported_array.data_type)?;

assert_eq!(offset_buf_len, 4);
assert_eq!(data_buf_len, 0);
Expand All @@ -1472,6 +1511,18 @@ mod tests_from_ffi {
StringArray::from(array)
}

fn roundtrip_byte_view_array<T: ByteViewType>(
array: GenericByteViewArray<T>,
) -> GenericByteViewArray<T> {
let data = array.into_data();

let array = FFI_ArrowArray::new(&data);
let schema = FFI_ArrowSchema::try_from(data.data_type()).unwrap();

let array = unsafe { from_ffi(array, &schema) }.unwrap();
GenericByteViewArray::<T>::from(array)
}

fn extend_array(array: &dyn Array) -> ArrayRef {
let len = array.len();
let data = array.to_data();
Expand Down Expand Up @@ -1551,4 +1602,93 @@ mod tests_from_ffi {
&imported
);
}

/// Helper trait to allow us to use easily strings as either BinaryViewType::Native or
/// StringViewType::Native scalars.
trait NativeFromStr {
fn from_str(value: &str) -> &Self;
}

impl NativeFromStr for str {
fn from_str(value: &str) -> &Self {
value
}
}

impl NativeFromStr for [u8] {
fn from_str(value: &str) -> &Self {
value.as_bytes()
}
}

#[test]
fn test_round_trip_byte_view() {
fn test_case<T>()
where
T: ByteViewType,
T::Native: NativeFromStr,
{
macro_rules! run_test_case {
($array:expr) => {{
// round-trip through C Data Interface
let len = $array.len();
let imported = roundtrip_byte_view_array($array);
assert_eq!(imported.len(), len);

let copied = extend_array(&imported);
assert_eq!(
copied
.as_any()
.downcast_ref::<GenericByteViewArray<T>>()
.unwrap(),
&imported
);
}};
}

// Empty test case.
let empty = GenericByteViewBuilder::<T>::new().finish();
run_test_case!(empty);

// All inlined strings test case.
let mut all_inlined = GenericByteViewBuilder::<T>::new();
all_inlined.append_value(T::Native::from_str("inlined1"));
all_inlined.append_value(T::Native::from_str("inlined2"));
all_inlined.append_value(T::Native::from_str("inlined3"));
let all_inlined = all_inlined.finish();
assert_eq!(all_inlined.data_buffers().len(), 0);
run_test_case!(all_inlined);

// some inlined + non-inlined, 1 variadic buffer.
let mixed_one_variadic = {
let mut builder = GenericByteViewBuilder::<T>::new();
builder.append_value(T::Native::from_str("inlined"));
let block_id =
builder.append_block(Buffer::from("non-inlined-string-buffer".as_bytes()));
builder.try_append_view(block_id, 0, 25).unwrap();
builder.finish()
};
assert_eq!(mixed_one_variadic.data_buffers().len(), 1);
run_test_case!(mixed_one_variadic);

// inlined + non-inlined, 2 variadic buffers.
let mixed_two_variadic = {
let mut builder = GenericByteViewBuilder::<T>::new();
builder.append_value(T::Native::from_str("inlined"));
let block_id =
builder.append_block(Buffer::from("non-inlined-string-buffer".as_bytes()));
builder.try_append_view(block_id, 0, 25).unwrap();

let block_id = builder
.append_block(Buffer::from("another-non-inlined-string-buffer".as_bytes()));
builder.try_append_view(block_id, 0, 33).unwrap();
builder.finish()
};
assert_eq!(mixed_two_variadic.data_buffers().len(), 2);
run_test_case!(mixed_two_variadic);
}

test_case::<StringViewType>();
test_case::<BinaryViewType>();
}
}
7 changes: 5 additions & 2 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ impl Buffer {
pub fn advance(&mut self, offset: usize) {
assert!(
offset <= self.length,
"the offset of the new Buffer cannot exceed the existing length"
"the offset of the new Buffer cannot exceed the existing length: offset={} length={}",
offset,
self.length
);
self.length -= offset;
// Safety:
Expand All @@ -221,7 +223,8 @@ impl Buffer {
pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
assert!(
offset.saturating_add(length) <= self.length,
"the offset of the new Buffer cannot exceed the existing length"
"the offset of the new Buffer cannot exceed the existing length: slice offset={offset} length={length} selflen={}",
self.length
);
// Safety:
// offset + length <= self.length
Expand Down
20 changes: 16 additions & 4 deletions arrow-data/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::bit_mask::set_bits;
use crate::{layout, ArrayData};
use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_buffer::{Buffer, MutableBuffer, ScalarBuffer};
use arrow_schema::DataType;
use std::ffi::c_void;

Expand Down Expand Up @@ -121,7 +121,7 @@ impl FFI_ArrowArray {
pub fn new(data: &ArrayData) -> Self {
let data_layout = layout(data.data_type());

let buffers = if data_layout.can_contain_null_mask {
let mut buffers = if data_layout.can_contain_null_mask {
// * insert the null buffer at the start
// * make all others `Option<Buffer>`.
std::iter::once(align_nulls(data.offset(), data.nulls()))
Expand All @@ -132,7 +132,7 @@ impl FFI_ArrowArray {
};

// `n_buffers` is the number of buffers by the spec.
let n_buffers = {
let mut n_buffers = {
data_layout.buffers.len() + {
// If the layout has a null buffer by Arrow spec.
// Note that even the array doesn't have a null buffer because it has
Expand All @@ -141,10 +141,22 @@ impl FFI_ArrowArray {
}
} as i64;

if data_layout.variadic {
// Save the lengths of all variadic buffers into a new buffer.
// The first buffer is `views`, and the rest are variadic.
let mut data_buffers_lengths = Vec::new();
for buffer in data.buffers().iter().skip(1) {
data_buffers_lengths.push(buffer.len() as i64);
n_buffers += 1;
}

buffers.push(Some(ScalarBuffer::from(data_buffers_lengths).into_inner()));
n_buffers += 1;
}

let buffers_ptr = buffers
.iter()
.flat_map(|maybe_buffer| match maybe_buffer {
// note that `raw_data` takes into account the buffer's offset
Some(b) => Some(b.as_ptr() as *const c_void),
// This is for null buffer. We only put a null pointer for
// null buffer if by spec it can contain null mask.
Expand Down
2 changes: 1 addition & 1 deletion arrow/src/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl FromPyArrow for RecordBatch {
validate_pycapsule(array_capsule, "arrow_array")?;

let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
let array_data = unsafe { ffi::from_ffi(ffi_array, schema_ptr) }.map_err(to_py_err)?;
if !matches!(array_data.data_type(), DataType::Struct(_)) {
return Err(PyTypeError::new_err(
Expand Down
Loading

0 comments on commit 60ec869

Please sign in to comment.