Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Better GC and push_view for binviews #17627

Merged
merged 6 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 0 additions & 36 deletions crates/polars-arrow/src/array/binview/buffers.rs

This file was deleted.

30 changes: 28 additions & 2 deletions crates/polars-arrow/src/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod private {
}
pub use iterator::BinaryViewValueIter;
pub use mutable::MutableBinaryViewArray;
use polars_utils::aliases::{InitHashMaps, PlHashMap};
use polars_utils::slice::GetSaferUnchecked;
use private::Sealed;

Expand Down Expand Up @@ -353,6 +354,22 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
self.total_buffer_len
}

fn total_unshared_buffer_len(&self) -> usize {
// XXX: it is O(n), not O(1).
// Given this function is only called in `maybe_gc()`,
// it may not be worthy to add an extra field for this.
self.buffers
.iter()
.map(|buf| {
if buf.shared_count_strong() == 1 {
buf.len()
} else {
0
}
})
.sum()
}

#[inline(always)]
pub fn len(&self) -> usize {
self.views.len()
Expand All @@ -367,7 +384,7 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
let buffers = self.buffers.as_ref();

for view in self.views.as_ref() {
unsafe { mutable.push_view(*view, buffers) }
unsafe { mutable.push_view_copied_unchecked(*view, buffers) }
}
mutable.freeze().with_validity(self.validity)
}
Expand All @@ -383,13 +400,21 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
return self;
}

if Arc::strong_count(&self.buffers) != 1 {
// There are multiple holders of this `buffers`.
// If we allow gc in this case,
// it may end up copying the same content multiple times.
return self;
}

// Subtract the maximum amount of inlined strings to get a lower bound
// on the number of buffer bytes needed (assuming no dedup).
let total_bytes_len = self.total_bytes_len();
let buffer_req_lower_bound = total_bytes_len.saturating_sub(self.len() * 12);

let lower_bound_mem_usage_post_gc = self.len() * 16 + buffer_req_lower_bound;
let cur_mem_usage = self.len() * 16 + self.total_buffer_len();
// Use unshared buffer len. Shared buffer won't be freed; no savings.
let cur_mem_usage = self.len() * 16 + self.total_unshared_buffer_len();
let savings_upper_bound = cur_mem_usage.saturating_sub(lower_bound_mem_usage_post_gc);

if savings_upper_bound >= GC_MINIMUM_SAVINGS
Expand All @@ -413,6 +438,7 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
phantom: Default::default(),
total_bytes_len: self.total_bytes_len.load(Ordering::Relaxed) as usize,
total_buffer_len: self.total_buffer_len,
stolen_buffers: PlHashMap::new(),
}
}
}
Expand Down
161 changes: 152 additions & 9 deletions crates/polars-arrow/src/array/binview/mutable.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::any::Any;
use std::fmt::{Debug, Formatter};
use std::ops::Deref;
use std::sync::Arc;

use hashbrown::hash_map::Entry;
use polars_error::PolarsResult;
use polars_utils::aliases::{InitHashMaps, PlHashMap};
use polars_utils::slice::GetSaferUnchecked;

use crate::array::binview::iterator::MutableBinaryViewValueIter;
Expand All @@ -19,15 +22,18 @@ use crate::types::NativeType;
const DEFAULT_BLOCK_SIZE: usize = 8 * 1024;

pub struct MutableBinaryViewArray<T: ViewType + ?Sized> {
pub(super) views: Vec<View>,
pub(super) completed_buffers: Vec<Buffer<u8>>,
pub(super) in_progress_buffer: Vec<u8>,
pub(super) validity: Option<MutableBitmap>,
pub(super) phantom: std::marker::PhantomData<T>,
pub(crate) views: Vec<View>,
pub(crate) completed_buffers: Vec<Buffer<u8>>,
pub(crate) in_progress_buffer: Vec<u8>,
pub(crate) validity: Option<MutableBitmap>,
pub(crate) phantom: std::marker::PhantomData<T>,
/// Total bytes length if we would concatenate them all.
pub(super) total_bytes_len: usize,
pub(crate) total_bytes_len: usize,
/// Total bytes in the buffer (excluding remaining capacity)
pub(super) total_buffer_len: usize,
pub(crate) total_buffer_len: usize,
/// Mapping from `Buffer::deref()` to index in `completed_buffers`.
/// Used in `push_view()`.
pub(crate) stolen_buffers: PlHashMap<usize, u32>,
}

impl<T: ViewType + ?Sized> Clone for MutableBinaryViewArray<T> {
Expand All @@ -40,6 +46,7 @@ impl<T: ViewType + ?Sized> Clone for MutableBinaryViewArray<T> {
phantom: Default::default(),
total_bytes_len: self.total_bytes_len,
total_buffer_len: self.total_buffer_len,
stolen_buffers: PlHashMap::new(),
}
}
}
Expand Down Expand Up @@ -86,6 +93,7 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
phantom: Default::default(),
total_buffer_len: 0,
total_bytes_len: 0,
stolen_buffers: PlHashMap::new(),
}
}

