Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement LevelHistograms as a struct #1

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 19 additions & 44 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::encodings::levels::LevelEncoder;
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram, OffsetIndexBuilder};
use crate::file::properties::EnabledStatistics;
use crate::file::statistics::{Statistics, ValueStatistics};
use crate::file::{
Expand Down Expand Up @@ -183,25 +183,14 @@ pub struct ColumnCloseResult {
pub offset_index: Option<OffsetIndex>,
}

/// Creates a vector to hold level histogram data. Length will be `max_level + 1`.
/// Because histograms are not necessary when `max_level == 0`, this will return
/// `None` in that case.
fn new_histogram(max_level: i16) -> Option<Vec<i64>> {
if max_level > 0 {
Some(vec![0; max_level as usize + 1])
} else {
None
}
}

// Metrics per page
#[derive(Default)]
struct PageMetrics {
num_buffered_values: u32,
num_buffered_rows: u32,
num_page_nulls: u64,
repetition_level_histogram: Option<Vec<i64>>,
definition_level_histogram: Option<Vec<i64>>,
repetition_level_histogram: Option<LevelHistogram>,
definition_level_histogram: Option<LevelHistogram>,
}

impl PageMetrics {
Expand All @@ -211,50 +200,37 @@ impl PageMetrics {

/// Initialize the repetition level histogram
fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
self.repetition_level_histogram = new_histogram(max_level);
self.repetition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Initialize the definition level histogram
fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
self.definition_level_histogram = new_histogram(max_level);
self.definition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Sets all elements of `histogram` to 0
fn reset_histogram(histogram: &mut Option<Vec<i64>>) {
if let Some(ref mut hist) = histogram {
for v in hist {
*v = 0
}
}
}

/// Resets the state of this `PageMetrics` to the initial state.
/// If histograms have been initialized their contents will be reset to zero.
fn new_page(&mut self) {
self.num_buffered_values = 0;
self.num_buffered_rows = 0;
self.num_page_nulls = 0;
PageMetrics::reset_histogram(&mut self.repetition_level_histogram);
PageMetrics::reset_histogram(&mut self.definition_level_histogram);
self.repetition_level_histogram.as_mut().map(LevelHistogram::reset);
self.definition_level_histogram.as_mut().map(LevelHistogram::reset);
Comment on lines +219 to +220
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So much nicer than what I was doing 😮

    if let Some(hist) = self.repetition_level_histogram.as_mut() {
        hist.reset()
    }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your way is pretty nice too -- this is borderline too functional for my taste, but it seemed to be ok :)

}

/// Updates histogram values using provided repetition levels
fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
if let Some(ref mut rep_hist) = self.repetition_level_histogram {
for &level in levels {
rep_hist[level as usize] += 1;
}
rep_hist.update_from_levels(levels);
}
}

/// Updates histogram values using provided definition levels
fn update_definition_level_histogram(&mut self, levels: &[i16]) {
if let Some(ref mut def_hist) = self.definition_level_histogram {
for &level in levels {
def_hist[level as usize] += 1;
}
def_hist.update_from_levels(levels);
}
}
}
Expand All @@ -274,8 +250,8 @@ struct ColumnMetrics<T: Default> {
num_column_nulls: u64,
column_distinct_count: Option<u64>,
variable_length_bytes: Option<i64>,
repetition_level_histogram: Option<Vec<i64>>,
definition_level_histogram: Option<Vec<i64>>,
repetition_level_histogram: Option<LevelHistogram>,
definition_level_histogram: Option<LevelHistogram>,
}

