Skip to content

Commit

Permalink
Merge branch 'main' into minor-mount-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored May 5, 2024
2 parents e00f5a0 + e7af965 commit e39463a
Show file tree
Hide file tree
Showing 11 changed files with 211 additions and 15 deletions.
2 changes: 1 addition & 1 deletion crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.1.1"
version = "0.1.2"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
31 changes: 29 additions & 2 deletions crates/aws/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use aws_credential_types::provider::error::CredentialsError;
use aws_sdk_dynamodb::{
error::SdkError,
operation::{
create_table::CreateTableError, get_item::GetItemError, put_item::PutItemError,
query::QueryError, update_item::UpdateItemError,
create_table::CreateTableError, delete_item::DeleteItemError, get_item::GetItemError,
put_item::PutItemError, query::QueryError, update_item::UpdateItemError,
},
};
use aws_smithy_runtime_api::client::result::ServiceError;
Expand Down Expand Up @@ -89,6 +89,9 @@ pub enum LockClientError {
to opt out of support for concurrent writers."
)]
LockClientRequired,

#[error("Log entry for table '{table_path}' and version '{version}' is already complete")]
VersionAlreadyCompleted { table_path: String, version: i64 },
}

impl From<GetItemError> for LockClientError {
Expand Down Expand Up @@ -164,7 +167,31 @@ impl From<UpdateItemError> for LockClientError {
}
}

impl From<DeleteItemError> for LockClientError {
fn from(err: DeleteItemError) -> Self {
match err {
DeleteItemError::ConditionalCheckFailedException(_) => {
unreachable!("error must be handled explicitly")
}
DeleteItemError::InternalServerError(_) => err.into(),
DeleteItemError::ProvisionedThroughputExceededException(_) => {
LockClientError::ProvisionedThroughputExceeded
}
DeleteItemError::RequestLimitExceeded(_) => {
LockClientError::ProvisionedThroughputExceeded
}
DeleteItemError::ResourceNotFoundException(_) => LockClientError::LockTableNotFound,
DeleteItemError::ItemCollectionSizeLimitExceededException(_) => err.into(),
DeleteItemError::TransactionConflictException(_) => err.into(),
_ => LockClientError::GenericDynamoDb {
source: Box::new(err),
},
}
}
}

