diff --git a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs index 3aec1e1d2037..5fb21975df4a 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/opener.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/opener.rs @@ -114,6 +114,7 @@ impl FileOpener for ParquetOpener { builder.metadata(), reorder_predicates, &file_metrics, + Arc::clone(&schema_mapping), ); match row_filter { diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs index 5f89ff087f70..18c6c51d2865 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs @@ -18,13 +18,15 @@ use std::collections::BTreeSet; use std::sync::Arc; -use super::ParquetFileMetrics; -use crate::physical_plan::metrics; - use arrow::array::BooleanArray; use arrow::datatypes::{DataType, Schema}; use arrow::error::{ArrowError, Result as ArrowResult}; use arrow::record_batch::RecordBatch; +use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; +use parquet::arrow::ProjectionMask; +use parquet::file::metadata::ParquetMetaData; + +use crate::datasource::schema_adapter::SchemaMapper; use datafusion_common::cast::as_boolean_array; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter, @@ -34,9 +36,9 @@ use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::utils::reassign_predicate_columns; use datafusion_physical_expr::{split_conjunction, PhysicalExpr}; -use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter}; -use parquet::arrow::ProjectionMask; -use parquet::file::metadata::ParquetMetaData; +use crate::physical_plan::metrics; + +use super::ParquetFileMetrics; /// This module contains utilities for enabling the pushdown of DataFusion filter predicates (which /// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the parquet decoder level in `arrow-rs`. @@ -78,6 +80,8 @@ pub(crate) struct DatafusionArrowPredicate { rows_filtered: metrics::Count, /// how long was spent evaluating this predicate time: metrics::Time, + /// used to perform type coercion while filtering rows + schema_mapping: Arc, } impl DatafusionArrowPredicate { @@ -87,6 +91,7 @@ impl DatafusionArrowPredicate { metadata: &ParquetMetaData, rows_filtered: metrics::Count, time: metrics::Time, + schema_mapping: Arc, ) -> Result { let schema = Arc::new(schema.project(&candidate.projection)?); let physical_expr = reassign_predicate_columns(candidate.expr, &schema, true)?; @@ -108,6 +113,7 @@ impl DatafusionArrowPredicate { ), rows_filtered, time, + schema_mapping, }) } } @@ -123,6 +129,8 @@ impl ArrowPredicate for DatafusionArrowPredicate { false => batch.project(&self.projection)?, }; + let batch = self.schema_mapping.map_partial_batch(batch)?; + // scoped timer updates on drop let mut timer = self.time.timer(); match self @@ -323,6 +331,7 @@ pub fn build_row_filter( metadata: &ParquetMetaData, reorder_predicates: bool, file_metrics: &ParquetFileMetrics, + schema_mapping: Arc, ) -> Result> { let rows_filtered = &file_metrics.pushdown_rows_filtered; let time = &file_metrics.pushdown_eval_time; @@ -360,6 +369,7 @@ pub fn build_row_filter( metadata, rows_filtered.clone(), time.clone(), + Arc::clone(&schema_mapping), )?; filters.push(Box::new(filter)); @@ -372,6 +382,7 @@ pub fn build_row_filter( metadata, rows_filtered.clone(), time.clone(), + Arc::clone(&schema_mapping), )?; filters.push(Box::new(filter)); @@ -387,6 +398,7 @@ pub fn build_row_filter( metadata, rows_filtered.clone(), time.clone(), + Arc::clone(&schema_mapping), )?; filters.push(Box::new(filter)); @@ -398,15 +410,23 @@ pub fn build_row_filter( #[cfg(test)] mod test { - use super::*; use arrow::datatypes::Field; + use arrow_schema::TimeUnit::Nanosecond; + use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; + use parquet::arrow::parquet_to_arrow_schema; + use parquet::file::reader::{FileReader, SerializedFileReader}; + use rand::prelude::*; + + use crate::datasource::schema_adapter::DefaultSchemaAdapterFactory; + use crate::datasource::schema_adapter::SchemaAdapterFactory; + use datafusion_common::ToDFSchema; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{cast, col, lit, Expr}; use datafusion_physical_expr::create_physical_expr; - use parquet::arrow::parquet_to_arrow_schema; - use parquet::file::reader::{FileReader, SerializedFileReader}; - use rand::prelude::*; + use datafusion_physical_plan::metrics::{Count, Time}; + + use super::*; // We should ignore predicate that read non-primitive columns #[test] @@ -473,6 +493,86 @@ mod test { ); } + #[test] + fn test_filter_type_coercion() { + let testdata = crate::test_util::parquet_test_data(); + let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet")) + .expect("opening file"); + + let parquet_reader_builder = + ParquetRecordBatchReaderBuilder::try_new(file).expect("creating reader"); + let metadata = parquet_reader_builder.metadata().clone(); + let file_schema = parquet_reader_builder.schema().clone(); + + // This is the schema we would like to coerce to, + // which is different from the physical schema of the file. + let table_schema = Schema::new(vec![Field::new( + "timestamp_col", + DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))), + false, + )]); + + let schema_adapter = + DefaultSchemaAdapterFactory {}.create(Arc::new(table_schema.clone())); + let (schema_mapping, _) = schema_adapter + .map_schema(&file_schema) + .expect("creating schema mapping"); + + let mut parquet_reader = parquet_reader_builder.build().expect("building reader"); + + // Parquet file is small, we only need 1 recordbatch + let first_rb = parquet_reader + .next() + .expect("expected record batch") + .expect("expected error free record batch"); + + // Test all should fail + let expr = col("timestamp_col").lt(Expr::Literal( + ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))), + )); + let expr = logical2physical(&expr, &table_schema); + let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema) + .build(&metadata) + .expect("building candidate") + .expect("candidate expected"); + + let mut row_filter = DatafusionArrowPredicate::try_new( + candidate, + &file_schema, + &metadata, + Count::new(), + Time::new(), + Arc::clone(&schema_mapping), + ) + .expect("creating filter predicate"); + + let filtered = row_filter.evaluate(first_rb.clone()); + assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![false; 8]))); + + // Test all should pass + let expr = col("timestamp_col").gt(Expr::Literal( + ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))), + )); + let expr = logical2physical(&expr, &table_schema); + let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema) + .build(&metadata) + .expect("building candidate") + .expect("candidate expected"); + + let mut row_filter = DatafusionArrowPredicate::try_new( + candidate, + &file_schema, + &metadata, + Count::new(), + Time::new(), + schema_mapping, + ) + .expect("creating filter predicate"); + + let filtered = row_filter.evaluate(first_rb); + assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8]))); + } + #[test] fn test_remap_projection() { let mut rng = thread_rng(); diff --git a/datafusion/core/src/datasource/schema_adapter.rs b/datafusion/core/src/datasource/schema_adapter.rs index 77fde608fd05..e8b64e90900c 100644 --- a/datafusion/core/src/datasource/schema_adapter.rs +++ b/datafusion/core/src/datasource/schema_adapter.rs @@ -75,9 +75,21 @@ pub trait SchemaAdapter: Send + Sync { /// Creates a `SchemaMapping` that can be used to cast or map the columns /// from the file schema to the table schema. -pub trait SchemaMapper: Send + Sync { +pub trait SchemaMapper: Debug + Send + Sync { /// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions. fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result; + + /// Adapts a [`RecordBatch`] that does not have all the columns from the + /// file schema. + /// + /// This method is used when applying a filter to a subset of the columns during + /// an `ArrowPredicate`. + /// + /// This method is slower than `map_batch` as it looks up columns by name. + fn map_partial_batch( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result; } #[derive(Clone, Debug, Default)] @@ -185,6 +197,31 @@ impl SchemaMapper for SchemaMapping { let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; Ok(record_batch) } + + fn map_partial_batch( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result { + let batch_cols = batch.columns().to_vec(); + let schema = batch.schema(); + + let mut cols = vec![]; + let mut fields = vec![]; + for (i, f) in schema.fields().iter().enumerate() { + let table_field = self.table_schema.field_with_name(f.name()); + if let Ok(tf) = table_field { + cols.push(cast(&batch_cols[i], tf.data_type())?); + fields.push(tf.clone()); + } + } + + // Necessary to handle empty batches + let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows())); + + let schema = Arc::new(Schema::new(fields)); + let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?; + Ok(record_batch) + } } #[cfg(test)] @@ -337,5 +374,12 @@ mod tests { Ok(RecordBatch::try_new(schema, new_columns).unwrap()) } + + fn map_partial_batch( + &self, + batch: RecordBatch, + ) -> datafusion_common::Result { + self.map_batch(batch) + } } }