Skip to content

Commit

Permalink
Merge branch 'main' into python-v-bump-0-14
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler authored Oct 30, 2023
2 parents 2d2d449 + eec0349 commit adac82c
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 4 deletions.
29 changes: 29 additions & 0 deletions python/tests/test_vacuum.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,32 @@ def test_vacuum_zero_duration(

parquet_files = {f for f in os.listdir(table_path) if f.endswith("parquet")}
assert parquet_files == new_files


def test_vacuum_transaction_log(tmp_path: pathlib.Path, sample_data: pa.Table):
for i in range(5):
write_deltalake(tmp_path, sample_data, mode="overwrite")

dt = DeltaTable(tmp_path)

dt.vacuum(retention_hours=0, dry_run=False, enforce_retention_duration=False)

dt = DeltaTable(tmp_path)

history = dt.history(2)

expected_start_parameters = {
"retentionCheckEnabled": "false",
"specifiedRetentionMillis": "0",
"defaultRetentionMillis": "604800000",
}

assert history[0]["operation"] == "VACUUM END"
assert history[1]["operation"] == "VACUUM START"

assert history[0]["operationParameters"]["status"] == "COMPLETED"
assert history[1]["operationParameters"] == expected_start_parameters

assert history[0]["operationMetrics"]["numDeletedFiles"] == 4
assert history[1]["operationMetrics"]["numFilesToDelete"] == 4
assert history[1]["operationMetrics"]["sizeOfDataToDelete"] > 0
110 changes: 107 additions & 3 deletions rust/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ use futures::future::BoxFuture;
use futures::{StreamExt, TryStreamExt};
use object_store::Error;
use object_store::{path::Path, ObjectStore};
use serde::Serialize;
use serde_json::{Map, Value};

use super::transaction::commit;
use crate::crate_version;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::protocol::{Action, DeltaOperation}; // Txn CommitInfo
use crate::storage::DeltaObjectStore;
use crate::table::state::DeltaTableState;
use crate::DeltaTable;
Expand Down Expand Up @@ -96,6 +101,26 @@ pub struct VacuumMetrics {
pub files_deleted: Vec<String>,
}

/// Details for the Vacuum start operation for the transaction log
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct VacuumStartOperationMetrics {
/// The number of files that will be deleted
pub num_files_to_delete: i64,
/// Size of the data to be deleted in bytes
pub size_of_data_to_delete: i64,
}

/// Details for the Vacuum End operation for the transaction log
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct VacuumEndOperationMetrics {
/// The number of actually deleted files
pub num_deleted_files: i64,
/// The number of actually vacuumed directories
pub num_vacuumed_directories: i64,
}

/// Methods to specify various vacuum options and to execute the operation
impl VacuumBuilder {
/// Create a new [`VacuumBuilder`]
Expand Down Expand Up @@ -157,6 +182,7 @@ impl VacuumBuilder {
let valid_files = self.snapshot.file_paths_iter().collect::<HashSet<Path>>();

let mut files_to_delete = vec![];
let mut file_sizes = vec![];
let mut all_files = self.store.list(None).await.map_err(DeltaTableError::from)?;

let partition_columns = &self
Expand All @@ -176,9 +202,16 @@ impl VacuumBuilder {
}

files_to_delete.push(obj_meta.location);
file_sizes.push(obj_meta.size as i64);
}

Ok(VacuumPlan { files_to_delete })
Ok(VacuumPlan {
files_to_delete,
file_sizes,
retention_check_enabled: enforce_retention_duration,
default_retention_millis: min_retention.num_milliseconds(),
specified_retention_millis: Some(retention_period.num_milliseconds()),
})
}
}

Expand All @@ -201,7 +234,7 @@ impl std::future::IntoFuture for VacuumBuilder {
));
}

