Skip to content

Commit

Permalink
feat: allow configurable number of commit attempts
Browse files Browse the repository at this point in the history
This adds `transaction::commit_with_retries` function where
the number of attempts can be specified. The default behavior for
`transaction::commit` remains 15.

Closes #1595.
  • Loading branch information
cmackenzie1 authored and rtyler committed Aug 28, 2023
1 parent 8d5a61c commit 42e7123
Showing 1 changed file with 19 additions and 5 deletions.
24 changes: 19 additions & 5 deletions rust/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub(crate) async fn try_commit_transaction(
Ok(version)
}

/// Commit a transaction, with up to 5 retries. This is low-level transaction API.
/// Commit a transaction, with up to 15 retries. This is higher-level transaction API.
///
/// Will error early if the a concurrent transaction has already been committed
/// and conflicts with this transaction.
Expand All @@ -161,14 +161,28 @@ pub async fn commit(
operation: DeltaOperation,
read_snapshot: &DeltaTableState,
app_metadata: Option<Map<String, Value>>,
) -> DeltaResult<i64> {
commit_with_retries(storage, actions, operation, read_snapshot, app_metadata, 15).await
}

/// Commit a transaction, with up configurable number of retries. This is higher-level transaction API.
///
/// The function will error early if the a concurrent transaction has already been committed
/// and conflicts with this transaction.
pub async fn commit_with_retries(
storage: &dyn ObjectStore,
actions: &Vec<Action>,
operation: DeltaOperation,
read_snapshot: &DeltaTableState,
app_metadata: Option<Map<String, Value>>,
max_retries: usize,
) -> DeltaResult<i64> {
let tmp_commit = prepare_commit(storage, &operation, actions, app_metadata).await?;

let max_attempts = 15;
let mut attempt_number = 1;

while attempt_number <= max_attempts {
let version = read_snapshot.version() + attempt_number;
while attempt_number <= max_retries {
let version = read_snapshot.version() + attempt_number as i64;
match try_commit_transaction(storage, &tmp_commit, version).await {
Ok(version) => return Ok(version),
Err(TransactionError::VersionAlreadyExists(version)) => {
Expand Down Expand Up @@ -199,7 +213,7 @@ pub async fn commit(
}
}

Err(TransactionError::MaxCommitAttempts(max_attempts as i32).into())
Err(TransactionError::MaxCommitAttempts(max_retries as i32).into())
}

#[cfg(all(test, feature = "parquet"))]
Expand Down

0 comments on commit 42e7123

Please sign in to comment.