From 235b04461cbf7ffe9be77fcca4c316fb3b58306f Mon Sep 17 00:00:00 2001 From: Jeffrey Smith II Date: Wed, 29 May 2024 10:37:38 -0400 Subject: [PATCH 1/4] test: Add a failing test to show the lack of type coercion in row filters --- .../physical_plan/parquet/row_filter.rs | 93 +++++++++++++++++-- 1 file changed, 83 insertions(+), 10 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs index 5f89ff087f70..69fc140f49a2 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs @@ -18,13 +18,14 @@ use std::collections::BTreeSet; use std::sync::Arc; -use super::ParquetFileMetrics; -use crate::physical_plan::metrics; - use arrow::array::BooleanArray; use arrow::datatypes::{DataType, Schema}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; +use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; +use parquet::arrow::ProjectionMask; +use parquet::file::metadata::ParquetMetaData; + use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, @@ -34,9 +35,9 @@ use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::utils::reassign_predicate_columns; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; -use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; -use parquet::arrow::ProjectionMask; -use parquet::file::metadata::ParquetMetaData; +use crate::physical_plan::metrics; + +use super::ParquetFileMetrics; /// This module contains utilities for enabling the pushdown of DataFusion filter predicates (which /// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the parquet decoder level in `arrow-rs`. @@ -398,15 +399,22 @@ pub fn build_row_filter( #[cfg(test)] mod test { - use super::*; + use arrow::compute::kernels::cast_utils::Parser; use arrow::datatypes::Field; + use arrow_array::types::TimestampNanosecondType; + use arrow_array::TimestampNanosecondArray; + use arrow_schema::TimeUnit::Nanosecond; + use parquet::arrow::parquet_to_arrow_schema; + use parquet::file::reader::{FileReader, SerializedFileReader}; + use rand::prelude::*; + use datafusion_common::ToDFSchema; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{cast, col, lit, Expr}; use datafusion_physical_expr::create_physical_expr; - use parquet::arrow::parquet_to_arrow_schema; - use parquet::file::reader::{FileReader, SerializedFileReader}; - use rand::prelude::*; + use datafusion_physical_plan::metrics::{Count, Time}; + + use super::*; // We should ignore predicate that read non-primitive columns #[test] @@ -473,6 +481,71 @@ mod test { ); } + #[test] + fn test_filter_type_coercion() { + let testdata = crate::test_util::parquet_test_data(); + let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet")) + .expect("opening file"); + + let reader = SerializedFileReader::new(file).expect("creating reader"); + let metadata = reader.metadata(); + let file_schema = + parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) + .expect("parsing schema"); + + // This is the schema we would like to coerce to, + // which is different from the physical schema of the file. + let table_schema = Schema::new(vec![Field::new( + "timestamp_col", + DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))), + false, + )]); + + let expr = col("timestamp_col").eq(Expr::Literal( + ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))), + )); + let expr = logical2physical(&expr, &table_schema); + let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema) + .build(metadata) + .expect("building candidate") + .expect("candidate expected"); + + let mut row_filter = DatafusionArrowPredicate::try_new( + candidate, + &file_schema, + metadata, + Count::new(), + Time::new(), + ) + .expect("creating filter predicate"); + + // Create some fake data as if it was from the parquet file + let ts_array = TimestampNanosecondArray::new( + vec![TimestampNanosecondType::parse("2020-01-01T00:00:00") + .expect("should parse")] + .into(), + None, + ); + // We need a matching schema to create a record batch + let batch_schema = Schema::new(vec![Field::new( + "timestamp", + DataType::Timestamp(Nanosecond, None), + false, + )]); + + let record_batch = + RecordBatch::try_new(Arc::new(batch_schema), vec![Arc::new(ts_array)]) + .expect("creating record batch"); + + let filtered = row_filter.evaluate(record_batch); + + let message = String::from("Error evaluating filter predicate: ArrowError(InvalidArgumentError(\"Invalid comparison operation: Timestamp(Nanosecond, None) == Timestamp(Nanosecond, Some(\\\"UTC\\\"))\"), None)"); + assert!(matches!(filtered, Err(ArrowError::ComputeError(msg)) if message == msg)); + + // This currently fails (and should replace the above assert once passing) + // assert!(matches!(filtered, Ok(_))); + } + #[test] fn test_remap_projection() { let mut rng = thread_rng(); From 484b2fd87492b071ecb614e320b23eee7d4ecca4 Mon Sep 17 00:00:00 2001 From: Jeffrey Smith II Date: Wed, 29 May 2024 15:52:46 -0400 Subject: [PATCH 2/4] feat: update parquet row filter to handle type coercion --- .../datasource/physical_plan/parquet/mod.rs | 1 + .../physical_plan/parquet/row_filter.rs | 29 +++++++++++--- .../core/src/datasource/schema_adapter.rs | 38 ++++++++++++++++++- 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 6655125ea876..b6ec18c9a19a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -621,6 +621,7 @@ impl FileOpener for ParquetOpener { builder.metadata(), reorder_predicates, &file_metrics, + Arc::clone(&schema_mapping), ); match row_filter { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs index 69fc140f49a2..2c00b45fa297 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs @@ -26,6 +26,7 @@ use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; use parquet::arrow::ProjectionMask; use parquet::file::metadata::ParquetMetaData; +use crate::datasource::schema_adapter::SchemaMapper; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, @@ -79,6 +80,8 @@ pub(crate) struct DatafusionArrowPredicate { rows_filtered: metrics::Count, /// how long was spent evaluating this predicate time: metrics::Time, + /// used to perform type coercion while filtering rows + schema_mapping: Arc, } impl DatafusionArrowPredicate { @@ -88,6 +91,7 @@ impl DatafusionArrowPredicate { metadata: &ParquetMetaData, rows_filtered: metrics::Count, time: metrics::Time, + schema_mapping: Arc, ) -> Result { let schema = Arc::new(schema.project(&candidate.projection)?); let physical_expr = reassign_predicate_columns(candidate.expr, &schema, true)?; @@ -109,6 +113,7 @@ impl DatafusionArrowPredicate { ), rows_filtered, time, + schema_mapping, }) } } @@ -124,6 +129,8 @@ impl ArrowPredicate for DatafusionArrowPredicate { false => batch.project(&self.projection)?, }; + let batch = self.schema_mapping.map_partial_batch(batch)?; + // scoped timer updates on drop let mut timer = self.time.timer(); match self @@ -324,6 +331,7 @@ pub fn build_row_filter( metadata: &ParquetMetaData, reorder_predicates: bool, file_metrics: &ParquetFileMetrics, + schema_mapping: Arc, ) -> Result> { let rows_filtered = &file_metrics.pushdown_rows_filtered; let time = &file_metrics.pushdown_eval_time; @@ -361,6 +369,7 @@ pub fn build_row_filter( metadata, rows_filtered.clone(), time.clone(), + Arc::clone(&schema_mapping), )?; filters.push(Box::new(filter)); @@ -373,6 +382,7 @@ pub fn build_row_filter( metadata, rows_filtered.clone(), time.clone(), + Arc::clone(&schema_mapping), )?; filters.push(Box::new(filter)); @@ -388,6 +398,7 @@ pub fn build_row_filter( metadata, rows_filtered.clone(), time.clone(), + Arc::clone(&schema_mapping), )?; filters.push(Box::new(filter)); @@ -408,6 +419,9 @@ mod test { use parquet::file::reader::{FileReader, SerializedFileReader}; use rand::prelude::*; + use crate::datasource::schema_adapter::DefaultSchemaAdapterFactory; + use crate::datasource::schema_adapter::SchemaAdapterFactory; + use datafusion_common::ToDFSchema; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{cast, col, lit, Expr}; @@ -510,12 +524,19 @@ mod test { .expect("building candidate") .expect("candidate expected"); + let schema_adapter = DefaultSchemaAdapterFactory{}.create(Arc::new(table_schema)); + let (schema_mapping, _) = schema_adapter + .map_schema(&file_schema) + .expect("creating schema mapping"); + + let mut row_filter = DatafusionArrowPredicate::try_new( candidate, &file_schema, metadata, Count::new(), Time::new(), + schema_mapping, ) .expect("creating filter predicate"); @@ -528,7 +549,7 @@ mod test { ); // We need a matching schema to create a record batch let batch_schema = Schema::new(vec![Field::new( - "timestamp", + "timestamp_col", DataType::Timestamp(Nanosecond, None), false, )]); @@ -539,11 +560,7 @@ mod test { let filtered = row_filter.evaluate(record_batch); - let message = String::from("Error evaluating filter predicate: ArrowError(InvalidArgumentError(\"Invalid comparison operation: Timestamp(Nanosecond, None) == Timestamp(Nanosecond, Some(\\\"UTC\\\"))\"), None)"); - assert!(matches!(filtered, Err(ArrowError::ComputeError(msg)) if message == msg)); - - // This currently fails (and should replace the above assert once passing) - // assert!(matches!(filtered, Ok(_))); + assert!(matches!(filtered, Ok(_))); } #[test] diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index 1838a3354b9c..c1cb8140efb7 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -75,9 +75,16 @@ pub trait SchemaAdapter: Send + Sync { /// Creates a `SchemaMapping` that can be used to cast or map the columns /// from the file schema to the table schema. -pub trait SchemaMapper: Send + Sync { +pub trait SchemaMapper: Debug + Send + Sync { /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; + + /// Adapts a `RecordBatch` that does not have all the columns (as defined in the schema). + /// This method is slower than `map_batch` and should only be used when explicitly needed. + fn map_partial_batch( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result; } #[derive(Clone, Debug, Default)] @@ -185,6 +192,31 @@ impl SchemaMapper for SchemaMapping { let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; Ok(record_batch) } + + fn map_partial_batch( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result { + let batch_cols = batch.columns().to_vec(); + let schema = batch.schema(); + + let mut cols = vec![]; + let mut fields = vec![]; + for (i, f) in schema.fields().iter().enumerate() { + let table_field = self.table_schema.field_with_name(f.name()); + if let Ok(tf) = table_field { + cols.push(cast(&batch_cols[i], tf.data_type())?); + fields.push(tf.clone()); + } + } + + // Necessary to handle empty batches + let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); + + let schema = Arc::new(Schema::new(fields)); + let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; + Ok(record_batch) + } } #[cfg(test)] @@ -339,5 +371,9 @@ mod tests { Ok(RecordBatch::try_new(schema, new_columns).unwrap()) } + + fn map_partial_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + self.map_batch(batch) + } } } From c976ad1dfdc000f53832fabd4724c13c12a1aadd Mon Sep 17 00:00:00 2001 From: Jeffrey Smith II Date: Thu, 30 May 2024 11:20:20 -0400 Subject: [PATCH 3/4] chore: lint/fmt --- .../core/src/datasource/physical_plan/parquet/row_filter.rs | 6 +++--- datafusion/core/src/datasource/schema_adapter.rs | 5 ++++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs index 2c00b45fa297..1d0826664ea6 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs @@ -524,12 +524,12 @@ mod test { .expect("building candidate") .expect("candidate expected"); - let schema_adapter = DefaultSchemaAdapterFactory{}.create(Arc::new(table_schema)); + let schema_adapter = + DefaultSchemaAdapterFactory {}.create(Arc::new(table_schema)); let (schema_mapping, _) = schema_adapter .map_schema(&file_schema) .expect("creating schema mapping"); - let mut row_filter = DatafusionArrowPredicate::try_new( candidate, &file_schema, @@ -560,7 +560,7 @@ mod test { let filtered = row_filter.evaluate(record_batch); - assert!(matches!(filtered, Ok(_))); + assert!(filtered.is_ok()); } #[test] diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index c1cb8140efb7..8b90309e9425 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -372,7 +372,10 @@ mod tests { Ok(RecordBatch::try_new(schema, new_columns).unwrap()) } - fn map_partial_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + fn map_partial_batch( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result { self.map_batch(batch) } } From f22b915cdbba956e2cc7dd9fd5a7fbebb6643316 Mon Sep 17 00:00:00 2001 From: Jeffrey Smith II Date: Mon, 3 Jun 2024 15:47:47 -0400 Subject: [PATCH 4/4] chore: test improvements and cleanup --- .../physical_plan/parquet/row_filter.rs | 82 +++++++++++-------- .../core/src/datasource/schema_adapter.rs | 9 +- 2 files changed, 53 insertions(+), 38 deletions(-) diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs index 1d0826664ea6..18c6c51d2865 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs @@ -410,11 +410,9 @@ pub fn build_row_filter( #[cfg(test)] mod test { - use arrow::compute::kernels::cast_utils::Parser; use arrow::datatypes::Field; - use arrow_array::types::TimestampNanosecondType; - use arrow_array::TimestampNanosecondArray; use arrow_schema::TimeUnit::Nanosecond; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::reader::{FileReader, SerializedFileReader}; use rand::prelude::*; @@ -501,11 +499,10 @@ mod test { let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet")) .expect("opening file"); - let reader = SerializedFileReader::new(file).expect("creating reader"); - let metadata = reader.metadata(); - let file_schema = - parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), None) - .expect("parsing schema"); + let parquet_reader_builder = + ParquetRecordBatchReaderBuilder::try_new(file).expect("creating reader"); + let metadata = parquet_reader_builder.metadata().clone(); + let file_schema = parquet_reader_builder.schema().clone(); // This is the schema we would like to coerce to, // which is different from the physical schema of the file. @@ -515,52 +512,65 @@ mod test { false, )]); - let expr = col("timestamp_col").eq(Expr::Literal( + let schema_adapter = + DefaultSchemaAdapterFactory {}.create(Arc::new(table_schema.clone())); + let (schema_mapping, _) = schema_adapter + .map_schema(&file_schema) + .expect("creating schema mapping"); + + let mut parquet_reader = parquet_reader_builder.build().expect("building reader"); + + // Parquet file is small, we only need 1 recordbatch + let first_rb = parquet_reader + .next() + .expect("expected record batch") + .expect("expected error free record batch"); + + // Test all should fail + let expr = col("timestamp_col").lt(Expr::Literal( ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))), )); let expr = logical2physical(&expr, &table_schema); let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema) - .build(metadata) + .build(&metadata) .expect("building candidate") .expect("candidate expected"); - let schema_adapter = - DefaultSchemaAdapterFactory {}.create(Arc::new(table_schema)); - let (schema_mapping, _) = schema_adapter - .map_schema(&file_schema) - .expect("creating schema mapping"); - let mut row_filter = DatafusionArrowPredicate::try_new( candidate, &file_schema, - metadata, + &metadata, Count::new(), Time::new(), - schema_mapping, + Arc::clone(&schema_mapping), ) .expect("creating filter predicate"); - // Create some fake data as if it was from the parquet file - let ts_array = TimestampNanosecondArray::new( - vec![TimestampNanosecondType::parse("2020-01-01T00:00:00") - .expect("should parse")] - .into(), - None, - ); - // We need a matching schema to create a record batch - let batch_schema = Schema::new(vec![Field::new( - "timestamp_col", - DataType::Timestamp(Nanosecond, None), - false, - )]); + let filtered = row_filter.evaluate(first_rb.clone()); + assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![false; 8]))); - let record_batch = - RecordBatch::try_new(Arc::new(batch_schema), vec![Arc::new(ts_array)]) - .expect("creating record batch"); + // Test all should pass + let expr = col("timestamp_col").gt(Expr::Literal( + ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))), + )); + let expr = logical2physical(&expr, &table_schema); + let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema) + .build(&metadata) + .expect("building candidate") + .expect("candidate expected"); - let filtered = row_filter.evaluate(record_batch); + let mut row_filter = DatafusionArrowPredicate::try_new( + candidate, + &file_schema, + &metadata, + Count::new(), + Time::new(), + schema_mapping, + ) + .expect("creating filter predicate"); - assert!(filtered.is_ok()); + let filtered = row_filter.evaluate(first_rb); + assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8]))); } #[test] diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index 98a0ea8bf6c0..e8b64e90900c 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -79,8 +79,13 @@ pub trait SchemaMapper: Debug + Send + Sync { /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; - /// Adapts a `RecordBatch` that does not have all the columns (as defined in the schema). - /// This method is slower than `map_batch` and should only be used when explicitly needed. + /// Adapts a [`RecordBatch`] that does not have all the columns from the + /// file schema. + /// + /// This method is used when applying a filter to a subset of the columns during + /// an `ArrowPredicate`. + /// + /// This method is slower than `map_batch` as it looks up columns by name. fn map_partial_batch( &self, batch: RecordBatch,