From bacb794b653ea7a09c666777b5c6c8f5dc1816ac Mon Sep 17 00:00:00 2001 From: David Blajda Date: Sun, 22 Oct 2023 14:21:57 -0400 Subject: [PATCH] refactor!: update operations to use delta scan (#1639) # Description Recently implemented operations did not use `DeltaScan` it had some gaps. These gaps would make it harder switch towards logical plans which is required for merge. Gaps: - It was not possible to include file lineage in the result - The subset of files to be scanned is known ahead of time. Users had to reconstruct a parquet scan based on those files The PR introduces a `DeltaScanBuilder` that allow users to specify which files to use when constructing the scan, if the scan should be enhanced to include additional metadata columns, and allows a projection to be specified. It also retains previous functionality of pruning based on the provided filter when files to scan are not provided. `DeltaScanConfig` is also introduced which allows users to deterministic obtain the names of any added metadata columns or allows them to specify the name if required. The public interface for `find_files` has changed but functionality remains the same. A new table provider was introduced which accepts an `DeltaScanConfig`. This is required for future merge enhancements so unmodified files can be pruned pruned prior to writes. --------- Co-authored-by: Robert Pack <42610831+roeap@users.noreply.github.com> --- rust/Cargo.toml | 2 +- rust/src/delta_datafusion/mod.rs | 731 +++++++++++++++++++++---------- rust/src/operations/delete.rs | 34 +- rust/src/operations/merge.rs | 30 +- rust/src/operations/update.rs | 34 +- 5 files changed, 531 insertions(+), 300 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 66b9c7e5b9..d3a1acca30 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -25,7 +25,7 @@ arrow-buffer = { workspace = true, optional = true } arrow-cast = { workspace = true, optional = true } arrow-ord = { workspace = true, optional = true } arrow-row = { workspace = true, optional = true } -arrow-schema = { workspace = true, optional = true } +arrow-schema = { workspace = true, optional = true, features = ["serde"] } arrow-select = { workspace = true, optional = true } parquet = { workspace = true, features = [ "async", diff --git a/rust/src/delta_datafusion/mod.rs b/rust/src/delta_datafusion/mod.rs index 166996dddd..0d2df09e46 100644 --- a/rust/src/delta_datafusion/mod.rs +++ b/rust/src/delta_datafusion/mod.rs @@ -21,7 +21,7 @@ //! ``` use std::any::Any; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::fmt::{self, Debug}; use std::sync::Arc; @@ -32,12 +32,15 @@ use arrow::datatypes::DataType; use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, TimeUnit}; use arrow::error::ArrowError; use arrow::record_batch::RecordBatch; -use arrow_array::StringArray; +use arrow_array::types::UInt16Type; +use arrow_array::{DictionaryArray, StringArray}; use arrow_schema::Field; use async_trait::async_trait; use chrono::{NaiveDateTime, TimeZone, Utc}; use datafusion::datasource::file_format::{parquet::ParquetFormat, FileFormat}; -use datafusion::datasource::physical_plan::FileScanConfig; +use datafusion::datasource::physical_plan::{ + wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, +}; use datafusion::datasource::provider::TableProviderFactory; use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType}; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; @@ -54,9 +57,7 @@ use datafusion::physical_plan::{ }; use datafusion_common::scalar::ScalarValue; use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion}; -use datafusion_common::{ - Column, DFSchema, DataFusionError, Result as DataFusionResult, ToDFSchema, -}; +use datafusion_common::{Column, DataFusionError, Result as DataFusionResult, ToDFSchema}; use datafusion_expr::expr::{ScalarFunction, ScalarUDF}; use datafusion_expr::logical_plan::CreateExternalTable; use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility}; @@ -65,6 +66,7 @@ use datafusion_physical_expr::{create_physical_expr, PhysicalExpr}; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use object_store::ObjectMeta; +use serde::{Deserialize, Serialize}; use url::Url; use crate::errors::{DeltaResult, DeltaTableError}; @@ -362,66 +364,299 @@ pub(crate) fn register_store(store: ObjectStoreRef, env: Arc) { env.register_object_store(url, store); } -/// Create a Parquet scan limited to a set of files -#[allow(clippy::too_many_arguments)] -pub(crate) async fn parquet_scan_from_actions( +pub(crate) fn logical_schema( snapshot: &DeltaTableState, + scan_config: &DeltaScanConfig, +) -> DeltaResult { + let input_schema = snapshot.input_schema()?; + let mut fields = Vec::new(); + for field in input_schema.fields.iter() { + fields.push(field.to_owned()); + } + + if let Some(file_column_name) = &scan_config.file_column_name { + fields.push(Arc::new(Field::new( + file_column_name, + arrow_schema::DataType::Utf8, + true, + ))); + } + + Ok(Arc::new(ArrowSchema::new(fields))) +} + +#[derive(Debug, Clone, Default)] +/// Used to specify if additonal metadata columns are exposed to the user +pub struct DeltaScanConfigBuilder { + /// Include the source path for each record. The name of this column is determine by `file_column_name` + include_file_column: bool, + /// Column name that contains the source path. + /// + /// If include_file_column is true and the name is None then it will be auto-generated + /// Otherwise the user provided name will be used + file_column_name: Option, +} + +impl DeltaScanConfigBuilder { + /// Construct a new instance of `DeltaScanConfigBuilder` + pub fn new() -> Self { + Self::default() + } + + /// Indicate that a column containing a records file path is included. + /// Column name is generated and can be determined once this Config is built + pub fn with_file_column(mut self, include: bool) -> Self { + self.include_file_column = include; + self.file_column_name = None; + self + } + + /// Indicate that a column containing a records file path is included and column name is user defined. + pub fn with_file_column_name(mut self, name: &S) -> Self { + self.file_column_name = Some(name.to_string()); + self.include_file_column = true; + self + } + + /// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing + pub fn build(&self, snapshot: &DeltaTableState) -> DeltaResult { + let input_schema = snapshot.input_schema()?; + let mut file_column_name = None; + let mut column_names: HashSet<&String> = HashSet::new(); + for field in input_schema.fields.iter() { + column_names.insert(field.name()); + } + + if self.include_file_column { + match &self.file_column_name { + Some(name) => { + if column_names.contains(name) { + return Err(DeltaTableError::Generic(format!( + "Unable to add file path column since column with name {} exits", + name + ))); + } + + file_column_name = Some(name.to_owned()) + } + None => { + let prefix = PATH_COLUMN; + let mut idx = 0; + let mut name = prefix.to_owned(); + + while column_names.contains(&name) { + idx += 1; + name = format!("{}_{}", prefix, idx); + } + + file_column_name = Some(name); + } + } + } + + Ok(DeltaScanConfig { file_column_name }) + } +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +/// Include additonal metadata columns during a [`DeltaScan`] +pub struct DeltaScanConfig { + /// Include the source path for each record + pub file_column_name: Option, +} + +#[derive(Debug)] +pub(crate) struct DeltaScanBuilder<'a> { + snapshot: &'a DeltaTableState, object_store: ObjectStoreRef, - actions: &[Add], - schema: &ArrowSchema, - filters: Option>, - state: &SessionState, - projection: Option<&Vec>, + filter: Option, + state: &'a SessionState, + projection: Option<&'a Vec>, limit: Option, -) -> DataFusionResult> { - // TODO we group files together by their partition values. If the table is partitioned - // and partitions are somewhat evenly distributed, probably not the worst choice ... - // However we may want to do some additional balancing in case we are far off from the above. - let mut file_groups: HashMap, Vec> = HashMap::new(); - for action in actions { - let part = partitioned_file_from_action(action, schema); - file_groups - .entry(part.partition_values.clone()) - .or_default() - .push(part); - } - - let table_partition_cols = snapshot - .current_metadata() - .ok_or(DeltaTableError::NoMetadata)? - .partition_columns - .clone(); - let file_schema = Arc::new(ArrowSchema::new( - schema - .fields() + files: Option<&'a [Add]>, + config: DeltaScanConfig, + schema: Option, +} + +impl<'a> DeltaScanBuilder<'a> { + pub fn new( + snapshot: &'a DeltaTableState, + object_store: ObjectStoreRef, + state: &'a SessionState, + ) -> Self { + DeltaScanBuilder { + snapshot, + object_store, + filter: None, + state, + files: None, + projection: None, + limit: None, + config: DeltaScanConfig::default(), + schema: None, + } + } + + pub fn with_filter(mut self, filter: Option) -> Self { + self.filter = filter; + self + } + + pub fn with_files(mut self, files: &'a [Add]) -> Self { + self.files = Some(files); + self + } + + pub fn with_projection(mut self, projection: Option<&'a Vec>) -> Self { + self.projection = projection; + self + } + + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + pub fn with_scan_config(mut self, config: DeltaScanConfig) -> Self { + self.config = config; + self + } + + pub fn with_schema(mut self, schema: SchemaRef) -> Self { + self.schema = Some(schema); + self + } + + pub async fn build(self) -> DeltaResult { + let config = self.config; + let schema = match self.schema { + Some(schema) => schema, + None => { + self.snapshot + .physical_arrow_schema(self.object_store.clone()) + .await? + } + }; + let logical_schema = logical_schema(self.snapshot, &config)?; + + let logical_schema = if let Some(used_columns) = self.projection { + let mut fields = vec![]; + for idx in used_columns { + fields.push(logical_schema.field(*idx).to_owned()); + } + Arc::new(ArrowSchema::new(fields)) + } else { + logical_schema + }; + + let logical_filter = self + .filter + .map(|expr| logical_expr_to_physical_expr(&expr, &logical_schema)); + + // Perform Pruning of files to scan + let files = match self.files { + Some(files) => files.to_owned(), + None => { + if let Some(predicate) = &logical_filter { + let pruning_predicate = + PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?; + let files_to_prune = pruning_predicate.prune(self.snapshot)?; + self.snapshot + .files() + .iter() + .zip(files_to_prune.into_iter()) + .filter_map( + |(action, keep)| { + if keep { + Some(action.to_owned()) + } else { + None + } + }, + ) + .collect() + } else { + self.snapshot.files().to_owned() + } + } + }; + + // TODO we group files together by their partition values. If the table is partitioned + // and partitions are somewhat evenly distributed, probably not the worst choice ... + // However we may want to do some additional balancing in case we are far off from the above. + let mut file_groups: HashMap, Vec> = HashMap::new(); + + for action in files.iter() { + let mut part = partitioned_file_from_action(action, &schema); + + if config.file_column_name.is_some() { + part.partition_values + .push(wrap_partition_value_in_dict(ScalarValue::Utf8(Some( + action.path.clone(), + )))); + } + + file_groups + .entry(part.partition_values.clone()) + .or_default() + .push(part); + } + + let table_partition_cols = self + .snapshot + .current_metadata() + .ok_or(DeltaTableError::NoMetadata)? + .partition_columns + .clone(); + + let file_schema = Arc::new(ArrowSchema::new( + schema + .fields() + .iter() + .filter(|f| !table_partition_cols.contains(f.name())) + .cloned() + .collect::>(), + )); + + let mut table_partition_cols = table_partition_cols .iter() - .filter(|f| !table_partition_cols.contains(f.name())) - .cloned() - .collect::>(), - )); + .map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone()))) + .collect::, ArrowError>>()?; - let table_partition_cols = table_partition_cols - .iter() - .map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone()))) - .collect::, ArrowError>>()?; + if let Some(file_column_name) = &config.file_column_name { + table_partition_cols.push(( + file_column_name.clone(), + wrap_partition_type_in_dict(DataType::Utf8), + )); + } - ParquetFormat::new() - .create_physical_plan( - state, - FileScanConfig { - object_store_url: object_store.object_store_url(), - file_schema, - file_groups: file_groups.into_values().collect(), - statistics: snapshot.datafusion_table_statistics(), - projection: projection.cloned(), - limit, - table_partition_cols, - output_ordering: vec![], - infinite_source: false, - }, - filters.as_ref(), - ) - .await + let scan = ParquetFormat::new() + .create_physical_plan( + self.state, + FileScanConfig { + object_store_url: self.object_store.object_store_url(), + file_schema, + file_groups: file_groups.into_values().collect(), + statistics: self.snapshot.datafusion_table_statistics(), + projection: self.projection.cloned(), + limit: self.limit, + table_partition_cols, + output_ordering: vec![], + infinite_source: false, + }, + logical_filter.as_ref(), + ) + .await?; + + Ok(DeltaScan { + table_uri: ensure_table_uri(self.object_store.root_uri())? + .as_str() + .into(), + parquet_scan: scan, + config, + logical_schema, + }) + } } #[async_trait] @@ -453,53 +688,96 @@ impl TableProvider for DeltaTable { filters: &[Expr], limit: Option, ) -> DataFusionResult> { - let schema = self - .state - .physical_arrow_schema(self.object_store()) + register_store(self.object_store(), session.runtime_env().clone()); + let filter_expr = conjunction(filters.iter().cloned()); + + let scan = DeltaScanBuilder::new(&self.state, self.object_store(), session) + .with_projection(projection) + .with_limit(limit) + .with_filter(filter_expr) + .build() .await?; - register_store(self.object_store(), session.runtime_env().clone()); + Ok(Arc::new(scan)) + } - let filter_expr = conjunction(filters.iter().cloned()) - .map(|expr| logical_expr_to_physical_expr(&expr, &schema)); + fn supports_filter_pushdown( + &self, + _filter: &Expr, + ) -> DataFusionResult { + Ok(TableProviderFilterPushDown::Inexact) + } - let actions = if let Some(predicate) = &filter_expr { - let pruning_predicate = PruningPredicate::try_new(predicate.clone(), schema.clone())?; - let files_to_prune = pruning_predicate.prune(&self.state)?; - self.get_state() - .files() - .iter() - .zip(files_to_prune) - .filter_map( - |(action, keep)| { - if keep { - Some(action.to_owned()) - } else { - None - } - }, - ) - .collect() - } else { - self.get_state().files().to_owned() - }; + fn statistics(&self) -> Option { + Some(self.state.datafusion_table_statistics()) + } +} - let parquet_scan = parquet_scan_from_actions( - &self.state, - self.object_store(), - &actions, - &schema, - filter_expr, - session, - projection, - limit, - ) - .await?; +/// A Delta table provider that enables additonal metadata columns to be included during the scan +pub struct DeltaTableProvider { + snapshot: DeltaTableState, + store: ObjectStoreRef, + config: DeltaScanConfig, + schema: Arc, +} + +impl DeltaTableProvider { + /// Build a DeltaTableProvider + pub fn try_new( + snapshot: DeltaTableState, + store: ObjectStoreRef, + config: DeltaScanConfig, + ) -> DeltaResult { + Ok(DeltaTableProvider { + schema: logical_schema(&snapshot, &config)?, + snapshot, + store, + config, + }) + } +} + +#[async_trait] +impl TableProvider for DeltaTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> Arc { + self.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Base + } + + fn get_table_definition(&self) -> Option<&str> { + None + } + + fn get_logical_plan(&self) -> Option<&LogicalPlan> { + None + } + + async fn scan( + &self, + session: &SessionState, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + ) -> DataFusionResult> { + register_store(self.store.clone(), session.runtime_env().clone()); + let filter_expr = conjunction(filters.iter().cloned()); + + let scan = DeltaScanBuilder::new(&self.snapshot, self.store.clone(), session) + .with_projection(projection) + .with_limit(limit) + .with_filter(filter_expr) + .with_scan_config(self.config.clone()) + .build() + .await?; - Ok(Arc::new(DeltaScan { - table_uri: ensure_table_uri(self.table_uri())?.as_str().into(), - parquet_scan, - })) + Ok(Arc::new(scan)) } fn supports_filter_pushdown( @@ -510,7 +788,7 @@ impl TableProvider for DeltaTable { } fn statistics(&self) -> Option { - Some(self.state.datafusion_table_statistics()) + Some(self.snapshot.datafusion_table_statistics()) } } @@ -520,8 +798,19 @@ impl TableProvider for DeltaTable { pub struct DeltaScan { /// The URL of the ObjectStore root pub table_uri: String, + /// Column that contains an index that maps to the original metadata Add + pub config: DeltaScanConfig, /// The parquet scan to wrap pub parquet_scan: Arc, + /// The schema of the table to be used when evaluating expressions + pub logical_schema: Arc, +} + +#[derive(Debug, Serialize, Deserialize)] +struct DeltaScanWire { + pub table_uri: String, + pub config: DeltaScanConfig, + pub logical_schema: Arc, } impl DisplayAs for DeltaScan { @@ -922,11 +1211,13 @@ impl PhysicalExtensionCodec for DeltaPhysicalCodec { inputs: &[Arc], _registry: &dyn FunctionRegistry, ) -> Result, DataFusionError> { - let table_uri: String = serde_json::from_reader(buf) + let wire: DeltaScanWire = serde_json::from_reader(buf) .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?; let delta_scan = DeltaScan { - table_uri, + table_uri: wire.table_uri, parquet_scan: (*inputs)[0].clone(), + config: wire.config, + logical_schema: wire.logical_schema, }; Ok(Arc::new(delta_scan)) } @@ -940,7 +1231,13 @@ impl PhysicalExtensionCodec for DeltaPhysicalCodec { .as_any() .downcast_ref::() .ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?; - serde_json::to_writer(buf, delta_scan.table_uri.as_str()) + + let wire = DeltaScanWire { + table_uri: delta_scan.table_uri.to_owned(), + config: delta_scan.config.clone(), + logical_schema: delta_scan.logical_schema.clone(), + }; + serde_json::to_writer(buf, &wire) .map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?; Ok(()) } @@ -1101,27 +1398,44 @@ pub struct FindFiles { fn join_batches_with_add_actions( batches: Vec, mut actions: HashMap, + path_column: &str, + dict_array: bool, ) -> DeltaResult> { // Given RecordBatches that contains `__delta_rs_path` perform a hash join // with actions to obtain original add actions let mut files = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum()); for batch in batches { - let array = batch - .column_by_name(PATH_COLUMN) - .ok_or_else(|| { - DeltaTableError::Generic(format!("Unable to find column {}", PATH_COLUMN)) - })? - .as_any() - .downcast_ref::() - .ok_or(DeltaTableError::Generic(format!( - "Unable to downcast column {}", - PATH_COLUMN - )))?; - for path in array { + let array = batch.column_by_name(path_column).ok_or_else(|| { + DeltaTableError::Generic(format!("Unable to find column {}", path_column)) + })?; + + let iter: Box>> = + if dict_array { + let array = array + .as_any() + .downcast_ref::>() + .ok_or(DeltaTableError::Generic(format!( + "Unable to downcast column {}", + path_column + )))? + .downcast_dict::() + .ok_or(DeltaTableError::Generic(format!( + "Unable to downcast column {}", + path_column + )))?; + Box::new(array.into_iter()) + } else { + let array = array.as_any().downcast_ref::().ok_or( + DeltaTableError::Generic(format!("Unable to downcast column {}", path_column)), + )?; + Box::new(array.into_iter()) + }; + + for path in iter { let path = path.ok_or(DeltaTableError::Generic(format!( "{} cannot be null", - PATH_COLUMN + path_column )))?; match actions.remove(path) { @@ -1141,89 +1455,43 @@ fn join_batches_with_add_actions( pub(crate) async fn find_files_scan<'a>( snapshot: &DeltaTableState, store: ObjectStoreRef, - schema: Arc, - file_schema: Arc, - candidates: Vec<&'a Add>, state: &SessionState, - expression: &Expr, + expression: Expr, ) -> DeltaResult> { - let mut candidate_map: HashMap = HashMap::new(); - - let table_partition_cols = snapshot - .current_metadata() - .ok_or(DeltaTableError::NoMetadata)? - .partition_columns - .clone(); - - let mut file_groups: HashMap, Vec> = HashMap::new(); - for action in candidates { - let mut part = partitioned_file_from_action(action, &schema); - part.partition_values - .push(ScalarValue::Utf8(Some(action.path.clone()))); - - file_groups - .entry(part.partition_values.clone()) - .or_default() - .push(part); - - candidate_map.insert(action.path.to_owned(), action.to_owned()); - } - - let mut table_partition_cols = table_partition_cols + let candidate_map: HashMap = snapshot + .files() .iter() - .map(|c| Ok((c.to_owned(), schema.field_with_name(c)?.data_type().clone()))) - .collect::, ArrowError>>()?; - // Append a column called __delta_rs_path to track the file path - table_partition_cols.push((PATH_COLUMN.to_owned(), DataType::Utf8)); + .map(|add| (add.path.clone(), add.to_owned())) + .collect(); - let input_schema = snapshot.input_schema()?; - - let mut fields = Vec::new(); - for field in input_schema.fields.iter() { - fields.push(field.to_owned()); + let scan_config = DeltaScanConfigBuilder { + include_file_column: true, + ..Default::default() } - fields.push(Arc::new(Field::new( - PATH_COLUMN, - arrow_schema::DataType::Boolean, - true, - ))); - let input_schema = Arc::new(ArrowSchema::new(fields)); + .build(snapshot)?; + + let logical_schema = logical_schema(snapshot, &scan_config)?; // Identify which columns we need to project let mut used_columns = expression .to_columns()? .into_iter() - .map(|column| input_schema.index_of(&column.name)) - .collect::, ArrowError>>() - .unwrap(); + .map(|column| logical_schema.index_of(&column.name)) + .collect::, ArrowError>>()?; // Add path column - used_columns.push(input_schema.index_of(PATH_COLUMN)?); - - // Project the logical schema so column indicies align between the parquet scan and the expression - let mut fields = vec![]; - for idx in &used_columns { - fields.push(input_schema.field(*idx).to_owned()); - } - let input_schema = Arc::new(ArrowSchema::new(fields)); - let input_dfschema = input_schema.as_ref().clone().try_into()?; + used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - let parquet_scan = ParquetFormat::new() - .create_physical_plan( - state, - FileScanConfig { - object_store_url: store.object_store_url(), - file_schema, - file_groups: file_groups.into_values().collect(), - statistics: snapshot.datafusion_table_statistics(), - projection: Some(used_columns), - limit: None, - table_partition_cols, - infinite_source: false, - output_ordering: vec![], - }, - None, - ) + let scan = DeltaScanBuilder::new(snapshot, store.clone(), state) + .with_filter(Some(expression.clone())) + .with_projection(Some(&used_columns)) + .with_scan_config(scan_config) + .build() .await?; + let scan = Arc::new(scan); + + let config = &scan.config; + let input_schema = scan.logical_schema.as_ref().to_owned(); + let input_dfschema = input_schema.clone().try_into()?; let predicate_expr = create_physical_expr( &Expr::IsTrue(Box::new(expression.clone())), @@ -1233,13 +1501,18 @@ pub(crate) async fn find_files_scan<'a>( )?; let filter: Arc = - Arc::new(FilterExec::try_new(predicate_expr, parquet_scan.clone())?); + Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); let limit: Arc = Arc::new(LocalLimitExec::new(filter, 1)); let task_ctx = Arc::new(TaskContext::from(state)); let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?; - join_batches_with_add_actions(path_batches, candidate_map) + join_batches_with_add_actions( + path_batches, + candidate_map, + config.file_column_name.as_ref().unwrap(), + true, + ) } pub(crate) async fn scan_memory_table( @@ -1293,42 +1566,22 @@ pub(crate) async fn scan_memory_table( .map(|action| (action.path.clone(), action)) .collect::>(); - join_batches_with_add_actions(batches, map) + join_batches_with_add_actions(batches, map, PATH_COLUMN, false) } /// Finds files in a snapshot that match the provided predicate. pub async fn find_files<'a>( snapshot: &DeltaTableState, object_store: ObjectStoreRef, - schema: Arc, state: &SessionState, predicate: Option, ) -> DeltaResult { let current_metadata = snapshot .current_metadata() .ok_or(DeltaTableError::NoMetadata)?; - let table_partition_cols = current_metadata.partition_columns.clone(); match &predicate { Some(predicate) => { - let file_schema = Arc::new(ArrowSchema::new( - schema - .fields() - .iter() - .filter(|f| !table_partition_cols.contains(f.name())) - .cloned() - .collect::>(), - )); - - let input_schema = snapshot.input_schema()?; - let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; - let expr = create_physical_expr( - predicate, - &input_dfschema, - &input_schema, - state.execution_props(), - )?; - // Validate the Predicate and determine if it only contains partition columns let mut expr_properties = FindFilesExprProperties { partition_only: true, @@ -1346,26 +1599,9 @@ pub async fn find_files<'a>( partition_scan: true, }) } else { - let pruning_predicate = PruningPredicate::try_new(expr, schema.clone())?; - let files_to_prune = pruning_predicate.prune(snapshot)?; - let files: Vec<&Add> = snapshot - .files() - .iter() - .zip(files_to_prune.into_iter()) - .filter_map(|(action, keep)| if keep { Some(action) } else { None }) - .collect(); - - // Create a new delta scan plan with only files that have a record - let candidates = find_files_scan( - snapshot, - object_store.clone(), - schema.clone(), - file_schema.clone(), - files, - state, - predicate, - ) - .await?; + let candidates = + find_files_scan(snapshot, object_store.clone(), state, predicate.to_owned()) + .await?; Ok(FindFiles { candidates, @@ -1385,6 +1621,7 @@ mod tests { use arrow::array::StructArray; use arrow::datatypes::{DataType, Field, Schema}; use chrono::{TimeZone, Utc}; + use datafusion::assert_batches_sorted_eq; use datafusion::physical_plan::empty::EmptyExec; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf; @@ -1650,7 +1887,9 @@ mod tests { ])); let exec_plan = Arc::from(DeltaScan { table_uri: "s3://my_bucket/this/is/some/path".to_string(), - parquet_scan: Arc::from(EmptyExec::new(false, schema)), + parquet_scan: Arc::from(EmptyExec::new(false, schema.clone())), + config: DeltaScanConfig::default(), + logical_schema: schema.clone(), }); let proto: protobuf::PhysicalPlanNode = protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec) @@ -1662,4 +1901,32 @@ mod tests { .expect("from proto"); assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}")); } + + #[tokio::test] + async fn delta_table_provider_with_config() { + let table = crate::open_table("tests/data/delta-2.2.0-partitioned-types") + .await + .unwrap(); + let config = DeltaScanConfigBuilder::new() + .with_file_column_name(&"file_source") + .build(&table.state) + .unwrap(); + + let provider = DeltaTableProvider::try_new(table.state, table.storage, config).unwrap(); + let ctx = SessionContext::new(); + ctx.register_table("test", Arc::new(provider)).unwrap(); + + let df = ctx.sql("select * from test").await.unwrap(); + let actual = df.collect().await.unwrap(); + let expected = vec! [ + "+----+----+----+-------------------------------------------------------------------------------+", + "| c3 | c1 | c2 | file_source |", + "+----+----+----+-------------------------------------------------------------------------------+", + "| 4 | 6 | a | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |", + "| 5 | 4 | c | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |", + "| 6 | 5 | b | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |", + "+----+----+----+-------------------------------------------------------------------------------+", + ]; + assert_batches_sorted_eq!(&expected, &actual); + } } diff --git a/rust/src/operations/delete.rs b/rust/src/operations/delete.rs index 550e97e6ba..bb524aa1db 100644 --- a/rust/src/operations/delete.rs +++ b/rust/src/operations/delete.rs @@ -35,8 +35,7 @@ use serde::Serialize; use serde_json::Map; use serde_json::Value; -use crate::delta_datafusion::find_files; -use crate::delta_datafusion::{parquet_scan_from_actions, register_store}; +use crate::delta_datafusion::{find_files, register_store, DeltaScanBuilder}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::operations::transaction::commit; use crate::operations::write::write_execution_plan; @@ -136,7 +135,6 @@ async fn excute_non_empty_expr( // For each identified file perform a parquet scan + filter + limit (1) + count. // If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file. - let schema = snapshot.arrow_schema()?; let input_schema = snapshot.input_schema()?; let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?; @@ -146,17 +144,11 @@ async fn excute_non_empty_expr( .partition_columns .clone(); - let parquet_scan = parquet_scan_from_actions( - snapshot, - object_store.clone(), - rewrite, - &schema, - None, - state, - None, - None, - ) - .await?; + let scan = DeltaScanBuilder::new(snapshot, object_store.clone(), state) + .with_files(rewrite) + .build() + .await?; + let scan = Arc::new(scan); // Apply the negation of the filter and rewrite files let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); @@ -168,7 +160,7 @@ async fn excute_non_empty_expr( state.execution_props(), )?; let filter: Arc = - Arc::new(FilterExec::try_new(predicate_expr, parquet_scan.clone())?); + Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); let add_actions = write_execution_plan( snapshot, @@ -183,7 +175,7 @@ async fn excute_non_empty_expr( ) .await?; - let read_records = parquet_scan.metrics().and_then(|m| m.output_rows()); + let read_records = scan.parquet_scan.metrics().and_then(|m| m.output_rows()); let filter_records = filter.metrics().and_then(|m| m.output_rows()); metrics.num_copied_rows = filter_records; metrics.num_deleted_rows = read_records @@ -203,17 +195,9 @@ async fn execute( ) -> DeltaResult<((Vec, i64), DeleteMetrics)> { let exec_start = Instant::now(); let mut metrics = DeleteMetrics::default(); - let schema = snapshot.arrow_schema()?; let scan_start = Instant::now(); - let candidates = find_files( - snapshot, - object_store.clone(), - schema.clone(), - &state, - predicate.clone(), - ) - .await?; + let candidates = find_files(snapshot, object_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_micros(); let predicate = predicate.unwrap_or(Expr::Literal(ScalarValue::Boolean(Some(true)))); diff --git a/rust/src/operations/merge.rs b/rust/src/operations/merge.rs index 46a2c540bf..dc8e108ab7 100644 --- a/rust/src/operations/merge.rs +++ b/rust/src/operations/merge.rs @@ -66,13 +66,16 @@ use serde_json::{Map, Value}; use super::datafusion_utils::{into_expr, maybe_into_expr, Expression}; use super::transaction::commit; use crate::delta_datafusion::expr::{fmt_expr_to_sql, parse_predicate_expression}; -use crate::delta_datafusion::{parquet_scan_from_actions, register_store}; +use crate::delta_datafusion::{register_store, DeltaScanBuilder}; use crate::operations::datafusion_utils::MetricObserverExec; -use crate::operations::write::write_execution_plan; +use crate::{ + operations::write::write_execution_plan, + storage::{DeltaObjectStore, ObjectStoreRef}, + DeltaResult, DeltaTable, DeltaTableError, +}; + use crate::protocol::{Action, DeltaOperation, MergePredicate, Remove}; -use crate::storage::{DeltaObjectStore, ObjectStoreRef}; use crate::table::state::DeltaTableState; -use crate::{DeltaResult, DeltaTable, DeltaTableError}; const OPERATION_COLUMN: &str = "__delta_rs_operation"; const DELETE_COLUMN: &str = "__delta_rs_delete"; @@ -579,8 +582,6 @@ async fn execute( .current_metadata() .ok_or(DeltaTableError::NoMetadata)?; - let schema = snapshot.input_schema()?; - // TODO: Given the join predicate, remove any expression that involve the // source table and keep expressions that only involve the target table. // This would allow us to perform statistics/partition pruning E.G @@ -590,17 +591,12 @@ async fn execute( // If the user specified any not_source_match operations then those // predicates also need to be considered when pruning - let target = parquet_scan_from_actions( - snapshot, - object_store.clone(), - snapshot.files(), - &schema, - None, - &state, - None, - None, - ) - .await?; + let target = Arc::new( + DeltaScanBuilder::new(snapshot, object_store.clone(), &state) + .with_schema(snapshot.input_schema()?) + .build() + .await?, + ); let source = source.create_physical_plan().await?; diff --git a/rust/src/operations/update.rs b/rust/src/operations/update.rs index a8b2820c33..67cccb3387 100644 --- a/rust/src/operations/update.rs +++ b/rust/src/operations/update.rs @@ -44,9 +44,7 @@ use serde::Serialize; use serde_json::{Map, Value}; use crate::{ - delta_datafusion::{ - expr::fmt_expr_to_sql, find_files, parquet_scan_from_actions, register_store, - }, + delta_datafusion::{expr::fmt_expr_to_sql, find_files, register_store, DeltaScanBuilder}, protocol::{Action, DeltaOperation, Remove}, storage::{DeltaObjectStore, ObjectStoreRef}, table::state::DeltaTableState, @@ -216,17 +214,9 @@ async fn execute( .current_metadata() .ok_or(DeltaTableError::NoMetadata)?; let table_partition_cols = current_metadata.partition_columns.clone(); - let schema = snapshot.arrow_schema()?; let scan_start = Instant::now(); - let candidates = find_files( - snapshot, - object_store.clone(), - schema.clone(), - &state, - predicate.clone(), - ) - .await?; + let candidates = find_files(snapshot, object_store.clone(), &state, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { @@ -238,17 +228,11 @@ async fn execute( let execution_props = state.execution_props(); // For each rewrite evaluate the predicate and then modify each expression // to either compute the new value or obtain the old one then write these batches - let parquet_scan = parquet_scan_from_actions( - snapshot, - object_store.clone(), - &candidates.candidates, - &schema, - None, - &state, - None, - None, - ) - .await?; + let scan = DeltaScanBuilder::new(snapshot, object_store.clone(), &state) + .with_files(&candidates.candidates) + .build() + .await?; + let scan = Arc::new(scan); // Create a projection for a new column with the predicate evaluated let input_schema = snapshot.input_schema()?; @@ -267,7 +251,7 @@ async fn execute( let input_dfschema: DFSchema = input_schema.as_ref().clone().try_into()?; let mut expressions: Vec<(Arc, String)> = Vec::new(); - let scan_schema = parquet_scan.schema(); + let scan_schema = scan.schema(); for (i, field) in scan_schema.fields().into_iter().enumerate() { expressions.push(( Arc::new(expressions::Column::new(field.name(), i)), @@ -291,7 +275,7 @@ async fn execute( expressions.push((predicate_expr, "__delta_rs_update_predicate".to_string())); let projection_predicate: Arc = - Arc::new(ProjectionExec::try_new(expressions, parquet_scan)?); + Arc::new(ProjectionExec::try_new(expressions, scan)?); let count_plan = Arc::new(MetricObserverExec::new( projection_predicate.clone(),