Skip to content

Commit

Permalink
Implement delete_objs in fs and s3 storage backends. (#395)
Browse files Browse the repository at this point in the history
* Implement delete_objs in fs ans s3 storage backends.
* Trigger s3 test in workflow.
* Use delete_objs in vacuum.
* Add bucket consistency check.
* Batch delete in chunk.
* Add default implementation for `delete_objs`.

Co-authored-by: Daniël Heres <danielheres@gmail.com>
  • Loading branch information
zijie0 and Dandandan authored Aug 20, 2021
1 parent 0c6109a commit 842d526
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 17 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,4 @@ jobs:
cargo test s3 --test concurrent_writes_test --features s3
cargo test --test dynamodb_lock_test --features s3
cargo test --test repair_s3_rename_test --features s3
cargo test test_s3_delete_objs --test s3_test --features s3
19 changes: 7 additions & 12 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -989,19 +989,14 @@ impl DeltaTable {
return Ok(files_to_delete);
}

for rel_path in &files_to_delete {
match self
.storage
.delete_obj(&self.storage.join_path(&self.table_uri, rel_path))
.await
{
Ok(_) => continue,
Err(StorageError::NotFound) => continue,
Err(err) => return Err(DeltaTableError::StorageError { source: err }),
}
let paths = &files_to_delete
.iter()
.map(|rel_path| self.storage.join_path(&self.table_uri, rel_path))
.collect::<Vec<_>>();
match self.storage.delete_objs(paths).await {
Ok(_) => Ok(files_to_delete),
Err(err) => Err(DeltaTableError::StorageError { source: err }),
}

Ok(files_to_delete)
}

/// Return table schema parsed from transaction log. Return None if table hasn't been loaded or
Expand Down
24 changes: 24 additions & 0 deletions rust/src/storage/file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,30 @@ mod tests {
assert_eq!(fs::metadata(path).await.is_ok(), false)
}

#[tokio::test]
async fn delete_objs() {
let tmp_dir = tempdir::TempDir::new("delete_test").unwrap();
let tmp_file_path1 = tmp_dir.path().join("tmp_file1");
let tmp_file_path2 = tmp_dir.path().join("tmp_file2");
let backend = FileStorageBackend::new(tmp_dir.path().to_str().unwrap());

// put object
let path1 = tmp_file_path1.to_str().unwrap();
let path2 = tmp_file_path2.to_str().unwrap();
backend.put_obj(path1, &[]).await.unwrap();
backend.put_obj(path2, &[]).await.unwrap();
assert_eq!(fs::metadata(path1).await.is_ok(), true);
assert_eq!(fs::metadata(path2).await.is_ok(), true);

// delete object
backend
.delete_objs(&[path1.to_string(), path2.to_string()])
.await
.unwrap();
assert_eq!(fs::metadata(path1).await.is_ok(), false);
assert_eq!(fs::metadata(path2).await.is_ok(), false)
}

#[test]
fn join_multiple_paths() {
let backend = FileStorageBackend::new("./");
Expand Down
22 changes: 21 additions & 1 deletion rust/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,14 +303,22 @@ pub enum StorageError {
/// The underlying Rusoto S3 error.
source: rusoto_core::RusotoError<rusoto_s3::PutObjectError>,
},
/// Error returned when an S3 response for a requested URI does not include body bytes.
/// Error representing a failure when executing an S3 DeleteObject request.
#[cfg(any(feature = "s3", feature = "s3-rustls"))]
#[error("Failed to delete S3 object: {source}")]
S3Delete {
/// The underlying Rusoto S3 error.
#[from]
source: rusoto_core::RusotoError<rusoto_s3::DeleteObjectError>,
},
/// Error representing a failure when executing an S3 DeleteObjects request.
#[cfg(any(feature = "s3", feature = "s3-rustls"))]
#[error("Failed to delete S3 object: {source}")]
S3BatchDelete {
/// The underlying Rusoto S3 error.
#[from]
source: rusoto_core::RusotoError<rusoto_s3::DeleteObjectsError>,
},
/// Error representing a failure when copying a S3 object
#[cfg(any(feature = "s3", feature = "s3-rustls"))]
#[error("Failed to copy S3 object: {source}")]
Expand Down Expand Up @@ -503,6 +511,18 @@ pub trait StorageBackend: Send + Sync + Debug {

/// Deletes object by `path`.
async fn delete_obj(&self, path: &str) -> Result<(), StorageError>;

/// Deletes object by `paths`.
async fn delete_objs(&self, paths: &[String]) -> Result<(), StorageError> {
for path in paths {
match self.delete_obj(path).await {
Ok(_) => continue,
Err(StorageError::NotFound) => continue,
Err(e) => return Err(e),
}
}
Ok(())
}
}

/// Dynamically construct a Storage backend trait object based on scheme for provided URI
Expand Down
56 changes: 52 additions & 4 deletions rust/src/storage/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use log::debug;
use rusoto_core::{HttpClient, Region, RusotoError};
use rusoto_credential::AutoRefreshingProvider;
use rusoto_s3::{
CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, HeadObjectRequest,
ListObjectsV2Request, PutObjectRequest, S3Client, S3,
CopyObjectRequest, Delete, DeleteObjectRequest, DeleteObjectsRequest, GetObjectRequest,
HeadObjectRequest, ListObjectsV2Request, ObjectIdentifier, PutObjectRequest, S3Client, S3,
};
use rusoto_sts::{StsAssumeRoleSessionCredentialsProvider, StsClient, WebIdentityProvider};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -416,13 +416,61 @@ impl StorageBackend for S3StorageBackend {
debug!("delete s3 object: {}...", path);

let uri = parse_uri(path)?.into_s3object()?;
let put_req = DeleteObjectRequest {
let delete_req = DeleteObjectRequest {
bucket: uri.bucket.to_string(),
key: uri.key.to_string(),
..Default::default()
};

self.client.delete_object(put_req).await?;
self.client.delete_object(delete_req).await?;

Ok(())
}

async fn delete_objs(&self, paths: &[String]) -> Result<(), StorageError> {
debug!("delete s3 objects: {:?}...", paths);
if paths.is_empty() {
return Ok(());
}

let s3_objects = paths
.iter()
.map(|path| Ok(parse_uri(path)?.into_s3object()?))
.collect::<Result<Vec<_>, StorageError>>()?;

// Check whether all buckets are equal
let bucket = s3_objects[0].bucket;
s3_objects.iter().skip(1).try_for_each(|object| {
let other_bucket = object.bucket;
if other_bucket != bucket {
Err(StorageError::S3Generic(
format!("All buckets of the paths in `S3StorageBackend::delete_objs` should be the same. Expected '{}', got '{}'", bucket, other_bucket)
))
} else {
Ok(())
}
})?;

// S3 has a maximum of 1000 files to delete
let chunks = s3_objects.chunks(1000);
for chunk in chunks {
let delete = Delete {
objects: chunk
.iter()
.map(|obj| ObjectIdentifier {
key: obj.key.to_string(),
..Default::default()
})
.collect(),
..Default::default()
};
let delete_req = DeleteObjectsRequest {
bucket: bucket.to_string(),
delete,
..Default::default()
};
self.client.delete_objects(delete_req).await?;
}

Ok(())
}
Expand Down
22 changes: 22 additions & 0 deletions rust/tests/s3_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,26 @@ mod s3 {

assert!(matches!(err, StorageError::NotFound));
}

#[tokio::test]
#[serial]
async fn test_s3_delete_objs() {
setup();

let path1 = "s3://deltars/delete1.snappy.parquet";
let path2 = "s3://deltars/delete2.snappy.parquet";
let backend = deltalake::get_backend_for_uri(path1).unwrap();
backend.put_obj(path1, &[]).await.unwrap();
backend.put_obj(path2, &[]).await.unwrap();

backend
.delete_objs(&[path1.to_string(), path2.to_string()])
.await
.unwrap();
let err1 = backend.head_obj(path1).await.err().unwrap();
let err2 = backend.head_obj(path2).await.err().unwrap();

assert!(matches!(err1, StorageError::NotFound));
assert!(matches!(err2, StorageError::NotFound));
}
}

0 comments on commit 842d526

Please sign in to comment.