Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unencoded_byte_array_data_bytes to ParquetMetaData #6068

Merged
merged 19 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
393aea1
update to latest thrift (as of 11 Jul 2024) from parquet-format
etseidl Jul 11, 2024
1c12fb8
pass None for optional size statistics
etseidl Jul 11, 2024
53cd5fa
escape HTML tags
etseidl Jul 11, 2024
98025cc
don't need to escape brackets in arrays
etseidl Jul 11, 2024
3e52326
Merge branch 'apache:master' into update_parquet_thrift
etseidl Jul 13, 2024
6c7ca66
add support for unencoded_byte_array_data_bytes
etseidl Jul 13, 2024
a7a10aa
add comments
etseidl Jul 13, 2024
d4ccdb6
change sig of ColumnMetrics::update_variable_length_bytes()
etseidl Jul 15, 2024
affe64a
rename ParquetOffsetIndex to OffsetSizeIndex
etseidl Jul 15, 2024
2ae35a6
Merge remote-tracking branch 'origin/master' into size_stats_oidx_lite
etseidl Jul 15, 2024
5abc5f2
rename some functions
etseidl Jul 16, 2024
26b2a06
Merge remote-tracking branch 'origin/53.0.0-dev' into size_stats_oidx…
etseidl Jul 16, 2024
315ea48
Merge remote-tracking branch 'origin/53.0.0-dev' into size_stats_oidx…
etseidl Jul 16, 2024
37c1e06
Merge remote-tracking branch 'origin/53.0.0-dev' into size_stats_oidx…
etseidl Jul 16, 2024
04f1a09
suggestion from review
etseidl Jul 17, 2024
44d1a97
add Default trait to ColumnMetrics as suggested in review
etseidl Jul 17, 2024
c53a4d7
Merge remote-tracking branch 'origin/53.0.0-dev' into size_stats_oidx…
etseidl Jul 17, 2024
d23f291
rename OffsetSizeIndex to OffsetIndexMetaData
etseidl Jul 19, 2024
8a5f051
Merge remote-tracking branch 'origin/53.0.0-dev' into size_stats_oidx…
etseidl Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ macro_rules! downcast_op {
struct FallbackEncoder {
encoder: FallbackEncoderImpl,
num_values: usize,
variable_length_bytes: i64,
}

/// The fallback encoder in use
Expand Down Expand Up @@ -152,6 +153,7 @@ impl FallbackEncoder {
Ok(Self {
encoder,
num_values: 0,
variable_length_bytes: 0,
})
}

Expand All @@ -168,7 +170,8 @@ impl FallbackEncoder {
let value = values.value(*idx);
let value = value.as_ref();
buffer.extend_from_slice((value.len() as u32).as_bytes());
buffer.extend_from_slice(value)
buffer.extend_from_slice(value);
self.variable_length_bytes += value.len() as i64;
}
}
FallbackEncoderImpl::DeltaLength { buffer, lengths } => {
Expand All @@ -177,6 +180,7 @@ impl FallbackEncoder {
let value = value.as_ref();
lengths.put(&[value.len() as i32]).unwrap();
buffer.extend_from_slice(value);
self.variable_length_bytes += value.len() as i64;
}
}
FallbackEncoderImpl::Delta {
Expand Down Expand Up @@ -205,6 +209,7 @@ impl FallbackEncoder {
buffer.extend_from_slice(&value[prefix_length..]);
prefix_lengths.put(&[prefix_length as i32]).unwrap();
suffix_lengths.put(&[suffix_length as i32]).unwrap();
self.variable_length_bytes += value.len() as i64;
}
}
}
Expand Down Expand Up @@ -269,12 +274,17 @@ impl FallbackEncoder {
}
};

// Capture value of variable_length_bytes and reset for next page
let variable_length_bytes = Some(self.variable_length_bytes);
self.variable_length_bytes = 0;

Ok(DataPageValues {
buf: buf.into(),
num_values: std::mem::take(&mut self.num_values),
encoding,
min_value,
max_value,
variable_length_bytes,
})
}
}
Expand Down Expand Up @@ -321,6 +331,7 @@ impl Storage for ByteArrayStorage {
struct DictEncoder {
interner: Interner<ByteArrayStorage>,
indices: Vec<u64>,
variable_length_bytes: i64,
}

impl DictEncoder {
Expand All @@ -336,6 +347,7 @@ impl DictEncoder {
let value = values.value(*idx);
let interned = self.interner.intern(value.as_ref());
self.indices.push(interned);
self.variable_length_bytes += value.as_ref().len() as i64;
}
}

Expand Down Expand Up @@ -384,12 +396,17 @@ impl DictEncoder {

self.indices.clear();

// Capture value of variable_length_bytes and reset for next page
let variable_length_bytes = Some(self.variable_length_bytes);
self.variable_length_bytes = 0;

DataPageValues {
buf: encoder.consume().into(),
num_values,
encoding: Encoding::RLE_DICTIONARY,
min_value,
max_value,
variable_length_bytes,
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index};
use crate::file::page_index::index_reader::{
acc_range, decode_column_index, decode_page_locations,
};
use bytes::Bytes;
use futures::future::BoxFuture;
use futures::FutureExt;
Expand Down Expand Up @@ -177,7 +179,9 @@ impl<F: MetadataFetch> MetadataLoader<F> {
x.columns()
.iter()
.map(|c| match c.offset_index_range() {
Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]),
Some(r) => {
decode_page_locations(&data[r.start - offset..r.end - offset])
}
None => Err(general_err!("missing offset index")),
})
.collect::<Result<Vec<_>>>()
Expand Down
1 change: 1 addition & 0 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1538,6 +1538,7 @@ mod tests {
vec![row_group_meta],
None,
Some(vec![offset_index.clone()]),
None,
);

let metadata = Arc::new(metadata);
Expand Down
8 changes: 8 additions & 0 deletions parquet/src/column/writer/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ pub struct DataPageValues<T> {
pub encoding: Encoding,
pub min_value: Option<T>,
pub max_value: Option<T>,
pub variable_length_bytes: Option<i64>,
}

/// A generic encoder of [`ColumnValues`] to data and dictionary pages used by
Expand Down Expand Up @@ -131,6 +132,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
min_value: Option<T::T>,
max_value: Option<T::T>,
bloom_filter: Option<Sbbf>,
variable_length_bytes: Option<i64>,
}