Expand Down Expand Up @@ -135,8 +143,8 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
/// # Safety
/// - caller must allocate enough capacity
/// - caller must ensure the view and buffers match.
#[inline]
pub unsafe fn push_view(&mut self, v: View, buffers: &[Buffer<u8>]) {
/// - The array must not have validity.
pub(crate) unsafe fn push_view_copied_unchecked(&mut self, v: View, buffers: &[Buffer<u8>]) {
let len = v.length;
self.total_bytes_len += len as usize;
if len <= 12 {
Expand All @@ -152,6 +160,67 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
}
}

/// # Safety
/// - caller must allocate enough capacity
/// - caller must ensure the view and buffers match.
/// - The array must not have validity.
/// - caller must not mix use this function with other push functions.
pub unsafe fn push_view_unchecked(&mut self, mut v: View, buffers: &[Buffer<u8>]) {
let len = v.length;
self.total_bytes_len += len as usize;
if len <= 12 {
self.views.push_unchecked(v);
} else {
let buffer = buffers.get_unchecked_release(v.buffer_idx as usize);
let idx = match self.stolen_buffers.entry(buffer.deref().as_ptr() as usize) {
Entry::Occupied(entry) => *entry.get(),
Entry::Vacant(entry) => {
let idx = self.completed_buffers.len() as u32;
entry.insert(idx);
self.completed_buffers.push(buffer.clone());
self.total_buffer_len += buffer.len();
idx
},
};
v.buffer_idx = idx;
self.views.push_unchecked(v);
}
}

pub fn push_view(&mut self, mut v: View, buffers: &[Buffer<u8>]) {
let len = v.length;
self.total_bytes_len += len as usize;
if len <= 12 {
self.views.push(v);
} else {
// Do no mix use of push_view and push_value_ignore_validity -
// it causes fragmentation.
self.finish_in_progress();

let buffer = &buffers[v.buffer_idx as usize];
let idx = match self.stolen_buffers.entry(buffer.deref().as_ptr() as usize) {
Entry::Occupied(entry) => {
let idx = *entry.get();
let target_buffer = &self.completed_buffers[idx as usize];
debug_assert_eq!(buffer, target_buffer);
idx
},
Entry::Vacant(entry) => {
let idx = self.completed_buffers.len() as u32;
entry.insert(idx);
self.completed_buffers.push(buffer.clone());
self.total_buffer_len += buffer.len();
idx
},
};
v.buffer_idx = idx;
self.views.push(v);
}
if let Some(validity) = &mut self.validity {
validity.push(true)
}
}

#[inline]
pub fn push_value_ignore_validity<V: AsRef<T>>(&mut self, value: V) {
let value = value.as_ref();
Expand Down Expand Up @@ -303,6 +372,73 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
self.extend(iterator)
}

#[inline]
pub fn extend_views<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
where
I: Iterator<Item = Option<View>>,
{
self.reserve(iterator.size_hint().0);
for p in iterator {
match p {
Some(v) => self.push_view(v, buffers),
None => self.push_null(),
}
}
}

#[inline]
pub fn extend_views_trusted_len<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
where
I: TrustedLen<Item = Option<View>>,
{
self.extend_views(iterator, buffers);
}

#[inline]
pub fn extend_non_null_views<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
where
I: Iterator<Item = View>,
{
self.reserve(iterator.size_hint().0);
for v in iterator {
self.push_view(v, buffers);
}
}

#[inline]
pub fn extend_non_null_views_trusted_len<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
where
I: TrustedLen<Item = View>,
{
self.extend_non_null_views(iterator, buffers);
}

/// # Safety
/// Same as `push_view_unchecked()`.
#[inline]
pub unsafe fn extend_non_null_views_unchecked<I>(&mut self, iterator: I, buffers: &[Buffer<u8>])
where
I: Iterator<Item = View>,
{
self.reserve(iterator.size_hint().0);
for v in iterator {
self.push_view_unchecked(v, buffers);
}
}

/// # Safety
/// Same as `push_view_unchecked()`.
#[inline]
pub unsafe fn extend_non_null_views_trusted_len_unchecked<I>(
&mut self,
iterator: I,
buffers: &[Buffer<u8>],
) where
I: TrustedLen<Item = View>,
{
self.extend_non_null_views_unchecked(iterator, buffers);
}

#[inline]
pub fn from_iterator<I, P>(iterator: I) -> Self
where
Expand Down Expand Up @@ -343,6 +479,13 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
self.into()
}

#[inline]
pub fn freeze_with_dtype(self, dtype: ArrowDataType) -> BinaryViewArrayGeneric<T> {
let mut arr: BinaryViewArrayGeneric<T> = self.into();
arr.data_type = dtype;
arr
}

#[inline]
pub fn value(&self, i: usize) -> &T {
assert!(i < self.len());
Expand Down
Loading