Skip to content

Commit

Permalink
chore(rust): Document and simplify MutableBinView::push_ignore_validi…
Browse files Browse the repository at this point in the history
…ty (#17645)
  • Loading branch information
coastalwhite authored Jul 16, 2024
1 parent 4ff51a3 commit 30bf890
Showing 1 changed file with 27 additions and 18 deletions.
45 changes: 27 additions & 18 deletions crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ use crate::buffer::Buffer;
use crate::datatypes::ArrowDataType;
use crate::legacy::trusted_len::TrustedLenPush;
use crate::trusted_len::TrustedLen;
use crate::types::NativeType;

const DEFAULT_BLOCK_SIZE: usize = 8 * 1024;
const MAX_EXP_BLOCK_SIZE: usize = 16 * 1024 * 1024;

pub struct MutableBinaryViewArray<T: ViewType + ?Sized> {
pub(super) views: Vec<View>,
Expand Down Expand Up @@ -154,38 +154,47 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {

#[inline]
pub fn push_value_ignore_validity<V: AsRef<T>>(&mut self, value: V) {
let value = value.as_ref();
let bytes = value.to_bytes();
let bytes = value.as_ref().to_bytes();
self.total_bytes_len += bytes.len();
let len: u32 = bytes.len().try_into().unwrap();
let mut payload = [0; 16];
payload[0..4].copy_from_slice(&len.to_le_bytes());

if len <= 12 {
payload[4..4 + bytes.len()].copy_from_slice(bytes);
// A string can only be maximum of 4GB in size.
let len = u32::try_from(bytes.len()).unwrap();

let view = if len <= View::MAX_INLINE_SIZE {
View::new_inline(bytes)
} else {
self.total_buffer_len += bytes.len();
let required_cap = self.in_progress_buffer.len() + bytes.len();
if self.in_progress_buffer.capacity() < required_cap {

// We want to make sure that we never have to memcopy between buffers. So if the
// current buffer is not large enough, create a new buffer that is large enough and try
// to anticipate the larger size.
let required_capacity = self.in_progress_buffer.len() + bytes.len();
let does_not_fit_in_buffer = self.in_progress_buffer.capacity() < required_capacity;

// We can only save offsets that are below u32::MAX
let offset_will_not_fit = self.in_progress_buffer.len() > u32::MAX as usize;

if does_not_fit_in_buffer || offset_will_not_fit {
// Allocate a new buffer and flush the old buffer
let new_capacity = (self.in_progress_buffer.capacity() * 2)
.clamp(DEFAULT_BLOCK_SIZE, 16 * 1024 * 1024)
.clamp(DEFAULT_BLOCK_SIZE, MAX_EXP_BLOCK_SIZE)
.max(bytes.len());
let in_progress = Vec::with_capacity(new_capacity);
let flushed = std::mem::replace(&mut self.in_progress_buffer, in_progress);
if !flushed.is_empty() {
self.completed_buffers.push(flushed.into())
}
}

let offset = self.in_progress_buffer.len() as u32;
self.in_progress_buffer.extend_from_slice(bytes);

unsafe { payload[4..8].copy_from_slice(bytes.get_unchecked_release(0..4)) };
let buffer_idx: u32 = self.completed_buffers.len().try_into().unwrap();
payload[8..12].copy_from_slice(&buffer_idx.to_le_bytes());
payload[12..16].copy_from_slice(&offset.to_le_bytes());
}
let value = View::from_le_bytes(payload);
self.views.push(value);
let buffer_idx = u32::try_from(self.completed_buffers.len()).unwrap();

View::new_from_bytes(bytes, buffer_idx, offset)
};

self.views.push(view);
}

#[inline]
Expand Down

0 comments on commit 30bf890

Please sign in to comment.