diff --git a/crates/polars-arrow/src/array/binary/ffi.rs b/crates/polars-arrow/src/array/binary/ffi.rs index 5b16f18dcf50..c135c8d3d8dd 100644 --- a/crates/polars-arrow/src/array/binary/ffi.rs +++ b/crates/polars-arrow/src/array/binary/ffi.rs @@ -10,8 +10,8 @@ unsafe impl ToFfi for BinaryArray { fn buffers(&self) -> Vec> { vec![ self.validity.as_ref().map(|x| x.as_ptr()), - Some(self.offsets.buffer().as_ptr().cast::()), - Some(self.values.as_ptr().cast::()), + Some(self.offsets.buffer().storage_ptr().cast::()), + Some(self.values.storage_ptr().cast::()), ] } diff --git a/crates/polars-arrow/src/array/binview/ffi.rs b/crates/polars-arrow/src/array/binview/ffi.rs index d5fffc9919a5..71e1e56abf5d 100644 --- a/crates/polars-arrow/src/array/binview/ffi.rs +++ b/crates/polars-arrow/src/array/binview/ffi.rs @@ -12,8 +12,8 @@ unsafe impl ToFfi for BinaryViewArrayGeneric { fn buffers(&self) -> Vec> { let mut buffers = Vec::with_capacity(self.buffers.len() + 2); buffers.push(self.validity.as_ref().map(|x| x.as_ptr())); - buffers.push(Some(self.views.as_ptr().cast::())); - buffers.extend(self.buffers.iter().map(|b| Some(b.as_ptr()))); + buffers.push(Some(self.views.storage_ptr().cast::())); + buffers.extend(self.buffers.iter().map(|b| Some(b.storage_ptr()))); buffers } diff --git a/crates/polars-arrow/src/array/binview/mod.rs b/crates/polars-arrow/src/array/binview/mod.rs index 95e0f00fd100..325966d5ecdf 100644 --- a/crates/polars-arrow/src/array/binview/mod.rs +++ b/crates/polars-arrow/src/array/binview/mod.rs @@ -126,7 +126,7 @@ unsafe impl Sync for BinaryViewArrayGeneric {} fn buffers_into_raw(buffers: &[Buffer]) -> Arc<[(*const T, usize)]> { buffers .iter() - .map(|buf| (buf.as_ptr(), buf.len())) + .map(|buf| (buf.storage_ptr(), buf.len())) .collect() } @@ -262,7 +262,7 @@ impl BinaryViewArrayGeneric { // data: 12 bytes let bytes = if len <= 12 { - let ptr = self.views.as_ptr() as *const u8; + let ptr = self.views.storage_ptr() as *const u8; std::slice::from_raw_parts(ptr.add(i * 16 + 4), len as usize) } else { let buffer_idx = (v >> 64) as u32; diff --git a/crates/polars-arrow/src/array/fixed_size_binary/ffi.rs b/crates/polars-arrow/src/array/fixed_size_binary/ffi.rs index aaa38e461eca..43af7fef58ad 100644 --- a/crates/polars-arrow/src/array/fixed_size_binary/ffi.rs +++ b/crates/polars-arrow/src/array/fixed_size_binary/ffi.rs @@ -9,7 +9,7 @@ unsafe impl ToFfi for FixedSizeBinaryArray { fn buffers(&self) -> Vec> { vec![ self.validity.as_ref().map(|x| x.as_ptr()), - Some(self.values.as_ptr().cast::()), + Some(self.values.storage_ptr().cast::()), ] } diff --git a/crates/polars-arrow/src/array/list/ffi.rs b/crates/polars-arrow/src/array/list/ffi.rs index 2709634681c7..e536a713cbc2 100644 --- a/crates/polars-arrow/src/array/list/ffi.rs +++ b/crates/polars-arrow/src/array/list/ffi.rs @@ -12,7 +12,7 @@ unsafe impl ToFfi for ListArray { fn buffers(&self) -> Vec> { vec![ self.validity.as_ref().map(|x| x.as_ptr()), - Some(self.offsets.buffer().as_ptr().cast::()), + Some(self.offsets.buffer().storage_ptr().cast::()), ] } diff --git a/crates/polars-arrow/src/array/map/ffi.rs b/crates/polars-arrow/src/array/map/ffi.rs index 3436e06b6360..fad531671703 100644 --- a/crates/polars-arrow/src/array/map/ffi.rs +++ b/crates/polars-arrow/src/array/map/ffi.rs @@ -12,7 +12,7 @@ unsafe impl ToFfi for MapArray { fn buffers(&self) -> Vec> { vec![ self.validity.as_ref().map(|x| x.as_ptr()), - Some(self.offsets.buffer().as_ptr().cast::()), + Some(self.offsets.buffer().storage_ptr().cast::()), ] } diff --git a/crates/polars-arrow/src/array/primitive/ffi.rs b/crates/polars-arrow/src/array/primitive/ffi.rs index 22b7f3cfacad..ae22cf2e9a9c 100644 --- a/crates/polars-arrow/src/array/primitive/ffi.rs +++ b/crates/polars-arrow/src/array/primitive/ffi.rs @@ -10,7 +10,7 @@ unsafe impl ToFfi for PrimitiveArray { fn buffers(&self) -> Vec> { vec![ self.validity.as_ref().map(|x| x.as_ptr()), - Some(self.values.as_ptr().cast::()), + Some(self.values.storage_ptr().cast::()), ] } diff --git a/crates/polars-arrow/src/array/union/ffi.rs b/crates/polars-arrow/src/array/union/ffi.rs index 4cbcb2d35ced..1510b29e2588 100644 --- a/crates/polars-arrow/src/array/union/ffi.rs +++ b/crates/polars-arrow/src/array/union/ffi.rs @@ -10,11 +10,11 @@ unsafe impl ToFfi for UnionArray { fn buffers(&self) -> Vec> { if let Some(offsets) = &self.offsets { vec![ - Some(self.types.as_ptr().cast::()), - Some(offsets.as_ptr().cast::()), + Some(self.types.storage_ptr().cast::()), + Some(offsets.storage_ptr().cast::()), ] } else { - vec![Some(self.types.as_ptr().cast::())] + vec![Some(self.types.storage_ptr().cast::())] } } diff --git a/crates/polars-arrow/src/array/utf8/ffi.rs b/crates/polars-arrow/src/array/utf8/ffi.rs index 8328b8a66f2a..5bdced4df6f1 100644 --- a/crates/polars-arrow/src/array/utf8/ffi.rs +++ b/crates/polars-arrow/src/array/utf8/ffi.rs @@ -10,8 +10,8 @@ unsafe impl ToFfi for Utf8Array { fn buffers(&self) -> Vec> { vec![ self.validity.as_ref().map(|x| x.as_ptr()), - Some(self.offsets.buffer().as_ptr().cast::()), - Some(self.values.as_ptr().cast::()), + Some(self.offsets.buffer().storage_ptr().cast::()), + Some(self.values.storage_ptr().cast::()), ] } diff --git a/crates/polars-arrow/src/buffer/immutable.rs b/crates/polars-arrow/src/buffer/immutable.rs index 8435cba4d725..5371cc71030c 100644 --- a/crates/polars-arrow/src/buffer/immutable.rs +++ b/crates/polars-arrow/src/buffer/immutable.rs @@ -40,17 +40,19 @@ use crate::array::ArrayAccessor; /// ``` #[derive(Clone)] pub struct Buffer { - /// the internal byte buffer. - data: Arc>, + /// The internal byte buffer. + storage: Arc>, - /// The offset into the buffer. - offset: usize, + /// A pointer into the buffer where our data starts. + ptr: *const T, - // the length of the buffer. Given a region `data` of N bytes, [offset..offset+length] is visible - // to this buffer. + // The length of the buffer. length: usize, } +unsafe impl Sync for Buffer {} +unsafe impl Send for Buffer {} + impl PartialEq for Buffer { #[inline] fn eq(&self, other: &Self) -> bool { @@ -80,10 +82,11 @@ impl Buffer { /// Auxiliary method to create a new Buffer pub(crate) fn from_bytes(bytes: Bytes) -> Self { + let ptr = bytes.as_ptr(); let length = bytes.len(); Buffer { - data: Arc::new(bytes), - offset: 0, + storage: Arc::new(bytes), + ptr, length, } } @@ -97,14 +100,14 @@ impl Buffer { /// Returns whether the buffer is empty. #[inline] pub fn is_empty(&self) -> bool { - self.len() == 0 + self.length == 0 } /// Returns whether underlying data is sliced. /// If sliced the [`Buffer`] is backed by /// more data than the length of `Self`. pub fn is_sliced(&self) -> bool { - self.data.len() != self.length + self.storage.len() != self.length } /// Returns the byte slice stored in this buffer @@ -112,11 +115,8 @@ impl Buffer { pub fn as_slice(&self) -> &[T] { // Safety: // invariant of this struct `offset + length <= data.len()` - debug_assert!(self.offset + self.length <= self.data.len()); - unsafe { - self.data - .get_unchecked(self.offset..self.offset + self.length) - } + debug_assert!(self.offset() + self.length <= self.storage.len()); + unsafe { std::slice::from_raw_parts(self.ptr, self.length) } } /// Returns the byte slice stored in this buffer @@ -127,7 +127,7 @@ impl Buffer { // Safety: // invariant of this function debug_assert!(index < self.length); - unsafe { self.data.get_unchecked(self.offset + index) } + unsafe { &*self.ptr.add(index) } } /// Returns a new [`Buffer`] that is a slice of this buffer starting at `offset`. @@ -173,20 +173,24 @@ impl Buffer { /// The caller must ensure `offset + length <= self.len()` #[inline] pub unsafe fn slice_unchecked(&mut self, offset: usize, length: usize) { - self.offset += offset; + self.ptr = self.ptr.add(offset); self.length = length; } - /// Returns a pointer to the start of this buffer. + /// Returns a pointer to the start of the storage underlying this buffer. #[inline] - pub(crate) fn as_ptr(&self) -> *const T { - self.data.deref().as_ptr() + pub(crate) fn storage_ptr(&self) -> *const T { + self.storage.as_ptr() } - /// Returns the offset of this buffer. + /// Returns the start offset of this buffer within the underlying storage. #[inline] pub fn offset(&self) -> usize { - self.offset + unsafe { + let ret = self.ptr.offset_from(self.storage.as_ptr()) as usize; + debug_assert!(ret <= self.storage.len()); + ret + } } /// # Safety @@ -200,14 +204,14 @@ impl Buffer { /// /// This operation returns [`Either::Right`] iff this [`Buffer`]: /// * has not been cloned (i.e. [`Arc`]`::get_mut` yields [`Some`]) - /// * has not been imported from the c data interface (FFI) + /// * has not been imported from the C data interface (FFI) #[inline] pub fn into_mut(mut self) -> Either> { - // We loose information if the data is sliced. - if self.length != self.data.len() { + // We lose information if the data is sliced. + if self.is_sliced() { return Either::Left(self); } - match Arc::get_mut(&mut self.data) + match Arc::get_mut(&mut self.storage) .and_then(|b| b.get_vec()) .map(std::mem::take) { @@ -216,65 +220,27 @@ impl Buffer { } } - /// Returns a mutable reference to its underlying `Vec`, if possible. - /// Note that only `[self.offset(), self.offset() + self.len()[` in this vector is visible - /// by this buffer. - /// - /// This operation returns [`Some`] iff this [`Buffer`]: - /// * has not been cloned (i.e. [`Arc`]`::get_mut` yields [`Some`]) - /// * has not been imported from the c data interface (FFI) - /// # Safety - /// The caller must ensure that the vector in the mutable reference keeps a length of at least `self.offset() + self.len() - 1`. - #[inline] - pub unsafe fn get_mut(&mut self) -> Option<&mut Vec> { - Arc::get_mut(&mut self.data).and_then(|b| b.get_vec()) - } - /// Returns a mutable reference to its slice, if possible. /// /// This operation returns [`Some`] iff this [`Buffer`]: /// * has not been cloned (i.e. [`Arc`]`::get_mut` yields [`Some`]) - /// * has not been imported from the c data interface (FFI) + /// * has not been imported from the C data interface (FFI) #[inline] pub fn get_mut_slice(&mut self) -> Option<&mut [T]> { - Arc::get_mut(&mut self.data) - .and_then(|b| b.get_vec()) - // Safety: the invariant of this struct - .map(|x| unsafe { x.get_unchecked_mut(self.offset..self.offset + self.length) }) + let offset = self.offset(); + let unique = Arc::get_mut(&mut self.storage)?; + let vec = unique.get_vec()?; + Some(unsafe { vec.get_unchecked_mut(offset..offset + self.length) }) } /// Get the strong count of underlying `Arc` data buffer. pub fn shared_count_strong(&self) -> usize { - Arc::strong_count(&self.data) + Arc::strong_count(&self.storage) } /// Get the weak count of underlying `Arc` data buffer. pub fn shared_count_weak(&self) -> usize { - Arc::weak_count(&self.data) - } - - /// Returns its internal representation - #[must_use] - pub fn into_inner(self) -> (Arc>, usize, usize) { - let Self { - data, - offset, - length, - } = self; - (data, offset, length) - } - - /// Creates a `[Bitmap]` from its internal representation. - /// This is the inverted from `[Bitmap::into_inner]` - /// - /// # Safety - /// Callers must ensure all invariants of this struct are upheld. - pub unsafe fn from_inner_unchecked(data: Arc>, offset: usize, length: usize) -> Self { - Self { - data, - offset, - length, - } + Arc::weak_count(&self.storage) } } @@ -288,10 +254,12 @@ impl From> for Buffer { #[inline] fn from(p: Vec) -> Self { let bytes: Bytes = p.into(); + let ptr = bytes.as_ptr(); + let length = bytes.len(); Self { - offset: 0, - length: bytes.len(), - data: Arc::new(bytes), + storage: Arc::new(bytes), + ptr, + length, } } } @@ -332,8 +300,9 @@ impl From for Buffer { #[cfg(feature = "arrow_rs")] impl From> for arrow_buffer::Buffer { fn from(value: Buffer) -> Self { - crate::buffer::to_buffer(value.data).slice_with_length( - value.offset * std::mem::size_of::(), + let offset = value.offset(); + crate::buffer::to_buffer(value.storage).slice_with_length( + offset * std::mem::size_of::(), value.length * std::mem::size_of::(), ) } @@ -343,7 +312,7 @@ unsafe impl<'a, T: 'a> ArrayAccessor<'a> for Buffer { type Item = &'a T; unsafe fn value_unchecked(&'a self, index: usize) -> Self::Item { - self.as_slice().get_unchecked(index) + unsafe { &*self.ptr.add(index) } } fn len(&self) -> usize { diff --git a/crates/polars-arrow/src/legacy/conversion.rs b/crates/polars-arrow/src/legacy/conversion.rs index 77ed4cefda07..03cfafa452cf 100644 --- a/crates/polars-arrow/src/legacy/conversion.rs +++ b/crates/polars-arrow/src/legacy/conversion.rs @@ -18,11 +18,7 @@ pub fn chunk_to_struct(chunk: Chunk, fields: Vec) -> StructArra /// [Arc::get_mut]: std::sync::Arc::get_mut pub fn primitive_to_vec(arr: ArrayRef) -> Option> { let arr_ref = arr.as_any().downcast_ref::>().unwrap(); - let mut buffer = arr_ref.values().clone(); - drop(arr); - // Safety: - // if the `get_mut` is successful - // we are the only owner and we drop it - // so it is safe to take the vec - unsafe { buffer.get_mut().map(std::mem::take) } + let buffer = arr_ref.values().clone(); + drop(arr); // Drop original reference so refcount becomes 1 if possible. + buffer.into_mut().right() }