From 842d526d7c9ad506429a20ee0c0df862b150800f Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Sat, 21 Aug 2021 05:18:36 +0800 Subject: [PATCH] Implement delete_objs in fs and s3 storage backends. (#395) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Implement delete_objs in fs ans s3 storage backends. * Trigger s3 test in workflow. * Use delete_objs in vacuum. * Add bucket consistency check. * Batch delete in chunk. * Add default implementation for `delete_objs`. Co-authored-by: Daniƫl Heres --- .github/workflows/build.yml | 1 + rust/src/delta.rs | 19 +++++------- rust/src/storage/file/mod.rs | 24 ++++++++++++++++ rust/src/storage/mod.rs | 22 +++++++++++++- rust/src/storage/s3/mod.rs | 56 +++++++++++++++++++++++++++++++++--- rust/tests/s3_test.rs | 22 ++++++++++++++ 6 files changed, 127 insertions(+), 17 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 5fe3b5081e..db46352737 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -122,3 +122,4 @@ jobs: cargo test s3 --test concurrent_writes_test --features s3 cargo test --test dynamodb_lock_test --features s3 cargo test --test repair_s3_rename_test --features s3 + cargo test test_s3_delete_objs --test s3_test --features s3 diff --git a/rust/src/delta.rs b/rust/src/delta.rs index f19160f74d..086a7ecd38 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -989,19 +989,14 @@ impl DeltaTable { return Ok(files_to_delete); } - for rel_path in &files_to_delete { - match self - .storage - .delete_obj(&self.storage.join_path(&self.table_uri, rel_path)) - .await - { - Ok(_) => continue, - Err(StorageError::NotFound) => continue, - Err(err) => return Err(DeltaTableError::StorageError { source: err }), - } + let paths = &files_to_delete + .iter() + .map(|rel_path| self.storage.join_path(&self.table_uri, rel_path)) + .collect::>(); + match self.storage.delete_objs(paths).await { + Ok(_) => Ok(files_to_delete), + Err(err) => Err(DeltaTableError::StorageError { source: err }), } - - Ok(files_to_delete) } /// Return table schema parsed from transaction log. Return None if table hasn't been loaded or diff --git a/rust/src/storage/file/mod.rs b/rust/src/storage/file/mod.rs index fb03125218..13669d08cd 100644 --- a/rust/src/storage/file/mod.rs +++ b/rust/src/storage/file/mod.rs @@ -182,6 +182,30 @@ mod tests { assert_eq!(fs::metadata(path).await.is_ok(), false) } + #[tokio::test] + async fn delete_objs() { + let tmp_dir = tempdir::TempDir::new("delete_test").unwrap(); + let tmp_file_path1 = tmp_dir.path().join("tmp_file1"); + let tmp_file_path2 = tmp_dir.path().join("tmp_file2"); + let backend = FileStorageBackend::new(tmp_dir.path().to_str().unwrap()); + + // put object + let path1 = tmp_file_path1.to_str().unwrap(); + let path2 = tmp_file_path2.to_str().unwrap(); + backend.put_obj(path1, &[]).await.unwrap(); + backend.put_obj(path2, &[]).await.unwrap(); + assert_eq!(fs::metadata(path1).await.is_ok(), true); + assert_eq!(fs::metadata(path2).await.is_ok(), true); + + // delete object + backend + .delete_objs(&[path1.to_string(), path2.to_string()]) + .await + .unwrap(); + assert_eq!(fs::metadata(path1).await.is_ok(), false); + assert_eq!(fs::metadata(path2).await.is_ok(), false) + } + #[test] fn join_multiple_paths() { let backend = FileStorageBackend::new("./"); diff --git a/rust/src/storage/mod.rs b/rust/src/storage/mod.rs index ed99d492d7..bf43912efb 100644 --- a/rust/src/storage/mod.rs +++ b/rust/src/storage/mod.rs @@ -303,7 +303,7 @@ pub enum StorageError { /// The underlying Rusoto S3 error. source: rusoto_core::RusotoError, }, - /// Error returned when an S3 response for a requested URI does not include body bytes. + /// Error representing a failure when executing an S3 DeleteObject request. #[cfg(any(feature = "s3", feature = "s3-rustls"))] #[error("Failed to delete S3 object: {source}")] S3Delete { @@ -311,6 +311,14 @@ pub enum StorageError { #[from] source: rusoto_core::RusotoError, }, + /// Error representing a failure when executing an S3 DeleteObjects request. + #[cfg(any(feature = "s3", feature = "s3-rustls"))] + #[error("Failed to delete S3 object: {source}")] + S3BatchDelete { + /// The underlying Rusoto S3 error. + #[from] + source: rusoto_core::RusotoError, + }, /// Error representing a failure when copying a S3 object #[cfg(any(feature = "s3", feature = "s3-rustls"))] #[error("Failed to copy S3 object: {source}")] @@ -503,6 +511,18 @@ pub trait StorageBackend: Send + Sync + Debug { /// Deletes object by `path`. async fn delete_obj(&self, path: &str) -> Result<(), StorageError>; + + /// Deletes object by `paths`. + async fn delete_objs(&self, paths: &[String]) -> Result<(), StorageError> { + for path in paths { + match self.delete_obj(path).await { + Ok(_) => continue, + Err(StorageError::NotFound) => continue, + Err(e) => return Err(e), + } + } + Ok(()) + } } /// Dynamically construct a Storage backend trait object based on scheme for provided URI diff --git a/rust/src/storage/s3/mod.rs b/rust/src/storage/s3/mod.rs index 0da723bf9f..1994bdf48f 100644 --- a/rust/src/storage/s3/mod.rs +++ b/rust/src/storage/s3/mod.rs @@ -10,8 +10,8 @@ use log::debug; use rusoto_core::{HttpClient, Region, RusotoError}; use rusoto_credential::AutoRefreshingProvider; use rusoto_s3::{ - CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, HeadObjectRequest, - ListObjectsV2Request, PutObjectRequest, S3Client, S3, + CopyObjectRequest, Delete, DeleteObjectRequest, DeleteObjectsRequest, GetObjectRequest, + HeadObjectRequest, ListObjectsV2Request, ObjectIdentifier, PutObjectRequest, S3Client, S3, }; use rusoto_sts::{StsAssumeRoleSessionCredentialsProvider, StsClient, WebIdentityProvider}; use serde::{Deserialize, Serialize}; @@ -416,13 +416,61 @@ impl StorageBackend for S3StorageBackend { debug!("delete s3 object: {}...", path); let uri = parse_uri(path)?.into_s3object()?; - let put_req = DeleteObjectRequest { + let delete_req = DeleteObjectRequest { bucket: uri.bucket.to_string(), key: uri.key.to_string(), ..Default::default() }; - self.client.delete_object(put_req).await?; + self.client.delete_object(delete_req).await?; + + Ok(()) + } + + async fn delete_objs(&self, paths: &[String]) -> Result<(), StorageError> { + debug!("delete s3 objects: {:?}...", paths); + if paths.is_empty() { + return Ok(()); + } + + let s3_objects = paths + .iter() + .map(|path| Ok(parse_uri(path)?.into_s3object()?)) + .collect::, StorageError>>()?; + + // Check whether all buckets are equal + let bucket = s3_objects[0].bucket; + s3_objects.iter().skip(1).try_for_each(|object| { + let other_bucket = object.bucket; + if other_bucket != bucket { + Err(StorageError::S3Generic( + format!("All buckets of the paths in `S3StorageBackend::delete_objs` should be the same. Expected '{}', got '{}'", bucket, other_bucket) + )) + } else { + Ok(()) + } + })?; + + // S3 has a maximum of 1000 files to delete + let chunks = s3_objects.chunks(1000); + for chunk in chunks { + let delete = Delete { + objects: chunk + .iter() + .map(|obj| ObjectIdentifier { + key: obj.key.to_string(), + ..Default::default() + }) + .collect(), + ..Default::default() + }; + let delete_req = DeleteObjectsRequest { + bucket: bucket.to_string(), + delete, + ..Default::default() + }; + self.client.delete_objects(delete_req).await?; + } Ok(()) } diff --git a/rust/tests/s3_test.rs b/rust/tests/s3_test.rs index bebff23010..b4557491f3 100644 --- a/rust/tests/s3_test.rs +++ b/rust/tests/s3_test.rs @@ -134,4 +134,26 @@ mod s3 { assert!(matches!(err, StorageError::NotFound)); } + + #[tokio::test] + #[serial] + async fn test_s3_delete_objs() { + setup(); + + let path1 = "s3://deltars/delete1.snappy.parquet"; + let path2 = "s3://deltars/delete2.snappy.parquet"; + let backend = deltalake::get_backend_for_uri(path1).unwrap(); + backend.put_obj(path1, &[]).await.unwrap(); + backend.put_obj(path2, &[]).await.unwrap(); + + backend + .delete_objs(&[path1.to_string(), path2.to_string()]) + .await + .unwrap(); + let err1 = backend.head_obj(path1).await.err().unwrap(); + let err2 = backend.head_obj(path2).await.err().unwrap(); + + assert!(matches!(err1, StorageError::NotFound)); + assert!(matches!(err2, StorageError::NotFound)); + } }