impl<T: Default> ColumnMetrics<T> {
Expand All @@ -285,24 +261,23 @@ impl<T: Default> ColumnMetrics<T> {

/// Initialize the repetition level histogram
fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
self.repetition_level_histogram = new_histogram(max_level);
self.repetition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Initialize the definition level histogram
fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
self.definition_level_histogram = new_histogram(max_level);
self.definition_level_histogram = LevelHistogram::try_new(max_level);
self
}

/// Sum `page_histogram` into `chunk_histogram`
fn update_histogram(chunk_histogram: &mut Option<Vec<i64>>, page_histogram: &Option<Vec<i64>>) {
if page_histogram.is_some() && chunk_histogram.is_some() {
let chunk_hist = chunk_histogram.as_mut().unwrap();
let page_hist = page_histogram.as_ref().unwrap();
for i in 0..page_hist.len() {
chunk_hist[i] += page_hist[i]
}
fn update_histogram(
chunk_histogram: &mut Option<LevelHistogram>,
page_histogram: &Option<LevelHistogram>,
) {
if let (Some(page_hist), Some(chunk_hist)) = (page_histogram, chunk_histogram) {
chunk_hist.add(page_hist);
Comment on lines +279 to +280
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I tried this and it didn't work. Thanks. My approach was to have add take an Option<LevelHistogram>.

}
}

Expand Down
149 changes: 135 additions & 14 deletions parquet/src/file/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,8 +557,114 @@ pub struct ColumnChunkMetaData {
column_index_offset: Option<i64>,
column_index_length: Option<i32>,
unencoded_byte_array_data_bytes: Option<i64>,
repetition_level_histogram: Option<Vec<i64>>,
definition_level_histogram: Option<Vec<i64>>,
repetition_level_histogram: Option<LevelHistogram>,
definition_level_histogram: Option<LevelHistogram>,
}

/// Histograms for repetition and definition levels.
///
/// Each histogram is a vector of length `max_level + 1`. The value at index `i` is the number of
/// values at level `i`.
///
/// For example, `vec[0]` is the number of rows with level 0, `vec[1]` is the
/// number of rows with level 1, and so on.
///
#[derive(Debug, Clone, PartialEq)]
pub struct LevelHistogram {
inner: Vec<i64>,
}

impl LevelHistogram {
/// Creates a new level histogram data.
///
/// Length will be `max_level + 1`.
///
/// Returns `None` when `max_level == 0` (because histograms are not necessary in this case)
pub fn try_new(max_level: i16) -> Option<Self> {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I struggled with this name, as the consensus seemed to be that try_new returns a Result. I wound up opting for maybe_new, but I prefer this!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah maybe there is a better convention but I think this is pretty standard in my experience

if max_level > 0 {
Some(Self {
inner: vec![0; max_level as usize + 1],
})
} else {
None
}
}
/// Returns a reference to the the histogram's values.
pub fn values(&self) -> &[i64] {
&self.inner
}

/// Return the inner vector, consuming self
pub fn into_inner(self) -> Vec<i64> {
self.inner
}

/// Returns the histogram value at the given index.
///
/// The value of `i` is the number of values with level `i`. For example,
/// `get(1)` returns the number of values with level 1.
///
/// Returns `None` if the index is out of bounds.
pub fn get(&self, index: usize) -> Option<i64> {
self.inner.get(index).copied()
}

/// Adds the values from the other histogram to this histogram
///
/// # Panics
/// If the histograms have different lengths
pub fn add(&mut self, other: &Self) {
assert_eq!(self.len(), other.len());
for (dst, src) in self.inner.iter_mut().zip(other.inner.iter()) {
*dst += src;
}
}

/// return the length of the histogram
pub fn len(&self) -> usize {
self.inner.len()
}

/// returns if the histogram is empty
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}

/// Sets the values of all histogram levels to 0.
pub fn reset(&mut self) {
for value in self.inner.iter_mut() {
*value = 0;
}
}

/// Updates histogram values using provided repetition levels
///
/// # Panics
/// if any of the levels is greater than the length of the histogram (
/// the argument supplied to [`Self::try_new`])
pub fn update_from_levels(&mut self, levels: &[i16]) {
for &level in levels {
self.inner[level as usize] += 1;
}
}
}

impl From<Vec<i64>> for LevelHistogram {
fn from(inner: Vec<i64>) -> Self {
Self { inner }
}
}

impl From<LevelHistogram> for Vec<i64> {
fn from(value: LevelHistogram) -> Self {
value.into_inner()
}
}
Comment on lines +652 to +662
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So much nicer. Thank you!


impl HeapSize for LevelHistogram {
fn heap_size(&self) -> usize {
self.inner.heap_size()
}
}

/// Represents common operations for a column chunk.
Expand Down Expand Up @@ -724,7 +830,7 @@ impl ColumnChunkMetaData {
/// The returned value `vec[i]` is how many values are at repetition level `i`. For example,
/// `vec[0]` indicates how many rows the page contains.
/// This field may not be set by older writers.
pub fn repetition_level_histogram(&self) -> Option<&Vec<i64>> {
pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> {
self.repetition_level_histogram.as_ref()
}

Expand All @@ -733,7 +839,7 @@ impl ColumnChunkMetaData {
/// The returned value `vec[i]` is how many values are at definition level `i`. For example,
/// `vec[max_definition_level-1]` indicates how many non-null values are present in the page.
/// This field may not be set by older writers.
pub fn definition_level_histogram(&self) -> Option<&Vec<i64>> {
pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> {
self.definition_level_histogram.as_ref()
}

Expand Down Expand Up @@ -788,6 +894,9 @@ impl ColumnChunkMetaData {
(None, None, None)
};

let repetition_level_histogram = repetition_level_histogram.map(LevelHistogram::from);
let definition_level_histogram = definition_level_histogram.map(LevelHistogram::from);
Comment on lines +897 to +898
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️


let result = ColumnChunkMetaData {
column_descr,
encodings,
Expand Down Expand Up @@ -838,10 +947,20 @@ impl ColumnChunkMetaData {
|| self.repetition_level_histogram.is_some()
|| self.definition_level_histogram.is_some()
{
let repetition_level_histogram = self
.repetition_level_histogram
.as_ref()
.map(|hist| hist.clone().into_inner());

let definition_level_histogram = self
.definition_level_histogram
.as_ref()
.map(|hist| hist.clone().into_inner());

Some(SizeStatistics {
unencoded_byte_array_data_bytes: self.unencoded_byte_array_data_bytes,
repetition_level_histogram: self.repetition_level_histogram.clone(),
definition_level_histogram: self.definition_level_histogram.clone(),
repetition_level_histogram,
definition_level_histogram,
})
} else {
None
Expand Down Expand Up @@ -1023,13 +1142,13 @@ impl ColumnChunkMetaDataBuilder {
}

/// Sets optional repetition level histogram
pub fn set_repetition_level_histogram(mut self, value: Option<Vec<i64>>) -> Self {
pub fn set_repetition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
self.0.repetition_level_histogram = value;
self
}

/// Sets optional repetition level histogram
pub fn set_definition_level_histogram(mut self, value: Option<Vec<i64>>) -> Self {
pub fn set_definition_level_histogram(mut self, value: Option<LevelHistogram>) -> Self {
self.0.definition_level_histogram = value;
self
}
Expand All @@ -1049,7 +1168,9 @@ pub struct ColumnIndexBuilder {
max_values: Vec<Vec<u8>>,
null_counts: Vec<i64>,
boundary_order: BoundaryOrder,
/// contains the concatenation of the histograms of all pages
repetition_level_histograms: Option<Vec<i64>>,
/// contains the concatenation of the histograms of all pages
definition_level_histograms: Option<Vec<i64>>,
/// Is the information in the builder valid?
///
Expand Down Expand Up @@ -1099,21 +1220,21 @@ impl ColumnIndexBuilder {
/// Does nothing if the `ColumnIndexBuilder` is not in the `valid` state.
pub fn append_histograms(
&mut self,
repetition_level_histogram: &Option<Vec<i64>>,
definition_level_histogram: &Option<Vec<i64>>,
repetition_level_histogram: &Option<LevelHistogram>,
definition_level_histogram: &Option<LevelHistogram>,
) {
if !self.valid {
return;
}
if let Some(ref rep_lvl_hist) = repetition_level_histogram {
let hist = self.repetition_level_histograms.get_or_insert(Vec::new());
hist.reserve(rep_lvl_hist.len());
hist.extend(rep_lvl_hist);
hist.extend(rep_lvl_hist.values());
}
if let Some(ref def_lvl_hist) = definition_level_histogram {
let hist = self.definition_level_histograms.get_or_insert(Vec::new());
hist.reserve(def_lvl_hist.len());
hist.extend(def_lvl_hist);
hist.extend(def_lvl_hist.values());
}
}

Expand Down Expand Up @@ -1358,8 +1479,8 @@ mod tests {
.set_column_index_offset(Some(8000))
.set_column_index_length(Some(25))
.set_unencoded_byte_array_data_bytes(Some(2000))
.set_repetition_level_histogram(Some(vec![100, 100]))
.set_definition_level_histogram(Some(vec![0, 200]))
.set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100])))
.set_definition_level_histogram(Some(LevelHistogram::from(vec![0, 200])))
.build()
.unwrap();

Expand Down
Loading