Skip to content

Commit

Permalink
Add unencoded_byte_array_data_bytes to ParquetMetaData (#6068)
Browse files Browse the repository at this point in the history
* update to latest thrift (as of 11 Jul 2024) from parquet-format

* pass None for optional size statistics

* escape HTML tags

* don't need to escape brackets in arrays

* add support for unencoded_byte_array_data_bytes

* add comments

* change sig of ColumnMetrics::update_variable_length_bytes()

* rename ParquetOffsetIndex to OffsetSizeIndex

* rename some functions

* suggestion from review

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>

* add Default trait to ColumnMetrics as suggested in review

* rename OffsetSizeIndex to OffsetIndexMetaData

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
etseidl and alamb authored Jul 19, 2024
1 parent 05e681d commit e40b311
Show file tree
Hide file tree
Showing 13 changed files with 349 additions and 34 deletions.
19 changes: 18 additions & 1 deletion parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ macro_rules! downcast_op {
struct FallbackEncoder {
encoder: FallbackEncoderImpl,
num_values: usize,
variable_length_bytes: i64,
}

/// The fallback encoder in use
Expand Down Expand Up @@ -152,6 +153,7 @@ impl FallbackEncoder {
Ok(Self {
encoder,
num_values: 0,
variable_length_bytes: 0,
})
}

Expand All @@ -168,7 +170,8 @@ impl FallbackEncoder {
let value = values.value(*idx);
let value = value.as_ref();
buffer.extend_from_slice((value.len() as u32).as_bytes());
buffer.extend_from_slice(value)
buffer.extend_from_slice(value);
self.variable_length_bytes += value.len() as i64;
}
}
FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
Expand All @@ -177,6 +180,7 @@ impl FallbackEncoder {
let value = value.as_ref();
lengths.put(&[value.len() as i32]).unwrap();
buffer.extend_from_slice(value);
self.variable_length_bytes += value.len() as i64;
}
}
FallbackEncoderImpl::Delta {
Expand Down Expand Up @@ -205,6 +209,7 @@ impl FallbackEncoder {
buffer.extend_from_slice(&value[prefix_length..]);
prefix_lengths.put(&[prefix_length as i32]).unwrap();
suffix_lengths.put(&[suffix_length as i32]).unwrap();
self.variable_length_bytes += value.len() as i64;
}
}
}
Expand Down Expand Up @@ -269,12 +274,17 @@ impl FallbackEncoder {
}
};

// Capture value of variable_length_bytes and reset for next page
let variable_length_bytes = Some(self.variable_length_bytes);
self.variable_length_bytes = 0;

Ok(DataPageValues {
buf: buf.into(),
num_values: std::mem::take(&mut self.num_values),
encoding,
min_value,
max_value,
variable_length_bytes,
})
}
}
Expand Down Expand Up @@ -321,6 +331,7 @@ impl Storage for ByteArrayStorage {
struct DictEncoder {
interner: Interner<ByteArrayStorage>,
indices: Vec<u64>,
variable_length_bytes: i64,
}

impl DictEncoder {
Expand All @@ -336,6 +347,7 @@ impl DictEncoder {
let value = values.value(*idx);
let interned = self.interner.intern(value.as_ref());
self.indices.push(interned);
self.variable_length_bytes += value.as_ref().len() as i64;
}
}

Expand Down Expand Up @@ -384,12 +396,17 @@ impl DictEncoder {

self.indices.clear();

// Capture value of variable_length_bytes and reset for next page
let variable_length_bytes = Some(self.variable_length_bytes);
self.variable_length_bytes = 0;

DataPageValues {
buf: encoder.consume().into(),
num_values,
encoding: Encoding::RLE_DICTIONARY,
min_value,
max_value,
variable_length_bytes,
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
use crate::file::page_index::index_reader::{
acc_range, decode_column_index, decode_page_locations,
};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
Expand Down Expand Up @@ -177,7 +179,9 @@ impl<F: MetadataFetch> MetadataLoader<F> {
x.columns()
.iter()
.map(|c| match c.offset_index_range() {
Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]),
Some(r) => {
decode_page_locations(&data[r.start - offset..r.end - offset])
}
None => Err(general_err!("missing offset index")),
})
.collect::<Result<Vec<_>>>()
Expand Down
1 change: 1 addition & 0 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1538,6 +1538,7 @@ mod tests {
vec![row_group_meta],
None,
Some(vec![offset_index.clone()]),
None,
);

let metadata = Arc::new(metadata);
Expand Down
8 changes: 8 additions & 0 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct DataPageValues<T> {
pub encoding: Encoding,
pub min_value: Option<T>,
pub max_value: Option<T>,
pub variable_length_bytes: Option<i64>,
}

/// A generic encoder of [`ColumnValues`] to data and dictionary pages used by
Expand Down Expand Up @@ -131,6 +132,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
min_value: Option<T::T>,
max_value: Option<T::T>,
bloom_filter: Option<Sbbf>,
variable_length_bytes: Option<i64>,
}

impl<T: DataType> ColumnValueEncoderImpl<T> {
Expand All @@ -150,6 +152,10 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {
update_min(&self.descr, &min, &mut self.min_value);
update_max(&self.descr, &max, &mut self.max_value);
}

if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
*self.variable_length_bytes.get_or_insert(0) += var_bytes;
}
}

