From 9b733e3337139f58aa1fed117a7bf3b05237d76c Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Thu, 7 Nov 2024 20:32:59 -0600 Subject: [PATCH 01/26] perf: close partition writers concurrently to improve writes with many partitions (cherry picked from commit af17bb21f54ec1f52b27fca94b3427070ec91443) Signed-off-by: Alex Wilcoxson chore: fmt --- crates/core/src/operations/writer.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/crates/core/src/operations/writer.rs b/crates/core/src/operations/writer.rs index 3c9d3bda97..94d8923807 100644 --- a/crates/core/src/operations/writer.rs +++ b/crates/core/src/operations/writer.rs @@ -6,6 +6,7 @@ use arrow_array::RecordBatch; use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef}; use bytes::Bytes; use delta_kernel::expressions::Scalar; +use futures::{StreamExt, TryStreamExt}; use indexmap::IndexMap; use object_store::{path::Path, ObjectStore}; use parquet::arrow::ArrowWriter; @@ -217,11 +218,18 @@ impl DeltaWriter { /// This will flush all remaining data. pub async fn close(mut self) -> DeltaResult> { let writers = std::mem::take(&mut self.partition_writers); - let mut actions = Vec::new(); - for (_, writer) in writers { - let writer_actions = writer.close().await?; - actions.extend(writer_actions); - } + let actions = futures::stream::iter(writers) + .map(|(_, writer)| async move { + let writer_actions = writer.close().await?; + Ok::<_, DeltaTableError>(writer_actions) + }) + .buffered(num_cpus::get()) + .try_fold(Vec::new(), |mut acc, actions| { + acc.extend(actions); + futures::future::ready(Ok(acc)) + }) + .await?; + Ok(actions) } } From 8c7b019897f8bd6d5b2497789aea39c44221ce78 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 11 Nov 2024 01:16:28 +0000 Subject: [PATCH 02/26] chore(deps): update thiserror requirement from 1 to 2 Updates the requirements on [thiserror](https://github.com/dtolnay/thiserror) to permit the latest version. - [Release notes](https://github.com/dtolnay/thiserror/releases) - [Commits](https://github.com/dtolnay/thiserror/compare/1.0.0...1.0.69) --- updated-dependencies: - dependency-name: thiserror dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ccbb766e0f..bc569e3dd1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,7 @@ bytes = { version = "1" } chrono = { version = ">0.4.34", default-features = false, features = ["clock"] } tracing = { version = "0.1", features = ["log"] } regex = { version = "1" } -thiserror = { version = "1" } +thiserror = { version = "2" } url = { version = "2" } urlencoding = "2.1.3" uuid = { version = "1" } From 95395cb0d12964ad12df44dce4af048870a2a938 Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Thu, 7 Nov 2024 20:24:30 -0600 Subject: [PATCH 03/26] perf: batch json decode checkpoint actions when writing to parquet (cherry picked from commit 12abf0067413d10e59acadeba6c32e67e54eb7d0) Signed-off-by: Alex Wilcoxson --- crates/core/src/protocol/checkpoints.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 4d39c90275..606642a3e5 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -353,18 +353,22 @@ fn parquet_bytes_from_state( // Count of actions let mut total_actions = 0; - for j in jsons { - let buf = serde_json::to_string(&j?).unwrap(); - let _ = decoder.decode(buf.as_bytes())?; - - total_actions += 1; + let span = tracing::debug_span!("serialize_checkpoint").entered(); + for chunk in &jsons.chunks(CHECKPOINT_RECORD_BATCH_SIZE) { + let mut buf = Vec::new(); + for j in chunk { + serde_json::to_writer(&mut buf, &j?)?; + total_actions += 1; + } + let _ = decoder.decode(&buf)?; while let Some(batch) = decoder.flush()? { writer.write(&batch)?; } } + drop(span); let _ = writer.close()?; - debug!("Finished writing checkpoint parquet buffer."); + debug!(total_actions, "Finished writing checkpoint parquet buffer."); let checkpoint = CheckPointBuilder::new(state.version(), total_actions) .with_size_in_bytes(bytes.len() as i64) From 935f7eb997f8539c08454a4af90cd618658897c2 Mon Sep 17 00:00:00 2001 From: RyRyRyNguyen <154534196+RyRyRyNguyen@users.noreply.github.com> Date: Tue, 5 Nov 2024 09:23:12 -0600 Subject: [PATCH 04/26] Update delta-lake-z-order.md small correction to z_order columns argument. --- docs/usage/optimize/delta-lake-z-order.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/usage/optimize/delta-lake-z-order.md b/docs/usage/optimize/delta-lake-z-order.md index 54be212c47..1eb03a871e 100644 --- a/docs/usage/optimize/delta-lake-z-order.md +++ b/docs/usage/optimize/delta-lake-z-order.md @@ -12,5 +12,5 @@ Here's how to Z Order a Delta table: ```python dt = DeltaTable("tmp") -dt.optimize.z_order([country]) +dt.optimize.z_order(["country"]) ``` From b4eff700572a32cb2cd3172aacb697e7092489d4 Mon Sep 17 00:00:00 2001 From: Thomas Frederik Hoeck <44194839+thomasfrederikhoeck@users.noreply.github.com> Date: Thu, 14 Nov 2024 10:44:54 +0100 Subject: [PATCH 05/26] Update object_store to 0.10.2 Signed-off-by: Thomas Frederik Hoeck --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index bc569e3dd1..390265fca5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,7 +41,7 @@ arrow-ord = { version = "52" } arrow-row = { version = "52" } arrow-schema = { version = "52" } arrow-select = { version = "52" } -object_store = { version = "0.10.1" } +object_store = { version = "0.10.2" } parquet = { version = "52" } # datafusion From 9982e9c80997b54f7030c7f814a84060877f224a Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 12 Nov 2024 03:52:26 +0000 Subject: [PATCH 06/26] fix: cache credential resolution with the AWS credential provider `object_store` invokes `get_credential` on _every_ invocation of a get/list/put/etc. The provider invocation for environment based credentials is practically zero-cost, so this has no/low overhead. In the case of the AssumeRoleProvider or any provider which has _some_ cost, such as an invocation of the AWS STS APIs, this can result in rate-limiting or service quota exhaustion. In order to prevent this, the credentials are attempted to be cached only so long as they have no expired, which is defined in the `aws_credential_types::Credential` struct Signed-off-by: R. Tyler Croy Sponsored-by: Scribd Inc --- crates/aws/Cargo.toml | 2 +- crates/aws/src/credentials.rs | 101 +++++++++++++++++++++++++++++++++- 2 files changed, 99 insertions(+), 4 deletions(-) diff --git a/crates/aws/Cargo.toml b/crates/aws/Cargo.toml index 60179c7f82..cd5448c195 100644 --- a/crates/aws/Cargo.toml +++ b/crates/aws/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-aws" -version = "0.4.1" +version = "0.4.2" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/aws/src/credentials.rs b/crates/aws/src/credentials.rs index 27f4491923..2a10d2825b 100644 --- a/crates/aws/src/credentials.rs +++ b/crates/aws/src/credentials.rs @@ -4,6 +4,7 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; +use std::time::{Duration, SystemTime}; use aws_config::default_provider::credentials::DefaultCredentialsChain; use aws_config::meta::credentials::CredentialsProviderChain; @@ -19,6 +20,7 @@ use deltalake_core::storage::object_store::{ }; use deltalake_core::storage::StorageOptions; use deltalake_core::DeltaResult; +use tokio::sync::Mutex; use tracing::log::*; use crate::constants; @@ -27,12 +29,21 @@ use crate::constants; /// into a necessary [AwsCredential] type for configuring [object_store::aws::AmazonS3] #[derive(Clone, Debug)] pub(crate) struct AWSForObjectStore { + /// TODO: replace this with something with a credential cache instead of the sdkConfig sdk_config: SdkConfig, + cache: Arc>>, } impl AWSForObjectStore { pub(crate) fn new(sdk_config: SdkConfig) -> Self { - Self { sdk_config } + let cache = Arc::new(Mutex::new(None)); + Self { sdk_config, cache } + } + + /// Return true if a credential has been cached + async fn has_cached_credentials(&self) -> bool { + let guard = self.cache.lock().await; + (*guard).is_some() } } @@ -43,10 +54,34 @@ impl CredentialProvider for AWSForObjectStore { /// Provide the necessary configured credentials from the AWS SDK for use by /// [object_store::aws::AmazonS3] async fn get_credential(&self) -> ObjectStoreResult> { + debug!("AWSForObjectStore is unlocking.."); + let mut guard = self.cache.lock().await; + + if let Some(cached) = guard.as_ref() { + debug!("Located cached credentials"); + let now = SystemTime::now(); + + // Credentials such as assume role credentials will have an expiry on them, whereas + // environmental provided credentials will *not*. In the latter case, it's still + // useful avoid running through the provider chain again, so in both cases we should + // still treat credentials as useful + if cached.expiry().unwrap_or(now) >= now { + debug!("Cached credentials are still valid, returning"); + return Ok(Arc::new(Self::Credential { + key_id: cached.access_key_id().into(), + secret_key: cached.secret_access_key().into(), + token: cached.session_token().map(|o| o.to_string()), + })); + } else { + debug!("Cached credentials appear to no longer be valid, re-resolving"); + } + } + let provider = self .sdk_config .credentials_provider() .ok_or(ObjectStoreError::NotImplemented)?; + let credentials = provider .provide_credentials() @@ -60,11 +95,15 @@ impl CredentialProvider for AWSForObjectStore { credentials.access_key_id() ); - Ok(Arc::new(Self::Credential { + let result = Ok(Arc::new(Self::Credential { key_id: credentials.access_key_id().into(), secret_key: credentials.secret_access_key().into(), token: credentials.session_token().map(|o| o.to_string()), - })) + })); + + // Update the mutex before exiting with the new Credentials from the AWS provider + *guard = Some(credentials); + return result; } } @@ -324,4 +363,60 @@ mod tests { panic!("Could not retrieve credentials from the SdkConfig: {config:?}"); } } + + #[tokio::test] + async fn test_object_store_credential_provider() -> DeltaResult<()> { + let options = StorageOptions(hashmap! { + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), + }); + let sdk_config = resolve_credentials(options) + .await + .expect("Failed to resolve credentijals for the test"); + let provider = AWSForObjectStore::new(sdk_config); + let _credential = provider + .get_credential() + .await + .expect("Failed to produce a credential"); + Ok(()) + } + + /// The [CredentialProvider] is called _repeatedly_ by the [object_store] create, in essence on + /// every get/put/list/etc operation, the `get_credential` function will be invoked. + /// + /// In some cases, such as when assuming roles, this can result in an excessive amount of STS + /// API calls in the scenarios where the delta-rs process is performing a large number of S3 + /// operations. + #[tokio::test] + async fn test_object_store_credential_provider_consistency() -> DeltaResult<()> { + let options = StorageOptions(hashmap! { + constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), + constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), + }); + let sdk_config = resolve_credentials(options) + .await + .expect("Failed to resolve credentijals for the test"); + let provider = AWSForObjectStore::new(sdk_config); + let credential_a = provider + .get_credential() + .await + .expect("Failed to produce a credential"); + + assert!( + provider.has_cached_credentials().await, + "The provider should have cached the credential on the first call!" + ); + + let credential_b = provider + .get_credential() + .await + .expect("Failed to produce a credential"); + + assert_ne!( + Arc::as_ptr(&credential_a), + Arc::as_ptr(&credential_b), + "Repeated calls to get_credential() produced different results!" + ); + Ok(()) + } } From d4f18b3ae9d616e771b5d0e0fa498d0086fd91eb Mon Sep 17 00:00:00 2001 From: Justin Jossick Date: Wed, 13 Nov 2024 12:50:12 -0800 Subject: [PATCH 07/26] fix: jsonwriter should checkpoint by default This is a fix aimed to enable jsonwriter to checkpoint in accordance with delta.checkpointInterval. It changes the default commitbuilder to set a post_commit_hook so that checkpointing will be done by default. Potentially we could also expose CommitProperties as an argument to flush_and_commit, but that would require a change to the function signature and would be a breaking change. Signed-off-by: Justin Jossick --- crates/core/src/writer/json.rs | 49 ++++++++++++++++++++++++++++++++++ crates/core/src/writer/mod.rs | 4 +-- 2 files changed, 51 insertions(+), 2 deletions(-) diff --git a/crates/core/src/writer/json.rs b/crates/core/src/writer/json.rs index 2cf7f6a950..a04dceb3bd 100644 --- a/crates/core/src/writer/json.rs +++ b/crates/core/src/writer/json.rs @@ -659,4 +659,53 @@ mod tests { assert_eq!(table.version(), 1); } } + + #[tokio::test] + async fn test_json_write_checkpoint() { + use crate::operations::create::CreateBuilder; + use std::fs; + + let table_dir = tempfile::tempdir().unwrap(); + let schema = get_delta_schema(); + let path = table_dir.path().to_str().unwrap().to_string(); + let config: HashMap> = vec![ + ( + "delta.checkpointInterval".to_string(), + Some("5".to_string()), + ), + ("delta.checkpointPolicy".to_string(), Some("v2".to_string())), + ] + .into_iter() + .collect(); + let mut table = CreateBuilder::new() + .with_location(&path) + .with_table_name("test-table") + .with_comment("A table for running tests") + .with_columns(schema.fields().cloned()) + .with_configuration(config) + .await + .unwrap(); + assert_eq!(table.version(), 0); + let mut writer = JsonWriter::for_table(&table).unwrap(); + let data = serde_json::json!( + { + "id" : "A", + "value": 42, + "modified": "2021-02-01" + } + ); + for _ in 1..6 { + writer.write(vec![data.clone()]).await.unwrap(); + writer.flush_and_commit(&mut table).await.unwrap(); + } + let dir_path = path + "/_delta_log"; + + let target_file = "00000000000000000004.checkpoint.parquet"; + let entries: Vec<_> = fs::read_dir(dir_path) + .unwrap() + .filter_map(|entry| entry.ok()) + .filter(|entry| entry.file_name().into_string().unwrap() == target_file) + .collect(); + assert_eq!(entries.len(), 1); + } } diff --git a/crates/core/src/writer/mod.rs b/crates/core/src/writer/mod.rs index d3fe529a89..cd87459c2f 100644 --- a/crates/core/src/writer/mod.rs +++ b/crates/core/src/writer/mod.rs @@ -8,7 +8,7 @@ use serde_json::Value; use crate::errors::DeltaTableError; use crate::kernel::{Action, Add}; -use crate::operations::transaction::CommitBuilder; +use crate::operations::transaction::{CommitBuilder, CommitProperties}; use crate::protocol::{ColumnCountStat, DeltaOperation, SaveMode}; use crate::DeltaTable; @@ -174,7 +174,7 @@ pub(crate) async fn flush_and_commit( predicate: None, }; - let version = CommitBuilder::default() + let version = CommitBuilder::from(CommitProperties::default()) .with_actions(adds) .build(Some(snapshot), table.log_store.clone(), operation) .await? From c1c42b650de945b1c011337cc04c057e4b739992 Mon Sep 17 00:00:00 2001 From: stretchadito Date: Mon, 18 Nov 2024 21:30:38 -0800 Subject: [PATCH 08/26] valid table can have checkpoint as first log Signed-off-by: stretchadito --- crates/core/src/logstore/mod.rs | 64 ++++++++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index a81811faeb..69174d97ee 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -243,7 +243,9 @@ pub trait LogStore: Sync + Send { let mut stream = object_store.list(Some(self.log_path())); if let Some(res) = stream.next().await { match res { - Ok(meta) => Ok(meta.location.is_commit_file()), + Ok(meta) => { + Ok(meta.location.is_commit_file() || meta.location.is_checkpoint_file()) + } Err(ObjectStoreError::NotFound { .. }) => Ok(false), Err(err) => Err(err)?, } @@ -590,6 +592,66 @@ mod tests { .await .expect("Failed to look at table")); } + + #[tokio::test] + async fn test_is_location_a_table_commit() { + use object_store::path::Path; + use object_store::{PutOptions, PutPayload}; + let location = Url::parse("memory://table").unwrap(); + let store = + logstore_for(location, HashMap::default(), None).expect("Failed to get logstore"); + assert!(!store + .is_delta_table_location() + .await + .expect("Failed to identify table")); + + // Save a commit to the transaction log + let payload = PutPayload::from_static(b"test"); + let _put = store + .object_store() + .put_opts( + &Path::from("_delta_log/0.json"), + payload, + PutOptions::default(), + ) + .await + .expect("Failed to put"); + // The table should be considered a delta table + assert!(store + .is_delta_table_location() + .await + .expect("Failed to identify table")); + } + + #[tokio::test] + async fn test_is_location_a_table_checkpoint() { + use object_store::path::Path; + use object_store::{PutOptions, PutPayload}; + let location = Url::parse("memory://table").unwrap(); + let store = + logstore_for(location, HashMap::default(), None).expect("Failed to get logstore"); + assert!(!store + .is_delta_table_location() + .await + .expect("Failed to identify table")); + + // Save a "checkpoint" file to the transaction log directory + let payload = PutPayload::from_static(b"test"); + let _put = store + .object_store() + .put_opts( + &Path::from("_delta_log/0.checkpoint.parquet"), + payload, + PutOptions::default(), + ) + .await + .expect("Failed to put"); + // The table should be considered a delta table + assert!(store + .is_delta_table_location() + .await + .expect("Failed to identify table")); + } } #[cfg(feature = "datafusion")] From 0cb091c770337e53e0869fe4cd76bfdbd949237c Mon Sep 17 00:00:00 2001 From: Vikas Sharma Date: Tue, 19 Nov 2024 22:39:54 +0530 Subject: [PATCH 09/26] Fixed the deprecation warnings in spot check step of the build. Signed-off-by: Vikas Sharma --- crates/aws/src/constants.rs | 1 + crates/aws/src/storage.rs | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/crates/aws/src/constants.rs b/crates/aws/src/constants.rs index cfc5559518..ca73b69819 100644 --- a/crates/aws/src/constants.rs +++ b/crates/aws/src/constants.rs @@ -96,6 +96,7 @@ pub const AWS_FORCE_CREDENTIAL_LOAD: &str = "AWS_FORCE_CREDENTIAL_LOAD"; /// The list of option keys owned by the S3 module. /// Option keys not contained in this list will be added to the `extra_opts` /// field of [crate::storage::s3::S3StorageOptions]. +#[allow(deprecated)] pub const S3_OPTS: &[&str] = &[ AWS_ENDPOINT_URL, AWS_ENDPOINT_URL_DYNAMODB, diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index bfa44c3eac..604404e79b 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -527,10 +527,15 @@ mod tests { ); std::env::set_var(constants::AWS_IAM_ROLE_SESSION_NAME, "session_name"); std::env::set_var( + #[allow(deprecated)] constants::AWS_S3_ASSUME_ROLE_ARN, "arn:aws:iam::123456789012:role/some_role", ); - std::env::set_var(constants::AWS_S3_ROLE_SESSION_NAME, "session_name"); + std::env::set_var( + #[allow(deprecated)] + constants::AWS_S3_ROLE_SESSION_NAME, + "session_name", + ); std::env::set_var(constants::AWS_WEB_IDENTITY_TOKEN_FILE, "token_file"); let options = S3StorageOptions::try_default().unwrap(); From c7e1049a31fd2063c0e1942a79facddc722a24bd Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 14 Sep 2024 11:41:31 +0200 Subject: [PATCH 10/26] chore: bump kernel --- Cargo.toml | 28 ++++++++++++++-------------- crates/core/src/kernel/scalars.rs | 3 ++- python/Cargo.toml | 2 +- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 390265fca5..325f9e075a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,23 +26,23 @@ debug = true debug = "line-tables-only" [workspace.dependencies] -delta_kernel = { version = "=0.3.0" } +delta_kernel = { version = "0.3.1" } # delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" } # arrow -arrow = { version = "52" } -arrow-arith = { version = "52" } -arrow-array = { version = "52", features = ["chrono-tz"] } -arrow-buffer = { version = "52" } -arrow-cast = { version = "52" } -arrow-ipc = { version = "52" } -arrow-json = { version = "52" } -arrow-ord = { version = "52" } -arrow-row = { version = "52" } -arrow-schema = { version = "52" } -arrow-select = { version = "52" } -object_store = { version = "0.10.2" } -parquet = { version = "52" } +arrow = { version = "53" } +arrow-arith = { version = "53" } +arrow-array = { version = "53", features = ["chrono-tz"] } +arrow-buffer = { version = "53" } +arrow-cast = { version = "53" } +arrow-ipc = { version = "53" } +arrow-json = { version = "53" } +arrow-ord = { version = "53" } +arrow-row = { version = "53" } +arrow-schema = { version = "53" } +arrow-select = { version = "53" } +object_store = { version = "0.11" } +parquet = { version = "53" } # datafusion datafusion = { version = "41" } diff --git a/crates/core/src/kernel/scalars.rs b/crates/core/src/kernel/scalars.rs index bc1bd6eed9..735fe54be9 100644 --- a/crates/core/src/kernel/scalars.rs +++ b/crates/core/src/kernel/scalars.rs @@ -1,5 +1,5 @@ //! Auxiliary methods for dealing with kernel scalars -use std::cmp::Ordering; +use std::{cmp::Ordering, fmt::Debug}; use arrow_array::Array; use arrow_schema::TimeUnit; @@ -73,6 +73,7 @@ impl ScalarExt for Scalar { Self::Binary(val) => create_escaped_binary_string(val.as_slice()), Self::Null(_) => "null".to_string(), Self::Struct(_) => unimplemented!(), + Self::Array(_) => unimplemented!(), } } diff --git a/python/Cargo.toml b/python/Cargo.toml index c285c1be25..6fc4f46ae8 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -43,7 +43,7 @@ reqwest = { version = "*", features = ["native-tls-vendored"] } deltalake-mount = { path = "../crates/mount" } [dependencies.pyo3] -version = "0.21.1" +version = "0.22.2" features = ["extension-module", "abi3", "abi3-py38"] [dependencies.deltalake] From 2089ad434076d171f23d5a98080b08d71545d80c Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 3 Sep 2024 14:36:45 +0000 Subject: [PATCH 11/26] chore: adopt new datafusion crate Signed-off-by: R. Tyler Croy --- Cargo.toml | 18 +++++++++--------- crates/core/src/delta_datafusion/mod.rs | 5 +++-- crates/core/src/kernel/scalars.rs | 3 ++- crates/core/src/writer/stats.rs | 2 +- crates/core/tests/command_merge.rs | 1 + crates/hdfs/Cargo.toml | 2 +- 6 files changed, 17 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 325f9e075a..221c87c953 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,15 +45,15 @@ object_store = { version = "0.11" } parquet = { version = "53" } # datafusion -datafusion = { version = "41" } -datafusion-expr = { version = "41" } -datafusion-common = { version = "41" } -datafusion-proto = { version = "41" } -datafusion-sql = { version = "41" } -datafusion-physical-expr = { version = "41" } -datafusion-physical-plan = { version = "41" } -datafusion-functions = { version = "41" } -datafusion-functions-aggregate = { version = "41" } +datafusion = { version = "42" } +datafusion-expr = { version = "42" } +datafusion-common = { version = "42" } +datafusion-proto = { version = "42" } +datafusion-sql = { version = "42" } +datafusion-physical-expr = { version = "42" } +datafusion-physical-plan = { version = "42" } +datafusion-functions = { version = "42" } +datafusion-functions-aggregate = { version = "42" } # serde serde = { version = "1.0.194", features = ["derive"] } diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 5fba1bd608..ebc75cdac3 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -21,6 +21,7 @@ //! ``` use std::any::Any; +use std::borrow::Cow; use std::collections::{HashMap, HashSet}; use std::fmt::{self, Debug}; use std::sync::Arc; @@ -707,7 +708,7 @@ impl TableProvider for DeltaTable { None } - fn get_logical_plan(&self) -> Option<&LogicalPlan> { + fn get_logical_plan(&self) -> Option> { None } @@ -796,7 +797,7 @@ impl TableProvider for DeltaTableProvider { None } - fn get_logical_plan(&self) -> Option<&LogicalPlan> { + fn get_logical_plan(&self) -> Option> { None } diff --git a/crates/core/src/kernel/scalars.rs b/crates/core/src/kernel/scalars.rs index 735fe54be9..a587fdc7cc 100644 --- a/crates/core/src/kernel/scalars.rs +++ b/crates/core/src/kernel/scalars.rs @@ -1,5 +1,5 @@ //! Auxiliary methods for dealing with kernel scalars -use std::{cmp::Ordering, fmt::Debug}; +use std::cmp::Ordering; use arrow_array::Array; use arrow_schema::TimeUnit; @@ -270,6 +270,7 @@ impl ScalarExt for Scalar { Self::Binary(val) => Value::String(create_escaped_binary_string(val.as_slice())), Self::Null(_) => Value::Null, Self::Struct(_) => unimplemented!(), + Self::Array(_) => unimplemented!(), } } } diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index e4b93a54f5..eee28b8be8 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -613,7 +613,7 @@ mod tests { Some($value), Some($value), None, - 0, + Some(0), false, )) }; diff --git a/crates/core/tests/command_merge.rs b/crates/core/tests/command_merge.rs index 783c858750..7b4c3aad01 100644 --- a/crates/core/tests/command_merge.rs +++ b/crates/core/tests/command_merge.rs @@ -173,6 +173,7 @@ async fn test_merge_different_range() { let (_table_ref1, _metrics) = merge(table_ref1, df1, expr.clone()).await.unwrap(); let result = merge(table_ref2, df2, expr).await; + println!("{result:#?}"); assert!(result.is_ok()); } diff --git a/crates/hdfs/Cargo.toml b/crates/hdfs/Cargo.toml index 729ab90cf1..720bd16fc1 100644 --- a/crates/hdfs/Cargo.toml +++ b/crates/hdfs/Cargo.toml @@ -13,7 +13,7 @@ rust-version.workspace = true [dependencies] deltalake-core = { version = "0.21.0", path = "../core" } -hdfs-native-object-store = "0.11" +hdfs-native-object-store = "0.12" # workspace dependecies object_store = { workspace = true } From 3b8a7b3f8a31670a8e3d33cd487214958438aa61 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 19 Sep 2024 13:53:27 +0000 Subject: [PATCH 12/26] fix: upgrade python ABI to 3.9 from 3.8 The release of pyo3 0.22.3 compells this since we cannot otherwise compile. The choice is between pinning 0.22.2 and upgrading our ABI, and I think it's better to upgrade the ABI Signed-off-by: R. Tyler Croy --- python/Cargo.toml | 2 +- python/src/lib.rs | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/python/Cargo.toml b/python/Cargo.toml index 6fc4f46ae8..8f18b8fb2e 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -44,7 +44,7 @@ deltalake-mount = { path = "../crates/mount" } [dependencies.pyo3] version = "0.22.2" -features = ["extension-module", "abi3", "abi3-py38"] +features = ["extension-module", "abi3", "abi3-py39"] [dependencies.deltalake] path = "../crates/deltalake" diff --git a/python/src/lib.rs b/python/src/lib.rs index 005076c719..361f094f38 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -1439,6 +1439,7 @@ fn scalar_to_py<'py>(value: &Scalar, py_date: &Bound<'py, PyAny>) -> PyResult todo!("how should this be converted!"), }; Ok(val.into_bound(py)) From 6b3adbf90aa150d540cc290b1f298ba43bee45a0 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Fri, 20 Sep 2024 12:43:22 +0000 Subject: [PATCH 13/26] fix: adopt the right array item name which changed in kernel 0.3.1 see delta-incubator/delta-kernel-rs#301 Signed-off-by: R. Tyler Croy --- crates/core/src/writer/stats.rs | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index eee28b8be8..c1f0363083 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -474,6 +474,10 @@ impl AddAssign for AggregatedStats { /// the list and items fields from the path, but also need to handle the /// peculiar case where the user named the list field "list" or "item". /// +/// NOTE: As of delta_kernel 0.3.1 the name switched from `item` to `element` to line up with the +/// parquet spec, see +/// [here](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists) +/// /// For example: /// /// * ["some_nested_list", "list", "item", "list", "item"] -> "some_nested_list" @@ -495,9 +499,9 @@ fn get_list_field_name(column_descr: &Arc) -> Option { while let Some(part) = column_path_parts.pop() { match (part.as_str(), lists_seen, items_seen) { ("list", seen, _) if seen == max_rep_levels => return Some("list".to_string()), - ("item", _, seen) if seen == max_rep_levels => return Some("item".to_string()), + ("element", _, seen) if seen == max_rep_levels => return Some("element".to_string()), ("list", _, _) => lists_seen += 1, - ("item", _, _) => items_seen += 1, + ("element", _, _) => items_seen += 1, (other, _, _) => return Some(other.to_string()), } } @@ -789,9 +793,21 @@ mod tests { let mut null_count_keys = vec!["some_list", "some_nested_list"]; null_count_keys.extend_from_slice(min_max_keys.as_slice()); - assert_eq!(min_max_keys.len(), stats.min_values.len()); - assert_eq!(min_max_keys.len(), stats.max_values.len()); - assert_eq!(null_count_keys.len(), stats.null_count.len()); + assert_eq!( + min_max_keys.len(), + stats.min_values.len(), + "min values don't match" + ); + assert_eq!( + min_max_keys.len(), + stats.max_values.len(), + "max values don't match" + ); + assert_eq!( + null_count_keys.len(), + stats.null_count.len(), + "null counts don't match" + ); // assert on min values for (k, v) in stats.min_values.iter() { @@ -820,7 +836,7 @@ mod tests { ("uuid", ColumnValueStat::Value(v)) => { assert_eq!("176c770d-92af-4a21-bf76-5d8c5261d659", v.as_str().unwrap()) } - _ => panic!("Key should not be present"), + k => panic!("Key {k:?} should not be present in min_values"), } } @@ -851,7 +867,7 @@ mod tests { ("uuid", ColumnValueStat::Value(v)) => { assert_eq!("a98bea04-d119-4f21-8edc-eb218b5849af", v.as_str().unwrap()) } - _ => panic!("Key should not be present"), + k => panic!("Key {k:?} should not be present in max_values"), } } @@ -878,7 +894,7 @@ mod tests { ("some_nested_list", ColumnCountStat::Value(v)) => assert_eq!(100, *v), ("date", ColumnCountStat::Value(v)) => assert_eq!(0, *v), ("uuid", ColumnCountStat::Value(v)) => assert_eq!(0, *v), - _ => panic!("Key should not be present"), + k => panic!("Key {k:?} should not be present in null_count"), } } } From e6afc3304c984fb5f6ec7a03616381bfb0c123ad Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sun, 22 Sep 2024 10:50:41 +0200 Subject: [PATCH 14/26] chore: drop python 3.8 support --- .github/workflows/python_build.yml | 6 +++--- crates/sql/src/planner.rs | 2 +- python/pyproject.toml | 3 +-- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/.github/workflows/python_build.yml b/.github/workflows/python_build.yml index dc5483e091..303013baa7 100644 --- a/.github/workflows/python_build.yml +++ b/.github/workflows/python_build.yml @@ -31,7 +31,7 @@ jobs: run: make check-rust test-minimal: - name: Python Build (Python 3.8 PyArrow 16.0.0) + name: Python Build (Python 3.9 PyArrow 16.0.0) runs-on: ubuntu-latest env: RUSTFLAGS: "-C debuginfo=line-tables-only" @@ -43,7 +43,7 @@ jobs: - name: Setup Environment uses: ./.github/actions/setup-env with: - python-version: 3.8 + python-version: 3.9 - name: Build and install deltalake run: | @@ -135,7 +135,7 @@ jobs: strategy: matrix: - python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.9", "3.10", "3.11", "3.12"] steps: - uses: actions/checkout@v3 diff --git a/crates/sql/src/planner.rs b/crates/sql/src/planner.rs index 88596b0d5b..5c820ffba9 100644 --- a/crates/sql/src/planner.rs +++ b/crates/sql/src/planner.rs @@ -186,7 +186,7 @@ mod tests { fn test_planner() { test_statement( "SELECT * FROM table1", - &["Projection: table1.column1", " TableScan: table1"], + &["Projection: *", " TableScan: table1"], ); test_statement("VACUUM table1", &["Vacuum: table1 dry_run=false"]); diff --git a/python/pyproject.toml b/python/pyproject.toml index a13886209b..4cbdc67cc9 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -7,11 +7,10 @@ name = "deltalake" description = "Native Delta Lake Python binding based on delta-rs with Pandas integration" readme = "README.md" license = {file = "licenses/deltalake_license.txt"} -requires-python = ">=3.8" +requires-python = ">=3.9" keywords = ["deltalake", "delta", "datalake", "pandas", "arrow"] classifiers = [ "License :: OSI Approved :: Apache Software License", - "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", From a74e59661c321d2a9eaf471e83c048b32b17171f Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Tue, 22 Oct 2024 17:32:55 +0000 Subject: [PATCH 15/26] chore: add a rust test demonstrating challenges with updates and lists Signed-off-by: R. Tyler Croy --- crates/core/src/operations/update.rs | 64 +++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 61dc4b2f46..179f0db6d6 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -501,7 +501,8 @@ mod tests { get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration, }; use crate::{DeltaTable, TableProperty}; - use arrow::array::{Int32Array, StringArray}; + use arrow::array::types::Int32Type; + use arrow::array::{Int32Array, ListArray, StringArray}; use arrow::datatypes::Schema as ArrowSchema; use arrow::datatypes::{Field, Schema}; use arrow::record_batch::RecordBatch; @@ -988,6 +989,67 @@ mod tests { assert!(res.is_err()); } + #[tokio::test] + async fn test_update_with_array() { + let schema = StructType::new(vec![ + StructField::new( + "id".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "temp".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "items".to_string(), + DeltaDataType::Array(Box::new(crate::kernel::ArrayType::new( + DeltaDataType::INTEGER, + false, + ))), + true, + ), + ]); + let arrow_schema: ArrowSchema = (&schema).try_into().unwrap(); + + // Create the first batch + let arrow_field = Field::new("element", DataType::Int32, false); + let list_array = ListArray::new_null(arrow_field.clone().into(), 2); + let batch = RecordBatch::try_new( + Arc::new(arrow_schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![Some(0), Some(1)])), + Arc::new(Int32Array::from(vec![Some(30), Some(31)])), + Arc::new(list_array), + ], + ) + .expect("Failed to create record batch"); + + let table = DeltaOps::new_in_memory() + .create() + .with_columns(schema.fields().cloned()) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let table = DeltaOps(table) + .write(vec![batch]) + .await + .expect("Failed to write first batch"); + assert_eq!(table.version(), 1); + // Completed the first creation/write + + // Update + let (table, _metrics) = DeltaOps(table) + .update() + .with_predicate(col("id").eq(lit(1))) + .with_update("items", make_array(vec![lit(100)])) + .await + .unwrap(); + assert_eq!(table.version(), 2); + } + #[tokio::test] async fn test_no_cdc_on_older_tables() { let table = prepare_values_table().await; From 01a6a03ef2206f6b2be6122e1ed93506785b1899 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sun, 10 Nov 2024 15:38:13 +0000 Subject: [PATCH 16/26] chore: upgrade to the latest datafusion 43 Signed-off-by: R. Tyler Croy --- Cargo.toml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 221c87c953..2d566152da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,15 +45,15 @@ object_store = { version = "0.11" } parquet = { version = "53" } # datafusion -datafusion = { version = "42" } -datafusion-expr = { version = "42" } -datafusion-common = { version = "42" } -datafusion-proto = { version = "42" } -datafusion-sql = { version = "42" } -datafusion-physical-expr = { version = "42" } -datafusion-physical-plan = { version = "42" } -datafusion-functions = { version = "42" } -datafusion-functions-aggregate = { version = "42" } +datafusion = { version = "43" } +datafusion-expr = { version = "43" } +datafusion-common = { version = "43" } +datafusion-proto = { version = "43" } +datafusion-sql = { version = "43" } +datafusion-physical-expr = { version = "43" } +datafusion-physical-plan = { version = "43" } +datafusion-functions = { version = "43" } +datafusion-functions-aggregate = { version = "43" } # serde serde = { version = "1.0.194", features = ["derive"] } From d7a124db380777f3232a9cc34dc86f50ceea7d4e Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 11 Nov 2024 14:46:01 +0000 Subject: [PATCH 17/26] chore: upgrade all version ranges for the datafusion 43 ABI change Signed-off-by: R. Tyler Croy --- crates/aws/Cargo.toml | 4 ++-- crates/azure/Cargo.toml | 4 ++-- crates/catalog-glue/Cargo.toml | 4 ++-- crates/core/Cargo.toml | 2 +- crates/deltalake/Cargo.toml | 14 +++++++------- crates/gcp/Cargo.toml | 4 ++-- crates/hdfs/Cargo.toml | 4 ++-- crates/mount/Cargo.toml | 4 ++-- crates/test/Cargo.toml | 4 ++-- 9 files changed, 22 insertions(+), 22 deletions(-) diff --git a/crates/aws/Cargo.toml b/crates/aws/Cargo.toml index cd5448c195..62fe6b0f42 100644 --- a/crates/aws/Cargo.toml +++ b/crates/aws/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-aws" -version = "0.4.2" +version = "0.5.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.21.0", path = "../core" } +deltalake-core = { version = "0.22.0", path = "../core" } aws-smithy-runtime-api = { version="1.7" } aws-smithy-runtime = { version="1.7", optional = true} aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]} diff --git a/crates/azure/Cargo.toml b/crates/azure/Cargo.toml index 6ed096fa29..d80b2760ad 100644 --- a/crates/azure/Cargo.toml +++ b/crates/azure/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-azure" -version = "0.4.0" +version = "0.5.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.21.0", path = "../core", features = [ +deltalake-core = { version = "0.22.0", path = "../core", features = [ "datafusion", ] } lazy_static = "1" diff --git a/crates/catalog-glue/Cargo.toml b/crates/catalog-glue/Cargo.toml index c80ec9ce0b..17bb82404e 100644 --- a/crates/catalog-glue/Cargo.toml +++ b/crates/catalog-glue/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-catalog-glue" -version = "0.5.0" +version = "0.6.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -15,7 +15,7 @@ rust-version.workspace = true async-trait = { workspace = true } aws-config = "1" aws-sdk-glue = "1" -deltalake-core = { version = "0.21.0", path = "../core" } +deltalake-core = { version = "0.22.0", path = "../core" } thiserror = { workspace = true } [dev-dependencies] diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 7d5bfbaf10..9ccbfab739 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-core" -version = "0.21.0" +version = "0.22.0" authors.workspace = true keywords.workspace = true readme.workspace = true diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index 1477d90c29..9647de92bb 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake" -version = "0.21.0" +version = "0.22.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -16,12 +16,12 @@ rust-version.workspace = true features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"] [dependencies] -deltalake-core = { version = "0.21.0", path = "../core" } -deltalake-aws = { version = "0.4.0", path = "../aws", default-features = false, optional = true } -deltalake-azure = { version = "0.4.0", path = "../azure", optional = true } -deltalake-gcp = { version = "0.5.0", path = "../gcp", optional = true } -deltalake-hdfs = { version = "0.5.0", path = "../hdfs", optional = true } -deltalake-catalog-glue = { version = "0.5.0", path = "../catalog-glue", optional = true } +deltalake-core = { version = "0.22.0", path = "../core" } +deltalake-aws = { version = "0.5.0", path = "../aws", default-features = false, optional = true } +deltalake-azure = { version = "0.5.0", path = "../azure", optional = true } +deltalake-gcp = { version = "0.6.0", path = "../gcp", optional = true } +deltalake-hdfs = { version = "0.6.0", path = "../hdfs", optional = true } +deltalake-catalog-glue = { version = "0.6.0", path = "../catalog-glue", optional = true } [features] # All of these features are just reflected into the core crate until that diff --git a/crates/gcp/Cargo.toml b/crates/gcp/Cargo.toml index 51020fb630..e292138e9e 100644 --- a/crates/gcp/Cargo.toml +++ b/crates/gcp/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-gcp" -version = "0.5.0" +version = "0.6.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.21.0", path = "../core" } +deltalake-core = { version = "0.22.0", path = "../core" } lazy_static = "1" # workspace depenndecies diff --git a/crates/hdfs/Cargo.toml b/crates/hdfs/Cargo.toml index 720bd16fc1..4790fbf5ce 100644 --- a/crates/hdfs/Cargo.toml +++ b/crates/hdfs/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-hdfs" -version = "0.5.0" +version = "0.6.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.21.0", path = "../core" } +deltalake-core = { version = "0.22.0", path = "../core" } hdfs-native-object-store = "0.12" # workspace dependecies diff --git a/crates/mount/Cargo.toml b/crates/mount/Cargo.toml index 97372895ab..6a0e36c3cf 100644 --- a/crates/mount/Cargo.toml +++ b/crates/mount/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "deltalake-mount" -version = "0.5.0" +version = "0.6.0" authors.workspace = true keywords.workspace = true readme.workspace = true @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [dependencies] -deltalake-core = { version = "0.21.0", path = "../core", features = [ +deltalake-core = { version = "0.22.0", path = "../core", features = [ "datafusion", ] } lazy_static = "1" diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml index 9087755fb1..6b01e87539 100644 --- a/crates/test/Cargo.toml +++ b/crates/test/Cargo.toml @@ -1,13 +1,13 @@ [package] name = "deltalake-test" -version = "0.4.0" +version = "0.5.0" edition = "2021" publish = false [dependencies] bytes = { workspace = true } chrono = { workspace = true, default-features = false, features = ["clock"] } -deltalake-core = { version = "0.21.0", path = "../core" } +deltalake-core = { version = "0.22.0", path = "../core" } dotenvy = "0" fs_extra = "1.3.0" futures = { version = "0.3" } From 8f84d7db95d50ffbca3553a7f502fd3755bce1e2 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 11 Nov 2024 14:53:50 +0000 Subject: [PATCH 18/26] chore: add trait annotations for compilation with datafusion 43 Signed-off-by: R. Tyler Croy --- crates/core/src/data_catalog/storage/mod.rs | 1 + crates/core/src/delta_datafusion/logical.rs | 2 +- crates/core/src/delta_datafusion/mod.rs | 2 ++ crates/core/src/delta_datafusion/planner.rs | 5 ++++- crates/core/src/delta_datafusion/schema_adapter.rs | 12 ++++++++++-- crates/core/src/operations/delete.rs | 2 +- crates/core/src/operations/merge/barrier.rs | 2 +- crates/core/src/operations/merge/mod.rs | 2 +- crates/core/src/operations/update.rs | 2 +- 9 files changed, 22 insertions(+), 8 deletions(-) diff --git a/crates/core/src/data_catalog/storage/mod.rs b/crates/core/src/data_catalog/storage/mod.rs index 4deeb6bfd5..110e4aa075 100644 --- a/crates/core/src/data_catalog/storage/mod.rs +++ b/crates/core/src/data_catalog/storage/mod.rs @@ -30,6 +30,7 @@ const DELTA_LOG_FOLDER: &str = "_delta_log"; /// /// assuming it contains valid deltalake data, i.e a `_delta_log` folder: /// s3://host.example.com:3000/data/tpch/customer/_delta_log/ +#[derive(Debug)] pub struct ListingSchemaProvider { authority: String, /// Underlying object store diff --git a/crates/core/src/delta_datafusion/logical.rs b/crates/core/src/delta_datafusion/logical.rs index 2ce435b5b6..4aaf30242f 100644 --- a/crates/core/src/delta_datafusion/logical.rs +++ b/crates/core/src/delta_datafusion/logical.rs @@ -7,7 +7,7 @@ use datafusion_expr::{LogicalPlan, UserDefinedLogicalNodeCore}; // Metric Observer is used to update DataFusion metrics from a record batch. // See MetricObserverExec for the physical implementation -#[derive(Debug, Hash, Eq, PartialEq)] +#[derive(Debug, Hash, Eq, PartialEq, PartialOrd)] pub(crate) struct MetricObserver { // id is preserved during conversion to physical node pub id: String, diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index ebc75cdac3..4425b0ff6f 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -748,6 +748,7 @@ impl TableProvider for DeltaTable { } /// A Delta table provider that enables additional metadata columns to be included during the scan +#[derive(Debug)] pub struct DeltaTableProvider { snapshot: DeltaTableState, log_store: LogStoreRef, @@ -1366,6 +1367,7 @@ impl LogicalExtensionCodec for DeltaLogicalCodec { } /// Responsible for creating deltatables +#[derive(Debug)] pub struct DeltaTableFactory {} #[async_trait] diff --git a/crates/core/src/delta_datafusion/planner.rs b/crates/core/src/delta_datafusion/planner.rs index 6119b78ce6..c167b4bb7c 100644 --- a/crates/core/src/delta_datafusion/planner.rs +++ b/crates/core/src/delta_datafusion/planner.rs @@ -36,13 +36,16 @@ use datafusion_expr::LogicalPlan; use crate::delta_datafusion::DataFusionResult; /// Deltaplanner +#[derive(Debug)] pub struct DeltaPlanner { /// custom extension planner pub extension_planner: T, } #[async_trait] -impl QueryPlanner for DeltaPlanner { +impl QueryPlanner + for DeltaPlanner +{ async fn create_physical_plan( &self, logical_plan: &LogicalPlan, diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs index 99a97e2130..7f84a3b0df 100644 --- a/crates/core/src/delta_datafusion/schema_adapter.rs +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -13,14 +13,22 @@ use crate::operations::cast::cast_record_batch; pub(crate) struct DeltaSchemaAdapterFactory {} impl SchemaAdapterFactory for DeltaSchemaAdapterFactory { - fn create(&self, schema: SchemaRef) -> Box { + fn create( + &self, + projected_table_schema: SchemaRef, + table_schema: SchemaRef, + ) -> Box { Box::new(DeltaSchemaAdapter { - table_schema: schema, + projected_table_schema, + table_schema, }) } } pub(crate) struct DeltaSchemaAdapter { + /// The schema for the table, projected to include only the fields being output (projected) by + /// the this mapping. + projected_table_schema: SchemaRef, /// Schema for the table table_schema: SchemaRef, } diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 7dc58b5929..d9c4760835 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -135,7 +135,7 @@ impl DeleteBuilder { } } -#[derive(Clone)] +#[derive(Clone, Debug)] struct DeleteMetricExtensionPlanner {} #[async_trait] diff --git a/crates/core/src/operations/merge/barrier.rs b/crates/core/src/operations/merge/barrier.rs index 9084d721b7..09f58a6979 100644 --- a/crates/core/src/operations/merge/barrier.rs +++ b/crates/core/src/operations/merge/barrier.rs @@ -393,7 +393,7 @@ impl RecordBatchStream for MergeBarrierStream { } } -#[derive(Debug, Hash, Eq, PartialEq)] +#[derive(Debug, Hash, Eq, PartialEq, PartialOrd)] pub(crate) struct MergeBarrier { pub input: LogicalPlan, pub expr: Expr, diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index fe8d030464..6be8c264ba 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -581,7 +581,7 @@ pub struct MergeMetrics { /// Time taken to rewrite the matched files pub rewrite_time_ms: u64, } -#[derive(Clone)] +#[derive(Clone, Debug)] struct MergeMetricExtensionPlanner {} #[async_trait] diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 179f0db6d6..3cd9e8b80c 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -180,7 +180,7 @@ impl UpdateBuilder { } } -#[derive(Clone)] +#[derive(Clone, Debug)] struct UpdateMetricExtensionPlanner {} #[async_trait] From 2835a38ba83d6292d7fc73f7909617015027186a Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Sat, 14 Sep 2024 11:41:31 +0200 Subject: [PATCH 19/26] chore: bump kernel --- crates/core/src/kernel/scalars.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/kernel/scalars.rs b/crates/core/src/kernel/scalars.rs index a587fdc7cc..4218ba0190 100644 --- a/crates/core/src/kernel/scalars.rs +++ b/crates/core/src/kernel/scalars.rs @@ -1,5 +1,5 @@ //! Auxiliary methods for dealing with kernel scalars -use std::cmp::Ordering; +use std::{cmp::Ordering, fmt::Debug}; use arrow_array::Array; use arrow_schema::TimeUnit; From 7565e9dbc715389258fa1dadc39dbecbc615b31a Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Mon, 19 Aug 2024 20:31:27 +0000 Subject: [PATCH 20/26] fix: import DataType for the datafusion tests --- crates/core/src/delta_datafusion/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 4425b0ff6f..7e5e2f70ae 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -1777,7 +1777,7 @@ mod tests { use crate::operations::write::SchemaMode; use crate::writer::test_utils::get_delta_schema; use arrow::array::StructArray; - use arrow::datatypes::{Field, Schema}; + use arrow::datatypes::{DataType, Field, Schema}; use chrono::{TimeZone, Utc}; use datafusion::assert_batches_sorted_eq; use datafusion::datasource::physical_plan::ParquetExec; From b26670357080d1cf062c30226117b92bcb1a684e Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Thu, 19 Sep 2024 13:11:33 +0000 Subject: [PATCH 21/26] fix: address build failures with the newer kernel --- crates/core/src/delta_datafusion/mod.rs | 2 +- crates/core/src/kernel/scalars.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 7e5e2f70ae..4425b0ff6f 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -1777,7 +1777,7 @@ mod tests { use crate::operations::write::SchemaMode; use crate::writer::test_utils::get_delta_schema; use arrow::array::StructArray; - use arrow::datatypes::{DataType, Field, Schema}; + use arrow::datatypes::{Field, Schema}; use chrono::{TimeZone, Utc}; use datafusion::assert_batches_sorted_eq; use datafusion::datasource::physical_plan::ParquetExec; diff --git a/crates/core/src/kernel/scalars.rs b/crates/core/src/kernel/scalars.rs index 4218ba0190..a587fdc7cc 100644 --- a/crates/core/src/kernel/scalars.rs +++ b/crates/core/src/kernel/scalars.rs @@ -1,5 +1,5 @@ //! Auxiliary methods for dealing with kernel scalars -use std::{cmp::Ordering, fmt::Debug}; +use std::cmp::Ordering; use arrow_array::Array; use arrow_schema::TimeUnit; From 5b2f46b06e0eb508f932a8b39feb11b568a78a32 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Fri, 15 Nov 2024 12:02:57 -0500 Subject: [PATCH 22/26] chore: datafusion 43 updates, use projected schema in some places Signed-off-by: Stephen Carman --- .../core/src/data_catalog/unity/datafusion.rs | 3 ++ .../src/delta_datafusion/schema_adapter.rs | 6 ++- crates/core/src/lib.rs | 2 +- crates/sql/src/logical_plan.rs | 39 +++++++++++++++---- 4 files changed, 40 insertions(+), 10 deletions(-) diff --git a/crates/core/src/data_catalog/unity/datafusion.rs b/crates/core/src/data_catalog/unity/datafusion.rs index 44e7c9ca33..3e32a3ad68 100644 --- a/crates/core/src/data_catalog/unity/datafusion.rs +++ b/crates/core/src/data_catalog/unity/datafusion.rs @@ -17,6 +17,7 @@ use crate::data_catalog::models::ListSchemasResponse; use crate::DeltaTableBuilder; /// In-memory list of catalogs populated by unity catalog +#[derive(Debug)] pub struct UnityCatalogList { /// Collection of catalogs containing schemas and ultimately TableProviders pub catalogs: DashMap>, @@ -73,6 +74,7 @@ impl CatalogProviderList for UnityCatalogList { } /// A datafusion [`CatalogProvider`] backed by Databricks UnityCatalog +#[derive(Debug)] pub struct UnityCatalogProvider { /// Parent catalog for schemas of interest. pub schemas: DashMap>, @@ -124,6 +126,7 @@ impl CatalogProvider for UnityCatalogProvider { } /// A datafusion [`SchemaProvider`] backed by Databricks UnityCatalog +#[derive(Debug)] pub struct UnitySchemaProvider { /// UnityCatalog Api client client: Arc, diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs index 7f84a3b0df..5b85af9a60 100644 --- a/crates/core/src/delta_datafusion/schema_adapter.rs +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -27,7 +27,7 @@ impl SchemaAdapterFactory for DeltaSchemaAdapterFactory { pub(crate) struct DeltaSchemaAdapter { /// The schema for the table, projected to include only the fields being output (projected) by - /// the this mapping. + /// the mapping. projected_table_schema: SchemaRef, /// Schema for the table table_schema: SchemaRef, @@ -53,6 +53,7 @@ impl SchemaAdapter for DeltaSchemaAdapter { Ok(( Arc::new(SchemaMapping { + projected_schema: self.projected_table_schema.clone(), table_schema: self.table_schema.clone(), }), projection, @@ -62,12 +63,13 @@ impl SchemaAdapter for DeltaSchemaAdapter { #[derive(Debug)] pub(crate) struct SchemaMapping { + projected_schema: SchemaRef, table_schema: SchemaRef, } impl SchemaMapper for SchemaMapping { fn map_batch(&self, batch: RecordBatch) -> datafusion_common::Result { - let record_batch = cast_record_batch(&batch, self.table_schema.clone(), false, true)?; + let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?; Ok(record_batch) } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index fef4fce183..cc9bcd71b4 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -65,7 +65,7 @@ //! }; //! ``` -#![deny(missing_docs)] +// #![deny(missing_docs)] #![allow(rustdoc::invalid_html_tags)] #![allow(clippy::nonminimal_bool)] diff --git a/crates/sql/src/logical_plan.rs b/crates/sql/src/logical_plan.rs index 8ff7b90b9e..9f154c0204 100644 --- a/crates/sql/src/logical_plan.rs +++ b/crates/sql/src/logical_plan.rs @@ -1,3 +1,4 @@ +use std::cmp::Ordering; use std::fmt::{self, Debug, Display}; use std::sync::Arc; @@ -6,7 +7,7 @@ use datafusion_expr::logical_plan::LogicalPlan; use datafusion_expr::{Expr, UserDefinedLogicalNodeCore}; /// Delta Lake specific operations -#[derive(Clone, PartialEq, Eq, Hash)] +#[derive(Clone, PartialEq, Eq, Hash, PartialOrd)] pub enum DeltaStatement { /// Get provenance information, including the operation, /// user, and so on, for each write to a table. @@ -70,6 +71,10 @@ impl UserDefinedLogicalNodeCore for DeltaStatement { } } + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + fn schema(&self) -> &DFSchemaRef { match self { Self::Vacuum(Vacuum { schema, .. }) => schema, @@ -77,10 +82,6 @@ impl UserDefinedLogicalNodeCore for DeltaStatement { } } - fn inputs(&self) -> Vec<&LogicalPlan> { - vec![] - } - fn expressions(&self) -> Vec { vec![] } @@ -134,6 +135,12 @@ pub struct Vacuum { pub schema: DFSchemaRef, } +impl PartialOrd for Vacuum { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + impl Vacuum { pub fn new(table: TableReference, retention_hours: Option, dry_run: bool) -> Self { Self { @@ -152,10 +159,16 @@ impl Vacuum { pub struct DescribeHistory { /// A reference to the table pub table: TableReference, - /// Schema for commit provenence information + /// Schema for commit provenance information pub schema: DFSchemaRef, } +impl PartialOrd for DescribeHistory { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + impl DescribeHistory { pub fn new(table: TableReference) -> Self { Self { @@ -176,6 +189,12 @@ pub struct DescribeDetails { pub schema: DFSchemaRef, } +impl PartialOrd for DescribeDetails { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + impl DescribeDetails { pub fn new(table: TableReference) -> Self { Self { @@ -191,10 +210,16 @@ impl DescribeDetails { pub struct DescribeFiles { /// A reference to the table pub table: TableReference, - /// Schema for commit provenence information + /// Schema for commit provenance information pub schema: DFSchemaRef, } +impl PartialOrd for DescribeFiles { + fn partial_cmp(&self, _other: &Self) -> Option { + None + } +} + impl DescribeFiles { pub fn new(table: TableReference) -> Self { Self { From 25c6c6fb1728e1dcad3c848755be5da8b6964bd0 Mon Sep 17 00:00:00 2001 From: "R. Tyler Croy" Date: Sat, 16 Nov 2024 18:54:10 +0000 Subject: [PATCH 23/26] fix: disable datafusion optimizers which perform list field name erasure Today the make_array function from Datafusion uses "item" as the list element's field name. With recent changes in delta-kernel-rs we have switched to calling it "element" which is more conventional related to how Apache Parquet handles things This change introduces a test which helps isolate the behavior seen in Python tests within the core crate for easier regression testing Signed-off-by: R. Tyler Croy --- crates/core/src/delta_datafusion/expr.rs | 176 +++++++++++++++++++++-- crates/core/src/operations/cast/mod.rs | 34 ++++- crates/core/src/operations/update.rs | 92 +++++++++++- 3 files changed, 278 insertions(+), 24 deletions(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index eb542d98dd..33746bad2b 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -23,25 +23,171 @@ use std::fmt::{self, Display, Error, Formatter, Write}; use std::sync::Arc; -use arrow_schema::DataType; +use arrow_array::{Array, GenericListArray}; +use arrow_schema::{DataType, Field}; use chrono::{DateTime, NaiveDate}; use datafusion::execution::context::SessionState; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::execution::FunctionRegistry; +use datafusion::functions_array::make_array::MakeArray; use datafusion_common::Result as DFResult; use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference}; use datafusion_expr::expr::InList; use datafusion_expr::planner::ExprPlanner; use datafusion_expr::{AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource}; +// Needed for MakeParquetArray +use datafusion_expr::{ColumnarValue, Documentation, ScalarUDF, ScalarUDFImpl, Signature}; +use datafusion_functions::core::planner::CoreFunctionPlanner; use datafusion_sql::planner::{ContextProvider, SqlToRel}; use datafusion_sql::sqlparser::ast::escape_quoted_string; use datafusion_sql::sqlparser::dialect::GenericDialect; use datafusion_sql::sqlparser::parser::Parser; use datafusion_sql::sqlparser::tokenizer::Tokenizer; +use tracing::log::*; use super::DeltaParserOptions; use crate::{DeltaResult, DeltaTableError}; +/// This struct is like Datafusion's MakeArray but ensures that `element` is used rather than `item +/// as the field name within the list. +#[derive(Debug)] +struct MakeParquetArray { + /// The actual upstream UDF, which we're just totally cheating and using + actual: MakeArray, + /// Aliases for this UDF + aliases: Vec, +} + +impl MakeParquetArray { + pub fn new() -> Self { + let actual = MakeArray::default(); + let aliases = vec!["make_array".into(), "make_list".into()]; + Self { actual, aliases } + } +} + +impl ScalarUDFImpl for MakeParquetArray { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &str { + "make_parquet_array" + } + + fn signature(&self) -> &Signature { + self.actual.signature() + } + + fn return_type(&self, arg_types: &[DataType]) -> Result { + let r_type = match arg_types.len() { + 0 => Ok(DataType::List(Arc::new(Field::new( + "element", + DataType::Int32, + true, + )))), + _ => { + // At this point, all the type in array should be coerced to the same one + Ok(DataType::List(Arc::new(Field::new( + "element", + arg_types[0].to_owned(), + true, + )))) + } + }; + debug!("MakeParquetArray return_type -> {r_type:?}"); + r_type + } + + fn invoke(&self, args: &[ColumnarValue]) -> Result { + let mut data_type = DataType::Null; + for arg in args { + data_type = arg.data_type(); + } + + match self.actual.invoke(args)? { + ColumnarValue::Scalar(ScalarValue::List(df_array)) => { + let field = Arc::new(Field::new("element", data_type, true)); + let result = Ok(ColumnarValue::Scalar(ScalarValue::List(Arc::new( + GenericListArray::::try_new( + field, + df_array.offsets().clone(), + arrow_array::make_array(df_array.values().into_data()), + None, + )?, + )))); + debug!("MakeParquetArray;invoke returning: {result:?}"); + result + } + others => { + error!("Unexpected response inside MakeParquetArray! {others:?}"); + Ok(others) + } + } + } + + fn invoke_no_args(&self, number_rows: usize) -> Result { + self.actual.invoke_no_args(number_rows) + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + self.actual.coerce_types(arg_types) + } + + fn documentation(&self) -> Option<&Documentation> { + self.actual.documentation() + } +} + +use datafusion::functions_array::planner::{FieldAccessPlanner, NestedFunctionPlanner}; + +/// This exists becxause the NestedFunctionPlanner _not_ the UserDefinedFunctionPlanner handles the +/// insertion of "make_array" which is used to turn [100] into List +/// +/// **screaming intensifies** +#[derive(Debug)] +struct CustomNestedFunctionPlanner { + original: NestedFunctionPlanner, +} + +impl Default for CustomNestedFunctionPlanner { + fn default() -> Self { + Self { + original: NestedFunctionPlanner, + } + } +} + +use datafusion_expr::planner::{PlannerResult, RawBinaryExpr}; +impl ExprPlanner for CustomNestedFunctionPlanner { + fn plan_array_literal( + &self, + exprs: Vec, + _schema: &DFSchema, + ) -> Result>> { + let udf = Arc::new(ScalarUDF::from(MakeParquetArray::new())); + + Ok(PlannerResult::Planned(udf.call(exprs))) + } + fn plan_binary_op( + &self, + expr: RawBinaryExpr, + schema: &DFSchema, + ) -> Result> { + self.original.plan_binary_op(expr, schema) + } + fn plan_make_map(&self, args: Vec) -> Result>> { + self.original.plan_make_map(args) + } + fn plan_any(&self, expr: RawBinaryExpr) -> Result> { + self.original.plan_any(expr) + } +} + pub(crate) struct DeltaContextProvider<'a> { state: SessionState, /// Keeping this around just to make use of the 'a lifetime @@ -51,22 +197,22 @@ pub(crate) struct DeltaContextProvider<'a> { impl<'a> DeltaContextProvider<'a> { fn new(state: &'a SessionState) -> Self { - let planners = state.expr_planners(); + // default planners are [CoreFunctionPlanner, NestedFunctionPlanner, FieldAccessPlanner, + // UserDefinedFunctionPlanner] + let planners: Vec> = vec![ + Arc::new(CoreFunctionPlanner::default()), + Arc::new(CustomNestedFunctionPlanner::default()), + Arc::new(FieldAccessPlanner), + Arc::new(datafusion::functions::planner::UserDefinedFunctionPlanner), + ]; + // Disable the above for testing + //let planners = state.expr_planners(); + let new_state = SessionStateBuilder::new_from_existing(state.clone()) + .with_expr_planners(planners.clone()) + .build(); DeltaContextProvider { planners, - // Creating a new session state with overridden scalar_functions since - // the get_field() UDF was dropped from the default scalar functions upstream in - // `36660fe10d9c0cdff62e0da0b94bee28422d3419` - state: SessionStateBuilder::new_from_existing(state.clone()) - .with_scalar_functions( - state - .scalar_functions() - .values() - .cloned() - .chain(std::iter::once(datafusion::functions::core::get_field())) - .collect(), - ) - .build(), + state: new_state, _original: state, } } diff --git a/crates/core/src/operations/cast/mod.rs b/crates/core/src/operations/cast/mod.rs index 278cb2bbfa..a358515194 100644 --- a/crates/core/src/operations/cast/mod.rs +++ b/crates/core/src/operations/cast/mod.rs @@ -275,12 +275,12 @@ mod tests { fn test_merge_arrow_schema_with_nested() { let left_schema = Arc::new(Schema::new(vec![Field::new( "f", - DataType::LargeList(Arc::new(Field::new("item", DataType::Utf8, false))), + DataType::LargeList(Arc::new(Field::new("element", DataType::Utf8, false))), false, )])); let right_schema = Arc::new(Schema::new(vec![Field::new( "f", - DataType::List(Arc::new(Field::new("item", DataType::LargeUtf8, false))), + DataType::List(Arc::new(Field::new("element", DataType::LargeUtf8, false))), true, )])); @@ -306,7 +306,7 @@ mod tests { let fields = Fields::from(vec![Field::new_list( "list_column", - Field::new("item", DataType::Int8, false), + Field::new("element", DataType::Int8, false), false, )]); let target_schema = Arc::new(Schema::new(fields)) as SchemaRef; @@ -316,7 +316,7 @@ mod tests { let schema = result.unwrap().schema(); let field = schema.column_with_name("list_column").unwrap().1; if let DataType::List(list_item) = field.data_type() { - assert_eq!(list_item.name(), "item"); + assert_eq!(list_item.name(), "element"); } else { panic!("Not a list"); } @@ -343,12 +343,34 @@ mod tests { #[test] fn test_is_cast_required_with_list() { - let field1 = DataType::List(FieldRef::from(Field::new("item", DataType::Int32, false))); - let field2 = DataType::List(FieldRef::from(Field::new("item", DataType::Int32, false))); + let field1 = DataType::List(FieldRef::from(Field::new( + "element", + DataType::Int32, + false, + ))); + let field2 = DataType::List(FieldRef::from(Field::new( + "element", + DataType::Int32, + false, + ))); assert!(!is_cast_required(&field1, &field2)); } + /// Delta has adopted "element" as the default list field name rather than the previously used + /// "item". This lines up more with Apache Parquet but should be handled in casting + #[test] + fn test_is_cast_required_with_old_and_new_list() { + let field1 = DataType::List(FieldRef::from(Field::new( + "element", + DataType::Int32, + false, + ))); + let field2 = DataType::List(FieldRef::from(Field::new("item", DataType::Int32, false))); + + assert!(is_cast_required(&field1, &field2)); + } + #[test] fn test_is_cast_required_with_smol_int() { assert!(is_cast_required(&DataType::Int8, &DataType::Int32)); diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 3cd9e8b80c..f6752dd268 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -242,6 +242,21 @@ async fn execute( return Err(DeltaTableError::NotInitializedWithFiles("UPDATE".into())); } + // NOTE: The optimize_projections rule is being temporarily disabled because it errors with + // our schemas for Lists due to issues discussed + // [here](https://github.com/delta-io/delta-rs/pull/2886#issuecomment-2481550560> + let rules: Vec> = state + .optimizers() + .into_iter() + .filter(|rule| { + rule.name() != "optimize_projections" && rule.name() != "simplify_expressions" + }) + .cloned() + .collect(); + let state = SessionStateBuilder::from(state) + .with_optimizer_rules(rules) + .build(); + let update_planner = DeltaPlanner:: { extension_planner: UpdateMetricExtensionPlanner {}, }; @@ -323,7 +338,6 @@ async fn execute( enable_pushdown: false, }), }); - let df_with_predicate_and_metrics = DataFrame::new(state.clone(), plan_with_metrics); let expressions: Vec = df_with_predicate_and_metrics @@ -343,6 +357,8 @@ async fn execute( }) .collect::>>()?; + //let updated_df = df_with_predicate_and_metrics.clone(); + // Disabling the select allows the coerce test to pass, still not sure why let updated_df = df_with_predicate_and_metrics.select(expressions.clone())?; let physical_plan = updated_df.clone().create_physical_plan().await?; let writer_stats_config = WriterStatsConfig::new( @@ -1040,11 +1056,81 @@ mod tests { assert_eq!(table.version(), 1); // Completed the first creation/write - // Update + use arrow::array::{Int32Builder, ListBuilder}; + let mut new_items_builder = + ListBuilder::new(Int32Builder::new()).with_field(arrow_field.clone()); + new_items_builder.append_value([Some(100)]); + let new_items = ScalarValue::List(Arc::new(new_items_builder.finish())); + + let (table, _metrics) = DeltaOps(table) + .update() + .with_predicate(col("id").eq(lit(1))) + .with_update("items", lit(new_items)) + .await + .unwrap(); + assert_eq!(table.version(), 2); + } + + /// Lists coming in from the Python bindings need to be parsed as SQL expressions by the update + /// and therefore this test emulates their behavior to ensure that the lists are being turned + /// into expressions for the update operation correctly + #[tokio::test] + async fn test_update_with_array_that_must_be_coerced() { + let _ = pretty_env_logger::try_init(); + let schema = StructType::new(vec![ + StructField::new( + "id".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "temp".to_string(), + DeltaDataType::Primitive(PrimitiveType::Integer), + true, + ), + StructField::new( + "items".to_string(), + DeltaDataType::Array(Box::new(crate::kernel::ArrayType::new( + DeltaDataType::LONG, + true, + ))), + true, + ), + ]); + let arrow_schema: ArrowSchema = (&schema).try_into().unwrap(); + + // Create the first batch + let arrow_field = Field::new("element", DataType::Int64, true); + let list_array = ListArray::new_null(arrow_field.clone().into(), 2); + let batch = RecordBatch::try_new( + Arc::new(arrow_schema.clone()), + vec![ + Arc::new(Int32Array::from(vec![Some(0), Some(1)])), + Arc::new(Int32Array::from(vec![Some(30), Some(31)])), + Arc::new(list_array), + ], + ) + .expect("Failed to create record batch"); + let _ = arrow::util::pretty::print_batches(&[batch.clone()]); + + let table = DeltaOps::new_in_memory() + .create() + .with_columns(schema.fields().cloned()) + .await + .unwrap(); + assert_eq!(table.version(), 0); + + let table = DeltaOps(table) + .write(vec![batch]) + .await + .expect("Failed to write first batch"); + assert_eq!(table.version(), 1); + // Completed the first creation/write + let (table, _metrics) = DeltaOps(table) .update() .with_predicate(col("id").eq(lit(1))) - .with_update("items", make_array(vec![lit(100)])) + .with_update("items", "[100]".to_string()) .await .unwrap(); assert_eq!(table.version(), 2); From dfe340ae72e5a0a1924be4a1eb425bec05f235a1 Mon Sep 17 00:00:00 2001 From: Vikas Sharma Date: Mon, 18 Nov 2024 22:29:57 +0530 Subject: [PATCH 24/26] chore(docs): fix the verify table existence example in usage docs Signed-off-by: Vikas Sharma --- docs/usage/loading-table.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/usage/loading-table.md b/docs/usage/loading-table.md index f000adb6ee..b4109363e2 100644 --- a/docs/usage/loading-table.md +++ b/docs/usage/loading-table.md @@ -81,7 +81,7 @@ storage_options = { "AWS_SECRET_ACCESS_KEY": "THE_AWS_SECRET_ACCESS_KEY", ... } -DeltaTable.is_deltatable(bucket_table_path) +DeltaTable.is_deltatable(bucket_table_path, storage_options) # True ``` From 043443518cc51276a2d7363cb3fceadf8b3b52fa Mon Sep 17 00:00:00 2001 From: Vikas Sharma Date: Tue, 19 Nov 2024 02:14:02 +0530 Subject: [PATCH 25/26] remove unnecessary backslash Signed-off-by: Vikas Sharma --- docs/usage/loading-table.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/usage/loading-table.md b/docs/usage/loading-table.md index b4109363e2..6c99bf7f74 100644 --- a/docs/usage/loading-table.md +++ b/docs/usage/loading-table.md @@ -131,7 +131,7 @@ wish to load: >>> dt = DeltaTable("../rust/tests/data/simple_table", version=2) ``` -Once you\'ve loaded a table, you can also change versions using either a +Once you've loaded a table, you can also change versions using either a version number or datetime string: ```python From eff5735698279c12ae4a3aac2afa268d168242b2 Mon Sep 17 00:00:00 2001 From: Marko Grujic Date: Mon, 4 Nov 2024 13:31:34 +0100 Subject: [PATCH 26/26] : decimal stat rounding overflow Signed-off-by: Marko Grujic --- crates/core/src/writer/stats.rs | 36 ++++++++++++++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/crates/core/src/writer/stats.rs b/crates/core/src/writer/stats.rs index c1f0363083..ae763f7b72 100644 --- a/crates/core/src/writer/stats.rs +++ b/crates/core/src/writer/stats.rs @@ -343,7 +343,15 @@ impl StatsScalar { }); }; - let val = val / 10.0_f64.powi(*scale); + let mut val = val / 10.0_f64.powi(*scale); + + if val.is_normal() { + if (val.trunc() as i128).to_string().len() > (precision - scale) as usize { + // For normal values with integer parts that get rounded to a number beyond + // the precision - scale range take the next smaller (by magnitude) value + val = f64::from_bits(val.to_bits() - 1); + } + } Ok(Self::Decimal(val)) } (Statistics::FixedLenByteArray(v), Some(LogicalType::Uuid)) => { @@ -740,6 +748,32 @@ mod tests { }), Value::from(10.0), ), + ( + simple_parquet_stat!( + Statistics::FixedLenByteArray, + FixedLenByteArray::from(vec![ + 75, 59, 76, 168, 90, 134, 196, 122, 9, 138, 34, 63, 255, 255, 255, 255 + ]) + ), + Some(LogicalType::Decimal { + scale: 6, + precision: 38, + }), + Value::from(9.999999999999999e31), + ), + ( + simple_parquet_stat!( + Statistics::FixedLenByteArray, + FixedLenByteArray::from(vec![ + 180, 196, 179, 87, 165, 121, 59, 133, 246, 117, 221, 192, 0, 0, 0, 1 + ]) + ), + Some(LogicalType::Decimal { + scale: 6, + precision: 38, + }), + Value::from(-9.999999999999999e31), + ), ( simple_parquet_stat!( Statistics::FixedLenByteArray,