Skip to content

Commit

Permalink
chore(rust): bump datafusion to 36 (#2249)
Browse files Browse the repository at this point in the history
# Description
The description of the main changes of your pull request

# Related Issue(s)
<!---
For example:

- closes #106
--->

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
universalmind303 authored Apr 1, 2024
1 parent e34767d commit 2963692
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 27 deletions.
13 changes: 7 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ object_store = { version = "0.9" }
parquet = { version = "50" }

# datafusion
datafusion = { version = "35" }
datafusion-expr = { version = "35" }
datafusion-common = { version = "35" }
datafusion-proto = { version = "35" }
datafusion-sql = { version = "35" }
datafusion-physical-expr = { version = "35" }
datafusion = { version = "36" }
datafusion-expr = { version = "36" }
datafusion-common = { version = "36" }
datafusion-proto = { version = "36" }
datafusion-sql = { version = "36" }
datafusion-physical-expr = { version = "36" }
datafusion-functions = { version = "36" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
4 changes: 3 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ datafusion-common = { workspace = true, optional = true }
datafusion-proto = { workspace = true, optional = true }
datafusion-sql = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }
datafusion-functions = { workspace = true, optional = true }

# serde
serde = { workspace = true, features = ["derive"] }
Expand Down Expand Up @@ -95,7 +96,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [
"rustls-tls",
"json",
], optional = true }
sqlparser = { version = "0.41", optional = true }
sqlparser = { version = "0.44", optional = true }

[dev-dependencies]
criterion = "0.5"
Expand All @@ -121,6 +122,7 @@ datafusion = [
"datafusion-proto",
"datafusion-physical-expr",
"datafusion-sql",
"datafusion-functions",
"sqlparser",
]
datafusion-ext = ["datafusion"]
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/data_catalog/unity/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;

use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogList, CatalogProvider};
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::datasource::TableProvider;
use tracing::error;

Expand Down Expand Up @@ -49,7 +49,7 @@ impl UnityCatalogList {
}
}

impl CatalogList for UnityCatalogList {
impl CatalogProviderList for UnityCatalogList {
fn as_any(&self) -> &dyn Any {
self
}
Expand Down
34 changes: 20 additions & 14 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ use datafusion_expr::{
expr::InList, AggregateUDF, Between, BinaryExpr, Cast, Expr, GetIndexedField, Like, TableSource,
};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use sqlparser::ast::escape_quoted_string;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
use sqlparser::tokenizer::Tokenizer;
use datafusion_sql::sqlparser::ast::escape_quoted_string;
use datafusion_sql::sqlparser::dialect::GenericDialect;
use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::sqlparser::tokenizer::Tokenizer;

use crate::{DeltaResult, DeltaTableError};

Expand Down Expand Up @@ -275,13 +275,18 @@ impl<'a> Display for SqlFormat<'a> {
datafusion_expr::GetFieldAccess::ListIndex { key } => {
write!(f, "{}[{}]", SqlFormat { expr }, SqlFormat { expr: key })
}
datafusion_expr::GetFieldAccess::ListRange { start, stop } => {
datafusion_expr::GetFieldAccess::ListRange {
start,
stop,
stride,
} => {
write!(
f,
"{}[{}:{}]",
SqlFormat { expr },
SqlFormat { expr: start },
SqlFormat { expr: stop }
"{expr}[{start}:{stop}:{stride}]",
expr = SqlFormat { expr },
start = SqlFormat { expr: start },
stop = SqlFormat { expr: stop },
stride = SqlFormat { expr: stride }
)
}
},
Expand Down Expand Up @@ -367,8 +372,9 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> {
mod test {
use arrow_schema::DataType as ArrowDataType;
use datafusion::prelude::SessionContext;
use datafusion_common::{Column, DFSchema, ScalarValue};
use datafusion_expr::{cardinality, col, decode, lit, substring, Cast, Expr, ExprSchemable};
use datafusion_common::{Column, ScalarValue, ToDFSchema};
use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable};
use datafusion_functions::encoding::expr_fn::decode;

use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext};
use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType};
Expand Down Expand Up @@ -572,7 +578,7 @@ mod test {
),
simple!(
col("value")
.cast_to::<DFSchema>(
.cast_to(
&arrow_schema::DataType::Utf8,
&table
.snapshot()
Expand All @@ -581,7 +587,7 @@ mod test {
.unwrap()
.as_ref()
.to_owned()
.try_into()
.to_dfschema()
.unwrap()
)
.unwrap()
Expand All @@ -602,7 +608,7 @@ mod test {
),
simple!(
cardinality(col("_list").range(col("value"), lit(10_i64))),
"cardinality(_list[value:10])".to_string()
"cardinality(_list[value:10:1])".to_string()
),
];

Expand Down
8 changes: 7 additions & 1 deletion crates/core/src/delta_datafusion/find_files/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,14 @@ impl ExecutionPlan for FindFilesExec {

fn with_new_children(
self: Arc<Self>,
_children: Vec<Arc<dyn ExecutionPlan>>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if !children.is_empty() {
return Err(datafusion::error::DataFusionError::Plan(
"Children cannot be replaced in FindFilesExec".to_string(),
));
}

Ok(self)
}

Expand Down
13 changes: 12 additions & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,18 @@ impl ExecutionPlan for DeltaScan {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
ExecutionPlan::with_new_children(self.parquet_scan.clone(), children)
if children.len() != 1 {
return Err(DataFusionError::Plan(format!(
"DeltaScan wrong number of children {}",
children.len()
)));
}
Ok(Arc::new(DeltaScan {
table_uri: self.table_uri.clone(),
config: self.config.clone(),
parquet_scan: children[0].clone(),
logical_schema: self.logical_schema.clone(),
}))
}

fn execute(
Expand Down
10 changes: 9 additions & 1 deletion crates/core/src/kernel/snapshot/log_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,15 @@ mod datafusion {
_ => None,
})
.collect::<Option<Vec<_>>>()
.map(|o| Precision::Exact(ScalarValue::Struct(Some(o), fields.clone())))
.map(|o| {
let arrays = o
.into_iter()
.map(|sv| sv.to_array())
.collect::<Result<Vec<_>, datafusion_common::DataFusionError>>()
.unwrap();
let sa = StructArray::new(fields.clone(), arrays, None);
Precision::Exact(ScalarValue::Struct(Arc::new(sa)))
})
.unwrap_or(Precision::Absent);
}
_ => Precision::Absent,
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/operations/merge/barrier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ impl ExecutionPlan for MergeBarrierExec {
self: std::sync::Arc<Self>,
children: Vec<std::sync::Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<std::sync::Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
return Err(DataFusionError::Plan(
"MergeBarrierExec wrong number of children".to_string(),
));
}
Ok(Arc::new(MergeBarrierExec::new(
children[0].clone(),
self.file_column.clone(),
Expand Down
2 changes: 1 addition & 1 deletion crates/sql/src/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl<'a> DeltaParser<'a> {
}

pub fn parse_vacuum(&mut self) -> Result<Statement, ParserError> {
let table_name = self.parser.parse_object_name()?;
let table_name = self.parser.parse_object_name(false)?;
match self.parser.peek_token().token {
Token::Word(w) => match w.keyword {
Keyword::RETAIN => {
Expand Down

0 comments on commit 2963692

Please sign in to comment.