// encode the values into bloom filter if enabled
Expand Down Expand Up @@ -203,6 +209,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
bloom_filter,
min_value: None,
max_value: None,
variable_length_bytes: None,
})
}

Expand Down Expand Up @@ -296,6 +303,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
num_values: std::mem::take(&mut self.num_values),
min_value: self.min_value.take(),
max_value: self.max_value.take(),
variable_length_bytes: self.variable_length_bytes.take(),
})
}
}
Expand Down
54 changes: 37 additions & 17 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ struct PageMetrics {
}

// Metrics per column writer
struct ColumnMetrics<T> {
#[derive(Default)]
struct ColumnMetrics<T: Default> {
total_bytes_written: u64,
total_rows_written: u64,
total_uncompressed_size: u64,
Expand All @@ -204,6 +205,20 @@ struct ColumnMetrics<T> {
max_column_value: Option<T>,
num_column_nulls: u64,
column_distinct_count: Option<u64>,
variable_length_bytes: Option<i64>,
}

impl<T: Default> ColumnMetrics<T> {
fn new() -> Self {
Default::default()
}

/// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
if let Some(var_bytes) = variable_length_bytes {
*self.variable_length_bytes.get_or_insert(0) += var_bytes;
}
}
}

/// Typed column writer for a primitive column.
Expand Down Expand Up @@ -276,19 +291,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
num_buffered_rows: 0,
num_page_nulls: 0,
},
column_metrics: ColumnMetrics {
total_bytes_written: 0,
total_rows_written: 0,
total_uncompressed_size: 0,
total_compressed_size: 0,
total_num_values: 0,
dictionary_page_offset: None,
data_page_offset: None,
min_column_value: None,
max_column_value: None,
num_column_nulls: 0,
column_distinct_count: None,
},
column_metrics: ColumnMetrics::<E::T>::new(),
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
Expand Down Expand Up @@ -634,7 +637,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}

/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: Option<&ValueStatistics<E::T>>) {
fn update_column_offset_index(
&mut self,
page_statistics: Option<&ValueStatistics<E::T>>,
page_variable_length_bytes: Option<i64>,
) {
// update the column index
let null_page =
(self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
Expand Down Expand Up @@ -708,6 +715,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// update the offset index
self.offset_index_builder
.append_row_count(self.page_metrics.num_buffered_rows as i64);

self.offset_index_builder
.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
}

/// Determine if we should allow truncating min/max values for this column's statistics
Expand Down Expand Up @@ -783,7 +793,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
};

// update column and offset index
self.update_column_offset_index(page_statistics.as_ref());
self.update_column_offset_index(
page_statistics.as_ref(),
values_data.variable_length_bytes,
);

// Update variable_length_bytes in column_metrics
self.column_metrics
.update_variable_length_bytes(values_data.variable_length_bytes);

let page_statistics = page_statistics.map(Statistics::from);

let compressed_page = match self.props.writer_version() {
Expand Down Expand Up @@ -993,7 +1011,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
stats => stats,
};

builder = builder.set_statistics(statistics);
builder = builder
.set_statistics(statistics)
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes);
}

let metadata = builder.build()?;
Expand Down
11 changes: 11 additions & 0 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,13 @@ pub(crate) mod private {
(std::mem::size_of::<Self>(), 1)
}

/// Return the number of variable length bytes in a given slice of data
///
/// Returns the sum of lengths for BYTE_ARRAY data, and None for all other data types
fn variable_length_bytes(_: &[Self]) -> Option<i64> {
None
}

/// Return the value as i64 if possible
///
/// This is essentially the same as `std::convert::TryInto<i64>` but can't be
Expand Down Expand Up @@ -956,6 +963,10 @@ pub(crate) mod private {
Ok(num_values)
}

fn variable_length_bytes(values: &[Self]) -> Option<i64> {
Some(values.iter().map(|x| x.len() as i64).sum())
}

fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
let data = decoder
.data
Expand Down
1 change: 1 addition & 0 deletions parquet/src/file/metadata/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl HeapSize for ColumnChunkMetaData {
+ self.compression.heap_size()
+ self.statistics.heap_size()
+ self.encoding_stats.heap_size()
+ self.unencoded_byte_array_data_bytes.heap_size()
}
}

Expand Down
Loading

0 comments on commit e40b311

Please sign in to comment.