impl<T: DataType> ColumnValueEncoderImpl<T> {
Expand All @@ -150,6 +152,10 @@ impl<T: DataType> ColumnValueEncoderImpl<T> {
update_min(&self.descr, &min, &mut self.min_value);
update_max(&self.descr, &max, &mut self.max_value);
}

if let Some(var_bytes) = T::T::variable_length_bytes(slice) {
*self.variable_length_bytes.get_or_insert(0) += var_bytes;
}
}

// encode the values into bloom filter if enabled
Expand Down Expand Up @@ -203,6 +209,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
bloom_filter,
min_value: None,
max_value: None,
variable_length_bytes: None,
})
}

Expand Down Expand Up @@ -296,6 +303,7 @@ impl<T: DataType> ColumnValueEncoder for ColumnValueEncoderImpl<T> {
num_values: std::mem::take(&mut self.num_values),
min_value: self.min_value.take(),
max_value: self.max_value.take(),
variable_length_bytes: self.variable_length_bytes.take(),
})
}
}
Expand Down
64 changes: 48 additions & 16 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,33 @@ struct ColumnMetrics<T> {
max_column_value: Option<T>,
num_column_nulls: u64,
column_distinct_count: Option<u64>,
variable_length_bytes: Option<i64>,
}

impl<T> ColumnMetrics<T> {
fn new() -> Self {
alamb marked this conversation as resolved.
Show resolved Hide resolved
ColumnMetrics {
total_bytes_written: 0,
total_rows_written: 0,
total_uncompressed_size: 0,
total_compressed_size: 0,
total_num_values: 0,
dictionary_page_offset: None,
data_page_offset: None,
min_column_value: None,
max_column_value: None,
num_column_nulls: 0,
column_distinct_count: None,
variable_length_bytes: None,
}
}

/// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
if let Some(var_bytes) = variable_length_bytes {
*self.variable_length_bytes.get_or_insert(0) += var_bytes;
}
}
}

/// Typed column writer for a primitive column.
Expand Down Expand Up @@ -276,19 +303,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
num_buffered_rows: 0,
num_page_nulls: 0,
},
column_metrics: ColumnMetrics {
total_bytes_written: 0,
total_rows_written: 0,
total_uncompressed_size: 0,
total_compressed_size: 0,
total_num_values: 0,
dictionary_page_offset: None,
data_page_offset: None,
min_column_value: None,
max_column_value: None,
num_column_nulls: 0,
column_distinct_count: None,
},
column_metrics: ColumnMetrics::<E::T>::new(),
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
Expand Down Expand Up @@ -634,7 +649,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}

/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: Option<&ValueStatistics<E::T>>) {
fn update_column_offset_index(
&mut self,
page_statistics: Option<&ValueStatistics<E::T>>,
page_variable_length_bytes: Option<i64>,
) {
// update the column index
let null_page =
(self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
Expand Down Expand Up @@ -708,6 +727,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// update the offset index
self.offset_index_builder
.append_row_count(self.page_metrics.num_buffered_rows as i64);

self.offset_index_builder
.append_unencoded_byte_array_data_bytes(page_variable_length_bytes);
}

/// Determine if we should allow truncating min/max values for this column's statistics
Expand Down Expand Up @@ -783,7 +805,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
};

// update column and offset index
self.update_column_offset_index(page_statistics.as_ref());
self.update_column_offset_index(
page_statistics.as_ref(),
values_data.variable_length_bytes,
);

// Update variable_length_bytes in column_metrics
self.column_metrics
.update_variable_length_bytes(values_data.variable_length_bytes);

let page_statistics = page_statistics.map(Statistics::from);

let compressed_page = match self.props.writer_version() {
Expand Down Expand Up @@ -993,7 +1023,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
stats => stats,
};

builder = builder.set_statistics(statistics);
builder = builder
.set_statistics(statistics)
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes);
}

let metadata = builder.build()?;
Expand Down
11 changes: 11 additions & 0 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,13 @@ pub(crate) mod private {
(std::mem::size_of::<Self>(), 1)
}

/// Return the number of variable length bytes in a given slice of data
///
/// Returns the sum of lengths for BYTE_ARRAY data, and None for all other data types
fn variable_length_bytes(_: &[Self]) -> Option<i64> {
None
}

/// Return the value as i64 if possible
///
/// This is essentially the same as `std::convert::TryInto<i64>` but can't be
Expand Down Expand Up @@ -956,6 +963,10 @@ pub(crate) mod private {
Ok(num_values)
}

fn variable_length_bytes(values: &[Self]) -> Option<i64> {
Some(values.iter().map(|x| x.len() as i64).sum())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked and this is a &[ByteArray] so it is summing up the lengths of the buffers, not the lengths of the individual strings 👍

}

fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result<usize> {
let data = decoder
.data
Expand Down
1 change: 1 addition & 0 deletions parquet/src/file/metadata/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ impl HeapSize for ColumnChunkMetaData {
+ self.compression.heap_size()
+ self.statistics.heap_size()
+ self.encoding_stats.heap_size()
+ self.unencoded_byte_array_data_bytes.heap_size()
}
}

Expand Down
Loading
Loading