Skip to content

Commit

Permalink
Add configuration option to StatisticsConverter to control interpre…
Browse files Browse the repository at this point in the history
…tation of missing null counts in Parquet statistics (apache#6485)

* Write null counts in parquet files when they are present

* revert writing of Some(0)

* fix docstring

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
etseidl and alamb authored Oct 1, 2024
1 parent e538289 commit 2d67c4c
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 5 deletions.
34 changes: 32 additions & 2 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,7 @@ where
///
/// # Schemas
///
/// The converter ues the schema of the Parquet file and the Arrow schema to
/// The converter uses the schema of the Parquet file and the Arrow schema to
/// convert the underlying statistics value (stored as a parquet value) into the
/// corresponding Arrow value. For example, Decimals are stored as binary in
/// parquet files and this structure handles mapping them to the `i128`
Expand All @@ -1175,6 +1175,8 @@ pub struct StatisticsConverter<'a> {
parquet_column_index: Option<usize>,
/// The field (with data type) of the column in the Arrow schema
arrow_field: &'a Field,
/// treat missing null_counts as 0 nulls
missing_null_counts_as_zero: bool,
}

impl<'a> StatisticsConverter<'a> {
Expand All @@ -1191,6 +1193,23 @@ impl<'a> StatisticsConverter<'a> {
self.arrow_field
}

/// Set the statistics converter to treat missing null counts as missing
///
/// By default, the converter will treat missing null counts as though
/// the null count is known to be `0`.
///
/// Note that parquet files written by parquet-rs currently do not store
/// null counts even when it is known there are zero nulls, and the reader
/// will return 0 for the null counts in that instance. This behavior may
/// change in a future release.
///
/// Both parquet-java and parquet-cpp store null counts as 0 when there are
/// no nulls, and don't write unknown values to the null count field.
pub fn with_missing_null_counts_as_zero(mut self, missing_null_counts_as_zero: bool) -> Self {
self.missing_null_counts_as_zero = missing_null_counts_as_zero;
self
}

/// Returns a [`UInt64Array`] with row counts for each row group
///
/// # Return Value
Expand Down Expand Up @@ -1284,6 +1303,7 @@ impl<'a> StatisticsConverter<'a> {
Ok(Self {
parquet_column_index: parquet_index,
arrow_field,
missing_null_counts_as_zero: true,
})
}

Expand Down Expand Up @@ -1382,7 +1402,15 @@ impl<'a> StatisticsConverter<'a> {
let null_counts = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics())
.map(|s| s.and_then(|s| s.null_count_opt()));
.map(|s| {
s.and_then(|s| {
if self.missing_null_counts_as_zero {
Some(s.null_count_opt().unwrap_or(0))
} else {
s.null_count_opt()
}
})
});
Ok(UInt64Array::from_iter(null_counts))
}

Expand Down Expand Up @@ -1593,3 +1621,5 @@ impl<'a> StatisticsConverter<'a> {
new_null_array(data_type, num_row_groups)
}
}

// See tests in parquet/tests/arrow_reader/statistics.rs
67 changes: 64 additions & 3 deletions parquet/tests/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::default::Default;
use std::fs::File;
use std::sync::Arc;

use super::make_test_file_rg;
use super::{struct_array, Scenario};
use arrow::compute::kernels::cast_utils::Parser;
use arrow::datatypes::{
Expand All @@ -37,16 +38,17 @@ use arrow_array::{
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
use half::f16;
use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::arrow::arrow_reader::{
ArrowReaderBuilder, ArrowReaderOptions, ParquetRecordBatchReaderBuilder,
};
use parquet::arrow::ArrowWriter;
use parquet::file::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use parquet::file::properties::{EnabledStatistics, WriterProperties};

use super::make_test_file_rg;
use parquet::file::statistics::{Statistics, ValueStatistics};
use parquet::schema::types::{SchemaDescPtr, SchemaDescriptor};

#[derive(Debug, Default, Clone)]
struct Int64Case {
Expand Down Expand Up @@ -2139,6 +2141,65 @@ async fn test_missing_statistics() {
.run();
}

#[test]
fn missing_null_counts_as_zero() {
let min = None;
let max = None;
let distinct_count = None;
let null_count = None; // NB: no null count
let is_min_max_deprecated = false;
let stats = Statistics::Boolean(ValueStatistics::new(
min,
max,
distinct_count,
null_count,
is_min_max_deprecated,
));
let (arrow_schema, parquet_schema) = bool_arrow_and_parquet_schema();

let column_chunk = ColumnChunkMetaData::builder(parquet_schema.column(0))
.set_statistics(stats)
.build()
.unwrap();
let metadata = RowGroupMetaData::builder(parquet_schema.clone())
.set_column_metadata(vec![column_chunk])
.build()
.unwrap();

let converter = StatisticsConverter::try_new("b", &arrow_schema, &parquet_schema).unwrap();

// by default null count should be 0
assert_eq!(
converter.row_group_null_counts([&metadata]).unwrap(),
UInt64Array::from_iter(vec![Some(0)])
);

// if we disable missing null counts as zero flag null count will be None
let converter = converter.with_missing_null_counts_as_zero(false);
assert_eq!(
converter.row_group_null_counts([&metadata]).unwrap(),
UInt64Array::from_iter(vec![None])
);
}

/// return an Arrow schema and corresponding Parquet SchemaDescriptor for
/// a schema with a single boolean column "b"
fn bool_arrow_and_parquet_schema() -> (SchemaRef, SchemaDescPtr) {
let arrow_schema = Arc::new(Schema::new(vec![Field::new("b", DataType::Boolean, true)]));
use parquet::schema::types::Type as ParquetType;
let parquet_schema = ParquetType::group_type_builder("schema")
.with_fields(vec![Arc::new(
ParquetType::primitive_type_builder("a", parquet::basic::Type::INT32)
.build()
.unwrap(),
)])
.build()
.unwrap();

let parquet_schema = Arc::new(SchemaDescriptor::new(Arc::new(parquet_schema)));
(arrow_schema, parquet_schema)
}

/////// NEGATIVE TESTS ///////
// column not found
#[tokio::test]
Expand Down

0 comments on commit 2d67c4c

Please sign in to comment.