Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Update Parquet row filtering to handle type coercion #10716

Merged
merged 5 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ impl FileOpener for ParquetOpener {
builder.metadata(),
reorder_predicates,
&file_metrics,
Arc::clone(&schema_mapping),
);

match row_filter {
Expand Down
120 changes: 110 additions & 10 deletions datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
use std::collections::BTreeSet;
use std::sync::Arc;

use super::ParquetFileMetrics;
use crate::physical_plan::metrics;

use arrow::array::BooleanArray;
use arrow::datatypes::{DataType, Schema};
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;

use crate::datasource::schema_adapter::SchemaMapper;
use datafusion_common::cast::as_boolean_array;
use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
Expand All @@ -34,9 +36,9 @@ use datafusion_physical_expr::expressions::{Column, Literal};
use datafusion_physical_expr::utils::reassign_predicate_columns;
use datafusion_physical_expr::{split_conjunction, PhysicalExpr};

use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
use parquet::arrow::ProjectionMask;
use parquet::file::metadata::ParquetMetaData;
use crate::physical_plan::metrics;

use super::ParquetFileMetrics;

/// This module contains utilities for enabling the pushdown of DataFusion filter predicates (which
/// can be any DataFusion `Expr` that evaluates to a `BooleanArray`) to the parquet decoder level in `arrow-rs`.
Expand Down Expand Up @@ -78,6 +80,8 @@ pub(crate) struct DatafusionArrowPredicate {
rows_filtered: metrics::Count,
/// how long was spent evaluating this predicate
time: metrics::Time,
/// used to perform type coercion while filtering rows
schema_mapping: Arc<dyn SchemaMapper>,
}

impl DatafusionArrowPredicate {
Expand All @@ -87,6 +91,7 @@ impl DatafusionArrowPredicate {
metadata: &ParquetMetaData,
rows_filtered: metrics::Count,
time: metrics::Time,
schema_mapping: Arc<dyn SchemaMapper>,
) -> Result<Self> {
let schema = Arc::new(schema.project(&candidate.projection)?);
let physical_expr = reassign_predicate_columns(candidate.expr, &schema, true)?;
Expand All @@ -108,6 +113,7 @@ impl DatafusionArrowPredicate {
),
rows_filtered,
time,
schema_mapping,
})
}
}
Expand All @@ -123,6 +129,8 @@ impl ArrowPredicate for DatafusionArrowPredicate {
false => batch.project(&self.projection)?,
};

let batch = self.schema_mapping.map_partial_batch(batch)?;

// scoped timer updates on drop
let mut timer = self.time.timer();
match self
Expand Down Expand Up @@ -323,6 +331,7 @@ pub fn build_row_filter(
metadata: &ParquetMetaData,
reorder_predicates: bool,
file_metrics: &ParquetFileMetrics,
schema_mapping: Arc<dyn SchemaMapper>,
) -> Result<Option<RowFilter>> {
let rows_filtered = &file_metrics.pushdown_rows_filtered;
let time = &file_metrics.pushdown_eval_time;
Expand Down Expand Up @@ -360,6 +369,7 @@ pub fn build_row_filter(
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;

filters.push(Box::new(filter));
Expand All @@ -372,6 +382,7 @@ pub fn build_row_filter(
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;

filters.push(Box::new(filter));
Expand All @@ -387,6 +398,7 @@ pub fn build_row_filter(
metadata,
rows_filtered.clone(),
time.clone(),
Arc::clone(&schema_mapping),
)?;

filters.push(Box::new(filter));
Expand All @@ -398,15 +410,23 @@ pub fn build_row_filter(

#[cfg(test)]
mod test {
use super::*;
use arrow::datatypes::Field;
use arrow_schema::TimeUnit::Nanosecond;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::reader::{FileReader, SerializedFileReader};
use rand::prelude::*;

use crate::datasource::schema_adapter::DefaultSchemaAdapterFactory;
use crate::datasource::schema_adapter::SchemaAdapterFactory;

use datafusion_common::ToDFSchema;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{cast, col, lit, Expr};
use datafusion_physical_expr::create_physical_expr;
use parquet::arrow::parquet_to_arrow_schema;
use parquet::file::reader::{FileReader, SerializedFileReader};
use rand::prelude::*;
use datafusion_physical_plan::metrics::{Count, Time};

use super::*;

// We should ignore predicate that read non-primitive columns
#[test]
Expand Down Expand Up @@ -473,6 +493,86 @@ mod test {
);
}

#[test]
fn test_filter_type_coercion() {
let testdata = crate::test_util::parquet_test_data();
let file = std::fs::File::open(format!("{testdata}/alltypes_plain.parquet"))
.expect("opening file");

let parquet_reader_builder =
ParquetRecordBatchReaderBuilder::try_new(file).expect("creating reader");
let metadata = parquet_reader_builder.metadata().clone();
let file_schema = parquet_reader_builder.schema().clone();

// This is the schema we would like to coerce to,
// which is different from the physical schema of the file.
let table_schema = Schema::new(vec![Field::new(
"timestamp_col",
DataType::Timestamp(Nanosecond, Some(Arc::from("UTC"))),
false,
)]);

let schema_adapter =
DefaultSchemaAdapterFactory {}.create(Arc::new(table_schema.clone()));
let (schema_mapping, _) = schema_adapter
.map_schema(&file_schema)
.expect("creating schema mapping");

let mut parquet_reader = parquet_reader_builder.build().expect("building reader");

// Parquet file is small, we only need 1 recordbatch
let first_rb = parquet_reader
.next()
.expect("expected record batch")
.expect("expected error free record batch");

// Test all should fail
let expr = col("timestamp_col").lt(Expr::Literal(
ScalarValue::TimestampNanosecond(Some(1), Some(Arc::from("UTC"))),
));
let expr = logical2physical(&expr, &table_schema);
let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema)
.build(&metadata)
.expect("building candidate")
.expect("candidate expected");

let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
&file_schema,
&metadata,
Count::new(),
Time::new(),
Arc::clone(&schema_mapping),
)
.expect("creating filter predicate");

let filtered = row_filter.evaluate(first_rb.clone());
assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![false; 8])));

