From d3642a621c6603ce7e103df09d6ee8e64d6aeb1d Mon Sep 17 00:00:00 2001 From: Michael Maletich Date: Sun, 21 Jul 2024 00:54:38 -0500 Subject: [PATCH] feat(rust,python): cast each parquet file to delta schema (#2615) # Description By casting the read record batch to the delta schema datafusion can read tables where the underlying parquet files can be cast to the desired schema. Fixes: - Errors querying data where some of the parquet files may not have columns that were added later because of schema migration. This includes nested columns for structs that are in Maps, Lists, or children of other structs - maps and lists written with different different element names - timestamps of different units. - Any other cast supported by arrow-cast. This can be done now since data-fusion exposes a SchemaAdapter which can be overwritten. We should note that this makes all times being read by delta-rs as having microsecond precision to match the Delta protocol. # Related Issue(s) - This makes solving #2478 and #2341 just a matter of adding code to delta-rs cast. --------- Co-authored-by: Alex Wilcoxson --- .../src/delta_datafusion/find_files/mod.rs | 2 +- crates/core/src/delta_datafusion/mod.rs | 260 ++++++++++++++---- .../src/delta_datafusion/schema_adapter.rs | 68 +++++ crates/core/src/operations/constraints.rs | 2 +- crates/core/src/operations/delete.rs | 2 +- crates/core/src/operations/update.rs | 2 +- crates/core/src/operations/write.rs | 2 +- crates/core/tests/integration_datafusion.rs | 15 +- 8 files changed, 290 insertions(+), 63 deletions(-) create mode 100644 crates/core/src/delta_datafusion/schema_adapter.rs diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 347925f31f..f237a4ac8e 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -148,7 +148,7 @@ async fn scan_table_by_files( // Add path column used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - let scan = DeltaScanBuilder::new(&snapshot, log_store, &state) + let scan = DeltaScanBuilder::new(&snapshot, log_store) .with_filter(Some(expression.clone())) .with_projection(Some(&used_columns)) .with_scan_config(scan_config) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 860a02be56..fecc6f3f03 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -39,7 +39,7 @@ use arrow_cast::display::array_value_to_string; use arrow_schema::Field; use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; -use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat}; +use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder; use datafusion::datasource::physical_plan::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, }; @@ -65,7 +65,6 @@ use datafusion_common::{ use datafusion_expr::logical_plan::CreateExternalTable; use datafusion_expr::utils::conjunction; use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility}; -use datafusion_physical_expr::PhysicalExpr; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use datafusion_sql::planner::ParserOptions; @@ -77,6 +76,7 @@ use serde::{Deserialize, Serialize}; use url::Url; use crate::delta_datafusion::expr::parse_predicate_expression; +use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt}; use crate::logstore::LogStoreRef; @@ -93,6 +93,7 @@ pub mod logical; pub mod physical; mod find_files; +mod schema_adapter; impl From for DataFusionError { fn from(err: DeltaTableError) -> Self { @@ -451,7 +452,6 @@ pub(crate) struct DeltaScanBuilder<'a> { snapshot: &'a DeltaTableState, log_store: LogStoreRef, filter: Option, - state: &'a SessionState, projection: Option<&'a Vec>, limit: Option, files: Option<&'a [Add]>, @@ -460,16 +460,11 @@ pub(crate) struct DeltaScanBuilder<'a> { } impl<'a> DeltaScanBuilder<'a> { - pub fn new( - snapshot: &'a DeltaTableState, - log_store: LogStoreRef, - state: &'a SessionState, - ) -> Self { + pub fn new(snapshot: &'a DeltaTableState, log_store: LogStoreRef) -> Self { DeltaScanBuilder { snapshot, log_store, filter: None, - state, files: None, projection: None, limit: None, @@ -517,11 +512,7 @@ impl<'a> DeltaScanBuilder<'a> { let schema = match self.schema { Some(schema) => schema, - None => { - self.snapshot - .physical_arrow_schema(self.log_store.object_store()) - .await? - } + None => self.snapshot.arrow_schema()?, }; let logical_schema = df_logical_schema(self.snapshot, &config)?; @@ -635,32 +626,27 @@ impl<'a> DeltaScanBuilder<'a> { .datafusion_table_statistics() .unwrap_or(Statistics::new_unknown(&schema)); + let mut exec_plan_builder = ParquetExecBuilder::new(FileScanConfig { + object_store_url: self.log_store.object_store_url(), + file_schema, + file_groups: file_groups.into_values().collect(), + statistics: stats, + projection: self.projection.cloned(), + limit: self.limit, + table_partition_cols, + output_ordering: vec![], + }) + .with_schema_adapter_factory(Arc::new(DeltaSchemaAdapterFactory {})); + // Sometimes (i.e Merge) we want to prune files that don't make the // filter and read the entire contents for files that do match the // filter - let parquet_pushdown = if config.enable_parquet_pushdown { - logical_filter.clone() - } else { - None + if let Some(predicate) = logical_filter { + if config.enable_parquet_pushdown { + exec_plan_builder = exec_plan_builder.with_predicate(predicate); + } }; - let scan = ParquetFormat::new() - .create_physical_plan( - self.state, - FileScanConfig { - object_store_url: self.log_store.object_store_url(), - file_schema, - file_groups: file_groups.into_values().collect(), - statistics: stats, - projection: self.projection.cloned(), - limit: self.limit, - table_partition_cols, - output_ordering: vec![], - }, - parquet_pushdown.as_ref(), - ) - .await?; - let metrics = ExecutionPlanMetricsSet::new(); MetricBuilder::new(&metrics) .global_counter("files_scanned") @@ -671,7 +657,7 @@ impl<'a> DeltaScanBuilder<'a> { Ok(DeltaScan { table_uri: ensure_table_uri(self.log_store.root_uri())?.as_str().into(), - parquet_scan: scan, + parquet_scan: exec_plan_builder.build_arc(), config, logical_schema, metrics, @@ -712,7 +698,7 @@ impl TableProvider for DeltaTable { register_store(self.log_store(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session) + let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store()) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -793,7 +779,7 @@ impl TableProvider for DeltaTableProvider { register_store(self.log_store.clone(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session) + let scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone()) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -1514,7 +1500,7 @@ pub(crate) async fn find_files_scan<'a>( // Add path column used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - let scan = DeltaScanBuilder::new(snapshot, log_store, state) + let scan = DeltaScanBuilder::new(snapshot, log_store) .with_filter(Some(expression.clone())) .with_projection(Some(&used_columns)) .with_scan_config(scan_config) @@ -1749,6 +1735,7 @@ impl From for DeltaColumn { #[cfg(test)] mod tests { + use crate::operations::write::SchemaMode; use crate::writer::test_utils::get_delta_schema; use arrow::array::StructArray; use arrow::datatypes::{DataType, Field, Schema}; @@ -1758,6 +1745,7 @@ mod tests { use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor}; use datafusion_expr::lit; + use datafusion_physical_expr::PhysicalExpr; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf; use object_store::path::Path; @@ -2153,9 +2141,184 @@ mod tests { */ } + #[tokio::test] + async fn delta_scan_supports_missing_columns() { + let schema1 = Arc::new(ArrowSchema::new(vec![Field::new( + "col_1", + DataType::Utf8, + true, + )])); + + let batch1 = RecordBatch::try_new( + schema1.clone(), + vec![Arc::new(arrow::array::StringArray::from(vec![ + Some("A"), + Some("B"), + ]))], + ) + .unwrap(); + + let schema2 = Arc::new(ArrowSchema::new(vec![ + Field::new("col_1", DataType::Utf8, true), + Field::new("col_2", DataType::Utf8, true), + ])); + + let batch2 = RecordBatch::try_new( + schema2.clone(), + vec![ + Arc::new(arrow::array::StringArray::from(vec![ + Some("E"), + Some("F"), + Some("G"), + ])), + Arc::new(arrow::array::StringArray::from(vec![ + Some("E2"), + Some("F2"), + Some("G2"), + ])), + ], + ) + .unwrap(); + + let table = crate::DeltaOps::new_in_memory() + .write(vec![batch2]) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let table = crate::DeltaOps(table) + .write(vec![batch1]) + .with_schema_mode(SchemaMode::Merge) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let config = DeltaScanConfigBuilder::new() + .build(table.snapshot().unwrap()) + .unwrap(); + let log = table.log_store(); + + let provider = + DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap(); + let ctx: SessionContext = DeltaSessionContext::default().into(); + ctx.register_table("test", Arc::new(provider)).unwrap(); + + let df = ctx.sql("select col_1, col_2 from test").await.unwrap(); + let actual = df.collect().await.unwrap(); + let expected = vec![ + "+-------+-------+", + "| col_1 | col_2 |", + "+-------+-------+", + "| A | |", + "| B | |", + "| E | E2 |", + "| F | F2 |", + "| G | G2 |", + "+-------+-------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + } + + #[tokio::test] + async fn delta_scan_supports_nested_missing_columns() { + let column1_schema1: arrow::datatypes::Fields = + vec![Field::new("col_1a", DataType::Utf8, true)].into(); + let schema1 = Arc::new(ArrowSchema::new(vec![Field::new( + "col_1", + DataType::Struct(column1_schema1.clone()), + true, + )])); + + let batch1 = RecordBatch::try_new( + schema1.clone(), + vec![Arc::new(StructArray::new( + column1_schema1, + vec![Arc::new(arrow::array::StringArray::from(vec![ + Some("A"), + Some("B"), + ]))], + None, + ))], + ) + .unwrap(); + + let column1_schema2: arrow::datatypes::Fields = vec![ + Field::new("col_1a", DataType::Utf8, true), + Field::new("col_1b", DataType::Utf8, true), + ] + .into(); + let schema2 = Arc::new(ArrowSchema::new(vec![Field::new( + "col_1", + DataType::Struct(column1_schema2.clone()), + true, + )])); + + let batch2 = RecordBatch::try_new( + schema2.clone(), + vec![Arc::new(StructArray::new( + column1_schema2, + vec![ + Arc::new(arrow::array::StringArray::from(vec![ + Some("E"), + Some("F"), + Some("G"), + ])), + Arc::new(arrow::array::StringArray::from(vec![ + Some("E2"), + Some("F2"), + Some("G2"), + ])), + ], + None, + ))], + ) + .unwrap(); + + let table = crate::DeltaOps::new_in_memory() + .write(vec![batch1]) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let table = crate::DeltaOps(table) + .write(vec![batch2]) + .with_schema_mode(SchemaMode::Merge) + .with_save_mode(crate::protocol::SaveMode::Append) + .await + .unwrap(); + + let config = DeltaScanConfigBuilder::new() + .build(table.snapshot().unwrap()) + .unwrap(); + let log = table.log_store(); + + let provider = + DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap(); + let ctx: SessionContext = DeltaSessionContext::default().into(); + ctx.register_table("test", Arc::new(provider)).unwrap(); + + let df = ctx + .sql("select col_1.col_1a, col_1.col_1b from test") + .await + .unwrap(); + let actual = df.collect().await.unwrap(); + let expected = vec![ + "+--------------------+--------------------+", + "| test.col_1[col_1a] | test.col_1[col_1b] |", + "+--------------------+--------------------+", + "| A | |", + "| B | |", + "| E | E2 |", + "| F | F2 |", + "| G | G2 |", + "+--------------------+--------------------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + } + #[tokio::test] async fn test_multiple_predicate_pushdown() { - use crate::{datafusion::prelude::SessionContext, DeltaTableBuilder}; + use crate::datafusion::prelude::SessionContext; let schema = Arc::new(ArrowSchema::new(vec![ Field::new("moDified", DataType::Utf8, true), Field::new("id", DataType::Utf8, true), @@ -2198,7 +2361,6 @@ mod tests { #[tokio::test] async fn test_delta_scan_builder_no_scan_config() { - use crate::datafusion::prelude::SessionContext; let arr: Arc = Arc::new(arrow::array::StringArray::from(vec!["s"])); let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap(); let table = crate::DeltaOps::new_in_memory() @@ -2207,13 +2369,11 @@ mod tests { .await .unwrap(); - let ctx = SessionContext::new(); - let scan = - DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store(), &ctx.state()) - .with_filter(Some(col("a").eq(lit("s")))) - .build() - .await - .unwrap(); + let scan = DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store()) + .with_filter(Some(col("a").eq(lit("s")))) + .build() + .await + .unwrap(); let mut visitor = ParquetPredicateVisitor::default(); visit_execution_plan(&scan, &mut visitor).unwrap(); @@ -2227,7 +2387,6 @@ mod tests { #[tokio::test] async fn test_delta_scan_builder_scan_config_disable_pushdown() { - use crate::datafusion::prelude::SessionContext; let arr: Arc = Arc::new(arrow::array::StringArray::from(vec!["s"])); let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap(); let table = crate::DeltaOps::new_in_memory() @@ -2236,9 +2395,8 @@ mod tests { .await .unwrap(); - let ctx = SessionContext::new(); let snapshot = table.snapshot().unwrap(); - let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &ctx.state()) + let scan = DeltaScanBuilder::new(snapshot, table.log_store()) .with_filter(Some(col("a").eq(lit("s")))) .with_scan_config( DeltaScanConfigBuilder::new() diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs new file mode 100644 index 0000000000..ce331a7fea --- /dev/null +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -0,0 +1,68 @@ +use crate::operations::cast::cast_record_batch; +use arrow_array::RecordBatch; +use arrow_schema::{Schema, SchemaRef}; +use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; +use std::fmt::Debug; +use std::sync::Arc; + +/// A Schema Adapter Factory which provides casting record batches from parquet to meet +/// delta lake conventions. +#[derive(Debug)] +pub(crate) struct DeltaSchemaAdapterFactory {} + +impl SchemaAdapterFactory for DeltaSchemaAdapterFactory { + fn create(&self, schema: SchemaRef) -> Box { + Box::new(DeltaSchemaAdapter { + table_schema: schema, + }) + } +} + +pub(crate) struct DeltaSchemaAdapter { + /// Schema for the table + table_schema: SchemaRef, +} + +impl SchemaAdapter for DeltaSchemaAdapter { + fn map_column_index(&self, index: usize, file_schema: &Schema) -> Option { + let field = self.table_schema.field(index); + Some(file_schema.fields.find(field.name())?.0) + } + + fn map_schema( + &self, + file_schema: &Schema, + ) -> datafusion_common::Result<(Arc, Vec)> { + let mut projection = Vec::with_capacity(file_schema.fields().len()); + + for (file_idx, file_field) in file_schema.fields.iter().enumerate() { + if self.table_schema.fields().find(file_field.name()).is_some() { + projection.push(file_idx); + } + } + + Ok(( + Arc::new(SchemaMapping { + table_schema: self.table_schema.clone(), + }), + projection, + )) + } +} + +#[derive(Debug)] +pub(crate) struct SchemaMapping { + table_schema: SchemaRef, +} + +impl SchemaMapper for SchemaMapping { + fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + let record_batch = cast_record_batch(&batch, self.table_schema.clone(), false, true)?; + Ok(record_batch) + } + + fn map_partial_batch(&self, batch: RecordBatch) -> datafusion_common::Result { + let record_batch = cast_record_batch(&batch, self.table_schema.clone(), false, true)?; + Ok(record_batch) + } +} diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index e5d356f81c..246541ccc1 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -114,7 +114,7 @@ impl std::future::IntoFuture for ConstraintBuilder { session.state() }); - let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), &state) + let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone()) .build() .await?; diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 56aa9ef98b..ac0b616ef3 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -139,7 +139,7 @@ async fn excute_non_empty_expr( let table_partition_cols = snapshot.metadata().partition_columns.clone(); - let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone()) .with_files(rewrite) .build() .await?; diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 2f30f4aa8a..cd13698bed 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -236,7 +236,7 @@ async fn execute( // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let scan = DeltaScanBuilder::new(&snapshot, log_store.clone(), &state) + let scan = DeltaScanBuilder::new(&snapshot, log_store.clone()) .with_files(&candidates.candidates) .build() .await?; diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 0606707c19..fc3dfe44c7 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -511,7 +511,7 @@ async fn execute_non_empty_expr( let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; - let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state) + let scan = DeltaScanBuilder::new(snapshot, log_store.clone()) .with_files(rewrite) .build() .await?; diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index 27f942b581..ea83bce29e 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -941,13 +941,14 @@ mod local { let batches = ctx.sql("SELECT * FROM demo").await?.collect().await?; + // Without defining a schema of the select the default for a timestamp is ms UTC let expected = vec![ - "+-------------------------------+---------------------+------------+", - "| BIG_DATE | NORMAL_DATE | SOME_VALUE |", - "+-------------------------------+---------------------+------------+", - "| 1816-03-28T05:56:08.066277376 | 2022-02-01T00:00:00 | 2 |", - "| 1816-03-29T05:56:08.066277376 | 2022-01-01T00:00:00 | 1 |", - "+-------------------------------+---------------------+------------+", + "+-----------------------------+----------------------+------------+", + "| BIG_DATE | NORMAL_DATE | SOME_VALUE |", + "+-----------------------------+----------------------+------------+", + "| 1816-03-28T05:56:08.066278Z | 2022-02-01T00:00:00Z | 2 |", + "| 1816-03-29T05:56:08.066278Z | 2022-01-01T00:00:00Z | 1 |", + "+-----------------------------+----------------------+------------+", ]; assert_batches_sorted_eq!(&expected, &batches); @@ -1154,7 +1155,7 @@ mod local { .unwrap(); let batch = batches.pop().unwrap(); - let expected_schema = Schema::new(vec![Field::new("id", ArrowDataType::Int32, true)]); + let expected_schema = Schema::new(vec![Field::new("id", ArrowDataType::Int64, false)]); assert_eq!(batch.schema().as_ref(), &expected_schema); Ok(()) }