Skip to content

Commit

Permalink
Wrap DeltaTransactionError with DeltaTableError. (#374)
Browse files Browse the repository at this point in the history
  • Loading branch information
zijie0 authored Aug 12, 2021
1 parent e457ed1 commit baf23ec
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 30 deletions.
48 changes: 25 additions & 23 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ pub enum DeltaTableError {
/// Generic Delta Table error
#[error("Generic DeltaTable error: {0}")]
Generic(String),

/// Error that wraps an underlying DeltaTransaction error.
/// The wrapped error describes the specific cause.
#[error("Delta transaction failed: {source}")]
TransactionError {
/// The wrapped DeltaTransaction error.
#[from]
source: DeltaTransactionError,
},
}

/// Delta table metadata
Expand Down Expand Up @@ -1036,12 +1045,13 @@ impl DeltaTable {
&mut self,
commit: &PreparedCommit,
version: DeltaDataTypeVersion,
) -> Result<DeltaDataTypeVersion, DeltaTransactionError> {
) -> Result<DeltaDataTypeVersion, DeltaTableError> {
// move temporary commit file to delta log directory
// rely on storage to fail if the file already exists -
self.storage
.rename_obj(&commit.uri, &self.commit_uri_from_version(version))
.await?;
.await
.map_err(|e| DeltaTableError::from(DeltaTransactionError::from(e)))?;

// NOTE: since we have the log entry in memory already,
// we could optimize this further by merging the log entry instead of updating from storage.
Expand Down Expand Up @@ -1076,7 +1086,7 @@ impl DeltaTable {
&mut self,
metadata: DeltaTableMetaData,
protocol: action::Protocol,
) -> Result<(), DeltaTransactionError> {
) -> Result<(), DeltaTableError> {
let meta = action::MetaData::try_from(metadata)?;

// TODO add commit info action
Expand Down Expand Up @@ -1188,15 +1198,6 @@ pub enum DeltaTransactionError {
source: StorageError,
},

/// Error that wraps an underlying DeltaTable error.
/// The wrapped error describes the specific cause.
#[error("DeltaTable interaction failed: {source}")]
DeltaTable {
/// The wrapped DeltaTable error.
#[from]
source: DeltaTableError,
},

/// Error caused by a problem while using serde_json to serialize an action.
#[error("Action serialization failed: {source}")]
ActionSerializationFailed {
Expand Down Expand Up @@ -1364,7 +1365,7 @@ impl<'a> DeltaTransaction<'a> {
pub async fn commit(
&mut self,
_operation: Option<DeltaOperation>,
) -> Result<DeltaDataTypeVersion, DeltaTransactionError> {
) -> Result<DeltaDataTypeVersion, DeltaTableError> {
// TODO: stubbing `operation` parameter (which will be necessary for writing the CommitInfo action), but leaving it unused for now.
// `CommitInfo` is a fairly dynamic data structure so we should work out the data structure approach separately.

Expand Down Expand Up @@ -1421,7 +1422,7 @@ impl<'a> DeltaTransaction<'a> {
async fn try_commit_loop(
&mut self,
commit: &PreparedCommit,
) -> Result<DeltaDataTypeVersion, DeltaTransactionError> {
) -> Result<DeltaDataTypeVersion, DeltaTableError> {
let mut attempt_number: u32 = 0;
loop {
self.delta_table.update_incremental().await?;
Expand All @@ -1438,15 +1439,16 @@ impl<'a> DeltaTransaction<'a> {
}
Err(e) => {
match e {
DeltaTransactionError::VersionAlreadyExists { .. }
if attempt_number > self.options.max_retry_commit_attempts + 1 =>
{
debug!("Transaction attempt failed. Attempts exhausted beyond max_retry_commit_attempts of {} so failing.", self.options.max_retry_commit_attempts);
return Err(e);
}
DeltaTransactionError::VersionAlreadyExists { .. } => {
attempt_number += 1;
debug!("Transaction attempt failed. Incrementing attempt number to {} and retrying.", attempt_number);
DeltaTableError::TransactionError {
source: DeltaTransactionError::VersionAlreadyExists { .. },
} => {
if attempt_number > self.options.max_retry_commit_attempts + 1 {
debug!("Transaction attempt failed. Attempts exhausted beyond max_retry_commit_attempts of {} so failing.", self.options.max_retry_commit_attempts);
return Err(e);
} else {
attempt_number += 1;
debug!("Transaction attempt failed. Incrementing attempt number to {} and retrying.", attempt_number);
}
}
// NOTE: Add other retryable errors as needed here
_ => {
Expand Down
4 changes: 2 additions & 2 deletions rust/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! parquet files
use crate::action::Txn;
use crate::{DeltaTableError, DeltaTransactionError};
use crate::DeltaTableError;
use arrow::record_batch::RecordBatch;
use log::*;
use parquet::arrow::ArrowWriter;
Expand Down Expand Up @@ -91,7 +91,7 @@ impl BufferedJsonWriter {
/// as well as any buffered txn actions
///
/// This will create a single transaction in the delta transaction log
pub async fn flush(&mut self) -> Result<(), DeltaTransactionError> {
pub async fn flush(&mut self) -> Result<(), DeltaTableError> {
use arrow::json::reader::Decoder;

let mut parquet_bufs = vec![];
Expand Down
17 changes: 12 additions & 5 deletions rust/tests/simple_commit_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod s3_common;
#[allow(dead_code)]
mod fs_common;

use deltalake::{action, DeltaTransactionError};
use deltalake::{action, DeltaTableError, DeltaTransactionError};
use std::collections::HashMap;

use serial_test::serial;
Expand Down Expand Up @@ -38,7 +38,10 @@ mod simple_commit_s3 {
std::env::set_var("AWS_S3_LOCKING_PROVIDER", "none ");

let result = test_two_commits(path).await;
if let Err(DeltaTransactionError::Storage { ref source }) = result {
if let Err(DeltaTableError::TransactionError {
source: DeltaTransactionError::Storage { ref source },
}) = result
{
if let StorageError::S3Generic(err) = source {
assert_eq!(err, "dynamodb locking is not enabled");
return;
Expand Down Expand Up @@ -114,7 +117,9 @@ mod simple_commit_fs {
let result = table.try_commit_transaction(&commit, 1).await;

match result {
Err(deltalake::DeltaTransactionError::VersionAlreadyExists { .. }) => {
Err(deltalake::DeltaTableError::TransactionError {
source: DeltaTransactionError::VersionAlreadyExists { .. },
}) => {
assert!(true, "Delta version already exists.");
}
_ => {
Expand Down Expand Up @@ -158,7 +163,9 @@ mod simple_commit_fs {
Ok(_) => {
break;
}
Err(DeltaTransactionError::VersionAlreadyExists { .. }) => {
Err(DeltaTableError::TransactionError {
source: DeltaTransactionError::VersionAlreadyExists { .. },
}) => {
attempt += 1;
}
Err(e) => {
Expand All @@ -180,7 +187,7 @@ mod simple_commit_fs {
}
}

async fn test_two_commits(table_path: &str) -> Result<(), DeltaTransactionError> {
async fn test_two_commits(table_path: &str) -> Result<(), DeltaTableError> {
let mut table = deltalake::open_table(table_path).await?;

assert_eq!(0, table.version);
Expand Down

0 comments on commit baf23ec

Please sign in to comment.