let metrics = plan.execute(&this.store).await?;
let metrics = plan.execute(&this.store, &this.snapshot).await?;
Ok((
DeltaTable::new_with_state(this.store, this.snapshot),
metrics,
Expand All @@ -214,18 +247,64 @@ impl std::future::IntoFuture for VacuumBuilder {
struct VacuumPlan {
/// What files are to be deleted
pub files_to_delete: Vec<Path>,
/// Size of each file which to delete
pub file_sizes: Vec<i64>,
/// If retention check is enabled
pub retention_check_enabled: bool,
/// Default retention in milliseconds
pub default_retention_millis: i64,
/// Overrided retention in milliseconds
pub specified_retention_millis: Option<i64>,
}

impl VacuumPlan {
/// Execute the vacuum plan and delete files from underlying storage
pub async fn execute(self, store: &DeltaObjectStore) -> Result<VacuumMetrics, DeltaTableError> {
pub async fn execute(
self,
store: &DeltaObjectStore,
snapshot: &DeltaTableState,
) -> Result<VacuumMetrics, DeltaTableError> {
if self.files_to_delete.is_empty() {
return Ok(VacuumMetrics {
dry_run: false,
files_deleted: Vec::new(),
});
}

let start_operation = DeltaOperation::VacuumStart {
retention_check_enabled: self.retention_check_enabled,
specified_retention_millis: self.specified_retention_millis,
default_retention_millis: self.default_retention_millis,
};

let end_operation = DeltaOperation::VacuumEnd {
status: String::from("COMPLETED"), // Maybe this should be FAILED when vacuum has error during the files, not sure how to check for this
};

let start_metrics = serde_json::to_value(VacuumStartOperationMetrics {
num_files_to_delete: self.files_to_delete.len() as i64,
size_of_data_to_delete: self.file_sizes.iter().sum(),
});

// Begin VACUUM START COMMIT
let mut commit_info = start_operation.get_commit_info();
let mut extra_info = Map::<String, Value>::new();

commit_info.timestamp = Some(Utc::now().timestamp_millis());
extra_info.insert(
"clientVersion".to_string(),
Value::String(format!("delta-rs.{}", crate_version())),
);
if let Ok(map) = start_metrics {
extra_info.insert("operationMetrics".to_owned(), map);
}
commit_info.info = extra_info;

let start_actions = vec![Action::commitInfo(commit_info)];

commit(store, &start_actions, start_operation, snapshot, None).await?;
// Finish VACUUM START COMMIT

let locations = futures::stream::iter(self.files_to_delete)
.map(Result::Ok)
.boxed();
Expand All @@ -240,6 +319,31 @@ impl VacuumPlan {
.try_collect::<Vec<_>>()
.await?;

// Create end metadata
let end_metrics = serde_json::to_value(VacuumEndOperationMetrics {
num_deleted_files: files_deleted.len() as i64,
num_vacuumed_directories: 0, // Set to zero since we only remove files not dirs
});

// Begin VACUUM END COMMIT
let mut commit_info = end_operation.get_commit_info();
let mut extra_info = Map::<String, Value>::new();

commit_info.timestamp = Some(Utc::now().timestamp_millis());
extra_info.insert(
"clientVersion".to_string(),
Value::String(format!("delta-rs.{}", crate_version())),
);
if let Ok(map) = end_metrics {
extra_info.insert("operationMetrics".to_owned(), map);
}
commit_info.info = extra_info;

let end_actions = vec![Action::commitInfo(commit_info)];

commit(store, &end_actions, end_operation, snapshot, None).await?;
// Finish VACUUM END COMMIT

Ok(VacuumMetrics {
files_deleted,
dry_run: false,
Expand Down
21 changes: 20 additions & 1 deletion rust/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,23 @@ pub enum DeltaOperation {
///Datetime to restore
datetime: Option<i64>,
}, // TODO: Add more operations

#[serde(rename_all = "camelCase")]
/// Represents the start of `Vacuum` operation
VacuumStart {
/// Whether the retention check is enforced
retention_check_enabled: bool,
/// The specified retetion period in milliseconds
specified_retention_millis: Option<i64>,
/// The default delta table retention milliseconds policy
default_retention_millis: i64,
},

/// Represents the end of `Vacuum` operation
VacuumEnd {
/// The status of the operation
status: String,
},
}

impl DeltaOperation {
Expand All @@ -849,6 +866,8 @@ impl DeltaOperation {
DeltaOperation::Optimize { .. } => "OPTIMIZE",
DeltaOperation::FileSystemCheck { .. } => "FSCK",
DeltaOperation::Restore { .. } => "RESTORE",
DeltaOperation::VacuumStart { .. } => "VACUUM START",
DeltaOperation::VacuumEnd { .. } => "VACUUM END",
}
}

Expand Down Expand Up @@ -884,7 +903,7 @@ impl DeltaOperation {
/// Denotes if the operation changes the data contained in the table
pub fn changes_data(&self) -> bool {
match self {
Self::Optimize { .. } => false,
Self::Optimize { .. } | Self::VacuumStart { .. } | Self::VacuumEnd { .. } => false,
Self::Create { .. }
| Self::FileSystemCheck {}
| Self::StreamingUpdate { .. }
Expand Down

0 comments on commit adac82c

Please sign in to comment.