Skip to content

Commit

Permalink
feat(rust,py): detail write_parquet statistics par
Browse files Browse the repository at this point in the history
Adds additional control over which statistics are written into Parquet files
through the `write_parquet` parameter `statistics`.

It is now possible to specify `"full"` to also attempt to add the
`distinct_count` statistic (currently only added for `Booleans`). It is also
possible to give a `dict[str, bool]` to specify individual statistics `min`,
`max`, `distinct_count` and `null_count`.

Fixes #16441
  • Loading branch information
coastalwhite committed May 29, 2024
1 parent 0eb8384 commit 8c38cfa
Show file tree
Hide file tree
Showing 27 changed files with 448 additions and 163 deletions.
2 changes: 1 addition & 1 deletion crates/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ json = [
"dtype-struct",
"csv",
]
serde = ["dep:serde", "polars-core/serde-lazy"]
serde = ["dep:serde", "polars-core/serde-lazy", "polars-parquet/serde"]
# support for arrows ipc file parsing
ipc = ["arrow/io_ipc", "arrow/io_ipc_compression"]
# support for arrows streaming ipc file parsing
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ mod writer;

pub use batched_writer::BatchedWriter;
pub use options::{BrotliLevel, GzipLevel, ParquetCompression, ParquetWriteOptions, ZstdLevel};
pub use polars_parquet::write::RowGroupIterColumns;
pub use polars_parquet::write::{RowGroupIterColumns, StatisticsOptions};
pub use writer::ParquetWriter;
4 changes: 2 additions & 2 deletions crates/polars-io/src/parquet/write/options.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use polars_error::PolarsResult;
use polars_parquet::write::{
BrotliLevel as BrotliLevelParquet, CompressionOptions, GzipLevel as GzipLevelParquet,
ZstdLevel as ZstdLevelParquet,
StatisticsOptions, ZstdLevel as ZstdLevelParquet,
};
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
Expand All @@ -12,7 +12,7 @@ pub struct ParquetWriteOptions {
/// Data page compression
pub compression: ParquetCompression,
/// Compute and write column statistics.
pub statistics: bool,
pub statistics: StatisticsOptions,
/// If `None` will be all written to a single row group.
pub row_group_size: Option<usize>,
/// if `None` will be 1024^2 bytes
Expand Down
11 changes: 6 additions & 5 deletions crates/polars-io/src/parquet/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ use std::sync::Mutex;
use arrow::datatypes::PhysicalType;
use polars_core::prelude::*;
use polars_parquet::write::{
to_parquet_schema, transverse, CompressionOptions, Encoding, FileWriter, Version, WriteOptions,
to_parquet_schema, transverse, CompressionOptions, Encoding, FileWriter, StatisticsOptions,
Version, WriteOptions,
};

use super::batched_writer::BatchedWriter;
Expand All @@ -18,7 +19,7 @@ pub struct ParquetWriter<W> {
/// Data page compression
compression: CompressionOptions,
/// Compute and write column statistics.
statistics: bool,
statistics: StatisticsOptions,
/// if `None` will be 512^2 rows
row_group_size: Option<usize>,
/// if `None` will be 1024^2 bytes
Expand All @@ -39,7 +40,7 @@ where
ParquetWriter {
writer,
compression: ParquetCompression::default().into(),
statistics: true,
statistics: StatisticsOptions::default(),
row_group_size: None,
data_page_size: None,
parallel: true,
Expand All @@ -56,7 +57,7 @@ where
}

/// Compute and write statistic
pub fn with_statistics(mut self, statistics: bool) -> Self {
pub fn with_statistics(mut self, statistics: StatisticsOptions) -> Self {
self.statistics = statistics;
self
}
Expand Down Expand Up @@ -100,7 +101,7 @@ where

fn materialize_options(&self) -> WriteOptions {
WriteOptions {
write_statistics: self.statistics,
statistics: self.statistics,
compression: self.compression,
version: Version::V1,
data_pagesize_limit: self.data_page_size,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ fn init_files() {
#[cfg(feature = "parquet")]
{
ParquetWriter::new(f)
.with_statistics(true)
.with_statistics(StatisticsOptions::full())
.finish(&mut df)
.unwrap();
}
Expand Down
39 changes: 25 additions & 14 deletions crates/polars-parquet/src/arrow/write/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::parquet::encoding::{delta_bitpacked, Encoding};
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::{BinaryStatistics, ParquetStatistics};
use crate::write::utils::invalid_encoding;
use crate::write::Page;
use crate::write::{Page, StatisticsOptions};

pub(crate) fn encode_non_null_values<'a, I: Iterator<Item = &'a [u8]>>(
iter: I,
Expand Down Expand Up @@ -65,8 +65,8 @@ pub fn array_to_page<O: Offset>(
_ => return Err(invalid_encoding(encoding, array.data_type())),
}

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
let statistics = if options.has_statistics() {
Some(build_statistics(array, type_.clone(), &options.statistics))
} else {
None
};
Expand All @@ -89,21 +89,32 @@ pub fn array_to_page<O: Offset>(
pub(crate) fn build_statistics<O: Offset>(
array: &BinaryArray<O>,
primitive_type: PrimitiveType,
options: &StatisticsOptions,
) -> ParquetStatistics {
BinaryStatistics {
primitive_type,
null_count: Some(array.null_count() as i64),
null_count: options.null_count.then_some(array.null_count() as i64),
distinct_count: None,
max_value: array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
min_value: array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
max_value: options
.max_value
.then(|| {
array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec())
})
.flatten(),
min_value: options
.min_value
.then(|| {
array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec())
})
.flatten(),
}
.serialize()
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/write/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ where

encode_plain(array, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
let statistics = if options.has_statistics() {
Some(build_statistics(array, type_.clone(), &options.statistics))
} else {
None
};
Expand Down
39 changes: 25 additions & 14 deletions crates/polars-parquet/src/arrow/write/binview/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::parquet::statistics::{BinaryStatistics, ParquetStatistics};
use crate::read::schema::is_nullable;
use crate::write::binary::{encode_non_null_values, ord_binary};
use crate::write::utils::invalid_encoding;
use crate::write::{utils, Encoding, Page, WriteOptions};
use crate::write::{utils, Encoding, Page, StatisticsOptions, WriteOptions};

pub(crate) fn encode_plain(array: &BinaryViewArray, buffer: &mut Vec<u8>) {
let capacity =
Expand Down Expand Up @@ -56,8 +56,8 @@ pub fn array_to_page(
_ => return Err(invalid_encoding(encoding, array.data_type())),
}

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
let statistics = if options.has_statistics() {
Some(build_statistics(array, type_.clone(), &options.statistics))
} else {
None
};
Expand All @@ -81,21 +81,32 @@ pub fn array_to_page(
pub(crate) fn build_statistics(
array: &BinaryViewArray,
primitive_type: PrimitiveType,
options: &StatisticsOptions,
) -> ParquetStatistics {
BinaryStatistics {
primitive_type,
null_count: Some(array.null_count() as i64),
null_count: options.null_count.then_some(array.null_count() as i64),
distinct_count: None,
max_value: array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
min_value: array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec()),
max_value: options
.max_value
.then(|| {
array
.iter()
.flatten()
.max_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec())
})
.flatten(),
min_value: options
.min_value
.then(|| {
array
.iter()
.flatten()
.min_by(|x, y| ord_binary(x, y))
.map(|x| x.to_vec())
})
.flatten(),
}
.serialize()
}
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/write/binview/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ pub fn array_to_page(

encode_plain(array, &mut buffer);

let statistics = if options.write_statistics {
Some(build_statistics(array, type_.clone()))
let statistics = if options.has_statistics() {
Some(build_statistics(array, type_.clone(), &options.statistics))
} else {
None
};
Expand Down
38 changes: 31 additions & 7 deletions crates/polars-parquet/src/arrow/write/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::parquet::encoding::Encoding;
use crate::parquet::page::DataPage;
use crate::parquet::schema::types::PrimitiveType;
use crate::parquet::statistics::{BooleanStatistics, ParquetStatistics};
use crate::write::StatisticsOptions;

fn encode(iterator: impl Iterator<Item = bool>, buffer: &mut Vec<u8>) -> PolarsResult<()> {
// encode values using bitpacking
Expand Down Expand Up @@ -59,8 +60,8 @@ pub fn array_to_page(

encode_plain(array, is_optional, &mut buffer)?;

let statistics = if options.write_statistics {
Some(build_statistics(array))
let statistics = if options.has_statistics() {
Some(build_statistics(array, &options.statistics))
} else {
None
};
Expand All @@ -79,12 +80,35 @@ pub fn array_to_page(
)
}

pub(super) fn build_statistics(array: &BooleanArray) -> ParquetStatistics {
fn distinct_bools(array: &BooleanArray) -> i64 {
let mut seen = 0b00i64;

for value in array.non_null_values_iter() {
seen |= 1 << i64::from(value);

if seen == 0b11 {
return 2;
}
}

seen.count_ones() as i64
}

pub(super) fn build_statistics(
array: &BooleanArray,
options: &StatisticsOptions,
) -> ParquetStatistics {
BooleanStatistics {
null_count: Some(array.null_count() as i64),
distinct_count: None,
max_value: array.iter().flatten().max(),
min_value: array.iter().flatten().min(),
null_count: options.null_count.then(|| array.null_count() as i64),
distinct_count: options.distinct_count.then(|| distinct_bools(array)),
max_value: options
.max_value
.then(|| array.iter().flatten().max())
.flatten(),
min_value: options
.min_value
.then(|| array.iter().flatten().min())
.flatten(),
}
.serialize()
}
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/write/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ pub fn array_to_page(

encode_plain(array, is_optional, &mut buffer)?;

let statistics = if options.write_statistics {
Some(build_statistics(array))
let statistics = if options.has_statistics() {
Some(build_statistics(array, &options.statistics))
} else {
None
};
Expand Down
48 changes: 36 additions & 12 deletions crates/polars-parquet/src/arrow/write/dictionary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,12 @@ macro_rules! dyn_prim {

let buffer = primitive_encode_plain::<$from, $to>(values, false, vec![]);

let stats: Option<ParquetStatistics> = if $options.write_statistics {
let mut stats = primitive_build_statistics::<$from, $to>(values, $type_.clone());
let stats: Option<ParquetStatistics> = if !$options.statistics.is_empty() {
let mut stats = primitive_build_statistics::<$from, $to>(
values,
$type_.clone(),
&$options.statistics,
);
stats.null_count = Some($array.null_count() as i64);
Some(stats.serialize())
} else {
Expand Down Expand Up @@ -240,8 +244,12 @@ pub fn array_to_pages<K: DictionaryKey>(

let mut buffer = vec![];
binary_encode_plain::<i64>(array, &mut buffer);
let stats = if options.write_statistics {
Some(binary_build_statistics(array, type_.clone()))
let stats = if options.has_statistics() {
Some(binary_build_statistics(
array,
type_.clone(),
&options.statistics,
))
} else {
None
};
Expand All @@ -256,8 +264,12 @@ pub fn array_to_pages<K: DictionaryKey>(
let mut buffer = vec![];
binview::encode_plain(array, &mut buffer);

let stats = if options.write_statistics {
Some(binview::build_statistics(array, type_.clone()))
let stats = if options.has_statistics() {
Some(binview::build_statistics(
array,
type_.clone(),
&options.statistics,
))
} else {
None
};
Expand All @@ -273,8 +285,12 @@ pub fn array_to_pages<K: DictionaryKey>(
let mut buffer = vec![];
binview::encode_plain(&array, &mut buffer);

let stats = if options.write_statistics {
Some(binview::build_statistics(&array, type_.clone()))
let stats = if options.has_statistics() {
Some(binview::build_statistics(
&array,
type_.clone(),
&options.statistics,
))
} else {
None
};
Expand All @@ -285,8 +301,12 @@ pub fn array_to_pages<K: DictionaryKey>(

let mut buffer = vec![];
binary_encode_plain::<i64>(values, &mut buffer);
let stats = if options.write_statistics {
Some(binary_build_statistics(values, type_.clone()))
let stats = if options.has_statistics() {
Some(binary_build_statistics(
values,
type_.clone(),
&options.statistics,
))
} else {
None
};
Expand All @@ -296,8 +316,12 @@ pub fn array_to_pages<K: DictionaryKey>(
let mut buffer = vec![];
let array = array.values().as_any().downcast_ref().unwrap();
fixed_binary_encode_plain(array, false, &mut buffer);
let stats = if options.write_statistics {
let stats = fixed_binary_build_statistics(array, type_.clone());
let stats = if options.has_statistics() {
let stats = fixed_binary_build_statistics(
array,
type_.clone(),
&options.statistics,
);
Some(stats.serialize())
} else {
None
Expand Down
Loading

0 comments on commit 8c38cfa

Please sign in to comment.