Skip to content

Commit

Permalink
chore: add physical_type to StatisticsConverter to account for coerce…
Browse files Browse the repository at this point in the history
…_types
  • Loading branch information
dsgibbons committed Oct 6, 2024
1 parent e0fb77c commit 169ba01
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
/// `arrow-rs/parquet/tests/arrow_reader/statistics.rs`.
use crate::arrow::buffer::bit_util::sign_extend_be;
use crate::arrow::parquet_column;
use crate::basic::Type as PhysicalType;
use crate::data_type::{ByteArray, FixedLenByteArray};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::{ParquetColumnIndex, ParquetOffsetIndex, RowGroupMetaData};
Expand Down Expand Up @@ -318,7 +319,7 @@ make_decimal_stats_iterator!(
/// data_type: The data type of the statistics (e.g. `DataType::Int32`)
/// iterator: The iterator of [`ParquetStatistics`] to extract the statistics from.
macro_rules! get_statistics {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident) => {
($stat_type_prefix: ident, $data_type: ident, $iterator: ident, $physical_type: ident) => {
paste! {
match $data_type {
DataType::Boolean => Ok(Arc::new(BooleanArray::from_iter(
Expand Down Expand Up @@ -370,9 +371,11 @@ macro_rules! get_statistics {
DataType::Date32 => Ok(Arc::new(Date32Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Date64 if $physical_type == Some(PhysicalType::INT32) => Ok(Arc::new(Date64Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator)
.map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000))))),
DataType::Date64 if $physical_type == Some(PhysicalType::INT64) => Ok(Arc::new(Date64Array::from_iter(
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),))),
DataType::Timestamp(unit, timezone) =>{
let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());
Ok(match unit {
Expand Down Expand Up @@ -486,7 +489,7 @@ macro_rules! get_statistics {
Ok(Arc::new(arr))
},
DataType::Dictionary(_, value_type) => {
[<$stat_type_prefix:lower _ statistics>](value_type, $iterator)
[<$stat_type_prefix:lower _ statistics>](value_type, $iterator, $physical_type)
},
DataType::Utf8View => {
let iterator = [<$stat_type_prefix ByteArrayStatsIterator>]::new($iterator);
Expand Down Expand Up @@ -523,6 +526,7 @@ macro_rules! get_statistics {
DataType::Map(_,_) |
DataType::Duration(_) |
DataType::Interval(_) |
DataType::Date64 | // required to cover $physical_type match guard
DataType::Null |
DataType::List(_) |
DataType::ListView(_) |
Expand Down Expand Up @@ -1054,8 +1058,9 @@ macro_rules! get_data_page_statistics {
fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
physical_type: Option<PhysicalType>,
) -> Result<ArrayRef> {
get_statistics!(Min, data_type, iterator)
get_statistics!(Min, data_type, iterator, physical_type)
}

/// Extracts the max statistics from an iterator of [`ParquetStatistics`] to an [`ArrayRef`]
Expand All @@ -1064,8 +1069,9 @@ fn min_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
fn max_statistics<'a, I: Iterator<Item = Option<&'a ParquetStatistics>>>(
data_type: &DataType,
iterator: I,
physical_type: Option<PhysicalType>,
) -> Result<ArrayRef> {
get_statistics!(Max, data_type, iterator)
get_statistics!(Max, data_type, iterator, physical_type)
}

/// Extracts the min statistics from an iterator
Expand Down Expand Up @@ -1164,6 +1170,8 @@ pub struct StatisticsConverter<'a> {
arrow_field: &'a Field,
/// treat missing null_counts as 0 nulls
missing_null_counts_as_zero: bool,
/// The physical type of the matched column in the Parquet schema
physical_type: Option<PhysicalType>,
}

impl<'a> StatisticsConverter<'a> {
Expand Down Expand Up @@ -1291,6 +1299,7 @@ impl<'a> StatisticsConverter<'a> {
parquet_column_index: parquet_index,
arrow_field,
missing_null_counts_as_zero: true,
physical_type: parquet_index.map(|idx| parquet_schema.column(idx).physical_type()),
})
}

Expand Down Expand Up @@ -1333,7 +1342,7 @@ impl<'a> StatisticsConverter<'a> {
/// // get the minimum value for the column "foo" in the parquet file
/// let min_values: ArrayRef = converter
/// .row_group_mins(metadata.row_groups().iter())
/// .unwrap();
/// .unwrap();
/// // if "foo" is a Float64 value, the returned array will contain Float64 values
/// assert_eq!(min_values, Arc::new(Float64Array::from(vec![Some(1.0), Some(2.0)])) as _);
/// ```
Expand All @@ -1350,7 +1359,7 @@ impl<'a> StatisticsConverter<'a> {
let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
min_statistics(data_type, iter)
min_statistics(data_type, iter, self.physical_type)
}

/// Extract the maximum values from row group statistics in [`RowGroupMetaData`]
Expand All @@ -1369,7 +1378,7 @@ impl<'a> StatisticsConverter<'a> {
let iter = metadatas
.into_iter()
.map(|x| x.column(parquet_index).statistics());
max_statistics(data_type, iter)
max_statistics(data_type, iter, self.physical_type)
}

/// Extract the null counts from row group statistics in [`RowGroupMetaData`]
Expand Down

0 comments on commit 169ba01

Please sign in to comment.