From 25ce38956e25722ba7a6cbc0f5a7dba6b3361554 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 21 Nov 2024 19:35:41 +0000 Subject: [PATCH] feat: upgrade to delta_kernel 0.4.1 :tada: This change includes some minor warnings cleanup which were pestering me too. Closes #3015 Signed-off-by: R. Tyler Croy --- Cargo.toml | 2 +- crates/core/src/delta_datafusion/expr.rs | 1 - crates/core/src/kernel/snapshot/log_data.rs | 6 +-- crates/core/src/kernel/snapshot/mod.rs | 43 ++++--------------- crates/core/src/operations/optimize.rs | 5 ++- crates/core/src/operations/update.rs | 1 - .../core/src/test_utils/factories/actions.rs | 3 +- 7 files changed, 16 insertions(+), 45 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2d566152da..cc8eead83c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "0.3.1" } +delta_kernel = { version = "0.4.1", features = ["sync-engine"] } # delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" } # arrow diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 33746bad2b..54cebcfbd2 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -28,7 +28,6 @@ use arrow_schema::{DataType, Field}; use chrono::{DateTime, NaiveDate}; use datafusion::execution::context::SessionState; use datafusion::execution::session_state::SessionStateBuilder; -use datafusion::execution::FunctionRegistry; use datafusion::functions_array::make_array::MakeArray; use datafusion_common::Result as DFResult; use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference}; diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 1a30ec7c46..8dbf0ff7ee 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -2,9 +2,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::sync::Arc; -use arrow_array::{ - Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray, UInt64Array, -}; +use arrow_array::{Array, Int32Array, Int64Array, MapArray, RecordBatch, StringArray, StructArray}; use chrono::{DateTime, Utc}; use delta_kernel::expressions::Scalar; use indexmap::IndexMap; @@ -484,7 +482,7 @@ mod datafusion { use ::datafusion::physical_plan::Accumulator; use arrow::compute::concat_batches; use arrow_arith::aggregate::sum; - use arrow_array::{ArrayRef, BooleanArray, Int64Array}; + use arrow_array::{ArrayRef, BooleanArray, Int64Array, UInt64Array}; use arrow_schema::DataType as ArrowDataType; use datafusion_common::scalar::ScalarValue; use datafusion_common::stats::{ColumnStatistics, Precision, Statistics}; diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index dbd6662f2f..d5763b5006 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -220,24 +220,14 @@ impl Snapshot { schema_actions.insert(ActionType::Add); let checkpoint_stream = self.log_segment.checkpoint_stream( store.clone(), - &StructType::new( - schema_actions - .iter() - .map(|a| a.schema_field().clone()) - .collect(), - ), + &StructType::new(schema_actions.iter().map(|a| a.schema_field().clone())), &self.config, ); schema_actions.insert(ActionType::Remove); let log_stream = self.log_segment.commit_stream( store.clone(), - &StructType::new( - schema_actions - .iter() - .map(|a| a.schema_field().clone()) - .collect(), - ), + &StructType::new(schema_actions.iter().map(|a| a.schema_field().clone())), &self.config, )?; @@ -460,12 +450,8 @@ impl EagerSnapshot { // NOTE: we don't need to add the visitor relevant data here, as it is repüresented in teh state already futures::stream::iter(files.into_iter().map(Ok)).boxed() } else { - let read_schema = StructType::new( - schema_actions - .iter() - .map(|a| a.schema_field().clone()) - .collect(), - ); + let read_schema = + StructType::new(schema_actions.iter().map(|a| a.schema_field().clone())); new_slice .checkpoint_stream( log_store.object_store(), @@ -476,12 +462,7 @@ impl EagerSnapshot { }; schema_actions.insert(ActionType::Remove); - let read_schema = StructType::new( - schema_actions - .iter() - .map(|a| a.schema_field().clone()) - .collect(), - ); + let read_schema = StructType::new(schema_actions.iter().map(|a| a.schema_field().clone())); let log_stream = new_slice.commit_stream( log_store.object_store().clone(), &read_schema, @@ -618,12 +599,7 @@ impl EagerSnapshot { let mut schema_actions: HashSet<_> = visitors.iter().flat_map(|v| v.required_actions()).collect(); schema_actions.extend([ActionType::Add, ActionType::Remove]); - let read_schema = StructType::new( - schema_actions - .iter() - .map(|a| a.schema_field().clone()) - .collect(), - ); + let read_schema = StructType::new(schema_actions.iter().map(|a| a.schema_field().clone())); let actions = self.snapshot.log_segment.advance( send, &self.table_root(), @@ -712,7 +688,7 @@ fn stats_schema(schema: &StructType, config: TableConfig<'_>) -> DeltaResult Option StructType::new( dt_struct .fields() - .flat_map(|f| stats_field(idx, num_indexed_cols, f)) - .collect(), + .flat_map(|f| stats_field(idx, num_indexed_cols, f)), ), true, )), @@ -769,7 +744,7 @@ fn to_count_field(field: &StructField) -> Option { DataType::Map(_) | DataType::Array(_) | &DataType::BINARY => None, DataType::Struct(s) => Some(StructField::new( field.name(), - StructType::new(s.fields().filter_map(to_count_field).collect::>()), + StructType::new(s.fields().filter_map(to_count_field)), true, )), _ => Some(StructField::new(field.name(), DataType::LONG, true)), diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 8ae6448341..758aaa47bf 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -40,7 +40,6 @@ use parquet::errors::ParquetError; use parquet::file::properties::WriterProperties; use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer}; use tracing::*; -use url::Url; use super::transaction::PROTOCOL; use super::writer::{PartitionWriter, PartitionWriterConfig}; @@ -1213,6 +1212,8 @@ pub(super) mod zorder { #[cfg(feature = "datafusion")] pub(super) mod datafusion { use super::*; + use url::Url; + use ::datafusion::{ execution::{ memory_pool::FairSpillPool, @@ -1245,7 +1246,7 @@ pub(super) mod zorder { let memory_pool = FairSpillPool::new(max_spill_size); let config = RuntimeConfig::new().with_memory_pool(Arc::new(memory_pool)); - let runtime = Arc::new(RuntimeEnv::new(config)?); + let runtime = Arc::new(RuntimeEnv::try_new(config)?); runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store); let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime); diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index f6752dd268..01dcb962b6 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -517,7 +517,6 @@ mod tests { get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration, }; use crate::{DeltaTable, TableProperty}; - use arrow::array::types::Int32Type; use arrow::array::{Int32Array, ListArray, StringArray}; use arrow::datatypes::Schema as ArrowSchema; use arrow::datatypes::{Field, Schema}; diff --git a/crates/core/src/test_utils/factories/actions.rs b/crates/core/src/test_utils/factories/actions.rs index 1f1e13a793..1ae264f624 100644 --- a/crates/core/src/test_utils/factories/actions.rs +++ b/crates/core/src/test_utils/factories/actions.rs @@ -76,8 +76,7 @@ impl ActionFactory { schema .fields() .filter(|f| !partition_columns.contains(f.name())) - .cloned() - .collect(), + .cloned(), ); let batch = DataFactory::record_batch(&data_schema, 10, &bounds).unwrap();