From 47e4b6166d67c50c87d99cd18efd770d5c331918 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 23 Feb 2023 09:52:34 +0000 Subject: [PATCH] Use Typed Buffers in Arrays (#1811) (#1176) (#3743) * Remove RawPtrBox (#1811) (#1176) * Clippy * Extract get_offsets function --- arrow-array/src/array/boolean_array.rs | 19 ++--- arrow-array/src/array/byte_array.rs | 42 +++++------ .../src/array/fixed_size_binary_array.rs | 10 +-- .../src/array/fixed_size_list_array.rs | 5 +- arrow-array/src/array/list_array.rs | 46 +++--------- arrow-array/src/array/map_array.rs | 27 ++----- arrow-array/src/array/mod.rs | 27 ++++++- arrow-array/src/array/primitive_array.rs | 42 ++++------- arrow-array/src/lib.rs | 1 - arrow-array/src/raw_pointer.rs | 75 ------------------- arrow-array/src/record_batch.rs | 2 +- arrow-buffer/src/buffer/immutable.rs | 70 ++++++++++++----- arrow-buffer/src/buffer/mod.rs | 2 + arrow-buffer/src/buffer/mutable.rs | 4 +- arrow-buffer/src/buffer/offset.rs | 58 ++++++++++++++ arrow-buffer/src/buffer/scalar.rs | 71 +++++++++++------- arrow-buffer/src/bytes.rs | 2 +- 17 files changed, 242 insertions(+), 261 deletions(-) delete mode 100644 arrow-array/src/raw_pointer.rs create mode 100644 arrow-buffer/src/buffer/offset.rs diff --git a/arrow-array/src/array/boolean_array.rs b/arrow-array/src/array/boolean_array.rs index 4c83dcf411d4..428a721ddb6c 100644 --- a/arrow-array/src/array/boolean_array.rs +++ b/arrow-array/src/array/boolean_array.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. +use crate::array::print_long_array; use crate::builder::BooleanBuilder; use crate::iterator::BooleanIter; -use crate::raw_pointer::RawPtrBox; -use crate::{print_long_array, Array, ArrayAccessor}; +use crate::{Array, ArrayAccessor}; use arrow_buffer::{bit_util, Buffer, MutableBuffer}; use arrow_data::bit_mask::combine_option_bitmap; use arrow_data::ArrayData; @@ -67,9 +67,7 @@ use std::any::Any; #[derive(Clone)] pub struct BooleanArray { data: ArrayData, - /// Pointer to the value array. The lifetime of this must be <= to the value buffer - /// stored in `data`, so it's safe to store. - raw_values: RawPtrBox, + raw_values: Buffer, } impl std::fmt::Debug for BooleanArray { @@ -102,7 +100,7 @@ impl BooleanArray { /// /// Note this doesn't take the offset of this array into account. pub fn values(&self) -> &Buffer { - &self.data.buffers()[0] + &self.raw_values } /// Returns the number of non null, true values within this array @@ -328,13 +326,8 @@ impl From for BooleanArray { 1, "BooleanArray data should contain a single buffer only (values buffer)" ); - let ptr = data.buffers()[0].as_ptr(); - Self { - data, - // SAFETY: - // ArrayData must be valid, and validated data type above - raw_values: unsafe { RawPtrBox::new(ptr) }, - } + let raw_values = data.buffers()[0].clone(); + Self { data, raw_values } } } diff --git a/arrow-array/src/array/byte_array.rs b/arrow-array/src/array/byte_array.rs index 2cb04efb8e89..f6946228c85c 100644 --- a/arrow-array/src/array/byte_array.rs +++ b/arrow-array/src/array/byte_array.rs @@ -15,14 +15,14 @@ // specific language governing permissions and limitations // under the License. -use crate::array::{empty_offsets, print_long_array}; +use crate::array::{get_offsets, print_long_array}; use crate::builder::GenericByteBuilder; use crate::iterator::ArrayIter; -use crate::raw_pointer::RawPtrBox; use crate::types::bytes::ByteArrayNativeType; use crate::types::ByteArrayType; use crate::{Array, ArrayAccessor, OffsetSizeTrait}; -use arrow_buffer::ArrowNativeType; +use arrow_buffer::buffer::OffsetBuffer; +use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_data::ArrayData; use arrow_schema::DataType; use std::any::Any; @@ -39,16 +39,16 @@ use std::any::Any; /// [`LargeBinaryArray`]: crate::LargeBinaryArray pub struct GenericByteArray { data: ArrayData, - value_offsets: RawPtrBox, - value_data: RawPtrBox, + value_offsets: OffsetBuffer, + value_data: Buffer, } impl Clone for GenericByteArray { fn clone(&self) -> Self { Self { data: self.data.clone(), - value_offsets: self.value_offsets, - value_data: self.value_data, + value_offsets: self.value_offsets.clone(), + value_data: self.value_data.clone(), } } } @@ -68,7 +68,7 @@ impl GenericByteArray { /// Returns the raw value data pub fn value_data(&self) -> &[u8] { - self.data.buffers()[1].as_slice() + self.value_data.as_slice() } /// Returns true if all data within this array is ASCII @@ -82,15 +82,7 @@ impl GenericByteArray { /// Returns the offset values in the offsets buffer #[inline] pub fn value_offsets(&self) -> &[T::Offset] { - // Soundness - // pointer alignment & location is ensured by RawPtrBox - // buffer bounds/offset is ensured by the ArrayData instance. - unsafe { - std::slice::from_raw_parts( - self.value_offsets.as_ptr().add(self.data.offset()), - self.len() + 1, - ) - } + &self.value_offsets } /// Returns the element at index `i` @@ -161,6 +153,8 @@ impl GenericByteArray { .slice_with_length(self.data.offset() * element_len, value_len * element_len); drop(self.data); + drop(self.value_data); + drop(self.value_offsets); let try_mutable_null_buffer = match null_bit_buffer { None => Ok(None), @@ -280,18 +274,16 @@ impl From for GenericByteArray { T::Offset::PREFIX, T::PREFIX, ); - // Handle case of empty offsets - let offsets = match data.is_empty() && data.buffers()[0].is_empty() { - true => empty_offsets::().as_ptr() as *const _, - false => data.buffers()[0].as_ptr(), - }; - let values = data.buffers()[1].as_ptr(); + // SAFETY: + // ArrayData is valid, and verified type above + let value_offsets = unsafe { get_offsets(&data) }; + let value_data = data.buffers()[1].clone(); Self { data, // SAFETY: // ArrayData must be valid, and validated data type above - value_offsets: unsafe { RawPtrBox::new(offsets) }, - value_data: unsafe { RawPtrBox::new(values) }, + value_offsets, + value_data, } } } diff --git a/arrow-array/src/array/fixed_size_binary_array.rs b/arrow-array/src/array/fixed_size_binary_array.rs index 936fb3025cd4..89ace430d8af 100644 --- a/arrow-array/src/array/fixed_size_binary_array.rs +++ b/arrow-array/src/array/fixed_size_binary_array.rs @@ -15,9 +15,9 @@ // specific language governing permissions and limitations // under the License. +use crate::array::print_long_array; use crate::iterator::FixedSizeBinaryIter; -use crate::raw_pointer::RawPtrBox; -use crate::{print_long_array, Array, ArrayAccessor, FixedSizeListArray}; +use crate::{Array, ArrayAccessor, FixedSizeListArray}; use arrow_buffer::{bit_util, Buffer, MutableBuffer}; use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType}; @@ -50,7 +50,7 @@ use std::any::Any; #[derive(Clone)] pub struct FixedSizeBinaryArray { data: ArrayData, - value_data: RawPtrBox, + value_data: Buffer, length: i32, } @@ -357,14 +357,14 @@ impl From for FixedSizeBinaryArray { 1, "FixedSizeBinaryArray data should contain 1 buffer only (values)" ); - let value_data = data.buffers()[0].as_ptr(); + let value_data = data.buffers()[0].clone(); let length = match data.data_type() { DataType::FixedSizeBinary(len) => *len, _ => panic!("Expected data type to be FixedSizeBinary"), }; Self { data, - value_data: unsafe { RawPtrBox::new(value_data) }, + value_data, length, } } diff --git a/arrow-array/src/array/fixed_size_list_array.rs b/arrow-array/src/array/fixed_size_list_array.rs index c361d2d4462b..6e228ba3c770 100644 --- a/arrow-array/src/array/fixed_size_list_array.rs +++ b/arrow-array/src/array/fixed_size_list_array.rs @@ -15,10 +15,9 @@ // specific language governing permissions and limitations // under the License. +use crate::array::print_long_array; use crate::builder::{FixedSizeListBuilder, PrimitiveBuilder}; -use crate::{ - make_array, print_long_array, Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType, -}; +use crate::{make_array, Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType}; use arrow_data::ArrayData; use arrow_schema::DataType; use std::any::Any; diff --git a/arrow-array/src/array/list_array.rs b/arrow-array/src/array/list_array.rs index b378549ebf20..6b63269d1615 100644 --- a/arrow-array/src/array/list_array.rs +++ b/arrow-array/src/array/list_array.rs @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -use crate::array::make_array; +use crate::array::{get_offsets, make_array, print_long_array}; use crate::builder::{GenericListBuilder, PrimitiveBuilder}; use crate::{ - iterator::GenericListArrayIter, print_long_array, raw_pointer::RawPtrBox, Array, - ArrayAccessor, ArrayRef, ArrowPrimitiveType, + iterator::GenericListArrayIter, Array, ArrayAccessor, ArrayRef, ArrowPrimitiveType, }; +use arrow_buffer::buffer::OffsetBuffer; use arrow_buffer::ArrowNativeType; use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType, Field}; @@ -45,35 +45,24 @@ impl OffsetSizeTrait for i64 { const PREFIX: &'static str = "Large"; } -/// Returns a slice of `OffsetSize` consisting of a single zero value -#[inline] -pub(crate) fn empty_offsets() -> &'static [OffsetSize] { - static OFFSET: &[i64] = &[0]; - // SAFETY: - // OffsetSize is ArrowNativeType and is therefore trivially transmutable - let (prefix, val, suffix) = unsafe { OFFSET.align_to::() }; - assert!(prefix.is_empty() && suffix.is_empty()); - val -} - /// Generic struct for a variable-size list array. /// /// Columnar format in Apache Arrow: /// /// /// For non generic lists, you may wish to consider using [`ListArray`] or [`LargeListArray`]` -pub struct GenericListArray { +pub struct GenericListArray { data: ArrayData, values: ArrayRef, - value_offsets: RawPtrBox, + value_offsets: OffsetBuffer, } -impl Clone for GenericListArray { +impl Clone for GenericListArray { fn clone(&self) -> Self { Self { data: self.data.clone(), values: self.values.clone(), - value_offsets: self.value_offsets, + value_offsets: self.value_offsets.clone(), } } } @@ -118,15 +107,7 @@ impl GenericListArray { /// Returns the offset values in the offsets buffer #[inline] pub fn value_offsets(&self) -> &[OffsetSize] { - // Soundness - // pointer alignment & location is ensured by RawPtrBox - // buffer bounds/offset is ensured by the ArrayData instance. - unsafe { - std::slice::from_raw_parts( - self.value_offsets.as_ptr().add(self.data.offset()), - self.len() + 1, - ) - } + &self.value_offsets } /// Returns the length for value at index `i`. @@ -242,15 +223,10 @@ impl GenericListArray { } let values = make_array(values); - // Handle case of empty offsets - let offsets = match data.is_empty() && data.buffers()[0].is_empty() { - true => empty_offsets::().as_ptr() as *const _, - false => data.buffers()[0].as_ptr(), - }; - // SAFETY: - // Verified list type in call to `Self::get_type` - let value_offsets = unsafe { RawPtrBox::new(offsets) }; + // ArrayData is valid, and verified type above + let value_offsets = unsafe { get_offsets(&data) }; + Ok(Self { data, values, diff --git a/arrow-array/src/array/map_array.rs b/arrow-array/src/array/map_array.rs index b0eb4a3c98ab..8c9b02921781 100644 --- a/arrow-array/src/array/map_array.rs +++ b/arrow-array/src/array/map_array.rs @@ -15,8 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::raw_pointer::RawPtrBox; -use crate::{make_array, print_long_array, Array, ArrayRef, StringArray, StructArray}; +use crate::array::{get_offsets, print_long_array}; +use crate::{make_array, Array, ArrayRef, StringArray, StructArray}; +use arrow_buffer::buffer::OffsetBuffer; use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice}; use arrow_data::ArrayData; use arrow_schema::{ArrowError, DataType, Field}; @@ -38,7 +39,7 @@ pub struct MapArray { /// The second child of `entries`, the "values" of this MapArray values: ArrayRef, /// The start and end offsets of each entry - value_offsets: RawPtrBox, + value_offsets: OffsetBuffer, } impl MapArray { @@ -86,15 +87,7 @@ impl MapArray { /// Returns the offset values in the offsets buffer #[inline] pub fn value_offsets(&self) -> &[i32] { - // Soundness - // pointer alignment & location is ensured by RawPtrBox - // buffer bounds/offset is ensured by the ArrayData instance. - unsafe { - std::slice::from_raw_parts( - self.value_offsets.as_ptr().add(self.data.offset()), - self.len() + 1, - ) - } + &self.value_offsets } /// Returns the length for value at index `i`. @@ -159,18 +152,10 @@ impl MapArray { let keys = make_array(entries.child_data()[0].clone()); let values = make_array(entries.child_data()[1].clone()); let entries = make_array(entries); - let value_offsets = data.buffers()[0].as_ptr(); // SAFETY: // ArrayData is valid, and verified type above - let value_offsets = unsafe { RawPtrBox::::new(value_offsets) }; - unsafe { - if (*value_offsets.as_ptr().offset(0)) != 0 { - return Err(ArrowError::InvalidArgumentError(String::from( - "offsets do not start at zero", - ))); - } - } + let value_offsets = unsafe { get_offsets(&data) }; Ok(Self { data, diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index b293d797e46e..27973a40faa9 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -20,6 +20,8 @@ mod binary_array; use crate::types::*; +use arrow_buffer::buffer::{OffsetBuffer, ScalarBuffer}; +use arrow_buffer::ArrowNativeType; use arrow_data::ArrayData; use arrow_schema::{DataType, IntervalUnit, TimeUnit}; use std::any::Any; @@ -636,8 +638,29 @@ pub fn new_null_array(data_type: &DataType, length: usize) -> ArrayRef { make_array(ArrayData::new_null(data_type, length)) } -// Helper function for printing potentially long arrays. -pub(crate) fn print_long_array( +/// Helper function that gets offset from an [`ArrayData`] +/// +/// # Safety +/// +/// - ArrayData must contain a valid [`OffsetBuffer`] as its first buffer +unsafe fn get_offsets(data: &ArrayData) -> OffsetBuffer { + match data.is_empty() && data.buffers()[0].is_empty() { + true => OffsetBuffer::new_empty(), + false => { + let buffer = ScalarBuffer::new( + data.buffers()[0].clone(), + data.offset(), + data.len() + 1, + ); + // Safety: + // ArrayData is valid + unsafe { OffsetBuffer::new_unchecked(buffer) } + } + } +} + +/// Helper function for printing potentially long arrays. +fn print_long_array( array: &A, f: &mut std::fmt::Formatter, print_item: F, diff --git a/arrow-array/src/array/primitive_array.rs b/arrow-array/src/array/primitive_array.rs index b64534e9835f..53217a06f497 100644 --- a/arrow-array/src/array/primitive_array.rs +++ b/arrow-array/src/array/primitive_array.rs @@ -15,16 +15,17 @@ // specific language governing permissions and limitations // under the License. +use crate::array::print_long_array; use crate::builder::{BooleanBufferBuilder, BufferBuilder, PrimitiveBuilder}; use crate::iterator::PrimitiveIter; -use crate::raw_pointer::RawPtrBox; use crate::temporal_conversions::{ as_date, as_datetime, as_datetime_with_timezone, as_duration, as_time, }; use crate::timezone::Tz; use crate::trusted_len::trusted_len_unzip; -use crate::{print_long_array, Array, ArrayAccessor}; use crate::{types::*, ArrowNativeTypeOp}; +use crate::{Array, ArrayAccessor}; +use arrow_buffer::buffer::ScalarBuffer; use arrow_buffer::{i256, ArrowNativeType, Buffer}; use arrow_data::bit_iterator::try_for_each_valid_idx; use arrow_data::ArrayData; @@ -266,22 +267,16 @@ pub trait ArrowPrimitiveType: 'static { /// ``` pub struct PrimitiveArray { /// Underlying ArrayData - /// # Safety - /// must have exactly one buffer, aligned to type T data: ArrayData, - /// Pointer to the value array. The lifetime of this must be <= to the value buffer - /// stored in `data`, so it's safe to store. - /// # Safety - /// raw_values must have a value equivalent to `data.buffers()[0].raw_data()` - /// raw_values must have alignment for type T::NativeType - raw_values: RawPtrBox, + /// Values data + raw_values: ScalarBuffer, } impl Clone for PrimitiveArray { fn clone(&self) -> Self { Self { data: self.data.clone(), - raw_values: self.raw_values, + raw_values: self.raw_values.clone(), } } } @@ -301,15 +296,7 @@ impl PrimitiveArray { /// Returns a slice of the values of this array #[inline] pub fn values(&self) -> &[T::Native] { - // Soundness - // raw_values alignment & location is ensured by fn from(ArrayDataRef) - // buffer bounds/offset is ensured by the ArrayData instance. - unsafe { - std::slice::from_raw_parts( - self.raw_values.as_ptr().add(self.data.offset()), - self.len(), - ) - } + &self.raw_values } /// Returns a new primitive array builder @@ -339,8 +326,7 @@ impl PrimitiveArray { /// caller must ensure that the passed in offset is less than the array len() #[inline] pub unsafe fn value_unchecked(&self, i: usize) -> T::Native { - let offset = i + self.offset(); - *self.raw_values.as_ptr().add(offset) + *self.raw_values.get_unchecked(i) } /// Returns the primitive value at index `i`. @@ -632,6 +618,7 @@ impl PrimitiveArray { .slice_with_length(self.data.offset() * element_len, len * element_len); drop(self.data); + drop(self.raw_values); let try_mutable_null_buffer = match null_bit_buffer { None => Ok(None), @@ -724,6 +711,7 @@ impl<'a, T: ArrowPrimitiveType> ArrayAccessor for &'a PrimitiveArray { PrimitiveArray::value(self, index) } + #[inline] unsafe fn value_unchecked(&self, index: usize) -> Self::Item { PrimitiveArray::value_unchecked(self, index) } @@ -1085,13 +1073,9 @@ impl From for PrimitiveArray { "PrimitiveArray data should contain a single buffer only (values buffer)" ); - let ptr = data.buffers()[0].as_ptr(); - Self { - data, - // SAFETY: - // ArrayData must be valid, and validated data type above - raw_values: unsafe { RawPtrBox::new(ptr) }, - } + let raw_values = + ScalarBuffer::new(data.buffers()[0].clone(), data.offset(), data.len()); + Self { data, raw_values } } } diff --git a/arrow-array/src/lib.rs b/arrow-array/src/lib.rs index 2cee2650eb7e..400b6e262faa 100644 --- a/arrow-array/src/lib.rs +++ b/arrow-array/src/lib.rs @@ -179,7 +179,6 @@ pub mod builder; pub mod cast; mod delta; pub mod iterator; -mod raw_pointer; pub mod run_iterator; pub mod temporal_conversions; pub mod timezone; diff --git a/arrow-array/src/raw_pointer.rs b/arrow-array/src/raw_pointer.rs deleted file mode 100644 index 0fea8c186d4c..000000000000 --- a/arrow-array/src/raw_pointer.rs +++ /dev/null @@ -1,75 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::ptr::NonNull; - -/// This struct is highly `unsafe` and offers the possibility to -/// self-reference a [arrow_buffer::Buffer] from -/// [arrow_data::ArrayData], as a pointer to the beginning of its -/// contents. -pub(super) struct RawPtrBox { - ptr: NonNull, -} - -impl Clone for RawPtrBox { - fn clone(&self) -> Self { - Self { ptr: self.ptr } - } -} - -impl Copy for RawPtrBox {} - -impl RawPtrBox { - /// # Safety - /// The user must guarantee that: - /// * the contents where `ptr` points to are never `moved`. This is guaranteed when they are Pinned. - /// * the lifetime of this struct does not outlive the lifetime of `ptr`. - /// Failure to fulfill any the above conditions results in undefined behavior. - /// # Panic - /// This function panics if: - /// * `ptr` is null - /// * `ptr` is not aligned to a slice of type `T`. This is guaranteed if it was built from a slice of type `T`. - pub(super) unsafe fn new(ptr: *const u8) -> Self { - let ptr = NonNull::new(ptr as *mut u8).expect("Pointer cannot be null"); - assert_eq!( - ptr.as_ptr().align_offset(std::mem::align_of::()), - 0, - "memory is not aligned" - ); - Self { ptr: ptr.cast() } - } - - pub(super) fn as_ptr(&self) -> *const T { - self.ptr.as_ptr() - } -} - -unsafe impl Send for RawPtrBox {} -unsafe impl Sync for RawPtrBox {} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - #[should_panic(expected = "memory is not aligned")] - #[cfg_attr(miri, ignore)] // sometimes does not panic as expected - fn test_primitive_array_alignment() { - let bytes = vec![0u8, 1u8]; - unsafe { RawPtrBox::::new(bytes.as_ptr().offset(1)) }; - } -} diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 3b517872aac4..04a559f21603 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -603,7 +603,7 @@ mod tests { let record_batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]) .unwrap(); - assert_eq!(record_batch.get_array_memory_size(), 592); + assert_eq!(record_batch.get_array_memory_size(), 640); } fn check_batch(record_batch: RecordBatch, num_rows: usize) { diff --git a/arrow-buffer/src/buffer/immutable.rs b/arrow-buffer/src/buffer/immutable.rs index 4048787c6a1f..cbfba1e0540c 100644 --- a/arrow-buffer/src/buffer/immutable.rs +++ b/arrow-buffer/src/buffer/immutable.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. +use std::convert::AsRef; use std::fmt::Debug; use std::iter::FromIterator; use std::ptr::NonNull; use std::sync::Arc; -use std::{convert::AsRef, usize}; use crate::alloc::{Allocation, Deallocation}; use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk}; @@ -30,26 +30,41 @@ use super::MutableBuffer; /// Buffer represents a contiguous memory region that can be shared with other buffers and across /// thread boundaries. -#[derive(Clone, PartialEq, Debug)] +#[derive(Clone, Debug)] pub struct Buffer { /// the internal byte buffer. data: Arc, - /// The offset into the buffer. - offset: usize, + /// Pointer into `data` valid + /// + /// We store a pointer instead of an offset to avoid pointer arithmetic + /// which causes LLVM to fail to vectorise code correctly + ptr: *const u8, /// Byte length of the buffer. length: usize, } +impl PartialEq for Buffer { + fn eq(&self, other: &Self) -> bool { + self.as_slice().eq(other.as_slice()) + } +} + +impl Eq for Buffer {} + +unsafe impl Send for Buffer where Bytes: Send {} +unsafe impl Sync for Buffer where Bytes: Sync {} + impl Buffer { /// Auxiliary method to create a new Buffer #[inline] pub fn from_bytes(bytes: Bytes) -> Self { let length = bytes.len(); + let ptr = bytes.as_ptr(); Buffer { data: Arc::new(bytes), - offset: 0, + ptr, length, } } @@ -108,9 +123,10 @@ impl Buffer { deallocation: Deallocation, ) -> Self { let bytes = Bytes::new(ptr, len, deallocation); + let ptr = bytes.as_ptr(); Buffer { + ptr, data: Arc::new(bytes), - offset: 0, length: len, } } @@ -136,7 +152,7 @@ impl Buffer { /// Returns the byte slice stored in this buffer pub fn as_slice(&self) -> &[u8] { - &self.data[self.offset..(self.offset + self.length)] + unsafe { std::slice::from_raw_parts(self.ptr, self.length) } } /// Returns a new [Buffer] that is a slice of this buffer starting at `offset`. @@ -145,13 +161,18 @@ impl Buffer { /// Panics iff `offset` is larger than `len`. pub fn slice(&self, offset: usize) -> Self { assert!( - offset <= self.len(), + offset <= self.length, "the offset of the new Buffer cannot exceed the existing length" ); + // Safety: + // This cannot overflow as + // `self.offset + self.length < self.data.len()` + // `offset < self.length` + let ptr = unsafe { self.ptr.add(offset) }; Self { data: self.data.clone(), - offset: self.offset + offset, length: self.length - offset, + ptr, } } @@ -162,12 +183,15 @@ impl Buffer { /// Panics iff `(offset + length)` is larger than the existing length. pub fn slice_with_length(&self, offset: usize, length: usize) -> Self { assert!( - offset + length <= self.len(), + offset.saturating_add(length) <= self.length, "the offset of the new Buffer cannot exceed the existing length" ); + // Safety: + // offset + length <= self.length + let ptr = unsafe { self.ptr.add(offset) }; Self { data: self.data.clone(), - offset: self.offset + offset, + ptr, length, } } @@ -178,7 +202,7 @@ impl Buffer { /// stored anywhere, to avoid dangling pointers. #[inline] pub fn as_ptr(&self) -> *const u8 { - unsafe { self.data.ptr().as_ptr().add(self.offset) } + self.ptr } /// View buffer as a slice of a specific type. @@ -231,18 +255,17 @@ impl Buffer { /// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared. /// Returns `Err` if this is shared or its allocation is from an external source. pub fn into_mutable(self) -> Result { - let offset_ptr = self.as_ptr(); - let offset = self.offset; + let ptr = self.ptr; let length = self.length; Arc::try_unwrap(self.data) .and_then(|bytes| { // The pointer of underlying buffer should not be offset. - assert_eq!(offset_ptr, bytes.ptr().as_ptr()); + assert_eq!(ptr, bytes.ptr().as_ptr()); MutableBuffer::from_bytes(bytes).map_err(Arc::new) }) .map_err(|bytes| Buffer { data: bytes, - offset, + ptr, length, }) } @@ -262,7 +285,7 @@ impl> From for Buffer { } /// Creating a `Buffer` instance by storing the boolean values into the buffer -impl std::iter::FromIterator for Buffer { +impl FromIterator for Buffer { fn from_iter(iter: I) -> Self where I: IntoIterator, @@ -321,10 +344,10 @@ impl Buffer { pub unsafe fn try_from_trusted_len_iter< E, T: ArrowNativeType, - I: Iterator>, + I: Iterator>, >( iterator: I, - ) -> std::result::Result { + ) -> Result { Ok(MutableBuffer::try_from_trusted_len_iter(iterator)?.into()) } } @@ -600,4 +623,13 @@ mod tests { let slice = buffer.typed_data::(); assert_eq!(slice, &[2, 3, 4, 5]); } + + #[test] + #[should_panic( + expected = "the offset of the new Buffer cannot exceed the existing length" + )] + fn slice_overflow() { + let buffer = Buffer::from(MutableBuffer::from_len_zeroed(12)); + buffer.slice_with_length(2, usize::MAX); + } } diff --git a/arrow-buffer/src/buffer/mod.rs b/arrow-buffer/src/buffer/mod.rs index b9201f774fe0..7c12e1804f9f 100644 --- a/arrow-buffer/src/buffer/mod.rs +++ b/arrow-buffer/src/buffer/mod.rs @@ -18,6 +18,8 @@ //! This module contains two main structs: [Buffer] and [MutableBuffer]. A buffer represents //! a contiguous memory region that can be shared via `offsets`. +mod offset; +pub use offset::*; mod immutable; pub use immutable::*; mod mutable; diff --git a/arrow-buffer/src/buffer/mutable.rs b/arrow-buffer/src/buffer/mutable.rs index b70a74e84249..2e6e2f1d7b08 100644 --- a/arrow-buffer/src/buffer/mutable.rs +++ b/arrow-buffer/src/buffer/mutable.rs @@ -581,10 +581,10 @@ impl MutableBuffer { pub unsafe fn try_from_trusted_len_iter< E, T: ArrowNativeType, - I: Iterator>, + I: Iterator>, >( iterator: I, - ) -> std::result::Result { + ) -> Result { let item_size = std::mem::size_of::(); let (_, upper) = iterator.size_hint(); let upper = upper.expect("try_from_trusted_len_iter requires an upper limit"); diff --git a/arrow-buffer/src/buffer/offset.rs b/arrow-buffer/src/buffer/offset.rs new file mode 100644 index 000000000000..a80c3c7ecb69 --- /dev/null +++ b/arrow-buffer/src/buffer/offset.rs @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::buffer::ScalarBuffer; +use crate::{ArrowNativeType, MutableBuffer}; +use std::ops::Deref; + +/// A non-empty buffer of monotonically increasing, positive integers +#[derive(Debug, Clone)] +pub struct OffsetBuffer(ScalarBuffer); + +impl OffsetBuffer { + /// Create a new [`OffsetBuffer`] from the provided [`ScalarBuffer`] + /// + /// # Safety + /// + /// `buffer` must be a non-empty buffer containing monotonically increasing + /// values greater than zero + pub unsafe fn new_unchecked(buffer: ScalarBuffer) -> Self { + Self(buffer) + } + + /// Create a new [`OffsetBuffer`] containing a single 0 value + pub fn new_empty() -> Self { + let buffer = MutableBuffer::from_len_zeroed(std::mem::size_of::()); + Self(buffer.into_buffer().into()) + } +} + +impl Deref for OffsetBuffer { + type Target = [T]; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl AsRef<[T]> for OffsetBuffer { + #[inline] + fn as_ref(&self) -> &[T] { + self + } +} diff --git a/arrow-buffer/src/buffer/scalar.rs b/arrow-buffer/src/buffer/scalar.rs index 124f3f6f5894..e688e52fea5c 100644 --- a/arrow-buffer/src/buffer/scalar.rs +++ b/arrow-buffer/src/buffer/scalar.rs @@ -17,6 +17,7 @@ use crate::buffer::Buffer; use crate::native::ArrowNativeType; +use std::marker::PhantomData; use std::ops::Deref; /// Provides a safe API for interpreting a [`Buffer`] as a slice of [`ArrowNativeType`] @@ -25,14 +26,11 @@ use std::ops::Deref; /// /// All [`ArrowNativeType`] are valid for all possible backing byte representations, and as /// a result they are "trivially safely transmutable". -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ScalarBuffer { - #[allow(unused)] + /// Underlying data buffer buffer: Buffer, - // Borrows from `buffer` and is valid for the lifetime of `buffer` - ptr: *const T, - // The length of this slice - len: usize, + phantom: PhantomData, } impl ScalarBuffer { @@ -48,39 +46,50 @@ impl ScalarBuffer { /// * `bytes` is not large enough for the requested slice pub fn new(buffer: Buffer, offset: usize, len: usize) -> Self { let size = std::mem::size_of::(); - let offset_len = offset.checked_add(len).expect("length overflow"); - let start_bytes = offset.checked_mul(size).expect("start bytes overflow"); - let end_bytes = offset_len.checked_mul(size).expect("end bytes overflow"); - - let bytes = &buffer.as_slice()[start_bytes..end_bytes]; - - // SAFETY: all byte sequences correspond to a valid instance of T - let (prefix, offsets, suffix) = unsafe { bytes.align_to::() }; - assert!( - prefix.is_empty() && suffix.is_empty(), - "buffer is not aligned to {size} byte boundary" - ); - - let ptr = offsets.as_ptr(); - Self { buffer, ptr, len } + let byte_offset = offset.checked_mul(size).expect("offset overflow"); + let byte_len = len.checked_mul(size).expect("length overflow"); + buffer.slice_with_length(byte_offset, byte_len).into() } } impl Deref for ScalarBuffer { type Target = [T]; + #[inline] fn deref(&self) -> &Self::Target { - // SAFETY: Bounds checked in constructor and ptr is valid for the lifetime of self - unsafe { std::slice::from_raw_parts(self.ptr, self.len) } + // SAFETY: Verified alignment in From + unsafe { + std::slice::from_raw_parts( + self.buffer.as_ptr() as *const T, + self.buffer.len() / std::mem::size_of::(), + ) + } } } impl AsRef<[T]> for ScalarBuffer { + #[inline] fn as_ref(&self) -> &[T] { self } } +impl From for ScalarBuffer { + fn from(buffer: Buffer) -> Self { + let align = std::mem::align_of::(); + assert_eq!( + buffer.as_ptr().align_offset(align), + 0, + "memory is not aligned" + ); + + Self { + buffer, + phantom: Default::default(), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -103,7 +112,7 @@ mod tests { } #[test] - #[should_panic(expected = "buffer is not aligned to 4 byte boundary")] + #[should_panic(expected = "memory is not aligned")] fn test_unaligned() { let expected = [0_i32, 1, 2]; let buffer = Buffer::from_iter(expected.iter().cloned()); @@ -112,35 +121,39 @@ mod tests { } #[test] - #[should_panic(expected = "range end index 16 out of range for slice of length 12")] + #[should_panic( + expected = "the offset of the new Buffer cannot exceed the existing length" + )] fn test_length_out_of_bounds() { let buffer = Buffer::from_iter([0_i32, 1, 2]); ScalarBuffer::::new(buffer, 1, 3); } #[test] - #[should_panic(expected = "range end index 16 out of range for slice of length 12")] + #[should_panic( + expected = "the offset of the new Buffer cannot exceed the existing length" + )] fn test_offset_out_of_bounds() { let buffer = Buffer::from_iter([0_i32, 1, 2]); ScalarBuffer::::new(buffer, 4, 0); } #[test] - #[should_panic(expected = "length overflow")] + #[should_panic(expected = "offset overflow")] fn test_length_overflow() { let buffer = Buffer::from_iter([0_i32, 1, 2]); ScalarBuffer::::new(buffer, usize::MAX, 1); } #[test] - #[should_panic(expected = "start bytes overflow")] + #[should_panic(expected = "offset overflow")] fn test_start_overflow() { let buffer = Buffer::from_iter([0_i32, 1, 2]); ScalarBuffer::::new(buffer, usize::MAX / 4 + 1, 0); } #[test] - #[should_panic(expected = "end bytes overflow")] + #[should_panic(expected = "length overflow")] fn test_end_overflow() { let buffer = Buffer::from_iter([0_i32, 1, 2]); ScalarBuffer::::new(buffer, 0, usize::MAX / 4 + 1); diff --git a/arrow-buffer/src/bytes.rs b/arrow-buffer/src/bytes.rs index fea04ad0d50b..3320dfc261c7 100644 --- a/arrow-buffer/src/bytes.rs +++ b/arrow-buffer/src/bytes.rs @@ -61,7 +61,7 @@ impl Bytes { /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed. #[inline] pub(crate) unsafe fn new( - ptr: std::ptr::NonNull, + ptr: NonNull, len: usize, deallocation: Deallocation, ) -> Bytes {