// Test all should pass
let expr = col("timestamp_col").gt(Expr::Literal(
ScalarValue::TimestampNanosecond(Some(0), Some(Arc::from("UTC"))),
));
let expr = logical2physical(&expr, &table_schema);
let candidate = FilterCandidateBuilder::new(expr, &file_schema, &table_schema)
.build(&metadata)
.expect("building candidate")
.expect("candidate expected");

let mut row_filter = DatafusionArrowPredicate::try_new(
candidate,
&file_schema,
&metadata,
Count::new(),
Time::new(),
schema_mapping,
)
.expect("creating filter predicate");

let filtered = row_filter.evaluate(first_rb);
assert!(matches!(filtered, Ok(a) if a == BooleanArray::from(vec![true; 8])));
}

#[test]
fn test_remap_projection() {
let mut rng = thread_rng();
Expand Down
46 changes: 45 additions & 1 deletion datafusion/core/src/datasource/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,21 @@ pub trait SchemaAdapter: Send + Sync {

/// Creates a `SchemaMapping` that can be used to cast or map the columns
/// from the file schema to the table schema.
pub trait SchemaMapper: Send + Sync {
pub trait SchemaMapper: Debug + Send + Sync {
/// Adapts a `RecordBatch` to match the `table_schema` using the stored mapping and conversions.
fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result<RecordBatch>;

/// Adapts a [`RecordBatch`] that does not have all the columns from the
/// file schema.
///
/// This method is used when applying a filter to a subset of the columns during
/// an `ArrowPredicate`.
///
/// This method is slower than `map_batch` as it looks up columns by name.
fn map_partial_batch(
&self,
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch>;
}

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -185,6 +197,31 @@ impl SchemaMapper for SchemaMapping {
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}

fn map_partial_batch(
&self,
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch> {
let batch_cols = batch.columns().to_vec();
let schema = batch.schema();

let mut cols = vec![];
let mut fields = vec![];
for (i, f) in schema.fields().iter().enumerate() {
let table_field = self.table_schema.field_with_name(f.name());
if let Ok(tf) = table_field {
cols.push(cast(&batch_cols[i], tf.data_type())?);
fields.push(tf.clone());
}
}

// Necessary to handle empty batches
let options = RecordBatchOptions::new().with_row_count(Some(batch.num_rows()));

let schema = Arc::new(Schema::new(fields));
let record_batch = RecordBatch::try_new_with_options(schema, cols, &options)?;
Ok(record_batch)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -337,5 +374,12 @@ mod tests {

Ok(RecordBatch::try_new(schema, new_columns).unwrap())
}

fn map_partial_batch(
&self,
batch: RecordBatch,
) -> datafusion_common::Result<RecordBatch> {
self.map_batch(batch)
}
}
}