From baf23ecce08165fac433367fe50842c4ffda6f3b Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Thu, 12 Aug 2021 13:27:24 +0800 Subject: [PATCH] Wrap DeltaTransactionError with DeltaTableError. (#374) --- rust/src/delta.rs | 48 +++++++++++++++++--------------- rust/src/writer.rs | 4 +-- rust/tests/simple_commit_test.rs | 17 +++++++---- 3 files changed, 39 insertions(+), 30 deletions(-) diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 0fa22c7937..c99e75713f 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -184,6 +184,15 @@ pub enum DeltaTableError { /// Generic Delta Table error #[error("Generic DeltaTable error: {0}")] Generic(String), + + /// Error that wraps an underlying DeltaTransaction error. + /// The wrapped error describes the specific cause. + #[error("Delta transaction failed: {source}")] + TransactionError { + /// The wrapped DeltaTransaction error. + #[from] + source: DeltaTransactionError, + }, } /// Delta table metadata @@ -1036,12 +1045,13 @@ impl DeltaTable { &mut self, commit: &PreparedCommit, version: DeltaDataTypeVersion, - ) -> Result { + ) -> Result { // move temporary commit file to delta log directory // rely on storage to fail if the file already exists - self.storage .rename_obj(&commit.uri, &self.commit_uri_from_version(version)) - .await?; + .await + .map_err(|e| DeltaTableError::from(DeltaTransactionError::from(e)))?; // NOTE: since we have the log entry in memory already, // we could optimize this further by merging the log entry instead of updating from storage. @@ -1076,7 +1086,7 @@ impl DeltaTable { &mut self, metadata: DeltaTableMetaData, protocol: action::Protocol, - ) -> Result<(), DeltaTransactionError> { + ) -> Result<(), DeltaTableError> { let meta = action::MetaData::try_from(metadata)?; // TODO add commit info action @@ -1188,15 +1198,6 @@ pub enum DeltaTransactionError { source: StorageError, }, - /// Error that wraps an underlying DeltaTable error. - /// The wrapped error describes the specific cause. - #[error("DeltaTable interaction failed: {source}")] - DeltaTable { - /// The wrapped DeltaTable error. - #[from] - source: DeltaTableError, - }, - /// Error caused by a problem while using serde_json to serialize an action. #[error("Action serialization failed: {source}")] ActionSerializationFailed { @@ -1364,7 +1365,7 @@ impl<'a> DeltaTransaction<'a> { pub async fn commit( &mut self, _operation: Option, - ) -> Result { + ) -> Result { // TODO: stubbing `operation` parameter (which will be necessary for writing the CommitInfo action), but leaving it unused for now. // `CommitInfo` is a fairly dynamic data structure so we should work out the data structure approach separately. @@ -1421,7 +1422,7 @@ impl<'a> DeltaTransaction<'a> { async fn try_commit_loop( &mut self, commit: &PreparedCommit, - ) -> Result { + ) -> Result { let mut attempt_number: u32 = 0; loop { self.delta_table.update_incremental().await?; @@ -1438,15 +1439,16 @@ impl<'a> DeltaTransaction<'a> { } Err(e) => { match e { - DeltaTransactionError::VersionAlreadyExists { .. } - if attempt_number > self.options.max_retry_commit_attempts + 1 => - { - debug!("Transaction attempt failed. Attempts exhausted beyond max_retry_commit_attempts of {} so failing.", self.options.max_retry_commit_attempts); - return Err(e); - } - DeltaTransactionError::VersionAlreadyExists { .. } => { - attempt_number += 1; - debug!("Transaction attempt failed. Incrementing attempt number to {} and retrying.", attempt_number); + DeltaTableError::TransactionError { + source: DeltaTransactionError::VersionAlreadyExists { .. }, + } => { + if attempt_number > self.options.max_retry_commit_attempts + 1 { + debug!("Transaction attempt failed. Attempts exhausted beyond max_retry_commit_attempts of {} so failing.", self.options.max_retry_commit_attempts); + return Err(e); + } else { + attempt_number += 1; + debug!("Transaction attempt failed. Incrementing attempt number to {} and retrying.", attempt_number); + } } // NOTE: Add other retryable errors as needed here _ => { diff --git a/rust/src/writer.rs b/rust/src/writer.rs index 98fc8486a5..9310e617cb 100644 --- a/rust/src/writer.rs +++ b/rust/src/writer.rs @@ -5,7 +5,7 @@ //! parquet files use crate::action::Txn; -use crate::{DeltaTableError, DeltaTransactionError}; +use crate::DeltaTableError; use arrow::record_batch::RecordBatch; use log::*; use parquet::arrow::ArrowWriter; @@ -91,7 +91,7 @@ impl BufferedJsonWriter { /// as well as any buffered txn actions /// /// This will create a single transaction in the delta transaction log - pub async fn flush(&mut self) -> Result<(), DeltaTransactionError> { + pub async fn flush(&mut self) -> Result<(), DeltaTableError> { use arrow::json::reader::Decoder; let mut parquet_bufs = vec![]; diff --git a/rust/tests/simple_commit_test.rs b/rust/tests/simple_commit_test.rs index dcdd20e662..fec2c53cee 100644 --- a/rust/tests/simple_commit_test.rs +++ b/rust/tests/simple_commit_test.rs @@ -9,7 +9,7 @@ mod s3_common; #[allow(dead_code)] mod fs_common; -use deltalake::{action, DeltaTransactionError}; +use deltalake::{action, DeltaTableError, DeltaTransactionError}; use std::collections::HashMap; use serial_test::serial; @@ -38,7 +38,10 @@ mod simple_commit_s3 { std::env::set_var("AWS_S3_LOCKING_PROVIDER", "none "); let result = test_two_commits(path).await; - if let Err(DeltaTransactionError::Storage { ref source }) = result { + if let Err(DeltaTableError::TransactionError { + source: DeltaTransactionError::Storage { ref source }, + }) = result + { if let StorageError::S3Generic(err) = source { assert_eq!(err, "dynamodb locking is not enabled"); return; @@ -114,7 +117,9 @@ mod simple_commit_fs { let result = table.try_commit_transaction(&commit, 1).await; match result { - Err(deltalake::DeltaTransactionError::VersionAlreadyExists { .. }) => { + Err(deltalake::DeltaTableError::TransactionError { + source: DeltaTransactionError::VersionAlreadyExists { .. }, + }) => { assert!(true, "Delta version already exists."); } _ => { @@ -158,7 +163,9 @@ mod simple_commit_fs { Ok(_) => { break; } - Err(DeltaTransactionError::VersionAlreadyExists { .. }) => { + Err(DeltaTableError::TransactionError { + source: DeltaTransactionError::VersionAlreadyExists { .. }, + }) => { attempt += 1; } Err(e) => { @@ -180,7 +187,7 @@ mod simple_commit_fs { } } -async fn test_two_commits(table_path: &str) -> Result<(), DeltaTransactionError> { +async fn test_two_commits(table_path: &str) -> Result<(), DeltaTableError> { let mut table = deltalake::open_table(table_path).await?; assert_eq!(0, table.version);