Skip to content

Commit

Permalink
Implement exponential block size growing strategy for `StringViewBuil…
Browse files Browse the repository at this point in the history
…der` (#6136)

* new block size growing strategy

* Update arrow-array/src/builder/generic_bytes_view_builder.rs

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* update function name, deprecate old function

* update comments

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
XiangpengHao and alamb authored Jul 29, 2024
1 parent 11f2bb8 commit bd1e76b
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 12 deletions.
4 changes: 2 additions & 2 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ mod tests {
fn test_in_progress_recreation() {
let array = {
// make a builder with small block size.
let mut builder = StringViewBuilder::new().with_block_size(14);
let mut builder = StringViewBuilder::new().with_fixed_block_size(14);
builder.append_value("large payload over 12 bytes");
builder.append_option(Some("another large payload over 12 bytes that double than the first one, so that we can trigger the in_progress in builder re-created"));
builder.finish()
Expand Down Expand Up @@ -848,7 +848,7 @@ mod tests {
];

let array = {
let mut builder = StringViewBuilder::new().with_block_size(8); // create multiple buffers
let mut builder = StringViewBuilder::new().with_fixed_block_size(8); // create multiple buffers
test_data.into_iter().for_each(|v| builder.append_option(v));
builder.finish()
};
Expand Down
104 changes: 98 additions & 6 deletions arrow-array/src/builder/generic_bytes_view_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,30 @@ use crate::types::bytes::ByteArrayNativeType;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{ArrayRef, GenericByteViewArray};

const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024;
const STARTING_BLOCK_SIZE: u32 = 8 * 1024; // 8KiB
const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; // 2MiB

enum BlockSizeGrowthStrategy {
Fixed { size: u32 },
Exponential { current_size: u32 },
}

impl BlockSizeGrowthStrategy {
fn next_size(&mut self) -> u32 {
match self {
Self::Fixed { size } => *size,
Self::Exponential { current_size } => {
if *current_size < MAX_BLOCK_SIZE {
// we have fixed start/end block sizes, so we can't overflow
*current_size = current_size.saturating_mul(2);
*current_size
} else {
MAX_BLOCK_SIZE
}
}
}
}
}

/// A builder for [`GenericByteViewArray`]
///
Expand Down Expand Up @@ -58,7 +81,7 @@ pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
null_buffer_builder: NullBufferBuilder,
completed: Vec<Buffer>,
in_progress: Vec<u8>,
block_size: u32,
block_size: BlockSizeGrowthStrategy,
/// Some if deduplicating strings
/// map `<string hash> -> <index to the views>`
string_tracker: Option<(HashTable<usize>, ahash::RandomState)>,
Expand All @@ -78,15 +101,42 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
null_buffer_builder: NullBufferBuilder::new(capacity),
completed: vec![],
in_progress: vec![],
block_size: DEFAULT_BLOCK_SIZE,
block_size: BlockSizeGrowthStrategy::Exponential {
current_size: STARTING_BLOCK_SIZE,
},
string_tracker: None,
phantom: Default::default(),
}
}

/// Set a fixed buffer size for variable length strings
///
/// The block size is the size of the buffer used to store values greater
/// than 12 bytes. The builder allocates new buffers when the current
/// buffer is full.
///
/// By default the builder balances buffer size and buffer count by
/// growing buffer size exponentially from 8KB up to 2MB. The
/// first buffer allocated is 8KB, then 16KB, then 32KB, etc up to 2MB.
///
/// If this method is used, any new buffers allocated are
/// exactly this size. This can be useful for advanced users
/// that want to control the memory usage and buffer count.
///
/// See <https://github.com/apache/arrow-rs/issues/6094> for more details on the implications.
pub fn with_fixed_block_size(self, block_size: u32) -> Self {
debug_assert!(block_size > 0, "Block size must be greater than 0");
Self {
block_size: BlockSizeGrowthStrategy::Fixed { size: block_size },
..self
}
}

/// Override the size of buffers to allocate for holding string data
/// Use `with_fixed_block_size` instead.
#[deprecated(note = "Use `with_fixed_block_size` instead")]
pub fn with_block_size(self, block_size: u32) -> Self {
Self { block_size, ..self }
self.with_fixed_block_size(block_size)
}

/// Deduplicate strings while building the array
Expand Down Expand Up @@ -277,7 +327,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
let required_cap = self.in_progress.len() + v.len();
if self.in_progress.capacity() < required_cap {
self.flush_in_progress();
let to_reserve = v.len().max(self.block_size as usize);
let to_reserve = v.len().max(self.block_size.next_size() as usize);
self.in_progress.reserve(to_reserve);
};
let offset = self.in_progress.len() as u32;
Expand Down Expand Up @@ -478,7 +528,7 @@ mod tests {

let mut builder = StringViewBuilder::new()
.with_deduplicate_strings()
.with_block_size(value_1.len() as u32 * 2); // so that we will have multiple buffers
.with_fixed_block_size(value_1.len() as u32 * 2); // so that we will have multiple buffers

let values = vec![
Some(value_1),
Expand Down Expand Up @@ -585,4 +635,46 @@ mod tests {
"Invalid argument error: No block found with index 5"
);
}

#[test]
fn test_string_view_with_block_size_growth() {
let mut exp_builder = StringViewBuilder::new();
let mut fixed_builder = StringViewBuilder::new().with_fixed_block_size(STARTING_BLOCK_SIZE);

let long_string = String::from_utf8(vec![b'a'; STARTING_BLOCK_SIZE as usize]).unwrap();

for i in 0..9 {
// 8k, 16k, 32k, 64k, 128k, 256k, 512k, 1M, 2M
for _ in 0..(2_u32.pow(i)) {
exp_builder.append_value(&long_string);
fixed_builder.append_value(&long_string);
}
exp_builder.flush_in_progress();
fixed_builder.flush_in_progress();

// Every step only add one buffer, but the buffer size is much larger
assert_eq!(exp_builder.completed.len(), i as usize + 1);
assert_eq!(
exp_builder.completed[i as usize].len(),
STARTING_BLOCK_SIZE as usize * 2_usize.pow(i)
);

// This step we added 2^i blocks, the sum of blocks should be 2^(i+1) - 1
assert_eq!(fixed_builder.completed.len(), 2_usize.pow(i + 1) - 1);

// Every buffer is fixed size
assert!(fixed_builder
.completed
.iter()
.all(|b| b.len() == STARTING_BLOCK_SIZE as usize));
}

// Add one more value, and the buffer stop growing.
exp_builder.append_value(&long_string);
exp_builder.flush_in_progress();
assert_eq!(
exp_builder.completed.last().unwrap().capacity(),
MAX_BLOCK_SIZE as usize
);
}
}
8 changes: 4 additions & 4 deletions arrow-cast/src/cast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5321,7 +5321,7 @@ mod tests {
let typed_dict = string_dict_array.downcast_dict::<StringArray>().unwrap();

let string_view_array = {
let mut builder = StringViewBuilder::new().with_block_size(8); // multiple buffers.
let mut builder = StringViewBuilder::new().with_fixed_block_size(8); // multiple buffers.
for v in typed_dict.into_iter() {
builder.append_option(v);
}
Expand All @@ -5338,7 +5338,7 @@ mod tests {
let typed_binary_dict = binary_dict_array.downcast_dict::<BinaryArray>().unwrap();

let binary_view_array = {
let mut builder = BinaryViewBuilder::new().with_block_size(8); // multiple buffers.
let mut builder = BinaryViewBuilder::new().with_fixed_block_size(8); // multiple buffers.
for v in typed_binary_dict.into_iter() {
builder.append_option(v);
}
Expand Down Expand Up @@ -5381,7 +5381,7 @@ mod tests {
O: OffsetSizeTrait,
{
let view_array = {
let mut builder = StringViewBuilder::new().with_block_size(8); // multiple buffers.
let mut builder = StringViewBuilder::new().with_fixed_block_size(8); // multiple buffers.
for s in VIEW_TEST_DATA.iter() {
builder.append_option(*s);
}
Expand Down Expand Up @@ -5410,7 +5410,7 @@ mod tests {
O: OffsetSizeTrait,
{
let view_array = {
let mut builder = BinaryViewBuilder::new().with_block_size(8); // multiple buffers.
let mut builder = BinaryViewBuilder::new().with_fixed_block_size(8); // multiple buffers.
for s in VIEW_TEST_DATA.iter() {
builder.append_option(*s);
}
Expand Down

0 comments on commit bd1e76b

Please sign in to comment.