diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml index dc5483e091..303013baa7 100644 --- a/.github/workflows/python_build.yml +++ b/.github/workflows/python_build.yml @@ -31,7 +31,7 @@ jobs: run: make check-rust test-minimal: - name: Python Build (Python 3.8 PyArrow 16.0.0) + name: Python Build (Python 3.9 PyArrow 16.0.0) runs-on: ubuntu-latest env: RUSTFLAGS: "-C debuginfo=line-tables-only" @@ -43,7 +43,7 @@ jobs: - name: Setup Environment uses: ./.github/actions/setup-env with: - python-version: 3.8 + python-version: 3.9 - name: Build and install deltalake run: | @@ -135,7 +135,7 @@ jobs: strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v3 diff --git a/Cargo.toml b/Cargo.toml index ccbb766e0f..2d566152da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,34 +26,34 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "=0.3.0" } +delta_kernel = { version = "0.3.1" } # delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" } # arrow -arrow = { version = "52" } -arrow-arith = { version = "52" } -arrow-array = { version = "52", features = ["chrono-tz"] } -arrow-buffer = { version = "52" } -arrow-cast = { version = "52" } -arrow-ipc = { version = "52" } -arrow-json = { version = "52" } -arrow-ord = { version = "52" } -arrow-row = { version = "52" } -arrow-schema = { version = "52" } -arrow-select = { version = "52" } -object_store = { version = "0.10.1" } -parquet = { version = "52" } +arrow = { version = "53" } +arrow-arith = { version = "53" } +arrow-array = { version = "53", features = ["chrono-tz"] } +arrow-buffer = { version = "53" } +arrow-cast = { version = "53" } +arrow-ipc = { version = "53" } +arrow-json = { version = "53" } +arrow-ord = { version = "53" } +arrow-row = { version = "53" } +arrow-schema = { version = "53" } +arrow-select = { version = "53" } +object_store = { version = "0.11" } +parquet = { version = "53" } # datafusion -datafusion = { version = "41" } -datafusion-expr = { version = "41" } -datafusion-common = { version = "41" } -datafusion-proto = { version = "41" } -datafusion-sql = { version = "41" } -datafusion-physical-expr = { version = "41" } -datafusion-physical-plan = { version = "41" } -datafusion-functions = { version = "41" } -datafusion-functions-aggregate = { version = "41" } +datafusion = { version = "43" } +datafusion-expr = { version = "43" } +datafusion-common = { version = "43" } +datafusion-proto = { version = "43" } +datafusion-sql = { version = "43" } +datafusion-physical-expr = { version = "43" } +datafusion-physical-plan = { version = "43" } +datafusion-functions = { version = "43" } +datafusion-functions-aggregate = { version = "43" } # serde serde = { version = "1.0.194", features = ["derive"] } @@ -64,7 +64,7 @@ bytes = { version = "1" } chrono = { version = ">0.4.34", default-features = false, features = ["clock"] } tracing = { version = "0.1", features = ["log"] } regex = { version = "1" } -thiserror = { version = "1" } +thiserror = { version = "2" } url = { version = "2" } urlencoding = "2.1.3" uuid = { version = "1" } diff --git a/crates/aws/Cargo.toml b/crates/aws/Cargo.toml index 60179c7f82..62fe6b0f42 100644 --- a/crates/aws/Cargo.toml +++ b/crates/aws/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-aws" -version = "0.4.1" +version = "0.5.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.21.0", path = "../core" } +deltalake-core = { version = "0.22.0", path = "../core" } aws-smithy-runtime-api = { version="1.7" } aws-smithy-runtime = { version="1.7", optional = true} aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]} diff --git a/crates/aws/src/constants.rs b/crates/aws/src/constants.rs index cfc5559518..ca73b69819 100644 --- a/crates/aws/src/constants.rs +++ b/crates/aws/src/constants.rs @@ -96,6 +96,7 @@ pub const AWS_FORCE_CREDENTIAL_LOAD: &str = "AWS_FORCE_CREDENTIAL_LOAD"; /// The list of option keys owned by the S3 module. /// Option keys not contained in this list will be added to the `extra_opts` /// field of [crate::storage::s3::S3StorageOptions]. +#[allow(deprecated)] pub const S3_OPTS: &[&str] = &[ AWS_ENDPOINT_URL, AWS_ENDPOINT_URL_DYNAMODB, diff --git a/crates/aws/src/credentials.rs b/crates/aws/src/credentials.rs index 27f4491923..2a10d2825b 100644 --- a/crates/aws/src/credentials.rs +++ b/crates/aws/src/credentials.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; +use std::time::{Duration, SystemTime}; use aws_config::default_provider::credentials::DefaultCredentialsChain; use aws_config::meta::credentials::CredentialsProviderChain; @@ -19,6 +20,7 @@ use deltalake_core::storage::object_store::{ }; use deltalake_core::storage::StorageOptions; use deltalake_core::DeltaResult; +use tokio::sync::Mutex; use tracing::log::*; use crate::constants; @@ -27,12 +29,21 @@ use crate::constants; /// into a necessary [AwsCredential] type for configuring [object_store::aws::AmazonS3] #[derive(Clone, Debug)] pub(crate) struct AWSForObjectStore { + /// TODO: replace this with something with a credential cache instead of the sdkConfig sdk_config: SdkConfig, + cache: Arc>>, } impl AWSForObjectStore { pub(crate) fn new(sdk_config: SdkConfig) -> Self { - Self { sdk_config } + let cache = Arc::new(Mutex::new(None)); + Self { sdk_config, cache } + } + + /// Return true if a credential has been cached + async fn has_cached_credentials(&self) -> bool { + let guard = self.cache.lock().await; + (*guard).is_some() } } @@ -43,10 +54,34 @@ impl CredentialProvider for AWSForObjectStore { /// Provide the necessary configured credentials from the AWS SDK for use by /// [object_store::aws::AmazonS3] async fn get_credential(&self) -> ObjectStoreResult> { + debug!("AWSForObjectStore is unlocking.."); + let mut guard = self.cache.lock().await; + + if let Some(cached) = guard.as_ref() { + debug!("Located cached credentials"); + let now = SystemTime::now(); + + // Credentials such as assume role credentials will have an expiry on them, whereas + // environmental provided credentials will *not*. In the latter case, it's still + // useful avoid running through the provider chain again, so in both cases we should + // still treat credentials as useful + if cached.expiry().unwrap_or(now) >= now { + debug!("Cached credentials are still valid, returning"); + return Ok(Arc::new(Self::Credential { + key_id: cached.access_key_id().into(), + secret_key: cached.secret_access_key().into(), + token: cached.session_token().map(|o| o.to_string()), + })); + } else { + debug!("Cached credentials appear to no longer be valid, re-resolving"); + } + } + let provider = self .sdk_config .credentials_provider() .ok_or(ObjectStoreError::NotImplemented)?; + let credentials = provider .provide_credentials() @@ -60,11 +95,15 @@ impl CredentialProvider for AWSForObjectStore { credentials.access_key_id() ); - Ok(Arc::new(Self::Credential { + let result = Ok(Arc::new(Self::Credential { key_id: credentials.access_key_id().into(), secret_key: credentials.secret_access_key().into(), token: credentials.session_token().map(|o| o.to_string()), - })) + })); + + // Update the mutex before exiting with the new Credentials from the AWS provider + *guard = Some(credentials); + return result; } } @@ -324,4 +363,60 @@ mod tests { panic!("Could not retrieve credentials from the SdkConfig: {config:?}"); } } + + #[tokio::test] + async fn test_object_store_credential_provider() -> DeltaResult<()> { + let options = StorageOptions(hashmap! { + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), + }); + let sdk_config = resolve_credentials(options) + .await + .expect("Failed to resolve credentijals for the test"); + let provider = AWSForObjectStore::new(sdk_config); + let _credential = provider + .get_credential() + .await + .expect("Failed to produce a credential"); + Ok(()) + } + + /// The [CredentialProvider] is called _repeatedly_ by the [object_store] create, in essence on + /// every get/put/list/etc operation, the `get_credential` function will be invoked. + /// + /// In some cases, such as when assuming roles, this can result in an excessive amount of STS + /// API calls in the scenarios where the delta-rs process is performing a large number of S3 + /// operations. + #[tokio::test] + async fn test_object_store_credential_provider_consistency() -> DeltaResult<()> { + let options = StorageOptions(hashmap! { + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), + }); + let sdk_config = resolve_credentials(options) + .await + .expect("Failed to resolve credentijals for the test"); + let provider = AWSForObjectStore::new(sdk_config); + let credential_a = provider + .get_credential() + .await + .expect("Failed to produce a credential"); + + assert!( + provider.has_cached_credentials().await, + "The provider should have cached the credential on the first call!" + ); + + let credential_b = provider + .get_credential() + .await + .expect("Failed to produce a credential"); + + assert_ne!( + Arc::as_ptr(&credential_a), + Arc::as_ptr(&credential_b), + "Repeated calls to get_credential() produced different results!" + ); + Ok(()) + } } diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index bfa44c3eac..604404e79b 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -527,10 +527,15 @@ mod tests { ); std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name"); std::env::set_var( + #[allow(deprecated)] constants::AWS_S3_ASSUME_ROLE_ARN, "arn:aws:iam::123456789012:role/some_role", ); - std::env::set_var(constants::AWS_S3_ROLE_SESSION_NAME, "session_name"); + std::env::set_var( + #[allow(deprecated)] + constants::AWS_S3_ROLE_SESSION_NAME, + "session_name", + ); std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file"); let options = S3StorageOptions::try_default().unwrap(); diff --git a/crates/azure/Cargo.toml b/crates/azure/Cargo.toml index 6ed096fa29..d80b2760ad 100644 --- a/crates/azure/Cargo.toml +++ b/crates/azure/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-azure" -version = "0.4.0" +version = "0.5.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.21.0", path = "../core", features = [ +deltalake-core = { version = "0.22.0", path = "../core", features = [ "datafusion", ] } lazy_static = "1" diff --git a/crates/catalog-glue/Cargo.toml b/crates/catalog-glue/Cargo.toml index c80ec9ce0b..17bb82404e 100644 --- a/crates/catalog-glue/Cargo.toml +++ b/crates/catalog-glue/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-catalog-glue" -version = "0.5.0" +version = "0.6.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -15,7 +15,7 @@ rust-version.workspace = true async-trait = { workspace = true } aws-config = "1" aws-sdk-glue = "1" -deltalake-core = { version = "0.21.0", path = "../core" } +deltalake-core = { version = "0.22.0", path = "../core" } thiserror = { workspace = true } [dev-dependencies] diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 7d5bfbaf10..9ccbfab739 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-core" -version = "0.21.0" +version = "0.22.0" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/core/src/data_catalog/storage/mod.rs b/crates/core/src/data_catalog/storage/mod.rs index 4deeb6bfd5..110e4aa075 100644 --- a/crates/core/src/data_catalog/storage/mod.rs +++ b/crates/core/src/data_catalog/storage/mod.rs @@ -30,6 +30,7 @@ const DELTA_LOG_FOLDER: &str = "_delta_log"; /// /// assuming it contains valid deltalake data, i.e a `_delta_log` folder: /// s3://host.example.com:3000/data/tpch/customer/_delta_log/ +#[derive(Debug)] pub struct ListingSchemaProvider { authority: String, /// Underlying object store diff --git a/crates/core/src/data_catalog/unity/datafusion.rs b/crates/core/src/data_catalog/unity/datafusion.rs index 44e7c9ca33..3e32a3ad68 100644 --- a/crates/core/src/data_catalog/unity/datafusion.rs +++ b/crates/core/src/data_catalog/unity/datafusion.rs @@ -17,6 +17,7 @@ use crate::data_catalog::models::ListSchemasResponse; use crate::DeltaTableBuilder; /// In-memory list of catalogs populated by unity catalog +#[derive(Debug)] pub struct UnityCatalogList { /// Collection of catalogs containing schemas and ultimately TableProviders pub catalogs: DashMap>, @@ -73,6 +74,7 @@ impl CatalogProviderList for UnityCatalogList { } /// A datafusion [`CatalogProvider`] backed by Databricks UnityCatalog +#[derive(Debug)] pub struct UnityCatalogProvider { /// Parent catalog for schemas of interest. pub schemas: DashMap>, @@ -124,6 +126,7 @@ impl CatalogProvider for UnityCatalogProvider { } /// A datafusion [`SchemaProvider`] backed by Databricks UnityCatalog +#[derive(Debug)] pub struct UnitySchemaProvider { /// UnityCatalog Api client client: Arc, diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index eb542d98dd..33746bad2b 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -23,25 +23,171 @@ use std::fmt::{self, Display, Error, Formatter, Write}; use std::sync::Arc; -use arrow_schema::DataType; +use arrow_array::{Array, GenericListArray}; +use arrow_schema::{DataType, Field}; use chrono::{DateTime, NaiveDate}; use datafusion::execution::context::SessionState; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::execution::FunctionRegistry; +use datafusion::functions_array::make_array::MakeArray; use datafusion_common::Result as DFResult; use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference}; use datafusion_expr::expr::InList; use datafusion_expr::planner::ExprPlanner; use datafusion_expr::{AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource}; +// Needed for MakeParquetArray +use datafusion_expr::{ColumnarValue, Documentation, ScalarUDF, ScalarUDFImpl, Signature}; +use datafusion_functions::core::planner::CoreFunctionPlanner; use datafusion_sql::planner::{ContextProvider, SqlToRel}; use datafusion_sql::sqlparser::ast::escape_quoted_string; use datafusion_sql::sqlparser::dialect::GenericDialect; use datafusion_sql::sqlparser::parser::Parser; use datafusion_sql::sqlparser::tokenizer::Tokenizer; +use tracing::log::*; use super::DeltaParserOptions; use crate::{DeltaResult, DeltaTableError}; +/// This struct is like Datafusion's MakeArray but ensures that `element` is used rather than `item +/// as the field name within the list. +#[derive(Debug)] +struct MakeParquetArray { + /// The actual upstream UDF, which we're just totally cheating and using + actual: MakeArray, + /// Aliases for this UDF + aliases: Vec, +} + +impl MakeParquetArray { + pub fn new() -> Self { + let actual = MakeArray::default(); + let aliases = vec!["make_array".into(), "make_list".into()]; + Self { actual, aliases } + } +} + +impl ScalarUDFImpl for MakeParquetArray { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &str { + "make_parquet_array" + } + + fn signature(&self) -> &Signature { + self.actual.signature() + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + let r_type = match arg_types.len() { + 0 => Ok(DataType::List(Arc::new(Field::new( + "element", + DataType::Int32, + true, + )))), + _ => { + // At this point, all the type in array should be coerced to the same one + Ok(DataType::List(Arc::new(Field::new( + "element", + arg_types[0].to_owned(), + true, + )))) + } + }; + debug!("MakeParquetArray return_type -> {r_type:?}"); + r_type + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + let mut data_type = DataType::Null; + for arg in args { + data_type = arg.data_type(); + } + + match self.actual.invoke(args)? { + ColumnarValue::Scalar(ScalarValue::List(df_array)) => { + let field = Arc::new(Field::new("element", data_type, true)); + let result = Ok(ColumnarValue::Scalar(ScalarValue::List(Arc::new( + GenericListArray::::try_new( + field, + df_array.offsets().clone(), + arrow_array::make_array(df_array.values().into_data()), + None, + )?, + )))); + debug!("MakeParquetArray;invoke returning: {result:?}"); + result + } + others => { + error!("Unexpected response inside MakeParquetArray! {others:?}"); + Ok(others) + } + } + } + + fn invoke_no_args(&self, number_rows: usize) -> Result { + self.actual.invoke_no_args(number_rows) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + self.actual.coerce_types(arg_types) + } + + fn documentation(&self) -> Option<&Documentation> { + self.actual.documentation() + } +} + +use datafusion::functions_array::planner::{FieldAccessPlanner, NestedFunctionPlanner}; + +/// This exists becxause the NestedFunctionPlanner _not_ the UserDefinedFunctionPlanner handles the +/// insertion of "make_array" which is used to turn [100] into List +/// +/// **screaming intensifies** +#[derive(Debug)] +struct CustomNestedFunctionPlanner { + original: NestedFunctionPlanner, +} + +impl Default for CustomNestedFunctionPlanner { + fn default() -> Self { + Self { + original: NestedFunctionPlanner, + } + } +} + +use datafusion_expr::planner::{PlannerResult, RawBinaryExpr}; +impl ExprPlanner for CustomNestedFunctionPlanner { + fn plan_array_literal( + &self, + exprs: Vec, + _schema: &DFSchema, + ) -> Result>> { + let udf = Arc::new(ScalarUDF::from(MakeParquetArray::new())); + + Ok(PlannerResult::Planned(udf.call(exprs))) + } + fn plan_binary_op( + &self, + expr: RawBinaryExpr, + schema: &DFSchema, + ) -> Result> { + self.original.plan_binary_op(expr, schema) + } + fn plan_make_map(&self, args: Vec) -> Result>> { + self.original.plan_make_map(args) + } + fn plan_any(&self, expr: RawBinaryExpr) -> Result> { + self.original.plan_any(expr) + } +} + pub(crate) struct DeltaContextProvider<'a> { state: SessionState, /// Keeping this around just to make use of the 'a lifetime @@ -51,22 +197,22 @@ pub(crate) struct DeltaContextProvider<'a> { impl<'a> DeltaContextProvider<'a> { fn new(state: &'a SessionState) -> Self { - let planners = state.expr_planners(); + // default planners are [CoreFunctionPlanner, NestedFunctionPlanner, FieldAccessPlanner, + // UserDefinedFunctionPlanner] + let planners: Vec> = vec![ + Arc::new(CoreFunctionPlanner::default()), + Arc::new(CustomNestedFunctionPlanner::default()), + Arc::new(FieldAccessPlanner), + Arc::new(datafusion::functions::planner::UserDefinedFunctionPlanner), + ]; + // Disable the above for testing + //let planners = state.expr_planners(); + let new_state = SessionStateBuilder::new_from_existing(state.clone()) + .with_expr_planners(planners.clone()) + .build(); DeltaContextProvider { planners, - // Creating a new session state with overridden scalar_functions since - // the get_field() UDF was dropped from the default scalar functions upstream in - // `36660fe10d9c0cdff62e0da0b94bee28422d3419` - state: SessionStateBuilder::new_from_existing(state.clone()) - .with_scalar_functions( - state - .scalar_functions() - .values() - .cloned() - .chain(std::iter::once(datafusion::functions::core::get_field())) - .collect(), - ) - .build(), + state: new_state, _original: state, } } diff --git a/crates/core/src/delta_datafusion/logical.rs b/crates/core/src/delta_datafusion/logical.rs index 2ce435b5b6..4aaf30242f 100644 --- a/crates/core/src/delta_datafusion/logical.rs +++ b/crates/core/src/delta_datafusion/logical.rs @@ -7,7 +7,7 @@ use datafusion_expr::{LogicalPlan, UserDefinedLogicalNodeCore}; // Metric Observer is used to update DataFusion metrics from a record batch. // See MetricObserverExec for the physical implementation -#[derive(Debug, Hash, Eq, PartialEq)] +#[derive(Debug, Hash, Eq, PartialEq, PartialOrd)] pub(crate) struct MetricObserver { // id is preserved during conversion to physical node pub id: String, diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 5fba1bd608..4425b0ff6f 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -21,6 +21,7 @@ //! ``` use std::any::Any; +use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug}; use std::sync::Arc; @@ -707,7 +708,7 @@ impl TableProvider for DeltaTable { None } - fn get_logical_plan(&self) -> Option<&LogicalPlan> { + fn get_logical_plan(&self) -> Option> { None } @@ -747,6 +748,7 @@ impl TableProvider for DeltaTable { } /// A Delta table provider that enables additional metadata columns to be included during the scan +#[derive(Debug)] pub struct DeltaTableProvider { snapshot: DeltaTableState, log_store: LogStoreRef, @@ -796,7 +798,7 @@ impl TableProvider for DeltaTableProvider { None } - fn get_logical_plan(&self) -> Option<&LogicalPlan> { + fn get_logical_plan(&self) -> Option> { None } @@ -1365,6 +1367,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec { } /// Responsible for creating deltatables +#[derive(Debug)] pub struct DeltaTableFactory {} #[async_trait] diff --git a/crates/core/src/delta_datafusion/planner.rs b/crates/core/src/delta_datafusion/planner.rs index 6119b78ce6..c167b4bb7c 100644 --- a/crates/core/src/delta_datafusion/planner.rs +++ b/crates/core/src/delta_datafusion/planner.rs @@ -36,13 +36,16 @@ use datafusion_expr::LogicalPlan; use crate::delta_datafusion::DataFusionResult; /// Deltaplanner +#[derive(Debug)] pub struct DeltaPlanner { /// custom extension planner pub extension_planner: T, } #[async_trait] -impl QueryPlanner for DeltaPlanner { +impl QueryPlanner + for DeltaPlanner +{ async fn create_physical_plan( &self, logical_plan: &LogicalPlan, diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs index 99a97e2130..5b85af9a60 100644 --- a/crates/core/src/delta_datafusion/schema_adapter.rs +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -13,14 +13,22 @@ use crate::operations::cast::cast_record_batch; pub(crate) struct DeltaSchemaAdapterFactory {} impl SchemaAdapterFactory for DeltaSchemaAdapterFactory { - fn create(&self, schema: SchemaRef) -> Box { + fn create( + &self, + projected_table_schema: SchemaRef, + table_schema: SchemaRef, + ) -> Box { Box::new(DeltaSchemaAdapter { - table_schema: schema, + projected_table_schema, + table_schema, }) } } pub(crate) struct DeltaSchemaAdapter { + /// The schema for the table, projected to include only the fields being output (projected) by + /// the mapping. + projected_table_schema: SchemaRef, /// Schema for the table table_schema: SchemaRef, } @@ -45,6 +53,7 @@ impl SchemaAdapter for DeltaSchemaAdapter { Ok(( Arc::new(SchemaMapping { + projected_schema: self.projected_table_schema.clone(), table_schema: self.table_schema.clone(), }), projection, @@ -54,12 +63,13 @@ impl SchemaAdapter for DeltaSchemaAdapter { #[derive(Debug)] pub(crate) struct SchemaMapping { + projected_schema: SchemaRef, table_schema: SchemaRef, } impl SchemaMapper for SchemaMapping { fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { - let record_batch = cast_record_batch(&batch, self.table_schema.clone(), false, true)?; + let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?; Ok(record_batch) } diff --git a/crates/core/src/kernel/scalars.rs b/crates/core/src/kernel/scalars.rs index bc1bd6eed9..a587fdc7cc 100644 --- a/crates/core/src/kernel/scalars.rs +++ b/crates/core/src/kernel/scalars.rs @@ -73,6 +73,7 @@ impl ScalarExt for Scalar { Self::Binary(val) => create_escaped_binary_string(val.as_slice()), Self::Null(_) => "null".to_string(), Self::Struct(_) => unimplemented!(), + Self::Array(_) => unimplemented!(), } } @@ -269,6 +270,7 @@ impl ScalarExt for Scalar { Self::Binary(val) => Value::String(create_escaped_binary_string(val.as_slice())), Self::Null(_) => Value::Null, Self::Struct(_) => unimplemented!(), + Self::Array(_) => unimplemented!(), } } } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index fef4fce183..cc9bcd71b4 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -65,7 +65,7 @@ //! }; //! ``` -#![deny(missing_docs)] +// #![deny(missing_docs)] #![allow(rustdoc::invalid_html_tags)] #![allow(clippy::nonminimal_bool)] diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index a81811faeb..69174d97ee 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -243,7 +243,9 @@ pub trait LogStore: Sync + Send { let mut stream = object_store.list(Some(self.log_path())); if let Some(res) = stream.next().await { match res { - Ok(meta) => Ok(meta.location.is_commit_file()), + Ok(meta) => { + Ok(meta.location.is_commit_file() || meta.location.is_checkpoint_file()) + } Err(ObjectStoreError::NotFound { .. }) => Ok(false), Err(err) => Err(err)?, } @@ -590,6 +592,66 @@ mod tests { .await .expect("Failed to look at table")); } + + #[tokio::test] + async fn test_is_location_a_table_commit() { + use object_store::path::Path; + use object_store::{PutOptions, PutPayload}; + let location = Url::parse("memory://table").unwrap(); + let store = + logstore_for(location, HashMap::default(), None).expect("Failed to get logstore"); + assert!(!store + .is_delta_table_location() + .await + .expect("Failed to identify table")); + + // Save a commit to the transaction log + let payload = PutPayload::from_static(b"test"); + let _put = store + .object_store() + .put_opts( + &Path::from("_delta_log/0.json"), + payload, + PutOptions::default(), + ) + .await + .expect("Failed to put"); + // The table should be considered a delta table + assert!(store + .is_delta_table_location() + .await + .expect("Failed to identify table")); + } + + #[tokio::test] + async fn test_is_location_a_table_checkpoint() { + use object_store::path::Path; + use object_store::{PutOptions, PutPayload}; + let location = Url::parse("memory://table").unwrap(); + let store = + logstore_for(location, HashMap::default(), None).expect("Failed to get logstore"); + assert!(!store + .is_delta_table_location() + .await + .expect("Failed to identify table")); + + // Save a "checkpoint" file to the transaction log directory + let payload = PutPayload::from_static(b"test"); + let _put = store + .object_store() + .put_opts( + &Path::from("_delta_log/0.checkpoint.parquet"), + payload, + PutOptions::default(), + ) + .await + .expect("Failed to put"); + // The table should be considered a delta table + assert!(store + .is_delta_table_location() + .await + .expect("Failed to identify table")); + } } #[cfg(feature = "datafusion")] diff --git a/crates/core/src/operations/cast/mod.rs b/crates/core/src/operations/cast/mod.rs index 278cb2bbfa..a358515194 100644 --- a/crates/core/src/operations/cast/mod.rs +++ b/crates/core/src/operations/cast/mod.rs @@ -275,12 +275,12 @@ mod tests { fn test_merge_arrow_schema_with_nested() { let left_schema = Arc::new(Schema::new(vec![Field::new( "f", - DataType::LargeList(Arc::new(Field::new("item", DataType::Utf8, false))), + DataType::LargeList(Arc::new(Field::new("element", DataType::Utf8, false))), false, )])); let right_schema = Arc::new(Schema::new(vec![Field::new( "f", - DataType::List(Arc::new(Field::new("item", DataType::LargeUtf8, false))), + DataType::List(Arc::new(Field::new("element", DataType::LargeUtf8, false))), true, )])); @@ -306,7 +306,7 @@ mod tests { let fields = Fields::from(vec![Field::new_list( "list_column", - Field::new("item", DataType::Int8, false), + Field::new("element", DataType::Int8, false), false, )]); let target_schema = Arc::new(Schema::new(fields)) as SchemaRef; @@ -316,7 +316,7 @@ mod tests { let schema = result.unwrap().schema(); let field = schema.column_with_name("list_column").unwrap().1; if let DataType::List(list_item) = field.data_type() { - assert_eq!(list_item.name(), "item"); + assert_eq!(list_item.name(), "element"); } else { panic!("Not a list"); } @@ -343,12 +343,34 @@ mod tests { #[test] fn test_is_cast_required_with_list() { - let field1 = DataType::List(FieldRef::from(Field::new("item", DataType::Int32, false))); - let field2 = DataType::List(FieldRef::from(Field::new("item", DataType::Int32, false))); + let field1 = DataType::List(FieldRef::from(Field::new( + "element", + DataType::Int32, + false, + ))); + let field2 = DataType::List(FieldRef::from(Field::new( + "element", + DataType::Int32, + false, + ))); assert!(!is_cast_required(&field1, &field2)); } + /// Delta has adopted "element" as the default list field name rather than the previously used + /// "item". This lines up more with Apache Parquet but should be handled in casting + #[test] + fn test_is_cast_required_with_old_and_new_list() { + let field1 = DataType::List(FieldRef::from(Field::new( + "element", + DataType::Int32, + false, + ))); + let field2 = DataType::List(FieldRef::from(Field::new("item", DataType::Int32, false))); + + assert!(is_cast_required(&field1, &field2)); + } + #[test] fn test_is_cast_required_with_smol_int() { assert!(is_cast_required(&DataType::Int8, &DataType::Int32)); diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 7dc58b5929..d9c4760835 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -135,7 +135,7 @@ impl DeleteBuilder { } } -#[derive(Clone)] +#[derive(Clone, Debug)] struct DeleteMetricExtensionPlanner {} #[async_trait] diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index 9084d721b7..09f58a6979 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -393,7 +393,7 @@ impl RecordBatchStream for MergeBarrierStream { } } -#[derive(Debug, Hash, Eq, PartialEq)] +#[derive(Debug, Hash, Eq, PartialEq, PartialOrd)] pub(crate) struct MergeBarrier { pub input: LogicalPlan, pub expr: Expr, diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index fe8d030464..6be8c264ba 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -581,7 +581,7 @@ pub struct MergeMetrics { /// Time taken to rewrite the matched files pub rewrite_time_ms: u64, } -#[derive(Clone)] +#[derive(Clone, Debug)] struct MergeMetricExtensionPlanner {} #[async_trait] diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 61dc4b2f46..f6752dd268 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -180,7 +180,7 @@ impl UpdateBuilder { } } -#[derive(Clone)] +#[derive(Clone, Debug)] struct UpdateMetricExtensionPlanner {} #[async_trait] @@ -242,6 +242,21 @@ async fn execute( return Err(DeltaTableError::NotInitializedWithFiles("UPDATE".into())); } + // NOTE: The optimize_projections rule is being temporarily disabled because it errors with + // our schemas for Lists due to issues discussed + // [here](https://github.com/delta-io/delta-rs/pull/2886#issuecomment-2481550560> + let rules: Vec> = state + .optimizers() + .into_iter() + .filter(|rule| { + rule.name() != "optimize_projections" && rule.name() != "simplify_expressions" + }) + .cloned() + .collect(); + let state = SessionStateBuilder::from(state) + .with_optimizer_rules(rules) + .build(); + let update_planner = DeltaPlanner:: { extension_planner: UpdateMetricExtensionPlanner {}, }; @@ -323,7 +338,6 @@ async fn execute( enable_pushdown: false, }), }); - let df_with_predicate_and_metrics = DataFrame::new(state.clone(), plan_with_metrics); let expressions: Vec = df_with_predicate_and_metrics @@ -343,6 +357,8 @@ async fn execute( }) .collect::>>()?; + //let updated_df = df_with_predicate_and_metrics.clone(); + // Disabling the select allows the coerce test to pass, still not sure why let updated_df = df_with_predicate_and_metrics.select(expressions.clone())?; let physical_plan = updated_df.clone().create_physical_plan().await?; let writer_stats_config = WriterStatsConfig::new( @@ -501,7 +517,8 @@ mod tests { get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration, }; use crate::{DeltaTable, TableProperty}; - use arrow::array::{Int32Array, StringArray}; + use arrow::array::types::Int32Type; + use arrow::array::{Int32Array, ListArray, StringArray}; use arrow::datatypes::Schema as ArrowSchema; use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; @@ -988,6 +1005,137 @@ mod tests { assert!(res.is_err()); } + #[tokio::test] + async fn test_update_with_array() { + let schema = StructType::new(vec![ + StructField::new( + "id".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "temp".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "items".to_string(), + DeltaDataType::Array(Box::new(crate::kernel::ArrayType::new( + DeltaDataType::INTEGER, + false, + ))), + true, + ), + ]); + let arrow_schema: ArrowSchema = (&schema).try_into().unwrap(); + + // Create the first batch + let arrow_field = Field::new("element", DataType::Int32, false); + let list_array = ListArray::new_null(arrow_field.clone().into(), 2); + let batch = RecordBatch::try_new( + Arc::new(arrow_schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![Some(0), Some(1)])), + Arc::new(Int32Array::from(vec![Some(30), Some(31)])), + Arc::new(list_array), + ], + ) + .expect("Failed to create record batch"); + + let table = DeltaOps::new_in_memory() + .create() + .with_columns(schema.fields().cloned()) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let table = DeltaOps(table) + .write(vec![batch]) + .await + .expect("Failed to write first batch"); + assert_eq!(table.version(), 1); + // Completed the first creation/write + + use arrow::array::{Int32Builder, ListBuilder}; + let mut new_items_builder = + ListBuilder::new(Int32Builder::new()).with_field(arrow_field.clone()); + new_items_builder.append_value([Some(100)]); + let new_items = ScalarValue::List(Arc::new(new_items_builder.finish())); + + let (table, _metrics) = DeltaOps(table) + .update() + .with_predicate(col("id").eq(lit(1))) + .with_update("items", lit(new_items)) + .await + .unwrap(); + assert_eq!(table.version(), 2); + } + + /// Lists coming in from the Python bindings need to be parsed as SQL expressions by the update + /// and therefore this test emulates their behavior to ensure that the lists are being turned + /// into expressions for the update operation correctly + #[tokio::test] + async fn test_update_with_array_that_must_be_coerced() { + let _ = pretty_env_logger::try_init(); + let schema = StructType::new(vec![ + StructField::new( + "id".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "temp".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "items".to_string(), + DeltaDataType::Array(Box::new(crate::kernel::ArrayType::new( + DeltaDataType::LONG, + true, + ))), + true, + ), + ]); + let arrow_schema: ArrowSchema = (&schema).try_into().unwrap(); + + // Create the first batch + let arrow_field = Field::new("element", DataType::Int64, true); + let list_array = ListArray::new_null(arrow_field.clone().into(), 2); + let batch = RecordBatch::try_new( + Arc::new(arrow_schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![Some(0), Some(1)])), + Arc::new(Int32Array::from(vec![Some(30), Some(31)])), + Arc::new(list_array), + ], + ) + .expect("Failed to create record batch"); + let _ = arrow::util::pretty::print_batches(&[batch.clone()]); + + let table = DeltaOps::new_in_memory() + .create() + .with_columns(schema.fields().cloned()) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let table = DeltaOps(table) + .write(vec![batch]) + .await + .expect("Failed to write first batch"); + assert_eq!(table.version(), 1); + // Completed the first creation/write + + let (table, _metrics) = DeltaOps(table) + .update() + .with_predicate(col("id").eq(lit(1))) + .with_update("items", "[100]".to_string()) + .await + .unwrap(); + assert_eq!(table.version(), 2); + } + #[tokio::test] async fn test_no_cdc_on_older_tables() { let table = prepare_values_table().await; diff --git a/crates/core/src/operations/writer.rs b/crates/core/src/operations/writer.rs index 3c9d3bda97..94d8923807 100644 --- a/crates/core/src/operations/writer.rs +++ b/crates/core/src/operations/writer.rs @@ -6,6 +6,7 @@ use arrow_array::RecordBatch; use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef}; use bytes::Bytes; use delta_kernel::expressions::Scalar; +use futures::{StreamExt, TryStreamExt}; use indexmap::IndexMap; use object_store::{path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; @@ -217,11 +218,18 @@ impl DeltaWriter { /// This will flush all remaining data. pub async fn close(mut self) -> DeltaResult> { let writers = std::mem::take(&mut self.partition_writers); - let mut actions = Vec::new(); - for (_, writer) in writers { - let writer_actions = writer.close().await?; - actions.extend(writer_actions); - } + let actions = futures::stream::iter(writers) + .map(|(_, writer)| async move { + let writer_actions = writer.close().await?; + Ok::<_, DeltaTableError>(writer_actions) + }) + .buffered(num_cpus::get()) + .try_fold(Vec::new(), |mut acc, actions| { + acc.extend(actions); + futures::future::ready(Ok(acc)) + }) + .await?; + Ok(actions) } } diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 4d39c90275..606642a3e5 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -353,18 +353,22 @@ fn parquet_bytes_from_state( // Count of actions let mut total_actions = 0; - for j in jsons { - let buf = serde_json::to_string(&j?).unwrap(); - let _ = decoder.decode(buf.as_bytes())?; - - total_actions += 1; + let span = tracing::debug_span!("serialize_checkpoint").entered(); + for chunk in &jsons.chunks(CHECKPOINT_RECORD_BATCH_SIZE) { + let mut buf = Vec::new(); + for j in chunk { + serde_json::to_writer(&mut buf, &j?)?; + total_actions += 1; + } + let _ = decoder.decode(&buf)?; while let Some(batch) = decoder.flush()? { writer.write(&batch)?; } } + drop(span); let _ = writer.close()?; - debug!("Finished writing checkpoint parquet buffer."); + debug!(total_actions, "Finished writing checkpoint parquet buffer."); let checkpoint = CheckPointBuilder::new(state.version(), total_actions) .with_size_in_bytes(bytes.len() as i64) diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index 2cf7f6a950..a04dceb3bd 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -659,4 +659,53 @@ mod tests { assert_eq!(table.version(), 1); } } + + #[tokio::test] + async fn test_json_write_checkpoint() { + use crate::operations::create::CreateBuilder; + use std::fs; + + let table_dir = tempfile::tempdir().unwrap(); + let schema = get_delta_schema(); + let path = table_dir.path().to_str().unwrap().to_string(); + let config: HashMap> = vec![ + ( + "delta.checkpointInterval".to_string(), + Some("5".to_string()), + ), + ("delta.checkpointPolicy".to_string(), Some("v2".to_string())), + ] + .into_iter() + .collect(); + let mut table = CreateBuilder::new() + .with_location(&path) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(schema.fields().cloned()) + .with_configuration(config) + .await + .unwrap(); + assert_eq!(table.version(), 0); + let mut writer = JsonWriter::for_table(&table).unwrap(); + let data = serde_json::json!( + { + "id" : "A", + "value": 42, + "modified": "2021-02-01" + } + ); + for _ in 1..6 { + writer.write(vec![data.clone()]).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + } + let dir_path = path + "/_delta_log"; + + let target_file = "00000000000000000004.checkpoint.parquet"; + let entries: Vec<_> = fs::read_dir(dir_path) + .unwrap() + .filter_map(|entry| entry.ok()) + .filter(|entry| entry.file_name().into_string().unwrap() == target_file) + .collect(); + assert_eq!(entries.len(), 1); + } } diff --git a/crates/core/src/writer/mod.rs b/crates/core/src/writer/mod.rs index d3fe529a89..cd87459c2f 100644 --- a/crates/core/src/writer/mod.rs +++ b/crates/core/src/writer/mod.rs @@ -8,7 +8,7 @@ use serde_json::Value; use crate::errors::DeltaTableError; use crate::kernel::{Action, Add}; -use crate::operations::transaction::CommitBuilder; +use crate::operations::transaction::{CommitBuilder, CommitProperties}; use crate::protocol::{ColumnCountStat, DeltaOperation, SaveMode}; use crate::DeltaTable; @@ -174,7 +174,7 @@ pub(crate) async fn flush_and_commit( predicate: None, }; - let version = CommitBuilder::default() + let version = CommitBuilder::from(CommitProperties::default()) .with_actions(adds) .build(Some(snapshot), table.log_store.clone(), operation) .await? diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index e4b93a54f5..ae763f7b72 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -343,7 +343,15 @@ impl StatsScalar { }); }; - let val = val / 10.0_f64.powi(*scale); + let mut val = val / 10.0_f64.powi(*scale); + + if val.is_normal() { + if (val.trunc() as i128).to_string().len() > (precision - scale) as usize { + // For normal values with integer parts that get rounded to a number beyond + // the precision - scale range take the next smaller (by magnitude) value + val = f64::from_bits(val.to_bits() - 1); + } + } Ok(Self::Decimal(val)) } (Statistics::FixedLenByteArray(v), Some(LogicalType::Uuid)) => { @@ -474,6 +482,10 @@ impl AddAssign for AggregatedStats { /// the list and items fields from the path, but also need to handle the /// peculiar case where the user named the list field "list" or "item". /// +/// NOTE: As of delta_kernel 0.3.1 the name switched from `item` to `element` to line up with the +/// parquet spec, see +/// [here](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists) +/// /// For example: /// /// * ["some_nested_list", "list", "item", "list", "item"] -> "some_nested_list" @@ -495,9 +507,9 @@ fn get_list_field_name(column_descr: &Arc) -> Option { while let Some(part) = column_path_parts.pop() { match (part.as_str(), lists_seen, items_seen) { ("list", seen, _) if seen == max_rep_levels => return Some("list".to_string()), - ("item", _, seen) if seen == max_rep_levels => return Some("item".to_string()), + ("element", _, seen) if seen == max_rep_levels => return Some("element".to_string()), ("list", _, _) => lists_seen += 1, - ("item", _, _) => items_seen += 1, + ("element", _, _) => items_seen += 1, (other, _, _) => return Some(other.to_string()), } } @@ -613,7 +625,7 @@ mod tests { Some($value), Some($value), None, - 0, + Some(0), false, )) }; @@ -736,6 +748,32 @@ mod tests { }), Value::from(10.0), ), + ( + simple_parquet_stat!( + Statistics::FixedLenByteArray, + FixedLenByteArray::from(vec![ + 75, 59, 76, 168, 90, 134, 196, 122, 9, 138, 34, 63, 255, 255, 255, 255 + ]) + ), + Some(LogicalType::Decimal { + scale: 6, + precision: 38, + }), + Value::from(9.999999999999999e31), + ), + ( + simple_parquet_stat!( + Statistics::FixedLenByteArray, + FixedLenByteArray::from(vec![ + 180, 196, 179, 87, 165, 121, 59, 133, 246, 117, 221, 192, 0, 0, 0, 1 + ]) + ), + Some(LogicalType::Decimal { + scale: 6, + precision: 38, + }), + Value::from(-9.999999999999999e31), + ), ( simple_parquet_stat!( Statistics::FixedLenByteArray, @@ -789,9 +827,21 @@ mod tests { let mut null_count_keys = vec!["some_list", "some_nested_list"]; null_count_keys.extend_from_slice(min_max_keys.as_slice()); - assert_eq!(min_max_keys.len(), stats.min_values.len()); - assert_eq!(min_max_keys.len(), stats.max_values.len()); - assert_eq!(null_count_keys.len(), stats.null_count.len()); + assert_eq!( + min_max_keys.len(), + stats.min_values.len(), + "min values don't match" + ); + assert_eq!( + min_max_keys.len(), + stats.max_values.len(), + "max values don't match" + ); + assert_eq!( + null_count_keys.len(), + stats.null_count.len(), + "null counts don't match" + ); // assert on min values for (k, v) in stats.min_values.iter() { @@ -820,7 +870,7 @@ mod tests { ("uuid", ColumnValueStat::Value(v)) => { assert_eq!("176c770d-92af-4a21-bf76-5d8c5261d659", v.as_str().unwrap()) } - _ => panic!("Key should not be present"), + k => panic!("Key {k:?} should not be present in min_values"), } } @@ -851,7 +901,7 @@ mod tests { ("uuid", ColumnValueStat::Value(v)) => { assert_eq!("a98bea04-d119-4f21-8edc-eb218b5849af", v.as_str().unwrap()) } - _ => panic!("Key should not be present"), + k => panic!("Key {k:?} should not be present in max_values"), } } @@ -878,7 +928,7 @@ mod tests { ("some_nested_list", ColumnCountStat::Value(v)) => assert_eq!(100, *v), ("date", ColumnCountStat::Value(v)) => assert_eq!(0, *v), ("uuid", ColumnCountStat::Value(v)) => assert_eq!(0, *v), - _ => panic!("Key should not be present"), + k => panic!("Key {k:?} should not be present in null_count"), } } } diff --git a/crates/core/tests/command_merge.rs b/crates/core/tests/command_merge.rs index 783c858750..7b4c3aad01 100644 --- a/crates/core/tests/command_merge.rs +++ b/crates/core/tests/command_merge.rs @@ -173,6 +173,7 @@ async fn test_merge_different_range() { let (_table_ref1, _metrics) = merge(table_ref1, df1, expr.clone()).await.unwrap(); let result = merge(table_ref2, df2, expr).await; + println!("{result:#?}"); assert!(result.is_ok()); } diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index 1477d90c29..9647de92bb 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake" -version = "0.21.0" +version = "0.22.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -16,12 +16,12 @@ rust-version.workspace = true features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"] [dependencies] -deltalake-core = { version = "0.21.0", path = "../core" } -deltalake-aws = { version = "0.4.0", path = "../aws", default-features = false, optional = true } -deltalake-azure = { version = "0.4.0", path = "../azure", optional = true } -deltalake-gcp = { version = "0.5.0", path = "../gcp", optional = true } -deltalake-hdfs = { version = "0.5.0", path = "../hdfs", optional = true } -deltalake-catalog-glue = { version = "0.5.0", path = "../catalog-glue", optional = true } +deltalake-core = { version = "0.22.0", path = "../core" } +deltalake-aws = { version = "0.5.0", path = "../aws", default-features = false, optional = true } +deltalake-azure = { version = "0.5.0", path = "../azure", optional = true } +deltalake-gcp = { version = "0.6.0", path = "../gcp", optional = true } +deltalake-hdfs = { version = "0.6.0", path = "../hdfs", optional = true } +deltalake-catalog-glue = { version = "0.6.0", path = "../catalog-glue", optional = true } [features] # All of these features are just reflected into the core crate until that diff --git a/crates/gcp/Cargo.toml b/crates/gcp/Cargo.toml index 51020fb630..e292138e9e 100644 --- a/crates/gcp/Cargo.toml +++ b/crates/gcp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-gcp" -version = "0.5.0" +version = "0.6.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.21.0", path = "../core" } +deltalake-core = { version = "0.22.0", path = "../core" } lazy_static = "1" # workspace depenndecies diff --git a/crates/hdfs/Cargo.toml b/crates/hdfs/Cargo.toml index 729ab90cf1..4790fbf5ce 100644 --- a/crates/hdfs/Cargo.toml +++ b/crates/hdfs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-hdfs" -version = "0.5.0" +version = "0.6.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,8 +12,8 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.21.0", path = "../core" } -hdfs-native-object-store = "0.11" +deltalake-core = { version = "0.22.0", path = "../core" } +hdfs-native-object-store = "0.12" # workspace dependecies object_store = { workspace = true } diff --git a/crates/mount/Cargo.toml b/crates/mount/Cargo.toml index 97372895ab..6a0e36c3cf 100644 --- a/crates/mount/Cargo.toml +++ b/crates/mount/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-mount" -version = "0.5.0" +version = "0.6.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.21.0", path = "../core", features = [ +deltalake-core = { version = "0.22.0", path = "../core", features = [ "datafusion", ] } lazy_static = "1" diff --git a/crates/sql/src/logical_plan.rs b/crates/sql/src/logical_plan.rs index 8ff7b90b9e..9f154c0204 100644 --- a/crates/sql/src/logical_plan.rs +++ b/crates/sql/src/logical_plan.rs @@ -1,3 +1,4 @@ +use std::cmp::Ordering; use std::fmt::{self, Debug, Display}; use std::sync::Arc; @@ -6,7 +7,7 @@ use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::{Expr, UserDefinedLogicalNodeCore}; /// Delta Lake specific operations -#[derive(Clone, PartialEq, Eq, Hash)] +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd)] pub enum DeltaStatement { /// Get provenance information, including the operation, /// user, and so on, for each write to a table. @@ -70,6 +71,10 @@ impl UserDefinedLogicalNodeCore for DeltaStatement { } } + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + fn schema(&self) -> &DFSchemaRef { match self { Self::Vacuum(Vacuum { schema, .. }) => schema, @@ -77,10 +82,6 @@ impl UserDefinedLogicalNodeCore for DeltaStatement { } } - fn inputs(&self) -> Vec<&LogicalPlan> { - vec![] - } - fn expressions(&self) -> Vec { vec![] } @@ -134,6 +135,12 @@ pub struct Vacuum { pub schema: DFSchemaRef, } +impl PartialOrd for Vacuum { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + impl Vacuum { pub fn new(table: TableReference, retention_hours: Option, dry_run: bool) -> Self { Self { @@ -152,10 +159,16 @@ impl Vacuum { pub struct DescribeHistory { /// A reference to the table pub table: TableReference, - /// Schema for commit provenence information + /// Schema for commit provenance information pub schema: DFSchemaRef, } +impl PartialOrd for DescribeHistory { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + impl DescribeHistory { pub fn new(table: TableReference) -> Self { Self { @@ -176,6 +189,12 @@ pub struct DescribeDetails { pub schema: DFSchemaRef, } +impl PartialOrd for DescribeDetails { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + impl DescribeDetails { pub fn new(table: TableReference) -> Self { Self { @@ -191,10 +210,16 @@ impl DescribeDetails { pub struct DescribeFiles { /// A reference to the table pub table: TableReference, - /// Schema for commit provenence information + /// Schema for commit provenance information pub schema: DFSchemaRef, } +impl PartialOrd for DescribeFiles { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + impl DescribeFiles { pub fn new(table: TableReference) -> Self { Self { diff --git a/crates/sql/src/planner.rs b/crates/sql/src/planner.rs index 88596b0d5b..5c820ffba9 100644 --- a/crates/sql/src/planner.rs +++ b/crates/sql/src/planner.rs @@ -186,7 +186,7 @@ mod tests { fn test_planner() { test_statement( "SELECT * FROM table1", - &["Projection: table1.column1", " TableScan: table1"], + &["Projection: *", " TableScan: table1"], ); test_statement("VACUUM table1", &["Vacuum: table1 dry_run=false"]); diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml index 9087755fb1..6b01e87539 100644 --- a/crates/test/Cargo.toml +++ b/crates/test/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "deltalake-test" -version = "0.4.0" +version = "0.5.0" edition = "2021" publish = false [dependencies] bytes = { workspace = true } chrono = { workspace = true, default-features = false, features = ["clock"] } -deltalake-core = { version = "0.21.0", path = "../core" } +deltalake-core = { version = "0.22.0", path = "../core" } dotenvy = "0" fs_extra = "1.3.0" futures = { version = "0.3" } diff --git a/docs/usage/loading-table.md b/docs/usage/loading-table.md index f000adb6ee..6c99bf7f74 100644 --- a/docs/usage/loading-table.md +++ b/docs/usage/loading-table.md @@ -81,7 +81,7 @@ storage_options = { "AWS_SECRET_ACCESS_KEY": "THE_AWS_SECRET_ACCESS_KEY", ... } -DeltaTable.is_deltatable(bucket_table_path) +DeltaTable.is_deltatable(bucket_table_path, storage_options) # True ``` @@ -131,7 +131,7 @@ wish to load: >>> dt = DeltaTable("../rust/tests/data/simple_table", version=2) ``` -Once you\'ve loaded a table, you can also change versions using either a +Once you've loaded a table, you can also change versions using either a version number or datetime string: ```python diff --git a/docs/usage/optimize/delta-lake-z-order.md b/docs/usage/optimize/delta-lake-z-order.md index 54be212c47..1eb03a871e 100644 --- a/docs/usage/optimize/delta-lake-z-order.md +++ b/docs/usage/optimize/delta-lake-z-order.md @@ -12,5 +12,5 @@ Here's how to Z Order a Delta table: ```python dt = DeltaTable("tmp") -dt.optimize.z_order([country]) +dt.optimize.z_order(["country"]) ``` diff --git a/python/Cargo.toml b/python/Cargo.toml index c285c1be25..8f18b8fb2e 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -43,8 +43,8 @@ reqwest = { version = "*", features = ["native-tls-vendored"] } deltalake-mount = { path = "../crates/mount" } [dependencies.pyo3] -version = "0.21.1" -features = ["extension-module", "abi3", "abi3-py38"] +version = "0.22.2" +features = ["extension-module", "abi3", "abi3-py39"] [dependencies.deltalake] path = "../crates/deltalake" diff --git a/python/pyproject.toml b/python/pyproject.toml index a13886209b..4cbdc67cc9 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -7,11 +7,10 @@ name = "deltalake" description = "Native Delta Lake Python binding based on delta-rs with Pandas integration" readme = "README.md" license = {file = "licenses/deltalake_license.txt"} -requires-python = ">=3.8" +requires-python = ">=3.9" keywords = ["deltalake", "delta", "datalake", "pandas", "arrow"] classifiers = [ "License :: OSI Approved :: Apache Software License", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", diff --git a/python/src/lib.rs b/python/src/lib.rs index 005076c719..361f094f38 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1439,6 +1439,7 @@ fn scalar_to_py<'py>(value: &Scalar, py_date: &Bound<'py, PyAny>) -> PyResult todo!("how should this be converted!"), }; Ok(val.into_bound(py))