Skip to content

Commit

Permalink
chore: upgrade to datafusion 41
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed Aug 13, 2024
1 parent a0f9c18 commit 423aab8
Show file tree
Hide file tree
Showing 27 changed files with 156 additions and 130 deletions.
16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ object_store = { version = "0.10.1" }
parquet = { version = "52" }

# datafusion
datafusion = { version = "40" }
datafusion-expr = { version = "40" }
datafusion-common = { version = "40" }
datafusion-proto = { version = "40" }
datafusion-sql = { version = "40" }
datafusion-physical-expr = { version = "40" }
datafusion-functions = { version = "40" }
datafusion-functions-array = { version = "40" }
datafusion = { version = "41" }
datafusion-expr = { version = "41" }
datafusion-common = { version = "41" }
datafusion-proto = { version = "41" }
datafusion-sql = { version = "41" }
datafusion-physical-expr = { version = "41" }
datafusion-functions = { version = "41" }
datafusion-functions-aggregate = { version = "41" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
4 changes: 2 additions & 2 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ datafusion-proto = { workspace = true, optional = true }
datafusion-sql = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }
datafusion-functions = { workspace = true, optional = true }
datafusion-functions-array = { workspace = true, optional = true }
datafusion-functions-aggregate = { workspace = true, optional = true }

