Skip to content

Commit

Permalink
perf: directly embed data ptr in Buffer (pola-rs#13744)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored and r-brink committed Jan 15, 2024
1 parent cbdf0de commit 130ab2a
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 99 deletions.
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/array/binary/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ unsafe impl<O: Offset> ToFfi for BinaryArray<O> {
fn buffers(&self) -> Vec<Option<*const u8>> {
vec![
self.validity.as_ref().map(|x| x.as_ptr()),
Some(self.offsets.buffer().as_ptr().cast::<u8>()),
Some(self.values.as_ptr().cast::<u8>()),
Some(self.offsets.buffer().storage_ptr().cast::<u8>()),
Some(self.values.storage_ptr().cast::<u8>()),
]
}

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/array/binview/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ unsafe impl<T: ViewType + ?Sized> ToFfi for BinaryViewArrayGeneric<T> {
fn buffers(&self) -> Vec<Option<*const u8>> {
let mut buffers = Vec::with_capacity(self.buffers.len() + 2);
buffers.push(self.validity.as_ref().map(|x| x.as_ptr()));
buffers.push(Some(self.views.as_ptr().cast::<u8>()));
buffers.extend(self.buffers.iter().map(|b| Some(b.as_ptr())));
buffers.push(Some(self.views.storage_ptr().cast::<u8>()));
buffers.extend(self.buffers.iter().map(|b| Some(b.storage_ptr())));
buffers
}

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ unsafe impl<T: ViewType + ?Sized> Sync for BinaryViewArrayGeneric<T> {}
fn buffers_into_raw<T>(buffers: &[Buffer<T>]) -> Arc<[(*const T, usize)]> {
buffers
.iter()
.map(|buf| (buf.as_ptr(), buf.len()))
.map(|buf| (buf.storage_ptr(), buf.len()))
.collect()
}

Expand Down Expand Up @@ -262,7 +262,7 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
// data: 12 bytes

let bytes = if len <= 12 {
let ptr = self.views.as_ptr() as *const u8;
let ptr = self.views.storage_ptr() as *const u8;
std::slice::from_raw_parts(ptr.add(i * 16 + 4), len as usize)
} else {
let buffer_idx = (v >> 64) as u32;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/fixed_size_binary/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ unsafe impl ToFfi for FixedSizeBinaryArray {
fn buffers(&self) -> Vec<Option<*const u8>> {
vec![
self.validity.as_ref().map(|x| x.as_ptr()),
Some(self.values.as_ptr().cast::<u8>()),
Some(self.values.storage_ptr().cast::<u8>()),
]
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/list/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ unsafe impl<O: Offset> ToFfi for ListArray<O> {
fn buffers(&self) -> Vec<Option<*const u8>> {
vec![
self.validity.as_ref().map(|x| x.as_ptr()),
Some(self.offsets.buffer().as_ptr().cast::<u8>()),
Some(self.offsets.buffer().storage_ptr().cast::<u8>()),
]
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/map/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ unsafe impl ToFfi for MapArray {
fn buffers(&self) -> Vec<Option<*const u8>> {
vec![
self.validity.as_ref().map(|x| x.as_ptr()),
Some(self.offsets.buffer().as_ptr().cast::<u8>()),
Some(self.offsets.buffer().storage_ptr().cast::<u8>()),
]
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/primitive/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ unsafe impl<T: NativeType> ToFfi for PrimitiveArray<T> {
fn buffers(&self) -> Vec<Option<*const u8>> {
vec![
self.validity.as_ref().map(|x| x.as_ptr()),
Some(self.values.as_ptr().cast::<u8>()),
Some(self.values.storage_ptr().cast::<u8>()),
]
}

Expand Down
6 changes: 3 additions & 3 deletions crates/polars-arrow/src/array/union/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ unsafe impl ToFfi for UnionArray {
fn buffers(&self) -> Vec<Option<*const u8>> {
if let Some(offsets) = &self.offsets {
vec![
Some(self.types.as_ptr().cast::<u8>()),
Some(offsets.as_ptr().cast::<u8>()),
Some(self.types.storage_ptr().cast::<u8>()),
Some(offsets.storage_ptr().cast::<u8>()),
]
} else {
vec![Some(self.types.as_ptr().cast::<u8>())]
vec![Some(self.types.storage_ptr().cast::<u8>())]
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/array/utf8/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ unsafe impl<O: Offset> ToFfi for Utf8Array<O> {
fn buffers(&self) -> Vec<Option<*const u8>> {
vec![
self.validity.as_ref().map(|x| x.as_ptr()),
Some(self.offsets.buffer().as_ptr().cast::<u8>()),
Some(self.values.as_ptr().cast::<u8>()),
Some(self.offsets.buffer().storage_ptr().cast::<u8>()),
Some(self.values.storage_ptr().cast::<u8>()),
]
}

Expand Down
123 changes: 46 additions & 77 deletions crates/polars-arrow/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,19 @@ use crate::array::ArrayAccessor;
/// ```
#[derive(Clone)]
pub struct Buffer<T> {
/// the internal byte buffer.
data: Arc<Bytes<T>>,
/// The internal byte buffer.
storage: Arc<Bytes<T>>,

/// The offset into the buffer.
offset: usize,
/// A pointer into the buffer where our data starts.
ptr: *const T,

// the length of the buffer. Given a region `data` of N bytes, [offset..offset+length] is visible
// to this buffer.
// The length of the buffer.
length: usize,
}

unsafe impl<T: Sync> Sync for Buffer<T> {}
unsafe impl<T: Send> Send for Buffer<T> {}

impl<T: PartialEq> PartialEq for Buffer<T> {
#[inline]
fn eq(&self, other: &Self) -> bool {
Expand Down Expand Up @@ -80,10 +82,11 @@ impl<T> Buffer<T> {

/// Auxiliary method to create a new Buffer
pub(crate) fn from_bytes(bytes: Bytes<T>) -> Self {
let ptr = bytes.as_ptr();
let length = bytes.len();
Buffer {
data: Arc::new(bytes),
offset: 0,
storage: Arc::new(bytes),
ptr,
length,
}
}
Expand All @@ -97,26 +100,23 @@ impl<T> Buffer<T> {
/// Returns whether the buffer is empty.
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
self.length == 0
}

/// Returns whether underlying data is sliced.
/// If sliced the [`Buffer`] is backed by
/// more data than the length of `Self`.
pub fn is_sliced(&self) -> bool {
self.data.len() != self.length
self.storage.len() != self.length
}

/// Returns the byte slice stored in this buffer
#[inline]
pub fn as_slice(&self) -> &[T] {
// Safety:
// invariant of this struct `offset + length <= data.len()`
debug_assert!(self.offset + self.length <= self.data.len());
unsafe {
self.data
.get_unchecked(self.offset..self.offset + self.length)
}
debug_assert!(self.offset() + self.length <= self.storage.len());
unsafe { std::slice::from_raw_parts(self.ptr, self.length) }
}

/// Returns the byte slice stored in this buffer
Expand All @@ -127,7 +127,7 @@ impl<T> Buffer<T> {
// Safety:
// invariant of this function
debug_assert!(index < self.length);
unsafe { self.data.get_unchecked(self.offset + index) }
unsafe { &*self.ptr.add(index) }
}

/// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
Expand Down Expand Up @@ -173,20 +173,24 @@ impl<T> Buffer<T> {
/// The caller must ensure `offset + length <= self.len()`
#[inline]
pub unsafe fn slice_unchecked(&mut self, offset: usize, length: usize) {
self.offset += offset;
self.ptr = self.ptr.add(offset);
self.length = length;
}

/// Returns a pointer to the start of this buffer.
/// Returns a pointer to the start of the storage underlying this buffer.
#[inline]
pub(crate) fn as_ptr(&self) -> *const T {
self.data.deref().as_ptr()
pub(crate) fn storage_ptr(&self) -> *const T {
self.storage.as_ptr()
}

/// Returns the offset of this buffer.
/// Returns the start offset of this buffer within the underlying storage.
#[inline]
pub fn offset(&self) -> usize {
self.offset
unsafe {
let ret = self.ptr.offset_from(self.storage.as_ptr()) as usize;
debug_assert!(ret <= self.storage.len());
ret
}
}

/// # Safety
Expand All @@ -200,14 +204,14 @@ impl<T> Buffer<T> {
///
/// This operation returns [`Either::Right`] iff this [`Buffer`]:
/// * has not been cloned (i.e. [`Arc`]`::get_mut` yields [`Some`])
/// * has not been imported from the c data interface (FFI)
/// * has not been imported from the C data interface (FFI)
#[inline]
pub fn into_mut(mut self) -> Either<Self, Vec<T>> {
// We loose information if the data is sliced.
if self.length != self.data.len() {
// We lose information if the data is sliced.
if self.is_sliced() {
return Either::Left(self);
}
match Arc::get_mut(&mut self.data)
match Arc::get_mut(&mut self.storage)
.and_then(|b| b.get_vec())
.map(std::mem::take)
{
Expand All @@ -216,65 +220,27 @@ impl<T> Buffer<T> {
}
}

/// Returns a mutable reference to its underlying `Vec`, if possible.
/// Note that only `[self.offset(), self.offset() + self.len()[` in this vector is visible
/// by this buffer.
///
/// This operation returns [`Some`] iff this [`Buffer`]:
/// * has not been cloned (i.e. [`Arc`]`::get_mut` yields [`Some`])
/// * has not been imported from the c data interface (FFI)
/// # Safety
/// The caller must ensure that the vector in the mutable reference keeps a length of at least `self.offset() + self.len() - 1`.
#[inline]
pub unsafe fn get_mut(&mut self) -> Option<&mut Vec<T>> {
Arc::get_mut(&mut self.data).and_then(|b| b.get_vec())
}

/// Returns a mutable reference to its slice, if possible.
///
/// This operation returns [`Some`] iff this [`Buffer`]:
/// * has not been cloned (i.e. [`Arc`]`::get_mut` yields [`Some`])
/// * has not been imported from the c data interface (FFI)
/// * has not been imported from the C data interface (FFI)
#[inline]
pub fn get_mut_slice(&mut self) -> Option<&mut [T]> {
Arc::get_mut(&mut self.data)
.and_then(|b| b.get_vec())
// Safety: the invariant of this struct
.map(|x| unsafe { x.get_unchecked_mut(self.offset..self.offset + self.length) })
let offset = self.offset();
let unique = Arc::get_mut(&mut self.storage)?;
let vec = unique.get_vec()?;
Some(unsafe { vec.get_unchecked_mut(offset..offset + self.length) })
}

/// Get the strong count of underlying `Arc` data buffer.
pub fn shared_count_strong(&self) -> usize {
Arc::strong_count(&self.data)
Arc::strong_count(&self.storage)
}

/// Get the weak count of underlying `Arc` data buffer.
pub fn shared_count_weak(&self) -> usize {
Arc::weak_count(&self.data)
}

/// Returns its internal representation
#[must_use]
pub fn into_inner(self) -> (Arc<Bytes<T>>, usize, usize) {
let Self {
data,
offset,
length,
} = self;
(data, offset, length)
}

/// Creates a `[Bitmap]` from its internal representation.
/// This is the inverted from `[Bitmap::into_inner]`
///
/// # Safety
/// Callers must ensure all invariants of this struct are upheld.
pub unsafe fn from_inner_unchecked(data: Arc<Bytes<T>>, offset: usize, length: usize) -> Self {
Self {
data,
offset,
length,
}
Arc::weak_count(&self.storage)
}
}

Expand All @@ -288,10 +254,12 @@ impl<T> From<Vec<T>> for Buffer<T> {
#[inline]
fn from(p: Vec<T>) -> Self {
let bytes: Bytes<T> = p.into();
let ptr = bytes.as_ptr();
let length = bytes.len();
Self {
offset: 0,
length: bytes.len(),
data: Arc::new(bytes),
storage: Arc::new(bytes),
ptr,
length,
}
}
}
Expand Down Expand Up @@ -332,8 +300,9 @@ impl<T: crate::types::NativeType> From<arrow_buffer::Buffer> for Buffer<T> {
#[cfg(feature = "arrow_rs")]
impl<T: crate::types::NativeType> From<Buffer<T>> for arrow_buffer::Buffer {
fn from(value: Buffer<T>) -> Self {
crate::buffer::to_buffer(value.data).slice_with_length(
value.offset * std::mem::size_of::<T>(),
let offset = value.offset();
crate::buffer::to_buffer(value.storage).slice_with_length(
offset * std::mem::size_of::<T>(),
value.length * std::mem::size_of::<T>(),
)
}
Expand All @@ -343,7 +312,7 @@ unsafe impl<'a, T: 'a> ArrayAccessor<'a> for Buffer<T> {
type Item = &'a T;

unsafe fn value_unchecked(&'a self, index: usize) -> Self::Item {
self.as_slice().get_unchecked(index)
unsafe { &*self.ptr.add(index) }
}

fn len(&self) -> usize {
Expand Down
10 changes: 3 additions & 7 deletions crates/polars-arrow/src/legacy/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ pub fn chunk_to_struct(chunk: Chunk<ArrayRef>, fields: Vec<Field>) -> StructArra
/// [Arc::get_mut]: std::sync::Arc::get_mut
pub fn primitive_to_vec<T: NativeType>(arr: ArrayRef) -> Option<Vec<T>> {
let arr_ref = arr.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
let mut buffer = arr_ref.values().clone();
drop(arr);
// Safety:
// if the `get_mut` is successful
// we are the only owner and we drop it
// so it is safe to take the vec
unsafe { buffer.get_mut().map(std::mem::take) }
let buffer = arr_ref.values().clone();
drop(arr); // Drop original reference so refcount becomes 1 if possible.
buffer.into_mut().right()
}

0 comments on commit 130ab2a

Please sign in to comment.