diff --git a/parquet-testing b/parquet-testing index 1ba34478f535..9b48ff4f94dc 160000 --- a/parquet-testing +++ b/parquet-testing @@ -1 +1 @@ -Subproject commit 1ba34478f535c89382263c42c675a9af4f57f2dd +Subproject commit 9b48ff4f94dc5e89592d46a119884dbb88100884 diff --git a/parquet/src/arrow/arrow_reader/statistics.rs b/parquet/src/arrow/arrow_reader/statistics.rs index c42f92838c8c..369ea4a47e57 100644 --- a/parquet/src/arrow/arrow_reader/statistics.rs +++ b/parquet/src/arrow/arrow_reader/statistics.rs @@ -17,6 +17,8 @@ //! [`StatisticsConverter`] to convert statistics in parquet format to arrow [`ArrayRef`]. +/// Notice that all the corresponding tests are in +/// `arrow-rs/parquet/tests/arrow_reader/statistics.rs`. use crate::arrow::buffer::bit_util::sign_extend_be; use crate::arrow::parquet_column; use crate::data_type::{ByteArray, FixedLenByteArray}; @@ -1568,1130 +1570,3 @@ impl<'a> StatisticsConverter<'a> { new_null_array(data_type, num_row_groups) } } - -#[cfg(test)] -mod test { - use super::*; - use crate::arrow::arrow_reader::ArrowReaderBuilder; - use crate::arrow::arrow_writer::ArrowWriter; - use crate::file::metadata::{ParquetMetaData, RowGroupMetaData}; - use crate::file::properties::{EnabledStatistics, WriterProperties}; - use arrow::compute::kernels::cast_utils::Parser; - use arrow::datatypes::{i256, Date32Type, Date64Type}; - use arrow::util::test_util::parquet_test_data; - use arrow_array::{ - new_empty_array, new_null_array, Array, ArrayRef, BinaryArray, BinaryViewArray, - BooleanArray, Date32Array, Date64Array, Decimal128Array, Decimal256Array, Float32Array, - Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray, RecordBatch, - StringArray, StringViewArray, StructArray, TimestampNanosecondArray, - }; - use arrow_schema::{DataType, Field, SchemaRef}; - use bytes::Bytes; - use std::path::PathBuf; - use std::sync::Arc; - // TODO error cases (with parquet statistics that are mismatched in expected type) - - #[test] - fn roundtrip_empty() { - let empty_bool_array = new_empty_array(&DataType::Boolean); - Test { - input: empty_bool_array.clone(), - expected_min: empty_bool_array.clone(), - expected_max: empty_bool_array.clone(), - } - .run() - } - - #[test] - fn roundtrip_bool() { - Test { - input: bool_array([ - // row group 1 - Some(true), - None, - Some(true), - // row group 2 - Some(true), - Some(false), - None, - // row group 3 - None, - None, - None, - ]), - expected_min: bool_array([Some(true), Some(false), None]), - expected_max: bool_array([Some(true), Some(true), None]), - } - .run() - } - - #[test] - fn roundtrip_int32() { - Test { - input: i32_array([ - // row group 1 - Some(1), - None, - Some(3), - // row group 2 - Some(0), - Some(5), - None, - // row group 3 - None, - None, - None, - ]), - expected_min: i32_array([Some(1), Some(0), None]), - expected_max: i32_array([Some(3), Some(5), None]), - } - .run() - } - - #[test] - fn roundtrip_int64() { - Test { - input: i64_array([ - // row group 1 - Some(1), - None, - Some(3), - // row group 2 - Some(0), - Some(5), - None, - // row group 3 - None, - None, - None, - ]), - expected_min: i64_array([Some(1), Some(0), None]), - expected_max: i64_array(vec![Some(3), Some(5), None]), - } - .run() - } - - #[test] - fn roundtrip_f32() { - Test { - input: f32_array([ - // row group 1 - Some(1.0), - None, - Some(3.0), - // row group 2 - Some(-1.0), - Some(5.0), - None, - // row group 3 - None, - None, - None, - ]), - expected_min: f32_array([Some(1.0), Some(-1.0), None]), - expected_max: f32_array([Some(3.0), Some(5.0), None]), - } - .run() - } - - #[test] - fn roundtrip_f64() { - Test { - input: f64_array([ - // row group 1 - Some(1.0), - None, - Some(3.0), - // row group 2 - Some(-1.0), - Some(5.0), - None, - // row group 3 - None, - None, - None, - ]), - expected_min: f64_array([Some(1.0), Some(-1.0), None]), - expected_max: f64_array([Some(3.0), Some(5.0), None]), - } - .run() - } - - #[test] - fn roundtrip_timestamp() { - Test { - input: timestamp_seconds_array( - [ - // row group 1 - Some(1), - None, - Some(3), - // row group 2 - Some(9), - Some(5), - None, - // row group 3 - None, - None, - None, - ], - None, - ), - expected_min: timestamp_seconds_array([Some(1), Some(5), None], None), - expected_max: timestamp_seconds_array([Some(3), Some(9), None], None), - } - .run(); - - Test { - input: timestamp_milliseconds_array( - [ - // row group 1 - Some(1), - None, - Some(3), - // row group 2 - Some(9), - Some(5), - None, - // row group 3 - None, - None, - None, - ], - None, - ), - expected_min: timestamp_milliseconds_array([Some(1), Some(5), None], None), - expected_max: timestamp_milliseconds_array([Some(3), Some(9), None], None), - } - .run(); - - Test { - input: timestamp_microseconds_array( - [ - // row group 1 - Some(1), - None, - Some(3), - // row group 2 - Some(9), - Some(5), - None, - // row group 3 - None, - None, - None, - ], - None, - ), - expected_min: timestamp_microseconds_array([Some(1), Some(5), None], None), - expected_max: timestamp_microseconds_array([Some(3), Some(9), None], None), - } - .run(); - - Test { - input: timestamp_nanoseconds_array( - [ - // row group 1 - Some(1), - None, - Some(3), - // row group 2 - Some(9), - Some(5), - None, - // row group 3 - None, - None, - None, - ], - None, - ), - expected_min: timestamp_nanoseconds_array([Some(1), Some(5), None], None), - expected_max: timestamp_nanoseconds_array([Some(3), Some(9), None], None), - } - .run() - } - - #[test] - fn roundtrip_timestamp_timezoned() { - Test { - input: timestamp_seconds_array( - [ - // row group 1 - Some(1), - None, - Some(3), - // row group 2 - Some(9), - Some(5), - None, - // row group 3 - None, - None, - None, - ], - Some("UTC"), - ), - expected_min: timestamp_seconds_array([Some(1), Some(5), None], Some("UTC")), - expected_max: timestamp_seconds_array([Some(3), Some(9), None], Some("UTC")), - } - .run(); - - Test { - input: timestamp_milliseconds_array( - [ - // row group 1 - Some(1), - None, - Some(3), - // row group 2 - Some(9), - Some(5), - None, - // row group 3 - None, - None, - None, - ], - Some("UTC"), - ), - expected_min: timestamp_milliseconds_array([Some(1), Some(5), None], Some("UTC")), - expected_max: timestamp_milliseconds_array([Some(3), Some(9), None], Some("UTC")), - } - .run(); - - Test { - input: timestamp_microseconds_array( - [ - // row group 1 - Some(1), - None, - Some(3), - // row group 2 - Some(9), - Some(5), - None, - // row group 3 - None, - None, - None, - ], - Some("UTC"), - ), - expected_min: timestamp_microseconds_array([Some(1), Some(5), None], Some("UTC")), - expected_max: timestamp_microseconds_array([Some(3), Some(9), None], Some("UTC")), - } - .run(); - - Test { - input: timestamp_nanoseconds_array( - [ - // row group 1 - Some(1), - None, - Some(3), - // row group 2 - Some(9), - Some(5), - None, - // row group 3 - None, - None, - None, - ], - Some("UTC"), - ), - expected_min: timestamp_nanoseconds_array([Some(1), Some(5), None], Some("UTC")), - expected_max: timestamp_nanoseconds_array([Some(3), Some(9), None], Some("UTC")), - } - .run() - } - - #[test] - fn roundtrip_decimal() { - Test { - input: Arc::new( - Decimal128Array::from(vec![ - // row group 1 - Some(100), - None, - Some(22000), - // row group 2 - Some(500000), - Some(330000), - None, - // row group 3 - None, - None, - None, - ]) - .with_precision_and_scale(9, 2) - .unwrap(), - ), - expected_min: Arc::new( - Decimal128Array::from(vec![Some(100), Some(330000), None]) - .with_precision_and_scale(9, 2) - .unwrap(), - ), - expected_max: Arc::new( - Decimal128Array::from(vec![Some(22000), Some(500000), None]) - .with_precision_and_scale(9, 2) - .unwrap(), - ), - } - .run(); - - Test { - input: Arc::new( - Decimal256Array::from(vec![ - // row group 1 - Some(i256::from(100)), - None, - Some(i256::from(22000)), - // row group 2 - Some(i256::MAX), - Some(i256::MIN), - None, - // row group 3 - None, - None, - None, - ]) - .with_precision_and_scale(76, 76) - .unwrap(), - ), - expected_min: Arc::new( - Decimal256Array::from(vec![Some(i256::from(100)), Some(i256::MIN), None]) - .with_precision_and_scale(76, 76) - .unwrap(), - ), - expected_max: Arc::new( - Decimal256Array::from(vec![Some(i256::from(22000)), Some(i256::MAX), None]) - .with_precision_and_scale(76, 76) - .unwrap(), - ), - } - .run() - } - - #[test] - fn roundtrip_utf8() { - Test { - input: utf8_array([ - // row group 1 - Some("A"), - None, - Some("Q"), - // row group 2 - Some("ZZ"), - Some("AA"), - None, - // row group 3 - None, - None, - None, - ]), - expected_min: utf8_array([Some("A"), Some("AA"), None]), - expected_max: utf8_array([Some("Q"), Some("ZZ"), None]), - } - .run() - } - - #[test] - fn roundtrip_string_view() { - Test { - input: string_view_array([ - // row group 1 - Some("A"), - None, - Some("Q"), - // row group 2 - Some("ZZ"), - Some("A_longerthan12"), - None, - // row group 3 - Some("A_longerthan12"), - None, - None, - ]), - expected_min: string_view_array([ - Some("A"), - Some("A_longerthan12"), - Some("A_longerthan12"), - ]), - expected_max: string_view_array([Some("Q"), Some("ZZ"), Some("A_longerthan12")]), - } - .run() - } - - #[test] - fn roundtrip_binary_view() { - let input: Vec> = vec![ - // row group 1 - Some(b"A"), - None, - Some(b"Q"), - // row group 2 - Some(b"ZZ"), - Some(b"A_longerthan12"), - None, - // row group 3 - Some(b"A_longerthan12"), - None, - None, - ]; - - let expected_min: Vec> = - vec![Some(b"A"), Some(b"A_longerthan12"), Some(b"A_longerthan12")]; - let expected_max: Vec> = - vec![Some(b"Q"), Some(b"ZZ"), Some(b"A_longerthan12")]; - - let array = binary_view_array(input); - - Test { - input: array, - expected_min: binary_view_array(expected_min), - expected_max: binary_view_array(expected_max), - } - .run() - } - - #[test] - fn roundtrip_struct() { - let mut test = Test { - input: struct_array(vec![ - // row group 1 - (Some(true), Some(1)), - (None, None), - (Some(true), Some(3)), - // row group 2 - (Some(true), Some(0)), - (Some(false), Some(5)), - (None, None), - // row group 3 - (None, None), - (None, None), - (None, None), - ]), - expected_min: struct_array(vec![ - (Some(true), Some(1)), - (Some(true), Some(0)), - (None, None), - ]), - - expected_max: struct_array(vec![ - (Some(true), Some(3)), - (Some(true), Some(0)), - (None, None), - ]), - }; - // Due to https://github.com/apache/datafusion/issues/8334, - // statistics for struct arrays are not supported - test.expected_min = new_null_array(test.input.data_type(), test.expected_min.len()); - test.expected_max = new_null_array(test.input.data_type(), test.expected_min.len()); - test.run() - } - - #[test] - fn roundtrip_binary() { - Test { - input: Arc::new(BinaryArray::from_opt_vec(vec![ - // row group 1 - Some(b"A"), - None, - Some(b"Q"), - // row group 2 - Some(b"ZZ"), - Some(b"AA"), - None, - // row group 3 - None, - None, - None, - ])), - expected_min: Arc::new(BinaryArray::from_opt_vec(vec![ - Some(b"A"), - Some(b"AA"), - None, - ])), - expected_max: Arc::new(BinaryArray::from_opt_vec(vec![ - Some(b"Q"), - Some(b"ZZ"), - None, - ])), - } - .run() - } - - #[test] - fn roundtrip_date32() { - Test { - input: date32_array(vec![ - // row group 1 - Some("2021-01-01"), - None, - Some("2021-01-03"), - // row group 2 - Some("2021-01-01"), - Some("2021-01-05"), - None, - // row group 3 - None, - None, - None, - ]), - expected_min: date32_array(vec![Some("2021-01-01"), Some("2021-01-01"), None]), - expected_max: date32_array(vec![Some("2021-01-03"), Some("2021-01-05"), None]), - } - .run() - } - - #[test] - fn roundtrip_date64() { - Test { - input: date64_array(vec![ - // row group 1 - Some("2021-01-01"), - None, - Some("2021-01-03"), - // row group 2 - Some("2021-01-01"), - Some("2021-01-05"), - None, - // row group 3 - None, - None, - None, - ]), - expected_min: date64_array(vec![Some("2021-01-01"), Some("2021-01-01"), None]), - expected_max: date64_array(vec![Some("2021-01-03"), Some("2021-01-05"), None]), - } - .run() - } - - #[test] - fn roundtrip_large_binary_array() { - let input: Vec> = vec![ - // row group 1 - Some(b"A"), - None, - Some(b"Q"), - // row group 2 - Some(b"ZZ"), - Some(b"AA"), - None, - // row group 3 - None, - None, - None, - ]; - - let expected_min: Vec> = vec![Some(b"A"), Some(b"AA"), None]; - let expected_max: Vec> = vec![Some(b"Q"), Some(b"ZZ"), None]; - - Test { - input: large_binary_array(input), - expected_min: large_binary_array(expected_min), - expected_max: large_binary_array(expected_max), - } - .run(); - } - - #[test] - fn struct_and_non_struct() { - // Ensures that statistics for an array that appears *after* a struct - // array are not wrong - let struct_col = struct_array(vec![ - // row group 1 - (Some(true), Some(1)), - (None, None), - (Some(true), Some(3)), - ]); - let int_col = i32_array([Some(100), Some(200), Some(300)]); - let expected_min = i32_array([Some(100)]); - let expected_max = i32_array(vec![Some(300)]); - - // use a name that shadows a name in the struct column - match struct_col.data_type() { - DataType::Struct(fields) => { - assert_eq!(fields.get(1).unwrap().name(), "int_col") - } - _ => panic!("unexpected data type for struct column"), - }; - - let input_batch = - RecordBatch::try_from_iter([("struct_col", struct_col), ("int_col", int_col)]).unwrap(); - - let schema = input_batch.schema(); - - let metadata = parquet_metadata(schema.clone(), input_batch); - let parquet_schema = metadata.file_metadata().schema_descr(); - - // read the int_col statistics - let (idx, _) = parquet_column(parquet_schema, &schema, "int_col").unwrap(); - assert_eq!(idx, 2); - - let row_groups = metadata.row_groups(); - let converter = StatisticsConverter::try_new("int_col", &schema, parquet_schema).unwrap(); - - let min = converter.row_group_mins(row_groups.iter()).unwrap(); - assert_eq!( - &min, - &expected_min, - "Min. Statistics\n\n{}\n\n", - DisplayStats(row_groups) - ); - - let max = converter.row_group_maxes(row_groups.iter()).unwrap(); - assert_eq!( - &max, - &expected_max, - "Max. Statistics\n\n{}\n\n", - DisplayStats(row_groups) - ); - } - - #[test] - fn nan_in_stats() { - // /parquet-testing/data/nan_in_stats.parquet - // row_groups: 1 - // "x": Double({min: Some(1.0), max: Some(NaN), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - - TestFile::new("nan_in_stats.parquet") - .with_column(ExpectedColumn { - name: "x", - expected_min: Arc::new(Float64Array::from(vec![Some(1.0)])), - expected_max: Arc::new(Float64Array::from(vec![Some(f64::NAN)])), - }) - .run(); - } - - #[test] - fn alltypes_plain() { - // /parquet-testing/data/datapage_v1-snappy-compressed-checksum.parquet - // row_groups: 1 - // (has no statistics) - TestFile::new("alltypes_plain.parquet") - // No column statistics should be read as NULL, but with the right type - .with_column(ExpectedColumn { - name: "id", - expected_min: i32_array([None]), - expected_max: i32_array([None]), - }) - .with_column(ExpectedColumn { - name: "bool_col", - expected_min: bool_array([None]), - expected_max: bool_array([None]), - }) - .run(); - } - - #[test] - fn alltypes_tiny_pages() { - // /parquet-testing/data/alltypes_tiny_pages.parquet - // row_groups: 1 - // "id": Int32({min: Some(0), max: Some(7299), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "bool_col": Boolean({min: Some(false), max: Some(true), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "tinyint_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "smallint_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "int_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "bigint_col": Int64({min: Some(0), max: Some(90), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "float_col": Float({min: Some(0.0), max: Some(9.9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "double_col": Double({min: Some(0.0), max: Some(90.89999999999999), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "date_string_col": ByteArray({min: Some(ByteArray { data: "01/01/09" }), max: Some(ByteArray { data: "12/31/10" }), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "string_col": ByteArray({min: Some(ByteArray { data: "0" }), max: Some(ByteArray { data: "9" }), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "timestamp_col": Int96({min: None, max: None, distinct_count: None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true}) - // "year": Int32({min: Some(2009), max: Some(2010), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - // "month": Int32({min: Some(1), max: Some(12), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) - TestFile::new("alltypes_tiny_pages.parquet") - .with_column(ExpectedColumn { - name: "id", - expected_min: i32_array([Some(0)]), - expected_max: i32_array([Some(7299)]), - }) - .with_column(ExpectedColumn { - name: "bool_col", - expected_min: bool_array([Some(false)]), - expected_max: bool_array([Some(true)]), - }) - .with_column(ExpectedColumn { - name: "tinyint_col", - expected_min: i8_array([Some(0)]), - expected_max: i8_array([Some(9)]), - }) - .with_column(ExpectedColumn { - name: "smallint_col", - expected_min: i16_array([Some(0)]), - expected_max: i16_array([Some(9)]), - }) - .with_column(ExpectedColumn { - name: "int_col", - expected_min: i32_array([Some(0)]), - expected_max: i32_array([Some(9)]), - }) - .with_column(ExpectedColumn { - name: "bigint_col", - expected_min: i64_array([Some(0)]), - expected_max: i64_array([Some(90)]), - }) - .with_column(ExpectedColumn { - name: "float_col", - expected_min: f32_array([Some(0.0)]), - expected_max: f32_array([Some(9.9)]), - }) - .with_column(ExpectedColumn { - name: "double_col", - expected_min: f64_array([Some(0.0)]), - expected_max: f64_array([Some(90.89999999999999)]), - }) - .with_column(ExpectedColumn { - name: "date_string_col", - expected_min: utf8_array([Some("01/01/09")]), - expected_max: utf8_array([Some("12/31/10")]), - }) - .with_column(ExpectedColumn { - name: "string_col", - expected_min: utf8_array([Some("0")]), - expected_max: utf8_array([Some("9")]), - }) - // File has no min/max for timestamp_col - .with_column(ExpectedColumn { - name: "timestamp_col", - expected_min: timestamp_nanoseconds_array([None], None), - expected_max: timestamp_nanoseconds_array([None], None), - }) - .with_column(ExpectedColumn { - name: "year", - expected_min: i32_array([Some(2009)]), - expected_max: i32_array([Some(2010)]), - }) - .with_column(ExpectedColumn { - name: "month", - expected_min: i32_array([Some(1)]), - expected_max: i32_array([Some(12)]), - }) - .run(); - } - - #[test] - fn fixed_length_decimal_legacy() { - // /parquet-testing/data/fixed_length_decimal_legacy.parquet - // row_groups: 1 - // "value": FixedLenByteArray({min: Some(FixedLenByteArray(ByteArray { data: Some(ByteBufferPtr { data: b"\0\0\0\0\0\xc8" }) })), max: Some(FixedLenByteArray(ByteArray { data: "\0\0\0\0\t`" })), distinct_count: None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true}) - - TestFile::new("fixed_length_decimal_legacy.parquet") - .with_column(ExpectedColumn { - name: "value", - expected_min: Arc::new( - Decimal128Array::from(vec![Some(200)]) - .with_precision_and_scale(13, 2) - .unwrap(), - ), - expected_max: Arc::new( - Decimal128Array::from(vec![Some(2400)]) - .with_precision_and_scale(13, 2) - .unwrap(), - ), - }) - .run(); - } - - const ROWS_PER_ROW_GROUP: usize = 3; - - /// Writes the input batch into a parquet file, with every every three rows as - /// their own row group, and compares the min/maxes to the expected values - struct Test { - input: ArrayRef, - expected_min: ArrayRef, - expected_max: ArrayRef, - } - - impl Test { - fn run(self) { - let Self { - input, - expected_min, - expected_max, - } = self; - - let input_batch = RecordBatch::try_from_iter([("c1", input)]).unwrap(); - - let schema = input_batch.schema(); - - let metadata = parquet_metadata(schema.clone(), input_batch); - let parquet_schema = metadata.file_metadata().schema_descr(); - - let row_groups = metadata.row_groups(); - - for field in schema.fields() { - if field.data_type().is_nested() { - let lookup = parquet_column(parquet_schema, &schema, field.name()); - assert_eq!(lookup, None); - continue; - } - - let converter = - StatisticsConverter::try_new(field.name(), &schema, parquet_schema).unwrap(); - - assert_eq!(converter.arrow_field, field.as_ref()); - - let mins = converter.row_group_mins(row_groups.iter()).unwrap(); - assert_eq!( - &mins, - &expected_min, - "Min. Statistics\n\n{}\n\n", - DisplayStats(row_groups) - ); - - let maxes = converter.row_group_maxes(row_groups.iter()).unwrap(); - assert_eq!( - &maxes, - &expected_max, - "Max. Statistics\n\n{}\n\n", - DisplayStats(row_groups) - ); - } - } - } - - /// Write the specified batches out as parquet and return the metadata - fn parquet_metadata(schema: SchemaRef, batch: RecordBatch) -> Arc { - let props = WriterProperties::builder() - .set_statistics_enabled(EnabledStatistics::Chunk) - .set_max_row_group_size(ROWS_PER_ROW_GROUP) - .build(); - - let mut buffer = Vec::new(); - let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); - writer.write(&batch).unwrap(); - writer.close().unwrap(); - - let reader = ArrowReaderBuilder::try_new(Bytes::from(buffer)).unwrap(); - reader.metadata().clone() - } - - /// Formats the statistics nicely for display - struct DisplayStats<'a>(&'a [RowGroupMetaData]); - impl<'a> std::fmt::Display for DisplayStats<'a> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let row_groups = self.0; - writeln!(f, " row_groups: {}", row_groups.len())?; - for rg in row_groups { - for col in rg.columns() { - if let Some(statistics) = col.statistics() { - writeln!(f, " {}: {:?}", col.column_path(), statistics)?; - } - } - } - Ok(()) - } - } - - struct ExpectedColumn { - name: &'static str, - expected_min: ArrayRef, - expected_max: ArrayRef, - } - - /// Reads statistics out of the specified, and compares them to the expected values - struct TestFile { - file_name: &'static str, - expected_columns: Vec, - } - - impl TestFile { - fn new(file_name: &'static str) -> Self { - Self { - file_name, - expected_columns: Vec::new(), - } - } - - fn with_column(mut self, column: ExpectedColumn) -> Self { - self.expected_columns.push(column); - self - } - - /// Reads the specified parquet file and validates that the expected min/max - /// values for the specified columns are as expected. - fn run(self) { - let path = PathBuf::from(parquet_test_data()).join(self.file_name); - let file = std::fs::File::open(path).unwrap(); - let reader = ArrowReaderBuilder::try_new(file).unwrap(); - let arrow_schema = reader.schema(); - let metadata = reader.metadata(); - let row_groups = metadata.row_groups(); - let parquet_schema = metadata.file_metadata().schema_descr(); - - for expected_column in self.expected_columns { - let ExpectedColumn { - name, - expected_min, - expected_max, - } = expected_column; - - let converter = - StatisticsConverter::try_new(name, arrow_schema, parquet_schema).unwrap(); - - // test accessors on the converter - let parquet_column_index = - parquet_column(parquet_schema, arrow_schema, name).map(|(idx, _field)| idx); - assert_eq!(converter.parquet_column_index(), parquet_column_index); - assert_eq!(converter.arrow_field().name(), name); - - let actual_min = converter.row_group_mins(row_groups.iter()).unwrap(); - assert_eq!(&expected_min, &actual_min, "column {name}"); - - let actual_max = converter.row_group_maxes(row_groups.iter()).unwrap(); - assert_eq!(&expected_max, &actual_max, "column {name}"); - } - } - } - - fn bool_array(input: impl IntoIterator>) -> ArrayRef { - let array: BooleanArray = input.into_iter().collect(); - Arc::new(array) - } - - fn i8_array(input: impl IntoIterator>) -> ArrayRef { - let array: Int8Array = input.into_iter().collect(); - Arc::new(array) - } - - fn i16_array(input: impl IntoIterator>) -> ArrayRef { - let array: Int16Array = input.into_iter().collect(); - Arc::new(array) - } - - fn i32_array(input: impl IntoIterator>) -> ArrayRef { - let array: Int32Array = input.into_iter().collect(); - Arc::new(array) - } - - fn i64_array(input: impl IntoIterator>) -> ArrayRef { - let array: Int64Array = input.into_iter().collect(); - Arc::new(array) - } - - fn f32_array(input: impl IntoIterator>) -> ArrayRef { - let array: Float32Array = input.into_iter().collect(); - Arc::new(array) - } - - fn f64_array(input: impl IntoIterator>) -> ArrayRef { - let array: Float64Array = input.into_iter().collect(); - Arc::new(array) - } - - fn timestamp_seconds_array( - input: impl IntoIterator>, - timzezone: Option<&str>, - ) -> ArrayRef { - let array: TimestampSecondArray = input.into_iter().collect(); - match timzezone { - Some(tz) => Arc::new(array.with_timezone(tz)), - None => Arc::new(array), - } - } - - fn timestamp_milliseconds_array( - input: impl IntoIterator>, - timzezone: Option<&str>, - ) -> ArrayRef { - let array: TimestampMillisecondArray = input.into_iter().collect(); - match timzezone { - Some(tz) => Arc::new(array.with_timezone(tz)), - None => Arc::new(array), - } - } - - fn timestamp_microseconds_array( - input: impl IntoIterator>, - timzezone: Option<&str>, - ) -> ArrayRef { - let array: TimestampMicrosecondArray = input.into_iter().collect(); - match timzezone { - Some(tz) => Arc::new(array.with_timezone(tz)), - None => Arc::new(array), - } - } - - fn timestamp_nanoseconds_array( - input: impl IntoIterator>, - timzezone: Option<&str>, - ) -> ArrayRef { - let array: TimestampNanosecondArray = input.into_iter().collect(); - match timzezone { - Some(tz) => Arc::new(array.with_timezone(tz)), - None => Arc::new(array), - } - } - - fn utf8_array<'a>(input: impl IntoIterator>) -> ArrayRef { - let array: StringArray = input - .into_iter() - .map(|s| s.map(|s| s.to_string())) - .collect(); - Arc::new(array) - } - - // returns a struct array with columns "bool_col" and "int_col" with the specified values - fn struct_array(input: Vec<(Option, Option)>) -> ArrayRef { - let boolean: BooleanArray = input.iter().map(|(b, _i)| b).collect(); - let int: Int32Array = input.iter().map(|(_b, i)| i).collect(); - - let nullable = true; - let struct_array = StructArray::from(vec![ - ( - Arc::new(Field::new("bool_col", DataType::Boolean, nullable)), - Arc::new(boolean) as ArrayRef, - ), - ( - Arc::new(Field::new("int_col", DataType::Int32, nullable)), - Arc::new(int) as ArrayRef, - ), - ]); - Arc::new(struct_array) - } - - fn date32_array<'a>(input: impl IntoIterator>) -> ArrayRef { - let array = Date32Array::from( - input - .into_iter() - .map(|s| Date32Type::parse(s.unwrap_or_default())) - .collect::>(), - ); - Arc::new(array) - } - - fn date64_array<'a>(input: impl IntoIterator>) -> ArrayRef { - let array = Date64Array::from( - input - .into_iter() - .map(|s| Date64Type::parse(s.unwrap_or_default())) - .collect::>(), - ); - Arc::new(array) - } - - fn large_binary_array<'a>(input: impl IntoIterator>) -> ArrayRef { - let array = LargeBinaryArray::from(input.into_iter().collect::>>()); - - Arc::new(array) - } - - fn string_view_array<'a>(input: impl IntoIterator>) -> ArrayRef { - let array: StringViewArray = input - .into_iter() - .map(|s| s.map(|s| s.to_string())) - .collect(); - - Arc::new(array) - } - - fn binary_view_array(input: Vec>) -> ArrayRef { - let array = BinaryViewArray::from(input.into_iter().collect::>>()); - - Arc::new(array) - } -} diff --git a/parquet/tests/arrow_reader/statistics.rs b/parquet/tests/arrow_reader/statistics.rs index 75a73ac1309f..384be83d30e3 100644 --- a/parquet/tests/arrow_reader/statistics.rs +++ b/parquet/tests/arrow_reader/statistics.rs @@ -2195,3 +2195,430 @@ async fn test_column_non_existent() { } .run_with_schema(&schema); } + +/// The following tests were initially in `arrow-rs/parquet/src/arrow/arrow_reader/statistics.rs`. +/// Part of them was moved here to avoid confusion and duplication, +/// including edge conditions and data file validations for only `row_group` statistics. +#[cfg(test)] +mod test { + use super::*; + use arrow::util::test_util::parquet_test_data; + use arrow_array::{ + new_empty_array, ArrayRef, BooleanArray, Decimal128Array, Float32Array, Float64Array, + Int16Array, Int32Array, Int64Array, Int8Array, RecordBatch, StringArray, + TimestampNanosecondArray, + }; + use arrow_schema::{DataType, SchemaRef, TimeUnit}; + use bytes::Bytes; + use parquet::arrow::parquet_column; + use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData}; + use std::path::PathBuf; + use std::sync::Arc; + // TODO error cases (with parquet statistics that are mismatched in expected type) + + #[test] + fn roundtrip_empty() { + let all_types = vec![ + DataType::Null, + DataType::Boolean, + DataType::Int8, + DataType::Int16, + DataType::Int32, + DataType::Int64, + DataType::UInt8, + DataType::UInt16, + DataType::UInt32, + DataType::UInt64, + DataType::Float16, + DataType::Float32, + DataType::Float64, + DataType::Timestamp(TimeUnit::Second, None), + DataType::Date32, + DataType::Date64, + // Skip types that don't support statistics + // DataType::Time32(Second), + // DataType::Time64(Second), + // DataType::Duration(Second), + // DataType::Interval(IntervalUnit), + DataType::Binary, + DataType::FixedSizeBinary(0), + DataType::LargeBinary, + DataType::BinaryView, + DataType::Utf8, + DataType::LargeUtf8, + DataType::Utf8View, + // DataType::List(FieldRef), + // DataType::ListView(FieldRef), + // DataType::FixedSizeList(FieldRef, i32), + // DataType::LargeList(FieldRef), + // DataType::LargeListView(FieldRef), + // DataType::Struct(Fields), + // DataType::Union(UnionFields, UnionMode), + // DataType::Dictionary(Box, Box), + // DataType::Decimal128(u8, i8), + // DataType::Decimal256(u8, i8), + // DataType::Map(FieldRef, bool), + // DataType::RunEndEncoded(FieldRef, FieldRef), + ]; + for data_type in all_types { + let empty_array = new_empty_array(&data_type); + Test { + input: empty_array.clone(), + expected_min: empty_array.clone(), + expected_max: empty_array, + } + .run(); + } + } + + #[test] + fn nan_in_stats() { + // /parquet-testing/data/nan_in_stats.parquet + // row_groups: 1 + // "x": Double({min: Some(1.0), max: Some(NaN), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + + TestFile::new("nan_in_stats.parquet") + .with_column(ExpectedColumn { + name: "x", + expected_min: Arc::new(Float64Array::from(vec![Some(1.0)])), + expected_max: Arc::new(Float64Array::from(vec![Some(f64::NAN)])), + }) + .run(); + } + + #[test] + fn alltypes_plain() { + // /parquet-testing/data/datapage_v1-snappy-compressed-checksum.parquet + // row_groups: 1 + // (has no statistics) + TestFile::new("alltypes_plain.parquet") + // No column statistics should be read as NULL, but with the right type + .with_column(ExpectedColumn { + name: "id", + expected_min: i32_array([None]), + expected_max: i32_array([None]), + }) + .with_column(ExpectedColumn { + name: "bool_col", + expected_min: bool_array([None]), + expected_max: bool_array([None]), + }) + .run(); + } + + #[test] + fn alltypes_tiny_pages() { + // /parquet-testing/data/alltypes_tiny_pages.parquet + // row_groups: 1 + // "id": Int32({min: Some(0), max: Some(7299), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "bool_col": Boolean({min: Some(false), max: Some(true), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "tinyint_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "smallint_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "int_col": Int32({min: Some(0), max: Some(9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "bigint_col": Int64({min: Some(0), max: Some(90), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "float_col": Float({min: Some(0.0), max: Some(9.9), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "double_col": Double({min: Some(0.0), max: Some(90.89999999999999), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "date_string_col": ByteArray({min: Some(ByteArray { data: "01/01/09" }), max: Some(ByteArray { data: "12/31/10" }), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "string_col": ByteArray({min: Some(ByteArray { data: "0" }), max: Some(ByteArray { data: "9" }), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "timestamp_col": Int96({min: None, max: None, distinct_count: None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true}) + // "year": Int32({min: Some(2009), max: Some(2010), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + // "month": Int32({min: Some(1), max: Some(12), distinct_count: None, null_count: 0, min_max_deprecated: false, min_max_backwards_compatible: false}) + TestFile::new("alltypes_tiny_pages.parquet") + .with_column(ExpectedColumn { + name: "id", + expected_min: i32_array([Some(0)]), + expected_max: i32_array([Some(7299)]), + }) + .with_column(ExpectedColumn { + name: "bool_col", + expected_min: bool_array([Some(false)]), + expected_max: bool_array([Some(true)]), + }) + .with_column(ExpectedColumn { + name: "tinyint_col", + expected_min: i8_array([Some(0)]), + expected_max: i8_array([Some(9)]), + }) + .with_column(ExpectedColumn { + name: "smallint_col", + expected_min: i16_array([Some(0)]), + expected_max: i16_array([Some(9)]), + }) + .with_column(ExpectedColumn { + name: "int_col", + expected_min: i32_array([Some(0)]), + expected_max: i32_array([Some(9)]), + }) + .with_column(ExpectedColumn { + name: "bigint_col", + expected_min: i64_array([Some(0)]), + expected_max: i64_array([Some(90)]), + }) + .with_column(ExpectedColumn { + name: "float_col", + expected_min: f32_array([Some(0.0)]), + expected_max: f32_array([Some(9.9)]), + }) + .with_column(ExpectedColumn { + name: "double_col", + expected_min: f64_array([Some(0.0)]), + expected_max: f64_array([Some(90.89999999999999)]), + }) + .with_column(ExpectedColumn { + name: "date_string_col", + expected_min: utf8_array([Some("01/01/09")]), + expected_max: utf8_array([Some("12/31/10")]), + }) + .with_column(ExpectedColumn { + name: "string_col", + expected_min: utf8_array([Some("0")]), + expected_max: utf8_array([Some("9")]), + }) + // File has no min/max for timestamp_col + .with_column(ExpectedColumn { + name: "timestamp_col", + expected_min: timestamp_nanoseconds_array([None], None), + expected_max: timestamp_nanoseconds_array([None], None), + }) + .with_column(ExpectedColumn { + name: "year", + expected_min: i32_array([Some(2009)]), + expected_max: i32_array([Some(2010)]), + }) + .with_column(ExpectedColumn { + name: "month", + expected_min: i32_array([Some(1)]), + expected_max: i32_array([Some(12)]), + }) + .run(); + } + + #[test] + fn fixed_length_decimal_legacy() { + // /parquet-testing/data/fixed_length_decimal_legacy.parquet + // row_groups: 1 + // "value": FixedLenByteArray({min: Some(FixedLenByteArray(ByteArray { data: Some(ByteBufferPtr { data: b"\0\0\0\0\0\xc8" }) })), max: Some(FixedLenByteArray(ByteArray { data: "\0\0\0\0\t`" })), distinct_count: None, null_count: 0, min_max_deprecated: true, min_max_backwards_compatible: true}) + + TestFile::new("fixed_length_decimal_legacy.parquet") + .with_column(ExpectedColumn { + name: "value", + expected_min: Arc::new( + Decimal128Array::from(vec![Some(200)]) + .with_precision_and_scale(13, 2) + .unwrap(), + ), + expected_max: Arc::new( + Decimal128Array::from(vec![Some(2400)]) + .with_precision_and_scale(13, 2) + .unwrap(), + ), + }) + .run(); + } + + const ROWS_PER_ROW_GROUP: usize = 3; + + /// Writes the input batch into a parquet file, with every every three rows as + /// their own row group, and compares the min/maxes to the expected values + struct Test { + input: ArrayRef, + expected_min: ArrayRef, + expected_max: ArrayRef, + } + + impl Test { + fn run(self) { + let Self { + input, + expected_min, + expected_max, + } = self; + + let input_batch = RecordBatch::try_from_iter([("c1", input)]).unwrap(); + + let schema = input_batch.schema(); + + let metadata = parquet_metadata(schema.clone(), input_batch); + let parquet_schema = metadata.file_metadata().schema_descr(); + + let row_groups = metadata.row_groups(); + + for field in schema.fields() { + if field.data_type().is_nested() { + let lookup = parquet_column(parquet_schema, &schema, field.name()); + assert_eq!(lookup, None); + continue; + } + + let converter = + StatisticsConverter::try_new(field.name(), &schema, parquet_schema).unwrap(); + + assert_eq!(converter.arrow_field(), field.as_ref()); + + let mins = converter.row_group_mins(row_groups.iter()).unwrap(); + assert_eq!( + &mins, + &expected_min, + "Min. Statistics\n\n{}\n\n", + DisplayStats(row_groups) + ); + + let maxes = converter.row_group_maxes(row_groups.iter()).unwrap(); + assert_eq!( + &maxes, + &expected_max, + "Max. Statistics\n\n{}\n\n", + DisplayStats(row_groups) + ); + } + } + } + + /// Write the specified batches out as parquet and return the metadata + fn parquet_metadata(schema: SchemaRef, batch: RecordBatch) -> Arc { + let props = WriterProperties::builder() + .set_statistics_enabled(EnabledStatistics::Chunk) + .set_max_row_group_size(ROWS_PER_ROW_GROUP) + .build(); + + let mut buffer = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buffer, schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + writer.close().unwrap(); + + let reader = ArrowReaderBuilder::try_new(Bytes::from(buffer)).unwrap(); + reader.metadata().clone() + } + + /// Formats the statistics nicely for display + struct DisplayStats<'a>(&'a [RowGroupMetaData]); + impl<'a> std::fmt::Display for DisplayStats<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let row_groups = self.0; + writeln!(f, " row_groups: {}", row_groups.len())?; + for rg in row_groups { + for col in rg.columns() { + if let Some(statistics) = col.statistics() { + writeln!(f, " {}: {:?}", col.column_path(), statistics)?; + } + } + } + Ok(()) + } + } + + struct ExpectedColumn { + name: &'static str, + expected_min: ArrayRef, + expected_max: ArrayRef, + } + + /// Reads statistics out of the specified, and compares them to the expected values + struct TestFile { + file_name: &'static str, + expected_columns: Vec, + } + + impl TestFile { + fn new(file_name: &'static str) -> Self { + Self { + file_name, + expected_columns: Vec::new(), + } + } + + fn with_column(mut self, column: ExpectedColumn) -> Self { + self.expected_columns.push(column); + self + } + + /// Reads the specified parquet file and validates that the expected min/max + /// values for the specified columns are as expected. + fn run(self) { + let path = PathBuf::from(parquet_test_data()).join(self.file_name); + let file = File::open(path).unwrap(); + let reader = ArrowReaderBuilder::try_new(file).unwrap(); + let arrow_schema = reader.schema(); + let metadata = reader.metadata(); + let row_groups = metadata.row_groups(); + let parquet_schema = metadata.file_metadata().schema_descr(); + + for expected_column in self.expected_columns { + let ExpectedColumn { + name, + expected_min, + expected_max, + } = expected_column; + + let converter = + StatisticsConverter::try_new(name, arrow_schema, parquet_schema).unwrap(); + + // test accessors on the converter + let parquet_column_index = + parquet_column(parquet_schema, arrow_schema, name).map(|(idx, _field)| idx); + assert_eq!(converter.parquet_column_index(), parquet_column_index); + assert_eq!(converter.arrow_field().name(), name); + + let actual_min = converter.row_group_mins(row_groups.iter()).unwrap(); + assert_eq!(&expected_min, &actual_min, "column {name}"); + + let actual_max = converter.row_group_maxes(row_groups.iter()).unwrap(); + assert_eq!(&expected_max, &actual_max, "column {name}"); + } + } + } + + fn bool_array(input: impl IntoIterator>) -> ArrayRef { + let array: BooleanArray = input.into_iter().collect(); + Arc::new(array) + } + + fn i8_array(input: impl IntoIterator>) -> ArrayRef { + let array: Int8Array = input.into_iter().collect(); + Arc::new(array) + } + + fn i16_array(input: impl IntoIterator>) -> ArrayRef { + let array: Int16Array = input.into_iter().collect(); + Arc::new(array) + } + + fn i32_array(input: impl IntoIterator>) -> ArrayRef { + let array: Int32Array = input.into_iter().collect(); + Arc::new(array) + } + + fn i64_array(input: impl IntoIterator>) -> ArrayRef { + let array: Int64Array = input.into_iter().collect(); + Arc::new(array) + } + + fn f32_array(input: impl IntoIterator>) -> ArrayRef { + let array: Float32Array = input.into_iter().collect(); + Arc::new(array) + } + + fn f64_array(input: impl IntoIterator>) -> ArrayRef { + let array: Float64Array = input.into_iter().collect(); + Arc::new(array) + } + + fn timestamp_nanoseconds_array( + input: impl IntoIterator>, + timzezone: Option<&str>, + ) -> ArrayRef { + let array: TimestampNanosecondArray = input.into_iter().collect(); + match timzezone { + Some(tz) => Arc::new(array.with_timezone(tz)), + None => Arc::new(array), + } + } + + fn utf8_array<'a>(input: impl IntoIterator>) -> ArrayRef { + let array: StringArray = input + .into_iter() + .map(|s| s.map(|s| s.to_string())) + .collect(); + Arc::new(array) + } +} diff --git a/testing b/testing index e270341fb5f3..735ae7128d57 160000 --- a/testing +++ b/testing @@ -1 +1 @@ -Subproject commit e270341fb5f3ff785410e6286cc42898e9d6a99c +Subproject commit 735ae7128d571398dd798d7ff004adebeb342883