From 8a3e5ed6477aa06102bed3c8639af41078af1cdc Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Fri, 16 Aug 2024 21:32:15 +0200 Subject: [PATCH] style: more consistent imports --- Cargo.toml | 1 + crates/core/Cargo.toml | 2 + crates/core/src/delta_datafusion/cdf/mod.rs | 13 +- crates/core/src/delta_datafusion/cdf/scan.rs | 2 +- crates/core/src/delta_datafusion/expr.rs | 17 +-- .../src/delta_datafusion/find_files/mod.rs | 8 +- .../delta_datafusion/find_files/physical.rs | 8 +- crates/core/src/delta_datafusion/mod.rs | 113 +++++++++--------- crates/core/src/delta_datafusion/physical.rs | 9 +- crates/core/src/delta_datafusion/planner.rs | 3 +- .../src/delta_datafusion/schema_adapter.rs | 8 +- crates/core/src/kernel/scalars.rs | 4 +- crates/core/src/kernel/snapshot/replay.rs | 1 - crates/core/src/kernel/snapshot/serde.rs | 3 +- crates/core/src/logstore/mod.rs | 36 +++--- .../core/src/operations/cast/merge_schema.rs | 6 +- crates/core/src/operations/constraints.rs | 2 +- .../core/src/operations/convert_to_delta.rs | 2 +- crates/core/src/operations/delete.rs | 13 +- .../core/src/operations/filesystem_check.rs | 4 +- crates/core/src/operations/load_cdf.rs | 14 +-- crates/core/src/operations/merge/barrier.rs | 6 +- crates/core/src/operations/mod.rs | 30 ++--- crates/core/src/operations/optimize.rs | 2 +- crates/core/src/operations/transaction/mod.rs | 2 +- .../core/src/operations/transaction/state.rs | 4 +- crates/core/src/operations/update.rs | 39 +++--- crates/core/src/operations/write.rs | 8 +- crates/core/src/operations/writer.rs | 5 +- crates/core/src/protocol/checkpoints.rs | 1 + crates/core/src/protocol/mod.rs | 17 +-- crates/core/src/schema/partitions.rs | 6 +- crates/core/src/storage/file.rs | 4 +- crates/core/src/storage/mod.rs | 19 ++- crates/core/src/table/state_arrow.rs | 4 +- crates/core/src/writer/record_batch.rs | 7 +- crates/core/src/writer/test_utils.rs | 2 +- crates/core/src/writer/utils.rs | 6 +- 38 files changed, 211 insertions(+), 220 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0363ae30de..e8cb698318 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,6 +51,7 @@ datafusion-common = { version = "41" } datafusion-proto = { version = "41" } datafusion-sql = { version = "41" } datafusion-physical-expr = { version = "41" } +datafusion-physical-plan = { version = "41" } datafusion-functions = { version = "41" } datafusion-functions-aggregate = { version = "41" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 12a1ef24ad..2db10473cc 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -42,6 +42,7 @@ datafusion-common = { workspace = true, optional = true } datafusion-proto = { workspace = true, optional = true } datafusion-sql = { workspace = true, optional = true } datafusion-physical-expr = { workspace = true, optional = true } +datafusion-physical-plan = { workspace = true, optional = true } datafusion-functions = { workspace = true, optional = true } datafusion-functions-aggregate = { workspace = true, optional = true } @@ -127,6 +128,7 @@ datafusion = [ "datafusion-common", "datafusion-proto", "datafusion-physical-expr", + "datafusion-physical-plan", "datafusion-sql", "datafusion-functions", "datafusion-functions-aggregate", diff --git a/crates/core/src/delta_datafusion/cdf/mod.rs b/crates/core/src/delta_datafusion/cdf/mod.rs index 09959a12ad..e561fc2152 100644 --- a/crates/core/src/delta_datafusion/cdf/mod.rs +++ b/crates/core/src/delta_datafusion/cdf/mod.rs @@ -1,16 +1,13 @@ //! Logical operators and physical executions for CDF +use std::collections::HashMap; use arrow_schema::{DataType, Field, TimeUnit}; use lazy_static::lazy_static; -use std::collections::HashMap; - -pub(crate) use scan::*; -pub(crate) use scan_utils::*; -use crate::{ - kernel::{Add, AddCDCFile, Remove}, - DeltaResult, -}; +pub(crate) use self::scan::*; +pub(crate) use self::scan_utils::*; +use crate::kernel::{Add, AddCDCFile, Remove}; +use crate::DeltaResult; mod scan; mod scan_utils; diff --git a/crates/core/src/delta_datafusion/cdf/scan.rs b/crates/core/src/delta_datafusion/cdf/scan.rs index bd7488899f..e5098bca72 100644 --- a/crates/core/src/delta_datafusion/cdf/scan.rs +++ b/crates/core/src/delta_datafusion/cdf/scan.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use arrow_schema::SchemaRef; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; -use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; /// Physical execution of a scan #[derive(Debug, Clone)] diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index a6ca14a077..eb542d98dd 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -20,11 +20,8 @@ // Display functions and required macros were pulled from https://github.com/apache/arrow-datafusion/blob/ddb95497e2792015d5a5998eec79aac8d37df1eb/datafusion/expr/src/expr.rs //! Utility functions for Datafusion's Expressions - -use std::{ - fmt::{self, Display, Error, Formatter, Write}, - sync::Arc, -}; +use std::fmt::{self, Display, Error, Formatter, Write}; +use std::sync::Arc; use arrow_schema::DataType; use chrono::{DateTime, NaiveDate}; @@ -33,19 +30,17 @@ use datafusion::execution::session_state::SessionStateBuilder; use datafusion::execution::FunctionRegistry; use datafusion_common::Result as DFResult; use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference}; -use datafusion_expr::{ - expr::InList, planner::ExprPlanner, AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, - TableSource, -}; +use datafusion_expr::expr::InList; +use datafusion_expr::planner::ExprPlanner; +use datafusion_expr::{AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource}; use datafusion_sql::planner::{ContextProvider, SqlToRel}; use datafusion_sql::sqlparser::ast::escape_quoted_string; use datafusion_sql::sqlparser::dialect::GenericDialect; use datafusion_sql::sqlparser::parser::Parser; use datafusion_sql::sqlparser::tokenizer::Tokenizer; -use crate::{DeltaResult, DeltaTableError}; - use super::DeltaParserOptions; +use crate::{DeltaResult, DeltaTableError}; pub(crate) struct DeltaContextProvider<'a> { state: SessionState, diff --git a/crates/core/src/delta_datafusion/find_files/mod.rs b/crates/core/src/delta_datafusion/find_files/mod.rs index 0725e6c326..0c235242c2 100644 --- a/crates/core/src/delta_datafusion/find_files/mod.rs +++ b/crates/core/src/delta_datafusion/find_files/mod.rs @@ -1,6 +1,6 @@ -use arrow_array::cast::AsArray; use std::sync::Arc; +use arrow_array::cast::AsArray; use arrow_array::types::UInt16Type; use arrow_array::RecordBatch; use arrow_schema::SchemaBuilder; @@ -10,13 +10,13 @@ use async_trait::async_trait; use datafusion::datasource::MemTable; use datafusion::execution::context::{QueryPlanner, SessionState}; use datafusion::execution::TaskContext; -use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::limit::LocalLimitExec; -use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}; use datafusion::prelude::SessionContext; use datafusion_common::{DFSchemaRef, Result, ToDFSchema}; use datafusion_expr::{col, Expr, LogicalPlan, UserDefinedLogicalNode}; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::limit::LocalLimitExec; +use datafusion_physical_plan::ExecutionPlan; use lazy_static::lazy_static; use crate::delta_datafusion::find_files::logical::FindFilesNode; diff --git a/crates/core/src/delta_datafusion/find_files/physical.rs b/crates/core/src/delta_datafusion/find_files/physical.rs index e23a561e5b..508d1f672e 100644 --- a/crates/core/src/delta_datafusion/find_files/physical.rs +++ b/crates/core/src/delta_datafusion/find_files/physical.rs @@ -8,14 +8,14 @@ use arrow_array::RecordBatch; use arrow_schema::SchemaRef; use datafusion::error::Result; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; -use datafusion::physical_plan::memory::MemoryStream; -use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, -}; use datafusion::prelude::SessionContext; use datafusion_common::tree_node::TreeNode; use datafusion_expr::Expr; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_plan::memory::MemoryStream; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties, +}; use futures::stream::BoxStream; use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 831f71aa2e..60579f6bfc 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -25,18 +25,15 @@ use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug}; use std::sync::Arc; -use arrow::compute::{cast_with_options, CastOptions}; -use arrow::datatypes::DataType; -use arrow::datatypes::{ - DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef, - TimeUnit, -}; -use arrow::error::ArrowError; -use arrow::record_batch::RecordBatch; use arrow_array::types::UInt16Type; -use arrow_array::{Array, DictionaryArray, StringArray, TypedDictionaryArray}; +use arrow_array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray}; use arrow_cast::display::array_value_to_string; -use arrow_schema::Field; +use arrow_cast::{cast_with_options, CastOptions}; +use arrow_schema::{ + ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema, SchemaRef, + SchemaRef as ArrowSchemaRef, TimeUnit, +}; +use arrow_select::concat::concat_batches; use async_trait::async_trait; use chrono::{DateTime, TimeZone, Utc}; use datafusion::catalog::{Session, TableProviderFactory}; @@ -50,13 +47,6 @@ use datafusion::execution::context::{SessionConfig, SessionContext, SessionState use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; use datafusion::physical_optimizer::pruning::PruningPredicate; -use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::limit::LocalLimitExec; -use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; -use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, - Statistics, -}; use datafusion_common::scalar::ScalarValue; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion_common::{ @@ -66,6 +56,13 @@ use datafusion_common::{ use datafusion_expr::logical_plan::CreateExternalTable; use datafusion_expr::utils::conjunction; use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility}; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::limit::LocalLimitExec; +use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, SendableRecordBatchStream, + Statistics, +}; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use datafusion_sql::planner::ParserOptions; @@ -210,10 +207,10 @@ fn _arrow_schema(snapshot: &Snapshot, wrap_partitions: bool) -> DeltaResult { + ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 + | ArrowDataType::Binary + | ArrowDataType::LargeBinary => { wrap_partition_type_in_dict(field.data_type().clone()) } _ => field.data_type().clone(), @@ -330,7 +327,11 @@ pub(crate) fn df_logical_schema( } if let Some(file_column_name) = file_column_name { - fields.push(Arc::new(Field::new(file_column_name, DataType::Utf8, true))); + fields.push(Arc::new(Field::new( + file_column_name, + ArrowDataType::Utf8, + true, + ))); } Ok(Arc::new(ArrowSchema::new(fields))) @@ -630,9 +631,9 @@ impl<'a> DeltaScanBuilder<'a> { if let Some(file_column_name) = &config.file_column_name { let field_name_datatype = if config.wrap_partition_values { - wrap_partition_type_in_dict(DataType::Utf8) + wrap_partition_type_in_dict(ArrowDataType::Utf8) } else { - DataType::Utf8 + ArrowDataType::Utf8 }; table_partition_cols.push(Field::new( file_column_name.clone(), @@ -1142,13 +1143,13 @@ pub(crate) async fn execute_plan_to_batch( let batches = batch_stream.try_collect::>().await?; - DataFusionResult::<_>::Ok(arrow::compute::concat_batches(&schema, batches.iter())?) + DataFusionResult::<_>::Ok(concat_batches(&schema, batches.iter())?) } }), ) .await?; - let batch = arrow::compute::concat_batches(&plan.schema(), data.iter())?; + let batch = concat_batches(&plan.schema(), data.iter())?; Ok(batch) } @@ -1591,7 +1592,7 @@ pub(crate) async fn scan_memory_table( ))? .to_owned(), ); - fields.push(Field::new(PATH_COLUMN, DataType::Utf8, false)); + fields.push(Field::new(PATH_COLUMN, ArrowDataType::Utf8, false)); for field in schema.fields() { if field.name().starts_with("partition.") { @@ -1776,10 +1777,8 @@ impl From for DeltaColumn { #[cfg(test)] mod tests { - use crate::operations::write::SchemaMode; - use crate::writer::test_utils::get_delta_schema; - use arrow::array::StructArray; - use arrow::datatypes::{DataType, Field, Schema}; + use arrow_array::StructArray; + use arrow_schema::Schema; use chrono::{TimeZone, Utc}; use datafusion::assert_batches_sorted_eq; use datafusion::datasource::physical_plan::ParquetExec; @@ -1793,6 +1792,8 @@ mod tests { use std::ops::Deref; use super::*; + use crate::operations::write::SchemaMode; + use crate::writer::test_utils::get_delta_schema; // test deserialization of serialized partition values. // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization @@ -1913,8 +1914,8 @@ mod tests { #[tokio::test] async fn test_enforce_invariants() { let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Int32, false), + Field::new("a", ArrowDataType::Utf8, false), + Field::new("b", ArrowDataType::Int32, false), ])); let batch = RecordBatch::try_new( Arc::clone(&schema), @@ -1966,7 +1967,7 @@ mod tests { let struct_fields = schema.fields().clone(); let schema = Arc::new(Schema::new(vec![Field::new( "x", - DataType::Struct(struct_fields), + ArrowDataType::Struct(struct_fields), false, )])); let inner = Arc::new(StructArray::from(batch)); @@ -1986,8 +1987,8 @@ mod tests { let codec = DeltaPhysicalCodec {}; let schema = Arc::new(Schema::new(vec![ - Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Int32, false), + Field::new("a", ArrowDataType::Utf8, false), + Field::new("b", ArrowDataType::Int32, false), ])); let exec_plan = Arc::from(DeltaScan { table_uri: "s3://my_bucket/this/is/some/path".to_string(), @@ -2043,9 +2044,9 @@ mod tests { // Tests issue (1787) where partition columns were incorrect when they // have a different order in the metadata and table schema let schema = Arc::new(ArrowSchema::new(vec![ - Field::new("modified", DataType::Utf8, true), - Field::new("id", DataType::Utf8, true), - Field::new("value", DataType::Int32, true), + Field::new("modified", ArrowDataType::Utf8, true), + Field::new("id", ArrowDataType::Utf8, true), + Field::new("value", ArrowDataType::Int32, true), ])); let table = crate::DeltaOps::new_in_memory() @@ -2115,9 +2116,9 @@ mod tests { #[tokio::test] async fn delta_scan_case_sensitive() { let schema = Arc::new(ArrowSchema::new(vec![ - Field::new("moDified", DataType::Utf8, true), - Field::new("ID", DataType::Utf8, true), - Field::new("vaLue", DataType::Int32, true), + Field::new("moDified", ArrowDataType::Utf8, true), + Field::new("ID", ArrowDataType::Utf8, true), + Field::new("vaLue", ArrowDataType::Int32, true), ])); let batch = RecordBatch::try_new( @@ -2185,7 +2186,7 @@ mod tests { async fn delta_scan_supports_missing_columns() { let schema1 = Arc::new(ArrowSchema::new(vec![Field::new( "col_1", - DataType::Utf8, + ArrowDataType::Utf8, true, )])); @@ -2199,8 +2200,8 @@ mod tests { .unwrap(); let schema2 = Arc::new(ArrowSchema::new(vec![ - Field::new("col_1", DataType::Utf8, true), - Field::new("col_2", DataType::Utf8, true), + Field::new("col_1", ArrowDataType::Utf8, true), + Field::new("col_2", ArrowDataType::Utf8, true), ])); let batch2 = RecordBatch::try_new( @@ -2262,8 +2263,8 @@ mod tests { #[tokio::test] async fn delta_scan_supports_pushdown() { let schema = Arc::new(ArrowSchema::new(vec![ - Field::new("col_1", DataType::Utf8, false), - Field::new("col_2", DataType::Utf8, false), + Field::new("col_1", ArrowDataType::Utf8, false), + Field::new("col_2", ArrowDataType::Utf8, false), ])); let batch = RecordBatch::try_new( @@ -2320,10 +2321,10 @@ mod tests { #[tokio::test] async fn delta_scan_supports_nested_missing_columns() { let column1_schema1: arrow::datatypes::Fields = - vec![Field::new("col_1a", DataType::Utf8, true)].into(); + vec![Field::new("col_1a", ArrowDataType::Utf8, true)].into(); let schema1 = Arc::new(ArrowSchema::new(vec![Field::new( "col_1", - DataType::Struct(column1_schema1.clone()), + ArrowDataType::Struct(column1_schema1.clone()), true, )])); @@ -2340,14 +2341,14 @@ mod tests { ) .unwrap(); - let column1_schema2: arrow::datatypes::Fields = vec![ - Field::new("col_1a", DataType::Utf8, true), - Field::new("col_1b", DataType::Utf8, true), + let column1_schema2: arrow_schema::Fields = vec![ + Field::new("col_1a", ArrowDataType::Utf8, true), + Field::new("col_1b", ArrowDataType::Utf8, true), ] .into(); let schema2 = Arc::new(ArrowSchema::new(vec![Field::new( "col_1", - DataType::Struct(column1_schema2.clone()), + ArrowDataType::Struct(column1_schema2.clone()), true, )])); @@ -2418,9 +2419,9 @@ mod tests { async fn test_multiple_predicate_pushdown() { use crate::datafusion::prelude::SessionContext; let schema = Arc::new(ArrowSchema::new(vec![ - Field::new("moDified", DataType::Utf8, true), - Field::new("id", DataType::Utf8, true), - Field::new("vaLue", DataType::Int32, true), + Field::new("moDified", ArrowDataType::Utf8, true), + Field::new("id", ArrowDataType::Utf8, true), + Field::new("vaLue", ArrowDataType::Int32, true), ])); let batch = RecordBatch::try_new( diff --git a/crates/core/src/delta_datafusion/physical.rs b/crates/core/src/delta_datafusion/physical.rs index c37b85101e..dd28e0d93b 100644 --- a/crates/core/src/delta_datafusion/physical.rs +++ b/crates/core/src/delta_datafusion/physical.rs @@ -1,13 +1,12 @@ //! Physical Operations for DataFusion use std::sync::Arc; +use arrow_array::RecordBatch; use arrow_schema::SchemaRef; -use datafusion::arrow::record_batch::RecordBatch; use datafusion::error::Result as DataFusionResult; -use datafusion::physical_plan::DisplayAs; -use datafusion::physical_plan::{ - metrics::{ExecutionPlanMetricsSet, MetricsSet}, - ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, +use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use datafusion_physical_plan::{ + DisplayAs, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, }; use futures::{Stream, StreamExt}; diff --git a/crates/core/src/delta_datafusion/planner.rs b/crates/core/src/delta_datafusion/planner.rs index f0af1092ca..6119b78ce6 100644 --- a/crates/core/src/delta_datafusion/planner.rs +++ b/crates/core/src/delta_datafusion/planner.rs @@ -24,7 +24,6 @@ //! let state = state.with_query_planner(Arc::new(merge_planner)); use std::sync::Arc; -use crate::delta_datafusion::DataFusionResult; use async_trait::async_trait; use datafusion::physical_planner::PhysicalPlanner; use datafusion::{ @@ -34,6 +33,8 @@ use datafusion::{ }; use datafusion_expr::LogicalPlan; +use crate::delta_datafusion::DataFusionResult; + /// Deltaplanner pub struct DeltaPlanner { /// custom extension planner diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs index 5fb0724f50..99a97e2130 100644 --- a/crates/core/src/delta_datafusion/schema_adapter.rs +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -1,9 +1,11 @@ -use crate::operations::cast::cast_record_batch; +use std::fmt::Debug; +use std::sync::Arc; + use arrow_array::RecordBatch; use arrow_schema::{Schema, SchemaRef}; use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; -use std::fmt::Debug; -use std::sync::Arc; + +use crate::operations::cast::cast_record_batch; /// A Schema Adapter Factory which provides casting record batches from parquet to meet /// delta lake conventions. diff --git a/crates/core/src/kernel/scalars.rs b/crates/core/src/kernel/scalars.rs index a884933308..bc1bd6eed9 100644 --- a/crates/core/src/kernel/scalars.rs +++ b/crates/core/src/kernel/scalars.rs @@ -1,5 +1,6 @@ //! Auxiliary methods for dealing with kernel scalars -//! +use std::cmp::Ordering; + use arrow_array::Array; use arrow_schema::TimeUnit; use chrono::{DateTime, TimeZone, Utc}; @@ -10,7 +11,6 @@ use delta_kernel::{ use object_store::path::Path; #[cfg(test)] use serde_json::Value; -use std::cmp::Ordering; use urlencoding::encode; use crate::NULL_PARTITION_VALUE_DATA_PATH; diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index e83dbb55f6..1b18b61bc7 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -588,7 +588,6 @@ pub(super) mod tests { use std::sync::Arc; use arrow_select::concat::concat_batches; - use delta_kernel::engine::arrow_expression::ArrowExpressionHandler; use delta_kernel::schema::DataType; use deltalake_test::utils::*; use futures::TryStreamExt; diff --git a/crates/core/src/kernel/snapshot/serde.rs b/crates/core/src/kernel/snapshot/serde.rs index dd7403bc28..45c1206c82 100644 --- a/crates/core/src/kernel/snapshot/serde.rs +++ b/crates/core/src/kernel/snapshot/serde.rs @@ -1,10 +1,11 @@ +use std::fmt; + use arrow_ipc::reader::FileReader; use arrow_ipc::writer::FileWriter; use chrono::{DateTime, TimeZone, Utc}; use object_store::ObjectMeta; use serde::de::{self, Deserializer, SeqAccess, Visitor}; use serde::{ser::SerializeSeq, Deserialize, Serialize}; -use std::fmt; use super::log_segment::LogSegment; use super::EagerSnapshot; diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index b8646cdb65..c808c95176 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -1,31 +1,27 @@ //! Delta log store. -use dashmap::DashMap; -use futures::StreamExt; -use lazy_static::lazy_static; -use regex::Regex; -use serde::{ - de::{Error, SeqAccess, Visitor}, - ser::SerializeSeq, - Deserialize, Serialize, -}; use std::io::{BufRead, BufReader, Cursor}; use std::sync::OnceLock; use std::{cmp::max, collections::HashMap, sync::Arc}; -use url::Url; -use crate::{ - errors::DeltaResult, - kernel::Action, - operations::transaction::TransactionError, - protocol::{get_last_checkpoint, ProtocolError}, - storage::{ - commit_uri_from_version, retry_ext::ObjectStoreRetryExt, ObjectStoreRef, StorageOptions, - }, - DeltaTableError, -}; use bytes::Bytes; +use dashmap::DashMap; +use futures::StreamExt; +use lazy_static::lazy_static; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; +use regex::Regex; +use serde::de::{Error, SeqAccess, Visitor}; +use serde::ser::SerializeSeq; +use serde::{Deserialize, Serialize}; use tracing::{debug, warn}; +use url::Url; + +use crate::kernel::Action; +use crate::operations::transaction::TransactionError; +use crate::protocol::{get_last_checkpoint, ProtocolError}; +use crate::storage::{ + commit_uri_from_version, retry_ext::ObjectStoreRetryExt, ObjectStoreRef, StorageOptions, +}; +use crate::{DeltaResult, DeltaTableError}; #[cfg(feature = "datafusion")] use datafusion::datasource::object_store::ObjectStoreUrl; diff --git a/crates/core/src/operations/cast/merge_schema.rs b/crates/core/src/operations/cast/merge_schema.rs index 624471dfbd..64fe2b7ed6 100644 --- a/crates/core/src/operations/cast/merge_schema.rs +++ b/crates/core/src/operations/cast/merge_schema.rs @@ -1,12 +1,14 @@ //! Provide schema merging for delta schemas //! -use crate::kernel::{ArrayType, DataType as DeltaDataType, MapType, StructField, StructType}; +use std::collections::HashMap; + use arrow::datatypes::DataType::Dictionary; use arrow_schema::{ ArrowError, DataType, Field as ArrowField, Fields, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, }; -use std::collections::HashMap; + +use crate::kernel::{ArrayType, DataType as DeltaDataType, MapType, StructField, StructType}; fn try_merge_metadata( left: &mut HashMap, diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index e5d356f81c..13336a39f4 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -4,9 +4,9 @@ use std::sync::Arc; use datafusion::execution::context::SessionState; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; -use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use datafusion_common::ToDFSchema; +use datafusion_physical_plan::ExecutionPlan; use futures::future::BoxFuture; use futures::StreamExt; diff --git a/crates/core/src/operations/convert_to_delta.rs b/crates/core/src/operations/convert_to_delta.rs index a51d353b20..1a0931c4ac 100644 --- a/crates/core/src/operations/convert_to_delta.rs +++ b/crates/core/src/operations/convert_to_delta.rs @@ -5,7 +5,7 @@ use std::num::TryFromIntError; use std::str::{FromStr, Utf8Error}; use std::sync::Arc; -use arrow::{datatypes::Schema as ArrowSchema, error::ArrowError}; +use arrow_schema::{ArrowError, Schema as ArrowSchema}; use futures::future::{self, BoxFuture}; use futures::TryStreamExt; use indexmap::IndexMap; diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 2a02da7d89..0627e0b633 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -17,22 +17,18 @@ //! .await?; //! ```` -use crate::delta_datafusion::logical::MetricObserver; -use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObserverExec}; -use crate::delta_datafusion::planner::DeltaPlanner; -use crate::logstore::LogStoreRef; use async_trait::async_trait; use datafusion::dataframe::DataFrame; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::execution::session_state::SessionStateBuilder; -use datafusion::physical_plan::metrics::MetricBuilder; -use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{ExtensionPlanner, PhysicalPlanner}; use datafusion::prelude::Expr; use datafusion_common::ScalarValue; use datafusion_expr::{lit, Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode}; +use datafusion_physical_plan::metrics::MetricBuilder; +use datafusion_physical_plan::ExecutionPlan; use futures::future::BoxFuture; use std::sync::Arc; @@ -44,14 +40,17 @@ use serde::Serialize; use super::cdc::should_write_cdc; use super::datafusion_utils::Expression; use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; - use crate::delta_datafusion::expr::fmt_expr_to_sql; +use crate::delta_datafusion::logical::MetricObserver; +use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObserverExec}; +use crate::delta_datafusion::planner::DeltaPlanner; use crate::delta_datafusion::{ find_files, register_store, DataFusionMixins, DeltaScanConfigBuilder, DeltaSessionContext, DeltaTableProvider, }; use crate::errors::DeltaResult; use crate::kernel::{Action, Add, Remove}; +use crate::logstore::LogStoreRef; use crate::operations::write::{write_execution_plan, write_execution_plan_cdc, WriterStatsConfig}; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; diff --git a/crates/core/src/operations/filesystem_check.rs b/crates/core/src/operations/filesystem_check.rs index 44fa84d29a..93119b4c50 100644 --- a/crates/core/src/operations/filesystem_check.rs +++ b/crates/core/src/operations/filesystem_check.rs @@ -24,6 +24,7 @@ use object_store::ObjectStore; use serde::Serialize; use url::{ParseError, Url}; +use super::transaction::{CommitBuilder, CommitProperties}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::{Action, Add, Remove}; use crate::logstore::LogStoreRef; @@ -31,9 +32,6 @@ use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; use crate::DeltaTable; -use super::transaction::CommitBuilder; -use super::transaction::CommitProperties; - /// Audit the Delta Table's active files with the underlying file system. /// See this module's documentation for more information #[derive(Debug)] diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index 76c94b549b..eb0d5b1a7b 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -1,23 +1,21 @@ //! Module for reading the change datafeed of delta tables -use datafusion_physical_expr::{ - expressions::{self}, - PhysicalExpr, -}; use std::sync::Arc; use std::time::SystemTime; -use arrow::record_batch::RecordBatch; +use arrow_array::RecordBatch; use arrow_schema::{ArrowError, Field}; use chrono::{DateTime, Utc}; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::physical_plan::FileScanConfig; -use datafusion::physical_plan::projection::ProjectionExec; -use datafusion::physical_plan::union::UnionExec; -use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use datafusion_common::{ScalarValue, Statistics}; +use datafusion_physical_expr::expressions; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::union::UnionExec; +use datafusion_physical_plan::ExecutionPlan; use tracing::log; use crate::delta_datafusion::{register_store, DataFusionMixins}; diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index e9b2f8fd00..f11474291d 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -18,12 +18,12 @@ use std::{ use arrow_array::{builder::UInt64Builder, ArrayRef, RecordBatch}; use arrow_schema::SchemaRef; -use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, -}; use datafusion_common::{DataFusionError, Result as DataFusionResult}; use datafusion_expr::{Expr, LogicalPlan, UserDefinedLogicalNodeCore}; use datafusion_physical_expr::{Distribution, PhysicalExpr}; +use datafusion_physical_plan::{ + DisplayAs, DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, +}; use futures::{Stream, StreamExt}; use crate::{ diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 3e4180763f..515143f088 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -6,15 +6,29 @@ //! the operations' behaviors and will return an updated table potentially in conjunction //! with a [data stream][datafusion::physical_plan::SendableRecordBatchStream], //! if the operation returns data as well. +use std::collections::HashMap; + +#[cfg(feature = "datafusion")] +use arrow_array::RecordBatch; +#[cfg(feature = "datafusion")] +pub use datafusion_physical_plan::common::collect as collect_sendable_stream; use self::add_column::AddColumnBuilder; use self::create::CreateBuilder; use self::filesystem_check::FileSystemCheckBuilder; +use self::optimize::OptimizeBuilder; +use self::restore::RestoreBuilder; +use self::set_tbl_properties::SetTablePropertiesBuilder; use self::vacuum::VacuumBuilder; +#[cfg(feature = "datafusion")] +use self::{ + constraints::ConstraintBuilder, datafusion_utils::Expression, delete::DeleteBuilder, + drop_constraints::DropConstraintBuilder, load::LoadBuilder, load_cdf::CdfLoadBuilder, + merge::MergeBuilder, update::UpdateBuilder, write::WriteBuilder, +}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::table::builder::DeltaTableBuilder; use crate::DeltaTable; -use std::collections::HashMap; pub mod add_column; pub mod cast; @@ -27,20 +41,6 @@ pub mod restore; pub mod transaction; pub mod vacuum; -#[cfg(feature = "datafusion")] -use self::{ - constraints::ConstraintBuilder, datafusion_utils::Expression, delete::DeleteBuilder, - drop_constraints::DropConstraintBuilder, load::LoadBuilder, load_cdf::CdfLoadBuilder, - merge::MergeBuilder, update::UpdateBuilder, write::WriteBuilder, -}; -#[cfg(feature = "datafusion")] -pub use ::datafusion::physical_plan::common::collect as collect_sendable_stream; -#[cfg(feature = "datafusion")] -use arrow::record_batch::RecordBatch; -use optimize::OptimizeBuilder; -use restore::RestoreBuilder; -use set_tbl_properties::SetTablePropertiesBuilder; - #[cfg(all(feature = "cdf", feature = "datafusion"))] mod cdc; #[cfg(feature = "datafusion")] diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 8bd845e281..a989a23ad1 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -25,8 +25,8 @@ use std::fmt; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use arrow::datatypes::SchemaRef as ArrowSchemaRef; use arrow_array::RecordBatch; +use arrow_schema::SchemaRef as ArrowSchemaRef; use delta_kernel::expressions::Scalar; use futures::future::BoxFuture; use futures::stream::BoxStream; diff --git a/crates/core/src/operations/transaction/mod.rs b/crates/core/src/operations/transaction/mod.rs index 28af281fa1..e0902f96e1 100644 --- a/crates/core/src/operations/transaction/mod.rs +++ b/crates/core/src/operations/transaction/mod.rs @@ -73,6 +73,7 @@ //! │ │ //! └───────────────────────────────┘ //! +use std::collections::HashMap; use chrono::Utc; use conflict_checker::ConflictChecker; @@ -80,7 +81,6 @@ use futures::future::BoxFuture; use object_store::path::Path; use object_store::{Error as ObjectStoreError, ObjectStore}; use serde_json::Value; -use std::collections::HashMap; use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary}; use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for}; diff --git a/crates/core/src/operations/transaction/state.rs b/crates/core/src/operations/transaction/state.rs index 6d04f7f64d..6c34ddbb82 100644 --- a/crates/core/src/operations/transaction/state.rs +++ b/crates/core/src/operations/transaction/state.rs @@ -1,7 +1,7 @@ use std::collections::HashSet; -use arrow::array::{ArrayRef, BooleanArray}; -use arrow::datatypes::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef}; +use arrow_array::{ArrayRef, BooleanArray}; +use arrow_schema::{DataType as ArrowDataType, SchemaRef as ArrowSchemaRef}; use datafusion::execution::context::SessionContext; use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics}; use datafusion_common::scalar::ScalarValue; diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 67639fef4c..bd4ec4ad1b 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -24,6 +24,26 @@ use std::{ time::{Instant, SystemTime, UNIX_EPOCH}, }; +use async_trait::async_trait; +use datafusion::error::Result as DataFusionResult; +use datafusion::{ + dataframe::DataFrame, + datasource::provider_as_source, + execution::context::SessionState, + execution::session_state::SessionStateBuilder, + physical_plan::{metrics::MetricBuilder, ExecutionPlan}, + physical_planner::{ExtensionPlanner, PhysicalPlanner}, + prelude::SessionContext, +}; +use datafusion_common::{Column, ScalarValue}; +use datafusion_expr::{ + case, col, lit, when, Expr, Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode, +}; +use futures::future::BoxFuture; +use parquet::file::properties::WriterProperties; +use serde::Serialize; +use tracing::log::*; + use super::write::{write_execution_plan, write_execution_plan_cdc}; use super::{ datafusion_utils::Expression, @@ -47,25 +67,6 @@ use crate::{ DeltaTableError, }; use crate::{DeltaResult, DeltaTable}; -use async_trait::async_trait; -use datafusion::error::Result as DataFusionResult; -use datafusion::{ - dataframe::DataFrame, - datasource::provider_as_source, - execution::context::SessionState, - execution::session_state::SessionStateBuilder, - physical_plan::{metrics::MetricBuilder, ExecutionPlan}, - physical_planner::{ExtensionPlanner, PhysicalPlanner}, - prelude::SessionContext, -}; -use datafusion_common::{Column, ScalarValue}; -use datafusion_expr::{ - case, col, lit, when, Expr, Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode, -}; -use futures::future::BoxFuture; -use parquet::file::properties::WriterProperties; -use serde::Serialize; -use tracing::log::*; /// Custom column name used for marking internal [RecordBatch] rows as updated pub(crate) const UPDATE_PREDICATE_COLNAME: &str = "__delta_rs_update_predicate"; diff --git a/crates/core/src/operations/write.rs b/crates/core/src/operations/write.rs index 1b3513c984..b3f0f30c1b 100644 --- a/crates/core/src/operations/write.rs +++ b/crates/core/src/operations/write.rs @@ -34,14 +34,14 @@ use arrow_array::RecordBatch; use arrow_cast::can_cast_types; use arrow_schema::{ArrowError, DataType, Fields, SchemaRef as ArrowSchemaRef}; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; -use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::projection::ProjectionExec; -use datafusion::physical_plan::union::UnionExec; -use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan}; use datafusion_common::DFSchema; use datafusion_expr::{lit, Expr}; use datafusion_physical_expr::expressions::{self}; use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::union::UnionExec; +use datafusion_physical_plan::{memory::MemoryExec, ExecutionPlan}; use futures::future::BoxFuture; use futures::StreamExt; use object_store::prefix::PrefixStore; diff --git a/crates/core/src/operations/writer.rs b/crates/core/src/operations/writer.rs index 5128611ffd..8fd4273c9f 100644 --- a/crates/core/src/operations/writer.rs +++ b/crates/core/src/operations/writer.rs @@ -2,9 +2,8 @@ use std::collections::HashMap; -use arrow::datatypes::SchemaRef as ArrowSchemaRef; -use arrow::error::ArrowError; -use arrow::record_batch::RecordBatch; +use arrow_array::RecordBatch; +use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef}; use bytes::Bytes; use delta_kernel::expressions::Scalar; use indexmap::IndexMap; diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index f2625e49cf..425029d9c3 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -28,6 +28,7 @@ use crate::logstore::LogStore; use crate::table::state::DeltaTableState; use crate::table::{get_partition_col_data_types, CheckPoint, CheckPointBuilder}; use crate::{open_table_with_version, DeltaTable}; + type SchemaPath = Vec; /// Error returned when there is an error during creating a checkpoint. diff --git a/crates/core/src/protocol/mod.rs b/crates/core/src/protocol/mod.rs index d1be7a0c1e..8c857f92de 100644 --- a/crates/core/src/protocol/mod.rs +++ b/crates/core/src/protocol/mod.rs @@ -2,9 +2,11 @@ #![allow(non_camel_case_types)] -pub mod checkpoints; -mod parquet_read; -mod time_utils; +use std::borrow::Borrow; +use std::collections::HashMap; +use std::hash::{Hash, Hasher}; +use std::mem::take; +use std::str::FromStr; use arrow_schema::ArrowError; use futures::StreamExt; @@ -13,11 +15,6 @@ use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::Value; -use std::borrow::Borrow; -use std::collections::HashMap; -use std::hash::{Hash, Hasher}; -use std::mem::take; -use std::str::FromStr; use tracing::{debug, error}; use crate::errors::{DeltaResult, DeltaTableError}; @@ -25,6 +22,10 @@ use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove, StructField}; use crate::logstore::LogStore; use crate::table::CheckPoint; +pub mod checkpoints; +mod parquet_read; +mod time_utils; + /// Error returned when an invalid Delta log action is encountered. #[allow(missing_docs)] #[derive(thiserror::Error, Debug)] diff --git a/crates/core/src/schema/partitions.rs b/crates/core/src/schema/partitions.rs index 581393ec22..23abb3896e 100644 --- a/crates/core/src/schema/partitions.rs +++ b/crates/core/src/schema/partitions.rs @@ -1,11 +1,11 @@ //! Delta Table partition handling logic. - -use delta_kernel::expressions::Scalar; -use serde::{Serialize, Serializer}; use std::cmp::Ordering; use std::collections::HashMap; use std::convert::TryFrom; +use delta_kernel::expressions::Scalar; +use serde::{Serialize, Serializer}; + use crate::errors::DeltaTableError; use crate::kernel::{scalars::ScalarExt, DataType, PrimitiveType}; diff --git a/crates/core/src/storage/file.rs b/crates/core/src/storage/file.rs index 73975d62b3..100faafcc5 100644 --- a/crates/core/src/storage/file.rs +++ b/crates/core/src/storage/file.rs @@ -1,6 +1,8 @@ //! Local file storage backend. This backend read and write objects from local filesystem. //! //! The local file storage backend is multi-writer safe. +use std::ops::Range; +use std::sync::Arc; use bytes::Bytes; use futures::stream::BoxStream; @@ -10,8 +12,6 @@ use object_store::{ Result as ObjectStoreResult, }; use object_store::{MultipartUpload, PutMultipartOpts, PutPayload}; -use std::ops::Range; -use std::sync::Arc; use url::Url; const STORE_NAME: &str = "DeltaLocalObjectStore"; diff --git a/crates/core/src/storage/mod.rs b/crates/core/src/storage/mod.rs index 3c38a337af..ceac5e1436 100644 --- a/crates/core/src/storage/mod.rs +++ b/crates/core/src/storage/mod.rs @@ -1,25 +1,20 @@ //! Object storage backend abstraction layer for Delta Table transaction logs and data - -use dashmap::DashMap; -use object_store::limit::LimitStore; use std::collections::HashMap; use std::sync::{Arc, OnceLock}; +use dashmap::DashMap; use lazy_static::lazy_static; +use object_store::limit::LimitStore; +use object_store::local::LocalFileSystem; +use object_store::memory::InMemory; +use object_store::prefix::PrefixStore; use serde::{Deserialize, Serialize}; use url::Url; -pub mod file; -pub mod retry_ext; -pub mod utils; - use crate::{DeltaResult, DeltaTableError}; pub use object_store; -use object_store::local::LocalFileSystem; -use object_store::memory::InMemory; pub use object_store::path::{Path, DELIMITER}; -use object_store::prefix::PrefixStore; pub use object_store::{ DynObjectStore, Error as ObjectStoreError, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result as ObjectStoreResult, @@ -27,6 +22,10 @@ pub use object_store::{ pub use retry_ext::ObjectStoreRetryExt; pub use utils::*; +pub mod file; +pub mod retry_ext; +pub mod utils; + lazy_static! { static ref DELTA_LOG_PATH: Path = Path::from("_delta_log"); } diff --git a/crates/core/src/table/state_arrow.rs b/crates/core/src/table/state_arrow.rs index 197e8d7fd3..e4a374b763 100644 --- a/crates/core/src/table/state_arrow.rs +++ b/crates/core/src/table/state_arrow.rs @@ -6,13 +6,13 @@ use std::borrow::Cow; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; -use arrow::compute::cast; -use arrow::compute::kernels::cast_utils::Parser; use arrow_array::types::{Date32Type, TimestampMicrosecondType}; use arrow_array::{ Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Float64Array, Int64Array, NullArray, StringArray, StructArray, TimestampMicrosecondArray, TimestampMillisecondArray, }; +use arrow_cast::cast; +use arrow_cast::parse::Parser; use arrow_schema::{DataType, Field, Fields, TimeUnit}; use delta_kernel::features::ColumnMappingMode; use itertools::Itertools; diff --git a/crates/core/src/writer/record_batch.rs b/crates/core/src/writer/record_batch.rs index 493646d479..10ba52ae62 100644 --- a/crates/core/src/writer/record_batch.rs +++ b/crates/core/src/writer/record_batch.rs @@ -7,12 +7,11 @@ use std::{collections::HashMap, sync::Arc}; -use arrow::array::{new_null_array, Array, UInt32Array}; -use arrow::compute::{partition, take}; -use arrow::record_batch::RecordBatch; -use arrow_array::ArrayRef; +use arrow_array::{new_null_array, Array, ArrayRef, RecordBatch, UInt32Array}; +use arrow_ord::partition::partition; use arrow_row::{RowConverter, SortField}; use arrow_schema::{ArrowError, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use arrow_select::take::take; use bytes::Bytes; use delta_kernel::expressions::Scalar; use indexmap::IndexMap; diff --git a/crates/core/src/writer/test_utils.rs b/crates/core/src/writer/test_utils.rs index ff860ed1cf..a75f17e7cf 100644 --- a/crates/core/src/writer/test_utils.rs +++ b/crates/core/src/writer/test_utils.rs @@ -3,9 +3,9 @@ use std::collections::HashMap; use std::sync::Arc; -use arrow::compute::take; use arrow_array::{Int32Array, Int64Array, RecordBatch, StringArray, StructArray, UInt32Array}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; +use arrow_select::take::take; use crate::kernel::{DataType as DeltaDataType, Metadata, PrimitiveType, StructField, StructType}; use crate::operations::create::CreateBuilder; diff --git a/crates/core/src/writer/utils.rs b/crates/core/src/writer/utils.rs index 3c95942993..864476684a 100644 --- a/crates/core/src/writer/utils.rs +++ b/crates/core/src/writer/utils.rs @@ -4,9 +4,9 @@ use std::io::Write; use std::sync::Arc; -use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; -use arrow::json::ReaderBuilder; -use arrow::record_batch::*; +use arrow_array::RecordBatch; +use arrow_json::ReaderBuilder; +use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; use object_store::path::Path; use parking_lot::RwLock; use parquet::basic::Compression;