diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index fc37ebfb4510..2d23ad8510f9 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -96,6 +96,7 @@ macro_rules! downcast_op { struct FallbackEncoder { encoder: FallbackEncoderImpl, num_values: usize, + variable_length_bytes: i64, } /// The fallback encoder in use @@ -152,6 +153,7 @@ impl FallbackEncoder { Ok(Self { encoder, num_values: 0, + variable_length_bytes: 0, }) } @@ -168,7 +170,8 @@ impl FallbackEncoder { let value = values.value(*idx); let value = value.as_ref(); buffer.extend_from_slice((value.len() as u32).as_bytes()); - buffer.extend_from_slice(value) + buffer.extend_from_slice(value); + self.variable_length_bytes += value.len() as i64; } } FallbackEncoderImpl::DeltaLength { buffer, lengths } => { @@ -177,6 +180,7 @@ impl FallbackEncoder { let value = value.as_ref(); lengths.put(&[value.len() as i32]).unwrap(); buffer.extend_from_slice(value); + self.variable_length_bytes += value.len() as i64; } } FallbackEncoderImpl::Delta { @@ -205,6 +209,7 @@ impl FallbackEncoder { buffer.extend_from_slice(&value[prefix_length..]); prefix_lengths.put(&[prefix_length as i32]).unwrap(); suffix_lengths.put(&[suffix_length as i32]).unwrap(); + self.variable_length_bytes += value.len() as i64; } } } @@ -269,12 +274,17 @@ impl FallbackEncoder { } }; + // Capture value of variable_length_bytes and reset for next page + let variable_length_bytes = Some(self.variable_length_bytes); + self.variable_length_bytes = 0; + Ok(DataPageValues { buf: buf.into(), num_values: std::mem::take(&mut self.num_values), encoding, min_value, max_value, + variable_length_bytes, }) } } @@ -321,6 +331,7 @@ impl Storage for ByteArrayStorage { struct DictEncoder { interner: Interner, indices: Vec, + variable_length_bytes: i64, } impl DictEncoder { @@ -336,6 +347,7 @@ impl DictEncoder { let value = values.value(*idx); let interned = self.interner.intern(value.as_ref()); self.indices.push(interned); + self.variable_length_bytes += value.as_ref().len() as i64; } } @@ -384,12 +396,17 @@ impl DictEncoder { self.indices.clear(); + // Capture value of variable_length_bytes and reset for next page + let variable_length_bytes = Some(self.variable_length_bytes); + self.variable_length_bytes = 0; + DataPageValues { buf: encoder.consume().into(), num_values, encoding: Encoding::RLE_DICTIONARY, min_value, max_value, + variable_length_bytes, } } } diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index 9224ea3f68a8..4a3489a2084d 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -20,7 +20,9 @@ use crate::errors::{ParquetError, Result}; use crate::file::footer::{decode_footer, decode_metadata}; use crate::file::metadata::ParquetMetaData; use crate::file::page_index::index::Index; -use crate::file::page_index::index_reader::{acc_range, decode_column_index, decode_offset_index}; +use crate::file::page_index::index_reader::{ + acc_range, decode_column_index, decode_page_locations, +}; use bytes::Bytes; use futures::future::BoxFuture; use futures::FutureExt; @@ -177,7 +179,9 @@ impl MetadataLoader { x.columns() .iter() .map(|c| match c.offset_index_range() { - Some(r) => decode_offset_index(&data[r.start - offset..r.end - offset]), + Some(r) => { + decode_page_locations(&data[r.start - offset..r.end - offset]) + } None => Err(general_err!("missing offset index")), }) .collect::>>() diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index e4205b7ef2ce..5a790fa6aff0 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -1538,6 +1538,7 @@ mod tests { vec![row_group_meta], None, Some(vec![offset_index.clone()]), + None, ); let metadata = Arc::new(metadata); diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index b6c8212608b8..9d01c09040de 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -63,6 +63,7 @@ pub struct DataPageValues { pub encoding: Encoding, pub min_value: Option, pub max_value: Option, + pub variable_length_bytes: Option, } /// A generic encoder of [`ColumnValues`] to data and dictionary pages used by @@ -131,6 +132,7 @@ pub struct ColumnValueEncoderImpl { min_value: Option, max_value: Option, bloom_filter: Option, + variable_length_bytes: Option, } impl ColumnValueEncoderImpl { @@ -150,6 +152,10 @@ impl ColumnValueEncoderImpl { update_min(&self.descr, &min, &mut self.min_value); update_max(&self.descr, &max, &mut self.max_value); } + + if let Some(var_bytes) = T::T::variable_length_bytes(slice) { + *self.variable_length_bytes.get_or_insert(0) += var_bytes; + } } // encode the values into bloom filter if enabled @@ -203,6 +209,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { bloom_filter, min_value: None, max_value: None, + variable_length_bytes: None, }) } @@ -296,6 +303,7 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { num_values: std::mem::take(&mut self.num_values), min_value: self.min_value.take(), max_value: self.max_value.take(), + variable_length_bytes: self.variable_length_bytes.take(), }) } } diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 8594e59714dc..0e227a157d06 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -192,7 +192,8 @@ struct PageMetrics { } // Metrics per column writer -struct ColumnMetrics { +#[derive(Default)] +struct ColumnMetrics { total_bytes_written: u64, total_rows_written: u64, total_uncompressed_size: u64, @@ -204,6 +205,20 @@ struct ColumnMetrics { max_column_value: Option, num_column_nulls: u64, column_distinct_count: Option, + variable_length_bytes: Option, +} + +impl ColumnMetrics { + fn new() -> Self { + Default::default() + } + + /// Sum the provided page variable_length_bytes into the chunk variable_length_bytes + fn update_variable_length_bytes(&mut self, variable_length_bytes: Option) { + if let Some(var_bytes) = variable_length_bytes { + *self.variable_length_bytes.get_or_insert(0) += var_bytes; + } + } } /// Typed column writer for a primitive column. @@ -276,19 +291,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { num_buffered_rows: 0, num_page_nulls: 0, }, - column_metrics: ColumnMetrics { - total_bytes_written: 0, - total_rows_written: 0, - total_uncompressed_size: 0, - total_compressed_size: 0, - total_num_values: 0, - dictionary_page_offset: None, - data_page_offset: None, - min_column_value: None, - max_column_value: None, - num_column_nulls: 0, - column_distinct_count: None, - }, + column_metrics: ColumnMetrics::::new(), column_index_builder: ColumnIndexBuilder::new(), offset_index_builder: OffsetIndexBuilder::new(), encodings, @@ -634,7 +637,11 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } /// Update the column index and offset index when adding the data page - fn update_column_offset_index(&mut self, page_statistics: Option<&ValueStatistics>) { + fn update_column_offset_index( + &mut self, + page_statistics: Option<&ValueStatistics>, + page_variable_length_bytes: Option, + ) { // update the column index let null_page = (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls; @@ -708,6 +715,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // update the offset index self.offset_index_builder .append_row_count(self.page_metrics.num_buffered_rows as i64); + + self.offset_index_builder + .append_unencoded_byte_array_data_bytes(page_variable_length_bytes); } /// Determine if we should allow truncating min/max values for this column's statistics @@ -783,7 +793,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { }; // update column and offset index - self.update_column_offset_index(page_statistics.as_ref()); + self.update_column_offset_index( + page_statistics.as_ref(), + values_data.variable_length_bytes, + ); + + // Update variable_length_bytes in column_metrics + self.column_metrics + .update_variable_length_bytes(values_data.variable_length_bytes); + let page_statistics = page_statistics.map(Statistics::from); let compressed_page = match self.props.writer_version() { @@ -993,7 +1011,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { stats => stats, }; - builder = builder.set_statistics(statistics); + builder = builder + .set_statistics(statistics) + .set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes); } let metadata = builder.build()?; diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index b85a75cfd410..01e92115c45b 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -644,6 +644,13 @@ pub(crate) mod private { (std::mem::size_of::(), 1) } + /// Return the number of variable length bytes in a given slice of data + /// + /// Returns the sum of lengths for BYTE_ARRAY data, and None for all other data types + fn variable_length_bytes(_: &[Self]) -> Option { + None + } + /// Return the value as i64 if possible /// /// This is essentially the same as `std::convert::TryInto` but can't be @@ -956,6 +963,10 @@ pub(crate) mod private { Ok(num_values) } + fn variable_length_bytes(values: &[Self]) -> Option { + Some(values.iter().map(|x| x.len() as i64).sum()) + } + fn skip(decoder: &mut PlainDecoderDetails, num_values: usize) -> Result { let data = decoder .data diff --git a/parquet/src/file/metadata/memory.rs b/parquet/src/file/metadata/memory.rs index 57b2f7eec0c2..a1b40a8de95a 100644 --- a/parquet/src/file/metadata/memory.rs +++ b/parquet/src/file/metadata/memory.rs @@ -97,6 +97,7 @@ impl HeapSize for ColumnChunkMetaData { + self.compression.heap_size() + self.statistics.heap_size() + self.encoding_stats.heap_size() + + self.unencoded_byte_array_data_bytes.heap_size() } } diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs index 39b6de41fa58..d86b1ce572fb 100644 --- a/parquet/src/file/metadata/mod.rs +++ b/parquet/src/file/metadata/mod.rs @@ -36,7 +36,7 @@ use std::sync::Arc; use crate::format::{ BoundaryOrder, ColumnChunk, ColumnIndex, ColumnMetaData, OffsetIndex, PageLocation, RowGroup, - SortingColumn, + SizeStatistics, SortingColumn, }; use crate::basic::{ColumnOrder, Compression, Encoding, Type}; @@ -96,6 +96,8 @@ pub struct ParquetMetaData { column_index: Option, /// Offset index for all each page in each column chunk offset_index: Option, + /// `unencoded_byte_array_data_bytes` from the offset index + unencoded_byte_array_data_bytes: Option>>>>, } impl ParquetMetaData { @@ -107,6 +109,7 @@ impl ParquetMetaData { row_groups, column_index: None, offset_index: None, + unencoded_byte_array_data_bytes: None, } } @@ -117,12 +120,14 @@ impl ParquetMetaData { row_groups: Vec, column_index: Option, offset_index: Option, + unencoded_byte_array_data_bytes: Option>>>>, ) -> Self { ParquetMetaData { file_metadata, row_groups, column_index, offset_index, + unencoded_byte_array_data_bytes, } } @@ -179,6 +184,19 @@ impl ParquetMetaData { self.offset_index.as_ref() } + /// Returns `unencoded_byte_array_data_bytes` from the offset indexes in this file, if loaded + /// + /// This value represents the output size of the total bytes in this file, which can be useful for + /// allocating an appropriately sized output buffer. + /// + /// Returns `None` if the parquet file does not have a `OffsetIndex` or + /// [ArrowReaderOptions::with_page_index] was set to false. + /// + /// [ArrowReaderOptions::with_page_index]: https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderOptions.html#method.with_page_index + pub fn unencoded_byte_array_data_bytes(&self) -> Option<&Vec>>>> { + self.unencoded_byte_array_data_bytes.as_ref() + } + /// Estimate of the bytes allocated to store `ParquetMetadata` /// /// # Notes: @@ -199,6 +217,7 @@ impl ParquetMetaData { + self.row_groups.heap_size() + self.column_index.heap_size() + self.offset_index.heap_size() + + self.unencoded_byte_array_data_bytes.heap_size() } /// Override the column index @@ -543,6 +562,7 @@ pub struct ColumnChunkMetaData { offset_index_length: Option, column_index_offset: Option, column_index_length: Option, + unencoded_byte_array_data_bytes: Option, } /// Represents common operations for a column chunk. @@ -695,6 +715,14 @@ impl ColumnChunkMetaData { Some(offset..(offset + length)) } + /// Returns the number of bytes of variable length data after decoding. + /// + /// Only set for BYTE_ARRAY columns. This field may not be set by older + /// writers. + pub fn unencoded_byte_array_data_bytes(&self) -> Option { + self.unencoded_byte_array_data_bytes + } + /// Method to convert from Thrift. pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result { if cc.meta_data.is_none() { @@ -732,6 +760,12 @@ impl ColumnChunkMetaData { let offset_index_length = cc.offset_index_length; let column_index_offset = cc.column_index_offset; let column_index_length = cc.column_index_length; + let unencoded_byte_array_data_bytes = if let Some(size_stats) = col_metadata.size_statistics + { + size_stats.unencoded_byte_array_data_bytes + } else { + None + }; let result = ColumnChunkMetaData { column_descr, @@ -753,6 +787,7 @@ impl ColumnChunkMetaData { offset_index_length, column_index_offset, column_index_length, + unencoded_byte_array_data_bytes, }; Ok(result) } @@ -776,6 +811,16 @@ impl ColumnChunkMetaData { /// Method to convert to Thrift `ColumnMetaData` pub fn to_column_metadata_thrift(&self) -> ColumnMetaData { + let size_statistics = if self.unencoded_byte_array_data_bytes.is_some() { + Some(SizeStatistics { + unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes, + repetition_level_histogram: None, + definition_level_histogram: None, + }) + } else { + None + }; + ColumnMetaData { type_: self.column_type().into(), encodings: self.encodings().iter().map(|&v| v.into()).collect(), @@ -795,7 +840,7 @@ impl ColumnChunkMetaData { .map(|vec| vec.iter().map(page_encoding_stats::to_thrift).collect()), bloom_filter_offset: self.bloom_filter_offset, bloom_filter_length: self.bloom_filter_length, - size_statistics: None, + size_statistics, } } @@ -831,6 +876,7 @@ impl ColumnChunkMetaDataBuilder { offset_index_length: None, column_index_offset: None, column_index_length: None, + unencoded_byte_array_data_bytes: None, }) } @@ -942,6 +988,12 @@ impl ColumnChunkMetaDataBuilder { self } + /// Sets optional length of variable length data in bytes. + pub fn set_unencoded_byte_array_data_bytes(mut self, value: Option) -> Self { + self.0.unencoded_byte_array_data_bytes = value; + self + } + /// Builds column chunk metadata. pub fn build(self) -> Result { Ok(self.0) @@ -1021,6 +1073,7 @@ pub struct OffsetIndexBuilder { offset_array: Vec, compressed_page_size_array: Vec, first_row_index_array: Vec, + unencoded_byte_array_data_bytes_array: Option>, current_first_row_index: i64, } @@ -1036,6 +1089,7 @@ impl OffsetIndexBuilder { offset_array: Vec::new(), compressed_page_size_array: Vec::new(), first_row_index_array: Vec::new(), + unencoded_byte_array_data_bytes_array: None, current_first_row_index: 0, } } @@ -1051,6 +1105,17 @@ impl OffsetIndexBuilder { self.compressed_page_size_array.push(compressed_page_size); } + pub fn append_unencoded_byte_array_data_bytes( + &mut self, + unencoded_byte_array_data_bytes: Option, + ) { + if let Some(val) = unencoded_byte_array_data_bytes { + self.unencoded_byte_array_data_bytes_array + .get_or_insert(Vec::new()) + .push(val); + } + } + /// Build and get the thrift metadata of offset index pub fn build_to_thrift(self) -> OffsetIndex { let locations = self @@ -1060,7 +1125,7 @@ impl OffsetIndexBuilder { .zip(self.first_row_index_array.iter()) .map(|((offset, size), row_index)| PageLocation::new(*offset, *size, *row_index)) .collect::>(); - OffsetIndex::new(locations, None) + OffsetIndex::new(locations, self.unencoded_byte_array_data_bytes_array) } } @@ -1211,6 +1276,7 @@ mod tests { .set_offset_index_length(Some(25)) .set_column_index_offset(Some(8000)) .set_column_index_length(Some(25)) + .set_unencoded_byte_array_data_bytes(Some(2000)) .build() .unwrap(); @@ -1298,7 +1364,7 @@ mod tests { column_orders, ); let parquet_meta = ParquetMetaData::new(file_metadata.clone(), row_group_meta.clone()); - let base_expected_size = 1320; + let base_expected_size = 1376; assert_eq!(parquet_meta.memory_size(), base_expected_size); let mut column_index = ColumnIndexBuilder::new(); @@ -1315,9 +1381,10 @@ mod tests { vec![PageLocation::new(1, 2, 3)], vec![PageLocation::new(1, 2, 3)], ]]), + Some(vec![vec![Some(vec![10, 20, 30])]]), ); - let bigger_expected_size = 2304; + let bigger_expected_size = 2464; // more set fields means more memory usage assert!(bigger_expected_size > base_expected_size); assert_eq!(parquet_meta.memory_size(), bigger_expected_size); diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs index 2ddf826fb022..7959cb95c052 100644 --- a/parquet/src/file/page_index/index_reader.rs +++ b/parquet/src/file/page_index/index_reader.rs @@ -22,6 +22,7 @@ use crate::data_type::Int96; use crate::errors::ParquetError; use crate::file::metadata::ColumnChunkMetaData; use crate::file::page_index::index::{Index, NativeIndex}; +use crate::file::page_index::offset_index::OffsetIndexMetaData; use crate::file::reader::ChunkReader; use crate::format::{ColumnIndex, OffsetIndex, PageLocation}; use crate::thrift::{TCompactSliceInputProtocol, TSerializable}; @@ -45,9 +46,9 @@ pub(crate) fn acc_range(a: Option>, b: Option>) -> Opt /// Returns an empty vector if this row group does not contain a /// [`ColumnIndex`]. /// -/// See [Column Index Documentation] for more details. +/// See [Page Index Documentation] for more details. /// -/// [Column Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md pub fn read_columns_indexes( reader: &R, chunks: &[ColumnChunkMetaData], @@ -81,9 +82,9 @@ pub fn read_columns_indexes( /// Return an empty vector if this row group does not contain an /// [`OffsetIndex]`. /// -/// See [Column Index Documentation] for more details. +/// See [Page Index Documentation] for more details. /// -/// [Column Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md pub fn read_pages_locations( reader: &R, chunks: &[ColumnChunkMetaData], @@ -100,6 +101,42 @@ pub fn read_pages_locations( let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?; let get = |r: Range| &bytes[(r.start - fetch.start)..(r.end - fetch.start)]; + chunks + .iter() + .map(|c| match c.offset_index_range() { + Some(r) => decode_page_locations(get(r)), + None => Err(general_err!("missing offset index")), + }) + .collect() +} + +/// Reads per-column [`OffsetIndexMetaData`] for all columns of a row group by +/// decoding [`OffsetIndex`] . +/// +/// Returns a vector of `offset_index[column_number]`. +/// +/// Returns an empty vector if this row group does not contain an +/// [`OffsetIndex`]. +/// +/// See [Page Index Documentation] for more details. +/// +/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md +pub fn read_offset_indexes( + reader: &R, + chunks: &[ColumnChunkMetaData], +) -> Result, ParquetError> { + let fetch = chunks + .iter() + .fold(None, |range, c| acc_range(range, c.offset_index_range())); + + let fetch = match fetch { + Some(r) => r, + None => return Ok(vec![]), + }; + + let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?; + let get = |r: Range| &bytes[(r.start - fetch.start)..(r.end - fetch.start)]; + chunks .iter() .map(|c| match c.offset_index_range() { @@ -109,7 +146,13 @@ pub fn read_pages_locations( .collect() } -pub(crate) fn decode_offset_index(data: &[u8]) -> Result, ParquetError> { +pub(crate) fn decode_offset_index(data: &[u8]) -> Result { + let mut prot = TCompactSliceInputProtocol::new(data); + let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; + OffsetIndexMetaData::try_new(offset) +} + +pub(crate) fn decode_page_locations(data: &[u8]) -> Result, ParquetError> { let mut prot = TCompactSliceInputProtocol::new(data); let offset = OffsetIndex::read_from_in_protocol(&mut prot)?; Ok(offset.page_locations) diff --git a/parquet/src/file/page_index/mod.rs b/parquet/src/file/page_index/mod.rs index 9372645d76ee..a8077896db34 100644 --- a/parquet/src/file/page_index/mod.rs +++ b/parquet/src/file/page_index/mod.rs @@ -21,3 +21,4 @@ pub mod index; pub mod index_reader; +pub mod offset_index; diff --git a/parquet/src/file/page_index/offset_index.rs b/parquet/src/file/page_index/offset_index.rs new file mode 100644 index 000000000000..9138620e3fcd --- /dev/null +++ b/parquet/src/file/page_index/offset_index.rs @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`OffsetIndexMetaData`] structure holding decoded [`OffsetIndex`] information + +use crate::errors::ParquetError; +use crate::format::{OffsetIndex, PageLocation}; + +/// [`OffsetIndex`] information for a column chunk. Contains offsets and sizes for each page +/// in the chunk. Optionally stores fully decoded page sizes for BYTE_ARRAY columns. +#[derive(Debug, Clone, PartialEq)] +pub struct OffsetIndexMetaData { + pub page_locations: Vec, + pub unencoded_byte_array_data_bytes: Option>, +} + +impl OffsetIndexMetaData { + /// Creates a new [`OffsetIndexMetaData`] from an [`OffsetIndex`]. + pub(crate) fn try_new(index: OffsetIndex) -> Result { + Ok(Self { + page_locations: index.page_locations, + unencoded_byte_array_data_bytes: index.unencoded_byte_array_data_bytes, + }) + } + + /// Vector of [`PageLocation`] objects, one per page in the chunk. + pub fn page_locations(&self) -> &Vec { + &self.page_locations + } + + /// Optional vector of unencoded page sizes, one per page in the chunk. Only defined + /// for BYTE_ARRAY columns. + pub fn unencoded_byte_array_data_bytes(&self) -> Option<&Vec> { + self.unencoded_byte_array_data_bytes.as_ref() + } +} diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index ac7d2d287488..65b6ebf2ec98 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -211,12 +211,22 @@ impl SerializedFileReader { if options.enable_page_index { let mut columns_indexes = vec![]; let mut offset_indexes = vec![]; + let mut unenc_byte_sizes = vec![]; for rg in &mut filtered_row_groups { let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?; - let offset_index = index_reader::read_pages_locations(&chunk_reader, rg.columns())?; + let offset_index = index_reader::read_offset_indexes(&chunk_reader, rg.columns())?; + + // split offset_index into two vectors to not break API + let mut page_locations = vec![]; + let mut unenc_bytes = vec![]; + offset_index.into_iter().for_each(|index| { + page_locations.push(index.page_locations); + unenc_bytes.push(index.unencoded_byte_array_data_bytes); + }); columns_indexes.push(column_index); - offset_indexes.push(offset_index); + offset_indexes.push(page_locations); + unenc_byte_sizes.push(unenc_bytes); } Ok(Self { @@ -226,6 +236,7 @@ impl SerializedFileReader { filtered_row_groups, Some(columns_indexes), Some(offset_indexes), + Some(unenc_byte_sizes), )), props: Arc::new(options.props), }) diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index eb633f31c477..b109a2da8eb0 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -659,7 +659,8 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> { .set_total_uncompressed_size(metadata.uncompressed_size()) .set_num_values(metadata.num_values()) .set_data_page_offset(map_offset(src_data_offset)) - .set_dictionary_page_offset(src_dictionary_offset.map(map_offset)); + .set_dictionary_page_offset(src_dictionary_offset.map(map_offset)) + .set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes()); if let Some(statistics) = metadata.statistics() { builder = builder.set_statistics(statistics.clone()) @@ -827,7 +828,7 @@ mod tests { use crate::column::page::{Page, PageReader}; use crate::column::reader::get_typed_column_reader; use crate::compression::{create_codec, Codec, CodecOptionsBuilder}; - use crate::data_type::{BoolType, Int32Type}; + use crate::data_type::{BoolType, ByteArrayType, Int32Type}; use crate::file::page_index::index::Index; use crate::file::properties::EnabledStatistics; use crate::file::serialized_reader::ReadOptionsBuilder; @@ -840,6 +841,7 @@ mod tests { use crate::record::{Row, RowAccessor}; use crate::schema::parser::parse_message_type; use crate::schema::types::{ColumnDescriptor, ColumnPath}; + use crate::util::test_common::rand_gen::RandGen; #[test] fn test_row_group_writer_error_not_all_columns_written() { @@ -1851,4 +1853,83 @@ mod tests { let b_idx = &column_index[0][1]; assert!(matches!(b_idx, Index::NONE), "{b_idx:?}"); } + + #[test] + fn test_byte_array_size_statistics() { + let message_type = " + message test_schema { + OPTIONAL BYTE_ARRAY a (UTF8); + } + "; + let schema = Arc::new(parse_message_type(message_type).unwrap()); + let data = ByteArrayType::gen_vec(32, 7); + let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1]; + let unenc_size: i64 = data.iter().map(|x| x.len() as i64).sum(); + let file: File = tempfile::tempfile().unwrap(); + let props = Arc::new( + WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Page) + .build(), + ); + + let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap(); + let mut row_group_writer = writer.next_row_group().unwrap(); + + let mut col_writer = row_group_writer.next_column().unwrap().unwrap(); + col_writer + .typed::() + .write_batch(&data, Some(&def_levels), None) + .unwrap(); + col_writer.close().unwrap(); + row_group_writer.close().unwrap(); + let file_metadata = writer.close().unwrap(); + + assert_eq!(file_metadata.row_groups.len(), 1); + assert_eq!(file_metadata.row_groups[0].columns.len(), 1); + assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); + + assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some()); + let meta_data = file_metadata.row_groups[0].columns[0] + .meta_data + .as_ref() + .unwrap(); + assert!(meta_data.size_statistics.is_some()); + let size_stats = meta_data.size_statistics.as_ref().unwrap(); + + assert!(size_stats.repetition_level_histogram.is_none()); + assert!(size_stats.definition_level_histogram.is_none()); + assert!(size_stats.unencoded_byte_array_data_bytes.is_some()); + assert_eq!( + unenc_size, + size_stats.unencoded_byte_array_data_bytes.unwrap() + ); + + // check that the read metadata is also correct + let options = ReadOptionsBuilder::new().with_page_index().build(); + let reader = SerializedFileReader::new_with_options(file, options).unwrap(); + + let rfile_metadata = reader.metadata().file_metadata(); + assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows); + assert_eq!(reader.num_row_groups(), 1); + let rowgroup = reader.get_row_group(0).unwrap(); + assert_eq!(rowgroup.num_columns(), 1); + let column = rowgroup.metadata().column(0); + assert!(column.unencoded_byte_array_data_bytes().is_some()); + assert_eq!( + unenc_size, + column.unencoded_byte_array_data_bytes().unwrap() + ); + + assert!(reader + .metadata() + .unencoded_byte_array_data_bytes() + .is_some()); + let unenc_sizes = reader.metadata().unencoded_byte_array_data_bytes().unwrap(); + assert_eq!(unenc_sizes.len(), 1); + assert_eq!(unenc_sizes[0].len(), 1); + assert!(unenc_sizes[0][0].is_some()); + let page_sizes = unenc_sizes[0][0].as_ref().unwrap(); + assert_eq!(page_sizes.len(), 1); + assert_eq!(page_sizes[0], unenc_size); + } }