Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for level histograms added in PARQUET-2261 to ParquetMetaData #6105

Merged
merged 27 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
741bbf6
bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041)
BugenZhao Jul 16, 2024
8f76248
Remove `impl<T: AsRef<[u8]>> From<T> for Buffer` that easily acciden…
XiangpengHao Jul 16, 2024
bb5f12b
Make display of interval types more pretty (#6006)
Rachelint Jul 16, 2024
756b1fb
Update snafu (#5930)
Jesse-Bakker Jul 16, 2024
fe04e09
Update Parquet thrift generated structures (#6045)
etseidl Jul 16, 2024
2e7f7ef
Revert "Revert "Write Bloom filters between row groups instead of the…
alamb Jul 16, 2024
effccc1
Revert "Update snafu (#5930)" (#6069)
alamb Jul 16, 2024
649d09d
Update pyo3 requirement from 0.21.1 to 0.22.1 (fixed) (#6075)
crepererum Jul 17, 2024
05e681d
remove repeated codes to make the codes more concise. (#6080)
Rachelint Jul 18, 2024
e40b311
Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068)
etseidl Jul 19, 2024
41b8f66
deprecate read_page_locations
etseidl Jul 19, 2024
c4c0a4d
add level histograms to metadata
etseidl Jul 19, 2024
1507c46
add to_thrift() to OffsetIndexMetaData
etseidl Jul 22, 2024
23cc77e
Merge branch 'deprecate_read_page_locations' into size_stats_histograms
etseidl Jul 22, 2024
81c34ac
Update pyo3 requirement from 0.21.1 to 0.22.2 (#6085)
dependabot[bot] Jul 23, 2024
3bc9987
Deprecate read_page_locations() and simplify offset index in `Parquet…
etseidl Jul 23, 2024
8369436
Merge remote-tracking branch 'origin/53.0.0-dev' into size_stats_hist…
etseidl Jul 23, 2024
bf4dadb
move valid test into ColumnIndexBuilder::append_histograms
etseidl Jul 25, 2024
4002464
move update_histogram() inside ColumnMetrics
etseidl Jul 25, 2024
095130f
Merge remote-tracking branch 'apache/master' into 53.0.0-dev
alamb Jul 25, 2024
a6353d1
Update parquet/src/column/writer/mod.rs
alamb Jul 25, 2024
f892613
Merge remote-tracking branch 'origin/53.0.0-dev' into size_stats_hist…
etseidl Jul 25, 2024
978264d
Implement LevelHistograms as a struct
alamb Jul 26, 2024
642c7d4
Merge remote-tracking branch 'origin/master' into size_stats_histograms
etseidl Jul 26, 2024
9582e0d
Merge pull request #1 from alamb/alamb/level_histogram_structs
etseidl Jul 26, 2024
70af5dd
formatting
etseidl Jul 26, 2024
e774bbe
fix error in docs
etseidl Jul 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 126 additions & 11 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::encodings::levels::LevelEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder};
use crate::file::properties::EnabledStatistics;
use crate::file::statistics::{Statistics, ValueStatistics};
use crate::file::{
Expand Down Expand Up @@ -189,6 +189,54 @@ struct PageMetrics {
num_buffered_values: u32,
num_buffered_rows: u32,
num_page_nulls: u64,
repetition_level_histogram: Option<LevelHistogram>,
definition_level_histogram: Option<LevelHistogram>,
}

impl PageMetrics {
fn new() -> Self {
Default::default()
}

/// Initialize the repetition level histogram
fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
self.repetition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Initialize the definition level histogram
fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
self.definition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Resets the state of this `PageMetrics` to the initial state.
/// If histograms have been initialized their contents will be reset to zero.
fn new_page(&mut self) {
self.num_buffered_values = 0;
self.num_buffered_rows = 0;
self.num_page_nulls = 0;
self.repetition_level_histogram
.as_mut()
.map(LevelHistogram::reset);
self.definition_level_histogram
.as_mut()
.map(LevelHistogram::reset);
}

/// Updates histogram values using provided repetition levels
fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
if let Some(ref mut rep_hist) = self.repetition_level_histogram {
rep_hist.update_from_levels(levels);
}
}

/// Updates histogram values using provided definition levels
fn update_definition_level_histogram(&mut self, levels: &[i16]) {
if let Some(ref mut def_hist) = self.definition_level_histogram {
def_hist.update_from_levels(levels);
}
}
}

// Metrics per column writer
Expand All @@ -206,13 +254,50 @@ struct ColumnMetrics<T: Default> {
num_column_nulls: u64,
column_distinct_count: Option<u64>,
variable_length_bytes: Option<i64>,
repetition_level_histogram: Option<LevelHistogram>,
definition_level_histogram: Option<LevelHistogram>,
}

impl<T: Default> ColumnMetrics<T> {
fn new() -> Self {
Default::default()
}

/// Initialize the repetition level histogram
fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
self.repetition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Initialize the definition level histogram
fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
self.definition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Sum `page_histogram` into `chunk_histogram`
fn update_histogram(
chunk_histogram: &mut Option<LevelHistogram>,
page_histogram: &Option<LevelHistogram>,
) {
if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
chunk_hist.add(page_hist);
}
}

/// Sum the provided PageMetrics histograms into the chunk histograms. Does nothing if
/// page histograms are not initialized.
fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
ColumnMetrics::<T>::update_histogram(
&mut self.definition_level_histogram,
&page_metrics.definition_level_histogram,
);
ColumnMetrics::<T>::update_histogram(
&mut self.repetition_level_histogram,
&page_metrics.repetition_level_histogram,
);
}

/// Sum the provided page variable_length_bytes into the chunk variable_length_bytes
fn update_variable_length_bytes(&mut self, variable_length_bytes: Option<i64>) {
if let Some(var_bytes) = variable_length_bytes {
Expand Down Expand Up @@ -275,6 +360,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// Used for level information
encodings.insert(Encoding::RLE);

let mut page_metrics = PageMetrics::new();
let mut column_metrics = ColumnMetrics::<E::T>::new();

// Initialize level histograms if collecting page or chunk statistics
if statistics_enabled != EnabledStatistics::None {
page_metrics = page_metrics
.with_repetition_level_histogram(descr.max_rep_level())
.with_definition_level_histogram(descr.max_def_level());
column_metrics = column_metrics
.with_repetition_level_histogram(descr.max_rep_level())
.with_definition_level_histogram(descr.max_def_level())
}

// Disable column_index_builder if not collecting page statistics.
let mut column_index_builder = ColumnIndexBuilder::new();
if statistics_enabled != EnabledStatistics::Page {
Expand All @@ -292,12 +390,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
def_levels_sink: vec![],
rep_levels_sink: vec![],
data_pages: VecDeque::new(),
page_metrics: PageMetrics {
num_buffered_values: 0,
num_buffered_rows: 0,
num_page_nulls: 0,
},
column_metrics: ColumnMetrics::<E::T>::new(),
page_metrics,
column_metrics,
column_index_builder,
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
Expand Down Expand Up @@ -547,6 +641,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}
}

// Update histogram
self.page_metrics.update_definition_level_histogram(levels);

self.def_levels_sink.extend_from_slice(levels);
values_to_write
} else {
Expand Down Expand Up @@ -575,6 +672,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.page_metrics.num_buffered_rows += (level == 0) as u32
}

// Update histogram
self.page_metrics.update_repetition_level_histogram(levels);

self.rep_levels_sink.extend_from_slice(levels);
} else {
// Each value is exactly one row.
Expand Down Expand Up @@ -718,7 +818,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}
}
}
// update the offset index

// Append page histograms to the `ColumnIndex` histograms
self.column_index_builder.append_histograms(
&self.page_metrics.repetition_level_histogram,
&self.page_metrics.definition_level_histogram,
);

// Update the offset index
self.offset_index_builder
.append_row_count(self.page_metrics.num_buffered_rows as i64);

Expand Down Expand Up @@ -804,7 +911,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
values_data.variable_length_bytes,
);

// Update variable_length_bytes in column_metrics
// Update histograms and variable_length_bytes in column_metrics
self.column_metrics
.update_from_page_metrics(&self.page_metrics);
self.column_metrics
.update_variable_length_bytes(values_data.variable_length_bytes);

Expand Down Expand Up @@ -911,7 +1020,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// Reset state.
self.rep_levels_sink.clear();
self.def_levels_sink.clear();
self.page_metrics = PageMetrics::default();
self.page_metrics.new_page();

Ok(())
}
Expand Down Expand Up @@ -1019,7 +1128,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

builder = builder
.set_statistics(statistics)
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes);
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
.set_repetition_level_histogram(
self.column_metrics.repetition_level_histogram.take(),
)
.set_definition_level_histogram(
self.column_metrics.definition_level_histogram.take(),
);
}

let metadata = builder.build()?;
Expand Down
2 changes: 2 additions & 0 deletions parquet/src/file/metadata/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ impl HeapSize for ColumnChunkMetaData {
+ self.statistics.heap_size()
+ self.encoding_stats.heap_size()
+ self.unencoded_byte_array_data_bytes.heap_size()
+ self.repetition_level_histogram.heap_size()
+ self.definition_level_histogram.heap_size()
}
}

Expand Down
Loading
Loading