Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: directly embed data ptr in Buffer #13744

Merged
merged 3 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/array/binary/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ unsafe impl<O: Offset> ToFfi for BinaryArray<O> {
fn buffers(&self) -> Vec<Option<*const u8>> {
vec![
self.validity.as_ref().map(|x| x.as_ptr()),
Some(self.offsets.buffer().as_ptr().cast::<u8>()),
Some(self.values.as_ptr().cast::<u8>()),
Some(self.offsets.buffer().storage_ptr().cast::<u8>()),
Some(self.values.storage_ptr().cast::<u8>()),
]
}

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/array/binview/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ unsafe impl<T: ViewType + ?Sized> ToFfi for BinaryViewArrayGeneric<T> {
fn buffers(&self) -> Vec<Option<*const u8>> {
let mut buffers = Vec::with_capacity(self.buffers.len() + 2);
buffers.push(self.validity.as_ref().map(|x| x.as_ptr()));
buffers.push(Some(self.views.as_ptr().cast::<u8>()));
buffers.extend(self.buffers.iter().map(|b| Some(b.as_ptr())));
buffers.push(Some(self.views.storage_ptr().cast::<u8>()));
buffers.extend(self.buffers.iter().map(|b| Some(b.storage_ptr())));
buffers
}

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ unsafe impl<T: ViewType + ?Sized> Sync for BinaryViewArrayGeneric<T> {}
fn buffers_into_raw<T>(buffers: &[Buffer<T>]) -> Arc<[(*const T, usize)]> {
buffers
.iter()
.map(|buf| (buf.as_ptr(), buf.len()))
.map(|buf| (buf.storage_ptr(), buf.len()))
.collect()
}

Expand Down Expand Up @@ -262,7 +262,7 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
// data: 12 bytes

let bytes = if len <= 12 {
let ptr = self.views.as_ptr() as *const u8;
let ptr = self.views.storage_ptr() as *const u8;
std::slice::from_raw_parts(ptr.add(i * 16 + 4), len as usize)
} else {
let buffer_idx = (v >> 64) as u32;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/fixed_size_binary/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ unsafe impl ToFfi for FixedSizeBinaryArray {
fn buffers(&self) -> Vec<Option<*const u8>> {
vec![
self.validity.as_ref().map(|x| x.as_ptr()),
Some(self.values.as_ptr().cast::<u8>()),
Some(self.values.storage_ptr().cast::<u8>()),
]
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/list/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ unsafe impl<O: Offset> ToFfi for ListArray<O> {
fn buffers(&self) -> Vec<Option<*const u8>> {
vec![
self.validity.as_ref().map(|x| x.as_ptr()),
Some(self.offsets.buffer().as_ptr().cast::<u8>()),
Some(self.offsets.buffer().storage_ptr().cast::<u8>()),
]
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/map/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ unsafe impl ToFfi for MapArray {
fn buffers(&self) -> Vec<Option<*const u8>> {
vec![
self.validity.as_ref().map(|x| x.as_ptr()),
Some(self.offsets.buffer().as_ptr().cast::<u8>()),
Some(self.offsets.buffer().storage_ptr().cast::<u8>()),
]
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/array/primitive/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ unsafe impl<T: NativeType> ToFfi for PrimitiveArray<T> {
fn buffers(&self) -> Vec<Option<*const u8>> {
vec![
self.validity.as_ref().map(|x| x.as_ptr()),
Some(self.values.as_ptr().cast::<u8>()),
Some(self.values.storage_ptr().cast::<u8>()),
]
}

Expand Down
6 changes: 3 additions & 3 deletions crates/polars-arrow/src/array/union/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ unsafe impl ToFfi for UnionArray {
fn buffers(&self) -> Vec<Option<*const u8>> {
if let Some(offsets) = &self.offsets {
vec![
Some(self.types.as_ptr().cast::<u8>()),
Some(offsets.as_ptr().cast::<u8>()),
Some(self.types.storage_ptr().cast::<u8>()),
Some(offsets.storage_ptr().cast::<u8>()),
]
} else {
vec![Some(self.types.as_ptr().cast::<u8>())]
vec![Some(self.types.storage_ptr().cast::<u8>())]
}
}

Expand Down
4 changes: 2 additions & 2 deletions crates/polars-arrow/src/array/utf8/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ unsafe impl<O: Offset> ToFfi for Utf8Array<O> {
fn buffers(&self) -> Vec<Option<*const u8>> {
vec![
self.validity.as_ref().map(|x| x.as_ptr()),
Some(self.offsets.buffer().as_ptr().cast::<u8>()),
Some(self.values.as_ptr().cast::<u8>()),
Some(self.offsets.buffer().storage_ptr().cast::<u8>()),
Some(self.values.storage_ptr().cast::<u8>()),
]
}

Expand Down
123 changes: 46 additions & 77 deletions crates/polars-arrow/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,19 @@ use crate::array::ArrayAccessor;
/// ```
#[derive(Clone)]
pub struct Buffer<T> {
/// the internal byte buffer.
data: Arc<Bytes<T>>,
/// The internal byte buffer.
storage: Arc<Bytes<T>>,

/// The offset into the buffer.
offset: usize,
/// A pointer into the buffer where our data starts.
ptr: *const T,

// the length of the buffer. Given a region `data` of N bytes, [offset..offset+length] is visible
// to this buffer.
// The length of the buffer.
length: usize,
}

unsafe impl<T: Sync> Sync for Buffer<T> {}
unsafe impl<T: Send> Send for Buffer<T> {}

impl<T: PartialEq> PartialEq for Buffer<T> {
#[inline]
fn eq(&self, other: &Self) -> bool {
Expand Down Expand Up @@ -80,10 +82,11 @@ impl<T> Buffer<T> {

/// Auxiliary method to create a new Buffer
pub(crate) fn from_bytes(bytes: Bytes<T>) -> Self {
let ptr = bytes.as_ptr();
let length = bytes.len();
Buffer {
data: Arc::new(bytes),
offset: 0,
storage: Arc::new(bytes),
ptr,
length,
}
}
Expand All @@ -97,26 +100,23 @@ impl<T> Buffer<T> {
/// Returns whether the buffer is empty.
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
self.length == 0
}

/// Returns whether underlying data is sliced.
/// If sliced the [`Buffer`] is backed by
/// more data than the length of `Self`.
pub fn is_sliced(&self) -> bool {
self.data.len() != self.length
self.storage.len() != self.length
}

/// Returns the byte slice stored in this buffer
#[inline]
pub fn as_slice(&self) -> &[T] {
// Safety:
// invariant of this struct `offset + length <= data.len()`
debug_assert!(self.offset + self.length <= self.data.len());
unsafe {
self.data
.get_unchecked(self.offset..self.offset + self.length)
}
debug_assert!(self.offset() + self.length <= self.storage.len());
unsafe { std::slice::from_raw_parts(self.ptr, self.length) }
}

/// Returns the byte slice stored in this buffer
Expand All @@ -127,7 +127,7 @@ impl<T> Buffer<T> {
// Safety:
// invariant of this function
debug_assert!(index < self.length);
unsafe { self.data.get_unchecked(self.offset + index) }
unsafe { &*self.ptr.add(index) }
}

/// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`.
Expand Down Expand Up @@ -173,20 +173,24 @@ impl<T> Buffer<T> {
/// The caller must ensure `offset + length <= self.len()`
#[inline]
pub unsafe fn slice_unchecked(&mut self, offset: usize, length: usize) {
self.offset += offset;
self.ptr = self.ptr.add(offset);
self.length = length;
}

/// Returns a pointer to the start of this buffer.
/// Returns a pointer to the start of the storage underlying this buffer.
#[inline]
pub(crate) fn as_ptr(&self) -> *const T {
self.data.deref().as_ptr()
pub(crate) fn storage_ptr(&self) -> *const T {
self.storage.as_ptr()
}

/// Returns the offset of this buffer.
/// Returns the start offset of this buffer within the underlying storage.
#[inline]
pub fn offset(&self) -> usize {
self.offset
unsafe {
let ret = self.ptr.offset_from(self.storage.as_ptr()) as usize;
debug_assert!(ret <= self.storage.len());
ret
}
}

/// # Safety
Expand All @@ -200,14 +204,14 @@ impl<T> Buffer<T> {
///
/// This operation returns [`Either::Right`] iff this [`Buffer`]:
/// * has not been cloned (i.e. [`Arc`]`::get_mut` yields [`Some`])
/// * has not been imported from the c data interface (FFI)
/// * has not been imported from the C data interface (FFI)
#[inline]
pub fn into_mut(mut self) -> Either<Self, Vec<T>> {
// We loose information if the data is sliced.
if self.length != self.data.len() {
// We lose information if the data is sliced.
if self.is_sliced() {
return Either::Left(self);
}
match Arc::get_mut(&mut self.data)
match Arc::get_mut(&mut self.storage)
.and_then(|b| b.get_vec())
.map(std::mem::take)
{
Expand All @@ -216,65 +220,27 @@ impl<T> Buffer<T> {
}
}

/// Returns a mutable reference to its underlying `Vec`, if possible.
/// Note that only `[self.offset(), self.offset() + self.len()[` in this vector is visible
/// by this buffer.
///
/// This operation returns [`Some`] iff this [`Buffer`]:
/// * has not been cloned (i.e. [`Arc`]`::get_mut` yields [`Some`])
/// * has not been imported from the c data interface (FFI)
/// # Safety
/// The caller must ensure that the vector in the mutable reference keeps a length of at least `self.offset() + self.len() - 1`.
#[inline]
pub unsafe fn get_mut(&mut self) -> Option<&mut Vec<T>> {
Arc::get_mut(&mut self.data).and_then(|b| b.get_vec())
}

/// Returns a mutable reference to its slice, if possible.
///
/// This operation returns [`Some`] iff this [`Buffer`]:
/// * has not been cloned (i.e. [`Arc`]`::get_mut` yields [`Some`])
/// * has not been imported from the c data interface (FFI)
/// * has not been imported from the C data interface (FFI)
#[inline]
pub fn get_mut_slice(&mut self) -> Option<&mut [T]> {
Arc::get_mut(&mut self.data)
.and_then(|b| b.get_vec())
// Safety: the invariant of this struct
.map(|x| unsafe { x.get_unchecked_mut(self.offset..self.offset + self.length) })
let offset = self.offset();
let unique = Arc::get_mut(&mut self.storage)?;
let vec = unique.get_vec()?;
Some(unsafe { vec.get_unchecked_mut(offset..offset + self.length) })
}

/// Get the strong count of underlying `Arc` data buffer.
pub fn shared_count_strong(&self) -> usize {
Arc::strong_count(&self.data)
Arc::strong_count(&self.storage)
}

/// Get the weak count of underlying `Arc` data buffer.
pub fn shared_count_weak(&self) -> usize {
Arc::weak_count(&self.data)
}

/// Returns its internal representation
#[must_use]
pub fn into_inner(self) -> (Arc<Bytes<T>>, usize, usize) {
let Self {
data,
offset,
length,
} = self;
(data, offset, length)
}

/// Creates a `[Bitmap]` from its internal representation.
/// This is the inverted from `[Bitmap::into_inner]`
///
/// # Safety
/// Callers must ensure all invariants of this struct are upheld.
pub unsafe fn from_inner_unchecked(data: Arc<Bytes<T>>, offset: usize, length: usize) -> Self {
Self {
data,
offset,
length,
}
Arc::weak_count(&self.storage)
}
}

Expand All @@ -288,10 +254,12 @@ impl<T> From<Vec<T>> for Buffer<T> {
#[inline]
fn from(p: Vec<T>) -> Self {
let bytes: Bytes<T> = p.into();
let ptr = bytes.as_ptr();
let length = bytes.len();
Self {
offset: 0,
length: bytes.len(),
data: Arc::new(bytes),
storage: Arc::new(bytes),
ptr,
length,
}
}
}
Expand Down Expand Up @@ -332,8 +300,9 @@ impl<T: crate::types::NativeType> From<arrow_buffer::Buffer> for Buffer<T> {
#[cfg(feature = "arrow_rs")]
impl<T: crate::types::NativeType> From<Buffer<T>> for arrow_buffer::Buffer {
fn from(value: Buffer<T>) -> Self {
crate::buffer::to_buffer(value.data).slice_with_length(
value.offset * std::mem::size_of::<T>(),
let offset = value.offset();
crate::buffer::to_buffer(value.storage).slice_with_length(
offset * std::mem::size_of::<T>(),
value.length * std::mem::size_of::<T>(),
)
}
Expand All @@ -343,7 +312,7 @@ unsafe impl<'a, T: 'a> ArrayAccessor<'a> for Buffer<T> {
type Item = &'a T;

unsafe fn value_unchecked(&'a self, index: usize) -> Self::Item {
self.as_slice().get_unchecked(index)
unsafe { &*self.ptr.add(index) }
}

fn len(&self) -> usize {
Expand Down
10 changes: 3 additions & 7 deletions crates/polars-arrow/src/legacy/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,7 @@ pub fn chunk_to_struct(chunk: Chunk<ArrayRef>, fields: Vec<Field>) -> StructArra
/// [Arc::get_mut]: std::sync::Arc::get_mut
pub fn primitive_to_vec<T: NativeType>(arr: ArrayRef) -> Option<Vec<T>> {
let arr_ref = arr.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
let mut buffer = arr_ref.values().clone();
drop(arr);
// Safety:
// if the `get_mut` is successful
// we are the only owner and we drop it
// so it is safe to take the vec
unsafe { buffer.get_mut().map(std::mem::take) }
let buffer = arr_ref.values().clone();
drop(arr); // Drop original reference so refcount becomes 1 if possible.
buffer.into_mut().right()
}
Loading