impl_from_service_error!(GetItemError);
impl_from_service_error!(PutItemError);
impl_from_service_error!(QueryError);
impl_from_service_error!(UpdateItemError);
impl_from_service_error!(DeleteItemError);
45 changes: 43 additions & 2 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ pub mod storage;
use aws_config::SdkConfig;
use aws_sdk_dynamodb::{
operation::{
create_table::CreateTableError, get_item::GetItemError, put_item::PutItemError,
query::QueryError, update_item::UpdateItemError,
create_table::CreateTableError, delete_item::DeleteItemError, get_item::GetItemError,
put_item::PutItemError, query::QueryError, update_item::UpdateItemError,
},
types::{
AttributeDefinition, AttributeValue, BillingMode, KeySchemaElement, KeyType,
Expand Down Expand Up @@ -416,6 +416,43 @@ impl DynamoDbLockClient {
.await
}

/// Delete existing log entry if it is not already complete
pub async fn delete_commit_entry(
&self,
version: i64,
table_path: &str,
) -> Result<(), LockClientError> {
self.retry(|| async {
match self
.dynamodb_client
.delete_item()
.table_name(self.get_lock_table_name())
.set_key(Some(self.get_primary_key(version, table_path)))
.set_expression_attribute_values(Some(maplit::hashmap! {
":f".into() => string_attr("false"),
}))
.condition_expression(constants::CONDITION_DELETE_INCOMPLETE.as_str())
.send()
.await
{
Ok(_) => Ok(()),
Err(err) => match err.as_service_error() {
Some(DeleteItemError::ProvisionedThroughputExceededException(_)) => Err(
backoff::Error::transient(LockClientError::ProvisionedThroughputExceeded),
),
Some(DeleteItemError::ConditionalCheckFailedException(_)) => Err(
backoff::Error::permanent(LockClientError::VersionAlreadyCompleted {
table_path: table_path.to_owned(),
version,
}),
),
_ => Err(backoff::Error::permanent(err.into())),
},
}
})
.await
}

async fn retry<I, E, Fn, Fut>(&self, operation: Fn) -> Result<I, E>
where
Fn: FnMut() -> Fut,
Expand Down Expand Up @@ -553,6 +590,10 @@ pub mod constants {
pub static ref CONDITION_EXPR_CREATE: String = format!(
"attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME})"
);

pub static ref CONDITION_DELETE_INCOMPLETE: String = format!(
"(complete = :f) or (attribute_not_exists({ATTR_TABLE_PATH}) and attribute_not_exists({ATTR_FILE_NAME}))"
);
}

pub const CONDITION_UPDATE_INCOMPLETE: &str = "complete = :f";
Expand Down
30 changes: 30 additions & 0 deletions crates/aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,36 @@ impl LogStore for S3DynamoDbLogStore {
Ok(())
}

/// Tries to abort an entry by first deleting the commit log entry, then deleting the temp commit file
async fn abort_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
self.lock_client
.delete_commit_entry(version, &self.table_path)
.await
.map_err(|err| match err {
LockClientError::ProvisionedThroughputExceeded => todo!(
"deltalake-aws does not yet handle DynamoDB provisioned throughput errors"
),
LockClientError::VersionAlreadyCompleted { version, .. } => {
error!("Trying to abort a completed commit");
TransactionError::LogStoreError {
msg: format!("trying to abort a completed log entry: {}", version),
source: Box::new(err),
}
}
err => TransactionError::LogStoreError {
msg: "dynamodb client failed to delete log entry".to_owned(),
source: Box::new(err),
},
})?;

abort_commit_entry(&self.storage, version, tmp_commit).await?;
Ok(())
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
debug!("Retrieving latest version of {self:?} at v{current_version}");
let entry = self
Expand Down
2 changes: 1 addition & 1 deletion crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
.0
.contains_key(AmazonS3ConfigKey::CopyIfNotExists.as_ref())
{
Ok((Arc::from(store), prefix))
Ok((store, prefix))
} else {
let s3_options = S3StorageOptions::from_map(&options.0)?;

Expand Down
76 changes: 75 additions & 1 deletion crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use deltalake_core::protocol::{DeltaOperation, SaveMode};
use deltalake_core::storage::commit_uri_from_version;
use deltalake_core::storage::StorageOptions;
use deltalake_core::table::builder::ensure_table_uri;
use deltalake_core::{DeltaOps, DeltaTable, DeltaTableBuilder};
use deltalake_core::{DeltaOps, DeltaTable, DeltaTableBuilder, ObjectStoreError};
use deltalake_test::utils::*;
use lazy_static::lazy_static;
use object_store::path::Path;
Expand Down Expand Up @@ -182,6 +182,80 @@ async fn test_repair_on_load() -> TestResult<()> {
Ok(())
}

#[tokio::test]
#[serial]
async fn test_abort_commit_entry() -> TestResult<()> {
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let client = make_client()?;
let table = prepare_table(&context, "abort_entry").await?;
let options: StorageOptions = OPTIONS.clone().into();
let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new(
ensure_table_uri(table.table_uri())?,
options.clone(),
&S3_OPTIONS,
std::sync::Arc::new(table.object_store()),
)?;

let entry = create_incomplete_commit_entry(&table, 1, "unfinished_commit").await?;

log_store
.abort_commit_entry(entry.version, &entry.temp_path)
.await?;

// The entry should have been aborted - the latest entry should be one version lower
if let Some(new_entry) = client.get_latest_entry(&table.table_uri()).await? {
assert_eq!(entry.version - 1, new_entry.version);
}
// Temp commit file should have been deleted
assert!(matches!(
log_store.object_store().get(&entry.temp_path).await,
Err(ObjectStoreError::NotFound { .. })
));

// Test abort commit is idempotent - still works if already aborted
log_store
.abort_commit_entry(entry.version, &entry.temp_path)
.await?;

Ok(())
}

#[tokio::test]
#[serial]
async fn test_abort_commit_entry_fail_to_delete_entry() -> TestResult<()> {
// Test abort commit does not delete the temp commit if the DynamoDB entry is not deleted
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let client = make_client()?;
let table = prepare_table(&context, "abort_entry_fail").await?;
let options: StorageOptions = OPTIONS.clone().into();
let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new(
ensure_table_uri(table.table_uri())?,
options.clone(),
&S3_OPTIONS,
std::sync::Arc::new(table.object_store()),
)?;

let entry = create_incomplete_commit_entry(&table, 1, "finished_commit").await?;

// Mark entry as complete
client
.update_commit_entry(entry.version, &table.table_uri())
.await?;

// Abort will fail since we marked the entry as complete
assert!(matches!(
log_store
.abort_commit_entry(entry.version, &entry.temp_path)
.await,
Err(_),
));

// Check temp commit file still exists
assert!(log_store.object_store().get(&entry.temp_path).await.is_ok());

Ok(())
}

const WORKERS: i64 = 3;
const COMMITS: i64 = 15;

Expand Down
8 changes: 8 additions & 0 deletions crates/core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ impl LogStore for DefaultLogStore {
super::write_commit_entry(self.storage.as_ref(), version, tmp_commit).await
}

async fn abort_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
super::abort_commit_entry(self.storage.as_ref(), version, tmp_commit).await
}

async fn get_latest_version(&self, current_version: i64) -> DeltaResult<i64> {
super::get_latest_version(self, current_version).await
}
Expand Down
21 changes: 20 additions & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use crate::{
kernel::Action,
operations::transaction::TransactionError,
protocol::{get_last_checkpoint, ProtocolError},
storage::{commit_uri_from_version, ObjectStoreRef, StorageOptions},
storage::{
commit_uri_from_version, retry_ext::ObjectStoreRetryExt, ObjectStoreRef, StorageOptions,
},
DeltaTableError,
};
use bytes::Bytes;
Expand Down Expand Up @@ -183,6 +185,13 @@ pub trait LogStore: Sync + Send {
tmp_commit: &Path,
) -> Result<(), TransactionError>;

/// Abort the commit entry for the given version.
async fn abort_commit_entry(
&self,
version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError>;

/// Find latest version currently stored in the delta log.
async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;

Expand Down Expand Up @@ -449,6 +458,16 @@ pub async fn write_commit_entry(
Ok(())
}

/// Default implementation for aborting a commit entry
pub async fn abort_commit_entry(
storage: &dyn ObjectStore,
_version: i64,
tmp_commit: &Path,
) -> Result<(), TransactionError> {
storage.delete_with_retries(tmp_commit, 15).await?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ async fn execute(
return Err(err.into());
}
Err(err) => {
log_store.object_store().delete(commit).await?;
log_store.abort_commit_entry(commit_version, commit).await?;
return Err(err.into());
}
}
Expand Down
7 changes: 2 additions & 5 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ use crate::kernel::{
};
use crate::logstore::LogStoreRef;
use crate::protocol::DeltaOperation;
use crate::storage::ObjectStoreRetryExt;
use crate::table::config::TableConfig;
use crate::table::state::DeltaTableState;
use crate::{crate_version, DeltaResult};
Expand Down Expand Up @@ -554,17 +553,15 @@ impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
}
Err(err) => {
this.log_store
.object_store()
.delete_with_retries(tmp_commit, 15)
.abort_commit_entry(version, tmp_commit)
.await?;
return Err(TransactionError::CommitConflict(err).into());
}
};
}
Err(err) => {
this.log_store
.object_store()
.delete_with_retries(tmp_commit, 15)
.abort_commit_entry(version, tmp_commit)
.await?;
return Err(err.into());
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/storage/retry_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,4 @@ pub trait ObjectStoreRetryExt: ObjectStore {
}
}

impl<T: ObjectStore> ObjectStoreRetryExt for T {}
impl<T: ObjectStore + ?Sized> ObjectStoreRetryExt for T {}

0 comments on commit e39463a

Please sign in to comment.