Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid a copy in the new parquet decoder #3

Open
wants to merge 12 commits into
base: better-decoder
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ exclude = [
]

[workspace.package]
version = "53.2.0"
version = "53.3.0"
homepage = "https://github.com/apache/arrow-rs"
repository = "https://github.com/apache/arrow-rs"
authors = ["Apache Arrow <dev@arrow.apache.org>"]
Expand Down
45 changes: 45 additions & 0 deletions arrow-buffer/src/buffer/boolean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,51 @@ impl BooleanBuffer {
pub fn set_slices(&self) -> BitSliceIterator<'_> {
BitSliceIterator::new(self.values(), self.offset, self.len)
}

/// Combines this [`BooleanBuffer`] with another using logical AND on the selected bits.
///
/// Unlike intersection, the `other` [`BooleanBuffer`] must have exactly as many **set bits** as `self`,
/// i.e., self.count_set_bits() == other.len().
///
/// This method will keep only the bits in `self` that are also set in `other`
/// at the positions corresponding to `self`'s set bits.
/// For example:
/// self: NNYYYNNYYNYN
/// other: YNY NY N
/// result: NNYNYNNNYNNN
pub fn and_then(&self, other: &Self) -> Self {
// Ensure that 'other' has exactly as many set bits as 'self'
debug_assert_eq!(
self.count_set_bits(),
other.len(),
"The 'other' selection must have exactly as many set bits as 'self'."
);

if self.len() == other.len() {
// fast path if the two bool masks are the same length
// this happens when self selects all rows
debug_assert_eq!(self.count_set_bits(), self.len());
return other.clone();
}

let mut buffer = MutableBuffer::from_len_zeroed(self.values().len());
buffer.copy_from_slice(self.values());
let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, self.len());

// Create iterators for 'self' and 'other' bits
let mut other_bits = other.iter();

for bit_idx in self.set_indices() {
let predicate = other_bits
.next()
.expect("Mismatch in set bits between self and other");
if !predicate {
builder.set_bit(bit_idx, false);
}
}

builder.finish()
}
}

impl Not for &BooleanBuffer {
Expand Down
1 change: 1 addition & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
sysinfo = { version = "0.32.0", optional = true, default-features = false, features = ["system"] }
crc32fast = { version = "1.4.2", optional = true, default-features = false }
simdutf8 = "0.1.5"

[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
Expand Down
31 changes: 24 additions & 7 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,17 @@ impl ColumnValueDecoder for ByteViewArrayColumnValueDecoder {
));
}

let mut buffer = ViewBuffer::default();
let mut buffer = ViewBuffer::with_capacity(num_values as usize, 1);
let mut decoder = ByteViewArrayDecoderPlain::new(
buf,
num_values as usize,
Some(num_values as usize),
self.validate_utf8,
);
decoder.read(&mut buffer, usize::MAX)?;

self.dict = Some(buffer);

Ok(())
}

Expand Down Expand Up @@ -290,7 +292,7 @@ impl ByteViewArrayDecoder {

/// Decoder from [`Encoding::PLAIN`] data to [`ViewBuffer`]
pub struct ByteViewArrayDecoderPlain {
buf: Bytes,
buf: Buffer,
offset: usize,

validate_utf8: bool,
Expand All @@ -307,6 +309,9 @@ impl ByteViewArrayDecoderPlain {
num_values: Option<usize>,
validate_utf8: bool,
) -> Self {
// Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is zero copy
// Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, which is also zero copy
let buf = arrow_buffer::Buffer::from_bytes(buf.clone().into());
Self {
buf,
offset: 0,
Expand All @@ -316,10 +321,20 @@ impl ByteViewArrayDecoderPlain {
}

pub fn read(&mut self, output: &mut ViewBuffer, len: usize) -> Result<usize> {
// Here we convert `bytes::Bytes` into `arrow_buffer::Bytes`, which is zero copy
// Then we convert `arrow_buffer::Bytes` into `arrow_buffer:Buffer`, which is also zero copy
let buf = arrow_buffer::Buffer::from_bytes(self.buf.clone().into());
let block_id = output.append_block(buf);
// let block_id = output.append_block(self.buf.clone());
let need_to_create_new_buffer = {
if let Some(last_buffer) = output.buffers.last() {
!last_buffer.ptr_eq(&self.buf)
} else {
true
}
};

let block_id = if need_to_create_new_buffer {
output.append_block(self.buf.clone())
} else {
output.buffers.len() as u32 - 1
};

let to_read = len.min(self.max_remaining_values);

Expand Down Expand Up @@ -458,6 +473,8 @@ impl ByteViewArrayDecoderDictionary {
}
}

output.views.reserve(len);

// Calculate the offset of the dictionary buffers in the output buffers
// For example if the 2nd buffer in the dictionary is the 5th buffer in the output buffers,
// then the base_buffer_idx is 5 - 2 = 3
Expand Down Expand Up @@ -679,7 +696,7 @@ impl ByteViewArrayDecoderDelta {

/// Check that `val` is a valid UTF-8 sequence
pub fn check_valid_utf8(val: &[u8]) -> Result<()> {
match std::str::from_utf8(val) {
match simdutf8::basic::from_utf8(val) {
Ok(_) => Ok(()),
Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
}
Expand Down
36 changes: 24 additions & 12 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,9 +413,12 @@ mod tests {
);
let page_iterator = InMemoryPageIterator::new(page_lists);

let mut array_reader =
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap();
let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
Box::new(page_iterator),
column_desc,
None,
)
.unwrap();

// Read first 50 values, which are all from the first column chunk
let array = array_reader.next_batch(50).unwrap();
Expand Down Expand Up @@ -619,9 +622,12 @@ mod tests {

let page_iterator = InMemoryPageIterator::new(page_lists);

let mut array_reader =
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap();
let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
Box::new(page_iterator),
column_desc,
None,
)
.unwrap();

let mut accu_len: usize = 0;

Expand Down Expand Up @@ -695,9 +701,12 @@ mod tests {
);
let page_iterator = InMemoryPageIterator::new(page_lists);

let mut array_reader =
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap();
let mut array_reader = PrimitiveArrayReader::<Int32Type>::new(
Box::new(page_iterator),
column_desc,
None,
)
.unwrap();

// read data from the reader
// the data type is decimal(8,2)
Expand Down Expand Up @@ -754,9 +763,12 @@ mod tests {
);
let page_iterator = InMemoryPageIterator::new(page_lists);

let mut array_reader =
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap();
let mut array_reader = PrimitiveArrayReader::<Int64Type>::new(
Box::new(page_iterator),
column_desc,
None,
)
.unwrap();

// read data from the reader
// the data type is decimal(18,4)
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/array_reader/struct_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;

/// Implementation of struct array reader.
pub struct StructArrayReader {
children: Vec<Box<dyn ArrayReader>>,
pub children: Vec<Box<dyn ArrayReader>>,
data_type: ArrowType,
struct_def_level: i16,
struct_rep_level: i16,
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_reader/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ where
/// [`RowSelection`]: crate::arrow::arrow_reader::RowSelection
pub struct RowFilter {
/// A list of [`ArrowPredicate`]
pub(crate) predicates: Vec<Box<dyn ArrowPredicate>>,
pub predicates: Vec<Box<dyn ArrowPredicate>>,
}

impl RowFilter {
Expand Down
Loading
Loading