# serde
serde = { workspace = true, features = ["derive"] }
Expand Down Expand Up @@ -129,7 +129,7 @@ datafusion = [
"datafusion-physical-expr",
"datafusion-sql",
"datafusion-functions",
"datafusion-functions-array",
"datafusion-functions-aggregate",
"sqlparser",
]
datafusion-ext = ["datafusion"]
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::SchemaProvider;
use datafusion::datasource::TableProvider;
use datafusion_common::DataFusionError;
use futures::TryStreamExt;
Expand Down Expand Up @@ -147,7 +147,8 @@ impl SchemaProvider for ListingSchemaProvider {
mod tests {
use super::*;
use datafusion::assert_batches_sorted_eq;
use datafusion::catalog::{CatalogProvider, MemoryCatalogProvider};
use datafusion::catalog::CatalogProvider;
use datafusion::catalog_common::MemoryCatalogProvider;
use datafusion::execution::context::SessionContext;

#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/data_catalog/unity/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::datasource::TableProvider;
use datafusion_common::DataFusionError;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/data_catalog/unity/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ pub enum TableType {
StreamingTable,
}

///
#[derive(Deserialize)]
/// Summary of the table
pub struct TableSummary {
/// The full name of the table.
pub full_name: String,
Expand Down
56 changes: 41 additions & 15 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ use std::{
use arrow_schema::DataType;
use chrono::{DateTime, NaiveDate};
use datafusion::execution::context::SessionState;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::execution::FunctionRegistry;
use datafusion_common::Result as DFResult;
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
use datafusion_expr::{
expr::InList, AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource,
expr::InList, planner::ExprPlanner, AggregateUDF, Between, BinaryExpr, Cast, Expr, Like,
TableSource,
};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::escape_quoted_string;
Expand All @@ -46,14 +48,44 @@ use crate::{DeltaResult, DeltaTableError};
use super::DeltaParserOptions;

pub(crate) struct DeltaContextProvider<'a> {
state: &'a SessionState,
state: SessionState,
/// Keeping this around just to make use of the 'a lifetime
_original: &'a SessionState,
planners: Vec<Arc<dyn ExprPlanner>>,
}

impl<'a> DeltaContextProvider<'a> {
fn new(state: &'a SessionState) -> Self {
let planners = state.expr_planners();
DeltaContextProvider {
planners,
// Creating a new session state with overridden scalar_functions since
// the get_field() UDF was dropped from the default scalar functions upstream in
// `36660fe10d9c0cdff62e0da0b94bee28422d3419`
state: SessionStateBuilder::new_from_existing(state.clone())
.with_scalar_functions(
state
.scalar_functions()
.values()
.cloned()
.chain(std::iter::once(datafusion::functions::core::get_field()))
.collect(),
)
.build(),
_original: state,
}
}
}

impl<'a> ContextProvider for DeltaContextProvider<'a> {
fn get_table_source(&self, _name: TableReference) -> DFResult<Arc<dyn TableSource>> {
unimplemented!()
}

fn get_expr_planners(&self) -> &[Arc<dyn ExprPlanner>] {
self.planners.as_slice()
}

fn get_function_meta(&self, name: &str) -> Option<Arc<datafusion_expr::ScalarUDF>> {
self.state.scalar_functions().get(name).cloned()
}
Expand All @@ -75,15 +107,15 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> {
}

fn udf_names(&self) -> Vec<String> {
unimplemented!()
self.state.scalar_functions().keys().cloned().collect()
}

fn udaf_names(&self) -> Vec<String> {
unimplemented!()
self.state.aggregate_functions().keys().cloned().collect()
}

fn udwf_names(&self) -> Vec<String> {
unimplemented!()
self.state.window_functions().keys().cloned().collect()
}
}

Expand All @@ -107,16 +139,10 @@ pub(crate) fn parse_predicate_expression(
source: Box::new(err),
})?;

let context_provider = DeltaContextProvider { state: df_state };
let mut sql_to_rel =
let context_provider = DeltaContextProvider::new(df_state);
let sql_to_rel =
SqlToRel::new_with_options(&context_provider, DeltaParserOptions::default().into());

// NOTE: This can be probably removed with Datafusion 41 once
// <https://github.com/apache/datafusion/pull/11485> is released
for planner in context_provider.state.expr_planners() {
sql_to_rel = sql_to_rel.with_user_defined_planner(planner.clone());
}

Ok(sql_to_rel.sql_to_expr(sql, schema, &mut Default::default())?)
}

Expand Down Expand Up @@ -401,6 +427,8 @@ impl<'a> fmt::Display for ScalarValueFormat<'a> {
#[cfg(test)]
mod test {
use arrow_schema::DataType as ArrowDataType;
use datafusion::functions_array::expr_fn::cardinality;
use datafusion::functions_nested::expr_ext::{IndexAccessor, SliceAccessor};
use datafusion::prelude::SessionContext;
use datafusion_common::{Column, ScalarValue, ToDFSchema};
use datafusion_expr::expr::ScalarFunction;
Expand All @@ -409,8 +437,6 @@ mod test {
use datafusion_functions::core::expr_ext::FieldAccessor;
use datafusion_functions::encoding::expr_fn::decode;
use datafusion_functions::expr_fn::substring;
use datafusion_functions_array::expr_ext::{IndexAccessor, SliceAccessor};
use datafusion_functions_array::expr_fn::cardinality;

use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext};
use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType};
Expand Down
9 changes: 6 additions & 3 deletions crates/core/src/delta_datafusion/find_files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ lazy_static! {
ONLY_FILES_SCHEMA.clone().to_dfschema_ref().unwrap();
}

#[derive(Default)]
struct FindFilesPlannerExtension {}

#[derive(Default)]
struct FindFilesPlanner {}

#[async_trait]
Expand Down Expand Up @@ -188,6 +190,7 @@ async fn scan_table_by_files(
pub mod tests {
use std::sync::Arc;

use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::prelude::{DataFrame, SessionContext};
use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq};
use datafusion_expr::{col, lit, Expr, Extension, LogicalPlan};
Expand All @@ -202,9 +205,9 @@ pub mod tests {
expr: Expr,
) -> Result<Vec<arrow_array::RecordBatch>, DeltaTableError> {
let ctx = SessionContext::new();
let state = ctx
.state()
.with_query_planner(Arc::new(FindFilesPlanner {}));
let state = SessionStateBuilder::new_from_existing(ctx.state())
.with_query_planner(Arc::new(FindFilesPlanner::default()))
.build();
let find_files_node = LogicalPlan::Extension(Extension {
node: Arc::new(FindFilesNode::new(
"my_cool_plan".into(),
Expand Down
17 changes: 8 additions & 9 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ use arrow_cast::display::array_value_to_string;
use arrow_schema::Field;
use async_trait::async_trait;
use chrono::{DateTime, TimeZone, Utc};
use datafusion::catalog::{Session, TableProviderFactory};
use datafusion::config::TableParquetOptions;
use datafusion::datasource::physical_plan::parquet::ParquetExecBuilder;
use datafusion::datasource::physical_plan::{
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
};
use datafusion::datasource::provider::TableProviderFactory;
use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider, TableType};
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
use datafusion::execution::runtime_env::RuntimeEnv;
Expand Down Expand Up @@ -465,12 +465,11 @@ pub struct DeltaScanConfig {
pub schema: Option<SchemaRef>,
}

#[derive(Debug)]
pub(crate) struct DeltaScanBuilder<'a> {
snapshot: &'a DeltaTableState,
log_store: LogStoreRef,
filter: Option<Expr>,
state: &'a SessionState,
session: &'a dyn Session,
projection: Option<&'a Vec<usize>>,
limit: Option<usize>,
files: Option<&'a [Add]>,
Expand All @@ -481,13 +480,13 @@ impl<'a> DeltaScanBuilder<'a> {
pub fn new(
snapshot: &'a DeltaTableState,
log_store: LogStoreRef,
state: &'a SessionState,
session: &'a dyn Session,
) -> Self {
DeltaScanBuilder {
snapshot,
log_store,
filter: None,
state,
session,
projection: None,
limit: None,
files: None,
Expand Down Expand Up @@ -648,7 +647,7 @@ impl<'a> DeltaScanBuilder<'a> {
.unwrap_or(Statistics::new_unknown(&schema));

let parquet_options = TableParquetOptions {
global: self.state.config().options().execution.parquet.clone(),
global: self.session.config().options().execution.parquet.clone(),
..Default::default()
};

Expand Down Expand Up @@ -717,7 +716,7 @@ impl TableProvider for DeltaTable {

async fn scan(
&self,
session: &SessionState,
session: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down Expand Up @@ -806,7 +805,7 @@ impl TableProvider for DeltaTableProvider {

async fn scan(
&self,
session: &SessionState,
session: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down Expand Up @@ -1377,7 +1376,7 @@ pub struct DeltaTableFactory {}
impl TableProviderFactory for DeltaTableFactory {
async fn create(
&self,
_ctx: &SessionState,
_ctx: &dyn Session,
cmd: &CreateExternalTable,
) -> datafusion::error::Result<Arc<dyn TableProvider>> {
let provider = if cmd.options.is_empty() {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/arrow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ pub(crate) fn delta_log_schema_for_table(
.iter()
.for_each(|f| max_min_schema_for_fields(&mut max_min_vec, f));

if max_min_vec.len() > 0 {
if !max_min_vec.is_empty() {
stats_parsed_fields.extend(["minValues", "maxValues"].into_iter().map(|name| {
ArrowField::new(
name,
Expand Down
10 changes: 0 additions & 10 deletions crates/core/src/kernel/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,3 @@ pub enum Error {
#[error("Failed to parse value '{0}' as '{1}'")]
Parse(String, DataType),
}

#[cfg(feature = "object_store")]
impl From<object_store::Error> for Error {
fn from(value: object_store::Error) -> Self {
match value {
object_store::Error::NotFound { path, .. } => Self::FileNotFound(path),
err => Self::ObjectStore(err),
}
}
}
Loading

0 comments on commit 423aab8

Please sign in to comment.