From 423aab8b2420996c3056188a9e39b208f06194d4 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 26 Jul 2024 08:34:02 +0000 Subject: [PATCH] chore: upgrade to datafusion 41 --- Cargo.toml | 16 ++--- crates/core/Cargo.toml | 4 +- crates/core/src/data_catalog/storage/mod.rs | 5 +- .../core/src/data_catalog/unity/datafusion.rs | 2 +- crates/core/src/data_catalog/unity/models.rs | 2 +- crates/core/src/delta_datafusion/expr.rs | 56 +++++++++++----- .../src/delta_datafusion/find_files/mod.rs | 9 ++- crates/core/src/delta_datafusion/mod.rs | 17 +++-- crates/core/src/kernel/arrow/mod.rs | 2 +- crates/core/src/kernel/error.rs | 10 --- crates/core/src/kernel/snapshot/log_data.rs | 64 ++++++++++--------- crates/core/src/kernel/snapshot/mod.rs | 2 +- .../core/src/operations/cast/merge_schema.rs | 2 +- crates/core/src/operations/delete.rs | 5 +- crates/core/src/operations/load_cdf.rs | 3 +- crates/core/src/operations/merge/mod.rs | 19 ++++-- crates/core/src/operations/mod.rs | 1 + crates/core/src/operations/restore.rs | 10 +-- crates/core/src/operations/update.rs | 7 +- crates/core/src/operations/write.rs | 2 +- crates/core/src/protocol/mod.rs | 15 +---- crates/core/src/storage/file.rs | 16 ++--- crates/core/tests/integration_datafusion.rs | 4 +- crates/sql/src/planner.rs | 1 + crates/test/Cargo.toml | 2 + crates/test/src/datafusion.rs | 8 ++- python/src/lib.rs | 2 +- 27 files changed, 156 insertions(+), 130 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0892b0f12b..188393beaa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,14 +45,14 @@ object_store = { version = "0.10.1" } parquet = { version = "52" } # datafusion -datafusion = { version = "40" } -datafusion-expr = { version = "40" } -datafusion-common = { version = "40" } -datafusion-proto = { version = "40" } -datafusion-sql = { version = "40" } -datafusion-physical-expr = { version = "40" } -datafusion-functions = { version = "40" } -datafusion-functions-array = { version = "40" } +datafusion = { version = "41" } +datafusion-expr = { version = "41" } +datafusion-common = { version = "41" } +datafusion-proto = { version = "41" } +datafusion-sql = { version = "41" } +datafusion-physical-expr = { version = "41" } +datafusion-functions = { version = "41" } +datafusion-functions-aggregate = { version = "41" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 07584917a3..541b91a59c 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -43,7 +43,7 @@ datafusion-proto = { workspace = true, optional = true } datafusion-sql = { workspace = true, optional = true } datafusion-physical-expr = { workspace = true, optional = true } datafusion-functions = { workspace = true, optional = true } -datafusion-functions-array = { workspace = true, optional = true } +datafusion-functions-aggregate = { workspace = true, optional = true } # serde serde = { workspace = true, features = ["derive"] } @@ -129,7 +129,7 @@ datafusion = [ "datafusion-physical-expr", "datafusion-sql", "datafusion-functions", - "datafusion-functions-array", + "datafusion-functions-aggregate", "sqlparser", ] datafusion-ext = ["datafusion"] diff --git a/crates/core/src/data_catalog/storage/mod.rs b/crates/core/src/data_catalog/storage/mod.rs index fc30f32144..7b0b779069 100644 --- a/crates/core/src/data_catalog/storage/mod.rs +++ b/crates/core/src/data_catalog/storage/mod.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use async_trait::async_trait; use dashmap::DashMap; -use datafusion::catalog::schema::SchemaProvider; +use datafusion::catalog::SchemaProvider; use datafusion::datasource::TableProvider; use datafusion_common::DataFusionError; use futures::TryStreamExt; @@ -147,7 +147,8 @@ impl SchemaProvider for ListingSchemaProvider { mod tests { use super::*; use datafusion::assert_batches_sorted_eq; - use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider}; + use datafusion::catalog::CatalogProvider; + use datafusion::catalog_common::MemoryCatalogProvider; use datafusion::execution::context::SessionContext; #[test] diff --git a/crates/core/src/data_catalog/unity/datafusion.rs b/crates/core/src/data_catalog/unity/datafusion.rs index 6b6a4b4a63..44e7c9ca33 100644 --- a/crates/core/src/data_catalog/unity/datafusion.rs +++ b/crates/core/src/data_catalog/unity/datafusion.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::sync::Arc; use dashmap::DashMap; -use datafusion::catalog::schema::SchemaProvider; +use datafusion::catalog::SchemaProvider; use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use datafusion::datasource::TableProvider; use datafusion_common::DataFusionError; diff --git a/crates/core/src/data_catalog/unity/models.rs b/crates/core/src/data_catalog/unity/models.rs index 265149b969..2066a4ee86 100644 --- a/crates/core/src/data_catalog/unity/models.rs +++ b/crates/core/src/data_catalog/unity/models.rs @@ -252,8 +252,8 @@ pub enum TableType { StreamingTable, } -/// #[derive(Deserialize)] +/// Summary of the table pub struct TableSummary { /// The full name of the table. pub full_name: String, diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 2577d1a1db..a6ca14a077 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -29,11 +29,13 @@ use std::{ use arrow_schema::DataType; use chrono::{DateTime, NaiveDate}; use datafusion::execution::context::SessionState; +use datafusion::execution::session_state::SessionStateBuilder; use datafusion::execution::FunctionRegistry; use datafusion_common::Result as DFResult; use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference}; use datafusion_expr::{ - expr::InList, AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource, + expr::InList, planner::ExprPlanner, AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, + TableSource, }; use datafusion_sql::planner::{ContextProvider, SqlToRel}; use datafusion_sql::sqlparser::ast::escape_quoted_string; @@ -46,7 +48,33 @@ use crate::{DeltaResult, DeltaTableError}; use super::DeltaParserOptions; pub(crate) struct DeltaContextProvider<'a> { - state: &'a SessionState, + state: SessionState, + /// Keeping this around just to make use of the 'a lifetime + _original: &'a SessionState, + planners: Vec>, +} + +impl<'a> DeltaContextProvider<'a> { + fn new(state: &'a SessionState) -> Self { + let planners = state.expr_planners(); + DeltaContextProvider { + planners, + // Creating a new session state with overridden scalar_functions since + // the get_field() UDF was dropped from the default scalar functions upstream in + // `36660fe10d9c0cdff62e0da0b94bee28422d3419` + state: SessionStateBuilder::new_from_existing(state.clone()) + .with_scalar_functions( + state + .scalar_functions() + .values() + .cloned() + .chain(std::iter::once(datafusion::functions::core::get_field())) + .collect(), + ) + .build(), + _original: state, + } + } } impl<'a> ContextProvider for DeltaContextProvider<'a> { @@ -54,6 +82,10 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> { unimplemented!() } + fn get_expr_planners(&self) -> &[Arc] { + self.planners.as_slice() + } + fn get_function_meta(&self, name: &str) -> Option> { self.state.scalar_functions().get(name).cloned() } @@ -75,15 +107,15 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> { } fn udf_names(&self) -> Vec { - unimplemented!() + self.state.scalar_functions().keys().cloned().collect() } fn udaf_names(&self) -> Vec { - unimplemented!() + self.state.aggregate_functions().keys().cloned().collect() } fn udwf_names(&self) -> Vec { - unimplemented!() + self.state.window_functions().keys().cloned().collect() } } @@ -107,16 +139,10 @@ pub(crate) fn parse_predicate_expression( source: Box::new(err), })?; - let context_provider = DeltaContextProvider { state: df_state }; - let mut sql_to_rel = + let context_provider = DeltaContextProvider::new(df_state); + let sql_to_rel = SqlToRel::new_with_options(&context_provider, DeltaParserOptions::default().into()); - // NOTE: This can be probably removed with Datafusion 41 once - // is released - for planner in context_provider.state.expr_planners() { - sql_to_rel = sql_to_rel.with_user_defined_planner(planner.clone()); - } - Ok(sql_to_rel.sql_to_expr(sql, schema, &mut Default::default())?) } @@ -401,6 +427,8 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { #[cfg(test)] mod test { use arrow_schema::DataType as ArrowDataType; + use datafusion::functions_array::expr_fn::cardinality; + use datafusion::functions_nested::expr_ext::{IndexAccessor, SliceAccessor}; use datafusion::prelude::SessionContext; use datafusion_common::{Column, ScalarValue, ToDFSchema}; use datafusion_expr::expr::ScalarFunction; @@ -409,8 +437,6 @@ mod test { use datafusion_functions::core::expr_ext::FieldAccessor; use datafusion_functions::encoding::expr_fn::decode; use datafusion_functions::expr_fn::substring; - use datafusion_functions_array::expr_ext::{IndexAccessor, SliceAccessor}; - use datafusion_functions_array::expr_fn::cardinality; use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext}; use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType}; diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 956966d3e7..0725e6c326 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -41,8 +41,10 @@ lazy_static! { ONLY_FILES_SCHEMA.clone().to_dfschema_ref().unwrap(); } +#[derive(Default)] struct FindFilesPlannerExtension {} +#[derive(Default)] struct FindFilesPlanner {} #[async_trait] @@ -188,6 +190,7 @@ async fn scan_table_by_files( pub mod tests { use std::sync::Arc; + use datafusion::execution::session_state::SessionStateBuilder; use datafusion::prelude::{DataFrame, SessionContext}; use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq}; use datafusion_expr::{col, lit, Expr, Extension, LogicalPlan}; @@ -202,9 +205,9 @@ pub mod tests { expr: Expr, ) -> Result, DeltaTableError> { let ctx = SessionContext::new(); - let state = ctx - .state() - .with_query_planner(Arc::new(FindFilesPlanner {})); + let state = SessionStateBuilder::new_from_existing(ctx.state()) + .with_query_planner(Arc::new(FindFilesPlanner::default())) + .build(); let find_files_node = LogicalPlan::Extension(Extension { node: Arc::new(FindFilesNode::new( "my_cool_plan".into(), diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index b08273164a..831f71aa2e 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -39,12 +39,12 @@ use arrow_cast::display::array_value_to_string; use arrow_schema::Field; use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; +use datafusion::catalog::{Session, TableProviderFactory}; use datafusion::config::TableParquetOptions; use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder; use datafusion::datasource::physical_plan::{ wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig, }; -use datafusion::datasource::provider::TableProviderFactory; use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType}; use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext}; use datafusion::execution::runtime_env::RuntimeEnv; @@ -465,12 +465,11 @@ pub struct DeltaScanConfig { pub schema: Option, } -#[derive(Debug)] pub(crate) struct DeltaScanBuilder<'a> { snapshot: &'a DeltaTableState, log_store: LogStoreRef, filter: Option, - state: &'a SessionState, + session: &'a dyn Session, projection: Option<&'a Vec>, limit: Option, files: Option<&'a [Add]>, @@ -481,13 +480,13 @@ impl<'a> DeltaScanBuilder<'a> { pub fn new( snapshot: &'a DeltaTableState, log_store: LogStoreRef, - state: &'a SessionState, + session: &'a dyn Session, ) -> Self { DeltaScanBuilder { snapshot, log_store, filter: None, - state, + session, projection: None, limit: None, files: None, @@ -648,7 +647,7 @@ impl<'a> DeltaScanBuilder<'a> { .unwrap_or(Statistics::new_unknown(&schema)); let parquet_options = TableParquetOptions { - global: self.state.config().options().execution.parquet.clone(), + global: self.session.config().options().execution.parquet.clone(), ..Default::default() }; @@ -717,7 +716,7 @@ impl TableProvider for DeltaTable { async fn scan( &self, - session: &SessionState, + session: &dyn Session, projection: Option<&Vec>, filters: &[Expr], limit: Option, @@ -806,7 +805,7 @@ impl TableProvider for DeltaTableProvider { async fn scan( &self, - session: &SessionState, + session: &dyn Session, projection: Option<&Vec>, filters: &[Expr], limit: Option, @@ -1377,7 +1376,7 @@ pub struct DeltaTableFactory {} impl TableProviderFactory for DeltaTableFactory { async fn create( &self, - _ctx: &SessionState, + _ctx: &dyn Session, cmd: &CreateExternalTable, ) -> datafusion::error::Result> { let provider = if cmd.options.is_empty() { diff --git a/crates/core/src/kernel/arrow/mod.rs b/crates/core/src/kernel/arrow/mod.rs index 0fb41379dd..0561f5bc98 100644 --- a/crates/core/src/kernel/arrow/mod.rs +++ b/crates/core/src/kernel/arrow/mod.rs @@ -250,7 +250,7 @@ pub(crate) fn delta_log_schema_for_table( .iter() .for_each(|f| max_min_schema_for_fields(&mut max_min_vec, f)); - if max_min_vec.len() > 0 { + if !max_min_vec.is_empty() { stats_parsed_fields.extend(["minValues", "maxValues"].into_iter().map(|name| { ArrowField::new( name, diff --git a/crates/core/src/kernel/error.rs b/crates/core/src/kernel/error.rs index 853b10e411..cefe81bf9d 100644 --- a/crates/core/src/kernel/error.rs +++ b/crates/core/src/kernel/error.rs @@ -71,13 +71,3 @@ pub enum Error { #[error("Failed to parse value '{0}' as '{1}'")] Parse(String, DataType), } - -#[cfg(feature = "object_store")] -impl From for Error { - fn from(value: object_store::Error) -> Self { - match value { - object_store::Error::NotFound { path, .. } => Self::FileNotFound(path), - err => Self::ObjectStore(err), - } - } -} diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 254616691c..6e39873cf2 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -472,18 +472,24 @@ impl<'a> IntoIterator for LogDataHandler<'a> { mod datafusion { use std::sync::Arc; + use ::datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator}; + use ::datafusion::physical_plan::Accumulator; use arrow_arith::aggregate::sum; use arrow_array::Int64Array; use arrow_schema::DataType as ArrowDataType; use datafusion_common::scalar::ScalarValue; use datafusion_common::stats::{ColumnStatistics, Precision, Statistics}; - use datafusion_expr::AggregateFunction; - use datafusion_physical_expr::aggregate::AggregateExpr; - use datafusion_physical_expr::expressions::{Column, Max, Min}; use super::*; use crate::kernel::arrow::extract::{extract_and_cast_opt, extract_column}; + #[derive(Debug, Default, Clone)] + enum AccumulatorType { + Min, + Max, + #[default] + Unused, + } // TODO validate this works with "wide and narrow" builds / stats impl FileStatsAccessor<'_> { @@ -512,7 +518,7 @@ mod datafusion { &self, path_step: &str, name: &str, - fun: &AggregateFunction, + fun_type: AccumulatorType, ) -> Precision { let mut path = name.split('.'); let array = if let Ok(array) = extract_column(self.stats, path_step, &mut path) { @@ -522,28 +528,24 @@ mod datafusion { }; if array.data_type().is_primitive() { - let agg: Box = match fun { - AggregateFunction::Min => Box::new(Min::new( - // NOTE: this is just a placeholder, we never evalutae this expression - Arc::new(Column::new(name, 0)), - name, - array.data_type().clone(), - )), - AggregateFunction::Max => Box::new(Max::new( - // NOTE: this is just a placeholder, we never evalutae this expression - Arc::new(Column::new(name, 0)), - name, - array.data_type().clone(), - )), - _ => return Precision::Absent, + let accumulator: Option> = match fun_type { + AccumulatorType::Min => MinAccumulator::try_new(array.data_type()) + .map_or(None, |a| Some(Box::new(a))), + AccumulatorType::Max => MaxAccumulator::try_new(array.data_type()) + .map_or(None, |a| Some(Box::new(a))), + _ => None, }; - let mut accum = agg.create_accumulator().ok().unwrap(); - return accum - .update_batch(&[array.clone()]) - .ok() - .and_then(|_| accum.evaluate().ok()) - .map(Precision::Exact) - .unwrap_or(Precision::Absent); + + if let Some(mut accumulator) = accumulator { + return accumulator + .update_batch(&[array.clone()]) + .ok() + .and_then(|_| accumulator.evaluate().ok()) + .map(Precision::Exact) + .unwrap_or(Precision::Absent); + } + + return Precision::Absent; } match array.data_type() { @@ -551,7 +553,11 @@ mod datafusion { return fields .iter() .map(|f| { - self.column_bounds(path_step, &format!("{name}.{}", f.name()), fun) + self.column_bounds( + path_step, + &format!("{name}.{}", f.name()), + fun_type.clone(), + ) }) .map(|s| match s { Precision::Exact(s) => Some(s), @@ -590,8 +596,7 @@ mod datafusion { let null_count_col = format!("{COL_NULL_COUNT}.{}", name.as_ref()); let null_count = self.collect_count(&null_count_col); - let min_value = - self.column_bounds(COL_MIN_VALUES, name.as_ref(), &AggregateFunction::Min); + let min_value = self.column_bounds(COL_MIN_VALUES, name.as_ref(), AccumulatorType::Min); let min_value = match &min_value { Precision::Exact(value) if value.is_null() => Precision::Absent, // TODO this is a hack, we should not be casting here but rather when we read the checkpoint data. @@ -602,8 +607,7 @@ mod datafusion { _ => min_value, }; - let max_value = - self.column_bounds(COL_MAX_VALUES, name.as_ref(), &AggregateFunction::Max); + let max_value = self.column_bounds(COL_MAX_VALUES, name.as_ref(), AccumulatorType::Max); let max_value = match &max_value { Precision::Exact(value) if value.is_null() => Precision::Absent, Precision::Exact(ScalarValue::TimestampNanosecond(a, b)) => Precision::Exact( diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index c0825a5c0b..d4a8a671a7 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -371,7 +371,7 @@ impl Snapshot { .partition_columns .iter() .map(|col| { - schema.field(col).map(|field| field.clone()).ok_or_else(|| { + schema.field(col).cloned().ok_or_else(|| { DeltaTableError::Generic(format!( "Partition column {} not found in schema", col diff --git a/crates/core/src/operations/cast/merge_schema.rs b/crates/core/src/operations/cast/merge_schema.rs index 597700ad3f..624471dfbd 100644 --- a/crates/core/src/operations/cast/merge_schema.rs +++ b/crates/core/src/operations/cast/merge_schema.rs @@ -248,7 +248,7 @@ pub(crate) fn merge_arrow_field( let mut new_field = left.clone(); match new_field.try_merge(right) { Ok(()) => (), - Err(err) => { + Err(_err) => { // We cannot keep the table field here, there is some weird behavior where // Decimal(5,1) can be safely casted into Decimal(4,1) with out loss of data // Then our stats parser fails to parse this decimal(1000.1) into Decimal(4,1) diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index a2bbaca0c9..2a02da7d89 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -26,6 +26,7 @@ use datafusion::dataframe::DataFrame; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; use datafusion::execution::context::{SessionContext, SessionState}; +use datafusion::execution::session_state::SessionStateBuilder; use datafusion::physical_plan::metrics::MetricBuilder; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; @@ -185,7 +186,9 @@ async fn excute_non_empty_expr( extension_planner: DeleteMetricExtensionPlanner {}, }; - let state = state.clone().with_query_planner(Arc::new(delete_planner)); + let state = SessionStateBuilder::new_from_existing(state.clone()) + .with_query_planner(Arc::new(delete_planner)) + .build(); let scan_config = DeltaScanConfigBuilder::default() .with_file_column(false) diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index 77a06170ce..290a174cee 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -380,6 +380,7 @@ impl CdfLoadBuilder { } } +#[allow(unused)] /// Helper function to collect batches associated with reading CDF data pub(crate) async fn collect_batches( num_partitions: usize, @@ -408,8 +409,6 @@ pub(crate) mod tests { use datafusion_common::assert_batches_sorted_eq; use itertools::Itertools; - use crate::delta_datafusion::cdf::DeltaCdfScan; - use crate::operations::collect_sendable_stream; use crate::test_utils::TestSchemas; use crate::writer::test_utils::TestResult; use crate::{DeltaConfigKey, DeltaOps, DeltaTable}; diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index e7b5e43ebc..c29c14bdbe 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -36,6 +36,8 @@ use async_trait::async_trait; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; use datafusion::execution::context::SessionConfig; +use datafusion::execution::session_state::SessionStateBuilder; +use datafusion::functions_aggregate::expr_fn::{max, min}; use datafusion::logical_expr::build_join_schema; use datafusion::physical_plan::metrics::MetricBuilder; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; @@ -48,7 +50,7 @@ use datafusion_common::tree_node::{Transformed, TreeNode}; use datafusion_common::{Column, DFSchema, ScalarValue, TableReference}; use datafusion_expr::expr::Placeholder; use datafusion_expr::{ - col, conditional_expressions::CaseBuilder, lit, max, min, when, Between, Expr, JoinType, + col, conditional_expressions::CaseBuilder, lit, when, Between, Expr, JoinType, }; use datafusion_expr::{ Aggregate, BinaryExpr, Extension, LogicalPlan, LogicalPlanBuilder, Operator, @@ -679,11 +681,11 @@ struct PredicatePlaceholder { /// Takes the predicate provided and does three things: /// /// 1. for any relations between a source column and a partition target column, -/// replace source with a placeholder matching the name of the partition -/// columns +/// replace source with a placeholder matching the name of the partition +/// columns /// /// 2. for any is equal relations between a source column and a non-partition target column, -/// replace source with is between expression with min(source_column) and max(source_column) placeholders +/// replace source with is between expression with min(source_column) and max(source_column) placeholders /// /// 3. for any other relation with a source column, remove them. /// @@ -1004,10 +1006,10 @@ async fn execute( source: DataFrame, log_store: LogStoreRef, snapshot: DeltaTableState, - state: SessionState, + _state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, - safe_cast: bool, + _safe_cast: bool, source_alias: Option, target_alias: Option, match_operations: Vec, @@ -1031,7 +1033,10 @@ async fn execute( extension_planner: MergeMetricExtensionPlanner {}, }; - let state = state.with_query_planner(Arc::new(merge_planner)); + let state = SessionStateBuilder::new() + .with_default_features() + .with_query_planner(Arc::new(merge_planner)) + .build(); // TODO: Given the join predicate, remove any expression that involve the // source table and keep expressions that only involve the target table. diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 608bdb1549..3e4180763f 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -60,6 +60,7 @@ pub mod update; pub mod write; pub mod writer; +#[allow(unused)] /// The [Operation] trait defines common behaviors that all operations builders /// should have consistent pub(crate) trait Operation: std::future::IntoFuture {} diff --git a/crates/core/src/operations/restore.rs b/crates/core/src/operations/restore.rs index e2ab9741bc..71d478ca68 100644 --- a/crates/core/src/operations/restore.rs +++ b/crates/core/src/operations/restore.rs @@ -4,14 +4,14 @@ //! 1) Read the latest state snapshot of the table. //! 2) Read table state for version or datetime to restore //! 3) Compute files available in state for restoring (files were removed by some commit) -//! but missed in the latest. Add these files into commit as AddFile action. +//! but missed in the latest. Add these files into commit as AddFile action. //! 4) Compute files available in the latest state snapshot (files were added after version to restore) -//! but missed in the state to restore. Add these files into commit as RemoveFile action. +//! but missed in the state to restore. Add these files into commit as RemoveFile action. //! 5) If ignore_missing_files option is false (default value) check availability of AddFile -//! in file system. +//! in file system. //! 6) Commit Protocol, all RemoveFile and AddFile actions -//! into delta log using `LogStore::write_commit_entry` (commit will be failed in case of parallel transaction) -//! TODO: comment is outdated +//! into delta log using `LogStore::write_commit_entry` (commit will be failed in case of parallel transaction) +//! TODO: comment is outdated //! 7) If table was modified in parallel then ignore restore and raise exception. //! //! # Example diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 60b9bfd160..67639fef4c 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -53,6 +53,7 @@ use datafusion::{ dataframe::DataFrame, datasource::provider_as_source, execution::context::SessionState, + execution::session_state::SessionStateBuilder, physical_plan::{metrics::MetricBuilder, ExecutionPlan}, physical_planner::{ExtensionPlanner, PhysicalPlanner}, prelude::SessionContext, @@ -225,7 +226,7 @@ async fn execute( state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, - safe_cast: bool, + _safe_cast: bool, ) -> DeltaResult<(DeltaTableState, UpdateMetrics)> { // Validate the predicate and update expressions. // @@ -241,7 +242,9 @@ async fn execute( extension_planner: UpdateMetricExtensionPlanner {}, }; - let state = state.clone().with_query_planner(Arc::new(update_planner)); + let state = SessionStateBuilder::new_from_existing(state) + .with_query_planner(Arc::new(update_planner)) + .build(); let exec_start = Instant::now(); let mut metrics = UpdateMetrics::default(); diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 3d618c41fe..cf1adf6e09 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -28,7 +28,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use std::{iter, vec}; +use std::vec; use arrow_array::RecordBatch; use arrow_cast::can_cast_types; diff --git a/crates/core/src/protocol/mod.rs b/crates/core/src/protocol/mod.rs index ce6ef0e8b0..87ed42939a 100644 --- a/crates/core/src/protocol/mod.rs +++ b/crates/core/src/protocol/mod.rs @@ -196,18 +196,9 @@ impl PartialStats { let null_count = take(&mut self.null_count); Stats { num_records: self.num_records, - min_values: match min_values { - Some(minv) => minv, - None => HashMap::default(), - }, - max_values: match max_values { - Some(maxv) => maxv, - None => HashMap::default(), - }, - null_count: match null_count { - Some(nc) => nc, - None => HashMap::default(), - }, + min_values: min_values.unwrap_or_default(), + max_values: max_values.unwrap_or_default(), + null_count: null_count.unwrap_or_default(), } } } diff --git a/crates/core/src/storage/file.rs b/crates/core/src/storage/file.rs index f7fa168127..73975d62b3 100644 --- a/crates/core/src/storage/file.rs +++ b/crates/core/src/storage/file.rs @@ -106,14 +106,14 @@ impl From for ObjectStoreError { /// Multi-writer support for different platforms: /// /// * Modern Linux kernels are well supported. However because Linux implementation leverages -/// `RENAME_NOREPLACE`, older versions of the kernel might not work depending on what filesystem is -/// being used: +/// `RENAME_NOREPLACE`, older versions of the kernel might not work depending on what filesystem is +/// being used: /// * ext4 requires >= Linux 3.15 /// * btrfs, shmem, and cif requires >= Linux 3.17 /// * xfs requires >= Linux 4.0 /// * ext2, minix, reiserfs, jfs, vfat, and bpf requires >= Linux 4.9 /// * Darwin is supported but not fully tested. -/// Patches welcome. +/// Patches welcome. /// * Support for other platforms are not implemented at the moment. #[derive(Debug)] pub struct FileStorageBackend { @@ -279,10 +279,7 @@ async fn rename_noreplace(from: &str, to: &str) -> Result<(), LocalFileSystemErr } // Generic implementation (Requires 2 system calls) -#[cfg(not(any( - all(target_os = "linux", target_env = "gnu", glibc_renameat2), - target_os = "macos" -)))] +#[cfg(not(any(all(target_os = "linux", target_env = "gnu"), target_os = "macos")))] mod imp { use super::*; @@ -323,10 +320,7 @@ mod imp { } // Optimized implementations (Only 1 system call) -#[cfg(any( - all(target_os = "linux", target_env = "gnu", glibc_renameat2), - target_os = "macos" -))] +#[cfg(any(all(target_os = "linux", target_env = "gnu"), target_os = "macos"))] mod imp { use super::*; use std::ffi::CString; diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index ea83bce29e..bd7d53612d 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -328,7 +328,7 @@ mod local { let expected = vec![ "+-----------------------+-----------------------+", - "| MAX(test_table.value) | MIN(test_table.value) |", + "| max(test_table.value) | min(test_table.value) |", "+-----------------------+-----------------------+", "| 4 | 0 |", "+-----------------------+-----------------------+", @@ -359,7 +359,7 @@ mod local { let expected = vec![ "+------------------------+------------------------+", - "| MAX(test_table2.value) | MIN(test_table2.value) |", + "| max(test_table2.value) | min(test_table2.value) |", "+------------------------+------------------------+", "| 3 | 1 |", "+------------------------+------------------------+", diff --git a/crates/sql/src/planner.rs b/crates/sql/src/planner.rs index 90b7827575..88596b0d5b 100644 --- a/crates/sql/src/planner.rs +++ b/crates/sql/src/planner.rs @@ -43,6 +43,7 @@ impl<'a, S: ContextProvider> DeltaSqlToRel<'a, S> { parse_float_as_decimal: self.options.parse_float_as_decimal, enable_ident_normalization: self.options.enable_ident_normalization, support_varchar_with_length: false, + enable_options_value_normalization: false, }, ); planner.statement_to_plan(s) diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml index b4fa816176..314c6fefa3 100644 --- a/crates/test/Cargo.toml +++ b/crates/test/Cargo.toml @@ -21,3 +21,5 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] } [features] default = [] datafusion = ["deltalake-core/datafusion"] +hdfs = [] +azure = [] diff --git a/crates/test/src/datafusion.rs b/crates/test/src/datafusion.rs index 8207233ef9..f6357ab3b7 100644 --- a/crates/test/src/datafusion.rs +++ b/crates/test/src/datafusion.rs @@ -1,5 +1,6 @@ -use deltalake_core::datafusion::execution::context::{SessionContext, SessionState}; +use deltalake_core::datafusion::execution::context::SessionContext; use deltalake_core::datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; +use deltalake_core::datafusion::execution::session_state::SessionStateBuilder; use deltalake_core::datafusion::prelude::SessionConfig; use deltalake_core::delta_datafusion::DeltaTableFactory; use std::sync::Arc; @@ -8,7 +9,10 @@ pub fn context_with_delta_table_factory() -> SessionContext { let cfg = RuntimeConfig::new(); let env = RuntimeEnv::new(cfg).unwrap(); let ses = SessionConfig::new(); - let mut state = SessionState::new_with_config_rt(ses, Arc::new(env)); + let mut state = SessionStateBuilder::new() + .with_config(ses) + .with_runtime_env(Arc::new(env)) + .build(); state .table_factories_mut() .insert("DELTATABLE".to_string(), Arc::new(DeltaTableFactory {})); diff --git a/python/src/lib.rs b/python/src/lib.rs index e87668b3c8..3947216285 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -20,8 +20,8 @@ use deltalake::arrow::record_batch::RecordBatchReader; use deltalake::arrow::record_batch::{RecordBatch, RecordBatchIterator}; use deltalake::arrow::{self, datatypes::Schema as ArrowSchema}; use deltalake::checkpoints::{cleanup_metadata, create_checkpoint}; +use deltalake::datafusion::catalog::TableProvider; use deltalake::datafusion::datasource::memory::MemTable; -use deltalake::datafusion::datasource::provider::TableProvider; use deltalake::datafusion::physical_plan::ExecutionPlan; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::cdf::FileAction;