From 296369208cadc5d7a3a8858a60c6dc5da49c49b3 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Mon, 1 Apr 2024 10:56:18 -0500 Subject: [PATCH] chore(rust): bump datafusion to 36 (#2249) # Description The description of the main changes of your pull request # Related Issue(s) # Documentation --- Cargo.toml | 13 +++---- crates/core/Cargo.toml | 4 ++- .../core/src/data_catalog/unity/datafusion.rs | 4 +-- crates/core/src/delta_datafusion/expr.rs | 34 +++++++++++-------- .../delta_datafusion/find_files/physical.rs | 8 ++++- crates/core/src/delta_datafusion/mod.rs | 13 ++++++- crates/core/src/kernel/snapshot/log_data.rs | 10 +++++- crates/core/src/operations/merge/barrier.rs | 5 +++ crates/sql/src/parser.rs | 2 +- 9 files changed, 66 insertions(+), 27 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5f7080ec7c..92f9a8a36d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,12 +46,13 @@ object_store = { version = "0.9" } parquet = { version = "50" } # datafusion -datafusion = { version = "35" } -datafusion-expr = { version = "35" } -datafusion-common = { version = "35" } -datafusion-proto = { version = "35" } -datafusion-sql = { version = "35" } -datafusion-physical-expr = { version = "35" } +datafusion = { version = "36" } +datafusion-expr = { version = "36" } +datafusion-common = { version = "36" } +datafusion-proto = { version = "36" } +datafusion-sql = { version = "36" } +datafusion-physical-expr = { version = "36" } +datafusion-functions = { version = "36" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 04b4a52275..7b5cd2adbc 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -40,6 +40,7 @@ datafusion-common = { workspace = true, optional = true } datafusion-proto = { workspace = true, optional = true } datafusion-sql = { workspace = true, optional = true } datafusion-physical-expr = { workspace = true, optional = true } +datafusion-functions = { workspace = true, optional = true } # serde serde = { workspace = true, features = ["derive"] } @@ -95,7 +96,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [ "rustls-tls", "json", ], optional = true } -sqlparser = { version = "0.41", optional = true } +sqlparser = { version = "0.44", optional = true } [dev-dependencies] criterion = "0.5" @@ -121,6 +122,7 @@ datafusion = [ "datafusion-proto", "datafusion-physical-expr", "datafusion-sql", + "datafusion-functions", "sqlparser", ] datafusion-ext = ["datafusion"] diff --git a/crates/core/src/data_catalog/unity/datafusion.rs b/crates/core/src/data_catalog/unity/datafusion.rs index 21246c865a..0ed539e708 100644 --- a/crates/core/src/data_catalog/unity/datafusion.rs +++ b/crates/core/src/data_catalog/unity/datafusion.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use dashmap::DashMap; use datafusion::catalog::schema::SchemaProvider; -use datafusion::catalog::{CatalogList, CatalogProvider}; +use datafusion::catalog::{CatalogProvider, CatalogProviderList}; use datafusion::datasource::TableProvider; use tracing::error; @@ -49,7 +49,7 @@ impl UnityCatalogList { } } -impl CatalogList for UnityCatalogList { +impl CatalogProviderList for UnityCatalogList { fn as_any(&self) -> &dyn Any { self } diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index dfe234ad46..4317f7f214 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -34,10 +34,10 @@ use datafusion_expr::{ expr::InList, AggregateUDF, Between, BinaryExpr, Cast, Expr, GetIndexedField, Like, TableSource, }; use datafusion_sql::planner::{ContextProvider, SqlToRel}; -use sqlparser::ast::escape_quoted_string; -use sqlparser::dialect::GenericDialect; -use sqlparser::parser::Parser; -use sqlparser::tokenizer::Tokenizer; +use datafusion_sql::sqlparser::ast::escape_quoted_string; +use datafusion_sql::sqlparser::dialect::GenericDialect; +use datafusion_sql::sqlparser::parser::Parser; +use datafusion_sql::sqlparser::tokenizer::Tokenizer; use crate::{DeltaResult, DeltaTableError}; @@ -275,13 +275,18 @@ impl<'a> Display for SqlFormat<'a> { datafusion_expr::GetFieldAccess::ListIndex { key } => { write!(f, "{}[{}]", SqlFormat { expr }, SqlFormat { expr: key }) } - datafusion_expr::GetFieldAccess::ListRange { start, stop } => { + datafusion_expr::GetFieldAccess::ListRange { + start, + stop, + stride, + } => { write!( f, - "{}[{}:{}]", - SqlFormat { expr }, - SqlFormat { expr: start }, - SqlFormat { expr: stop } + "{expr}[{start}:{stop}:{stride}]", + expr = SqlFormat { expr }, + start = SqlFormat { expr: start }, + stop = SqlFormat { expr: stop }, + stride = SqlFormat { expr: stride } ) } }, @@ -367,8 +372,9 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> { mod test { use arrow_schema::DataType as ArrowDataType; use datafusion::prelude::SessionContext; - use datafusion_common::{Column, DFSchema, ScalarValue}; - use datafusion_expr::{cardinality, col, decode, lit, substring, Cast, Expr, ExprSchemable}; + use datafusion_common::{Column, ScalarValue, ToDFSchema}; + use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable}; + use datafusion_functions::encoding::expr_fn::decode; use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext}; use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType}; @@ -572,7 +578,7 @@ mod test { ), simple!( col("value") - .cast_to::( + .cast_to( &arrow_schema::DataType::Utf8, &table .snapshot() @@ -581,7 +587,7 @@ mod test { .unwrap() .as_ref() .to_owned() - .try_into() + .to_dfschema() .unwrap() ) .unwrap() @@ -602,7 +608,7 @@ mod test { ), simple!( cardinality(col("_list").range(col("value"), lit(10_i64))), - "cardinality(_list[value:10])".to_string() + "cardinality(_list[value:10:1])".to_string() ), ]; diff --git a/crates/core/src/delta_datafusion/find_files/physical.rs b/crates/core/src/delta_datafusion/find_files/physical.rs index 9b9238dd86..56c7ca9989 100644 --- a/crates/core/src/delta_datafusion/find_files/physical.rs +++ b/crates/core/src/delta_datafusion/find_files/physical.rs @@ -99,8 +99,14 @@ impl ExecutionPlan for FindFilesExec { fn with_new_children( self: Arc, - _children: Vec>, + children: Vec>, ) -> Result> { + if !children.is_empty() { + return Err(datafusion::error::DataFusionError::Plan( + "Children cannot be replaced in FindFilesExec".to_string(), + )); + } + Ok(self) } diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 2c11335edd..7676a60997 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -835,7 +835,18 @@ impl ExecutionPlan for DeltaScan { self: Arc, children: Vec>, ) -> DataFusionResult> { - ExecutionPlan::with_new_children(self.parquet_scan.clone(), children) + if children.len() != 1 { + return Err(DataFusionError::Plan(format!( + "DeltaScan wrong number of children {}", + children.len() + ))); + } + Ok(Arc::new(DeltaScan { + table_uri: self.table_uri.clone(), + config: self.config.clone(), + parquet_scan: children[0].clone(), + logical_schema: self.logical_schema.clone(), + })) } fn execute( diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 972aeb6f9a..b2c06210e9 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -549,7 +549,15 @@ mod datafusion { _ => None, }) .collect::>>() - .map(|o| Precision::Exact(ScalarValue::Struct(Some(o), fields.clone()))) + .map(|o| { + let arrays = o + .into_iter() + .map(|sv| sv.to_array()) + .collect::, datafusion_common::DataFusionError>>() + .unwrap(); + let sa = StructArray::new(fields.clone(), arrays, None); + Precision::Exact(ScalarValue::Struct(Arc::new(sa))) + }) .unwrap_or(Precision::Absent); } _ => Precision::Absent, diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index f1df28c4a4..8cc0c2d804 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -95,6 +95,11 @@ impl ExecutionPlan for MergeBarrierExec { self: std::sync::Arc, children: Vec>, ) -> datafusion_common::Result> { + if children.len() != 1 { + return Err(DataFusionError::Plan( + "MergeBarrierExec wrong number of children".to_string(), + )); + } Ok(Arc::new(MergeBarrierExec::new( children[0].clone(), self.file_column.clone(), diff --git a/crates/sql/src/parser.rs b/crates/sql/src/parser.rs index 4f66896389..19bf3f00b0 100644 --- a/crates/sql/src/parser.rs +++ b/crates/sql/src/parser.rs @@ -164,7 +164,7 @@ impl<'a> DeltaParser<'a> { } pub fn parse_vacuum(&mut self) -> Result { - let table_name = self.parser.parse_object_name()?; + let table_name = self.parser.parse_object_name(false)?; match self.parser.peek_token().token { Token::Word(w) => match w.keyword { Keyword::RETAIN => {