Skip to content

Commit

Permalink
feat: add enforce_retention_duration param to vacuum method (#648)
Browse files Browse the repository at this point in the history
This maps to the reference implementation's
spark.databricks.delta.retentionDurationCheck.enabled config.
  • Loading branch information
houqp authored Jun 19, 2022
1 parent 1fa8d6d commit 2ac3c3e
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 16 deletions.
8 changes: 6 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,20 +241,24 @@ def history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
]

def vacuum(
self, retention_hours: Optional[int] = None, dry_run: bool = True
self,
retention_hours: Optional[int] = None,
dry_run: bool = True,
enforce_retention_duration: bool = True,
) -> List[str]:
"""
Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
:param retention_hours: the retention threshold in hours, if none then the value from `configuration.deletedFileRetentionDuration` is used or default of 1 week otherwise.
:param dry_run: when activated, list only the files, delete otherwise
:param enforce_retention_duration: when disabled, accepts retention hours smaller than the value from `configuration.deletedFileRetentionDuration`.
:return: the list of files no longer referenced by the Delta Table and are older than the retention threshold.
"""
if retention_hours:
if retention_hours < 0:
raise ValueError("The retention periods should be positive.")

return self._table.vacuum(dry_run, retention_hours)
return self._table.vacuum(dry_run, retention_hours, enforce_retention_duration)

def pyarrow_schema(self) -> pyarrow.Schema:
"""
Expand Down
13 changes: 11 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,18 @@ impl RawDeltaTable {
}

/// Run the Vacuum command on the Delta Table: list and delete files no longer referenced by the Delta table and are older than the retention threshold.
pub fn vacuum(&mut self, dry_run: bool, retention_hours: Option<u64>) -> PyResult<Vec<String>> {
pub fn vacuum(
&mut self,
dry_run: bool,
retention_hours: Option<u64>,
enforce_retention_duration: bool,
) -> PyResult<Vec<String>> {
rt()?
.block_on(self._table.vacuum(retention_hours, dry_run))
.block_on(self._table.vacuum(
retention_hours,
dry_run,
Some(enforce_retention_duration),
))
.map_err(PyDeltaTableError::from_raw)
}

Expand Down
2 changes: 1 addition & 1 deletion python/tests/test_table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def test_vacuum_dry_run_simple_table():
dt.vacuum(retention_periods)
assert (
str(exception.value)
== "Invalid retention period, retention for Vacuum must be greater than 1 week (168 hours)"
== "Invalid retention period, minimum retention for vacuum is configured to be greater than 168 hours, got 167 hours"
)


Expand Down
29 changes: 22 additions & 7 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use super::storage::{StorageBackend, StorageError, UriError};
use super::table_state::DeltaTableState;
use crate::delta_config::DeltaConfigError;

const MILLIS_IN_HOUR: i64 = 3600000;

/// Metadata for a checkpoint file
#[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)]
pub struct CheckPoint {
Expand Down Expand Up @@ -175,9 +177,14 @@ pub enum DeltaTableError {
},
/// Error returned when Vacuum retention period is below the safe threshold
#[error(
"Invalid retention period, retention for Vacuum must be greater than 1 week (168 hours)"
"Invalid retention period, minimum retention for vacuum is configured to be greater than {} hours, got {} hours", .min, .provided
)]
InvalidVacuumRetentionPeriod,
InvalidVacuumRetentionPeriod {
/// User provided retention on vacuum call
provided: DeltaDataTypeLong,
/// Minimal retention configured in delta table config
min: DeltaDataTypeLong,
},
/// Error returned when a line from log record is invalid.
#[error("Failed to read line from log record")]
Io {
Expand Down Expand Up @@ -1156,13 +1163,20 @@ impl DeltaTable {
fn get_stale_files(
&self,
retention_hours: Option<u64>,
enforce_retention_duration: Option<bool>,
) -> Result<HashSet<&str>, DeltaTableError> {
let retention_millis = retention_hours
.map(|hours| 3600000 * hours as i64)
.map(|hours| MILLIS_IN_HOUR * hours as i64)
.unwrap_or_else(|| self.state.tombstone_retention_millis());

if retention_millis < self.state.tombstone_retention_millis() {
return Err(DeltaTableError::InvalidVacuumRetentionPeriod);
if enforce_retention_duration.unwrap_or(true) {
let min_retention_mills = self.state.tombstone_retention_millis();
if retention_millis < min_retention_mills {
return Err(DeltaTableError::InvalidVacuumRetentionPeriod {
provided: retention_millis / MILLIS_IN_HOUR,
min: min_retention_mills / MILLIS_IN_HOUR,
});
}
}

let tombstone_retention_timestamp = Utc::now().timestamp_millis() - retention_millis;
Expand Down Expand Up @@ -1208,8 +1222,10 @@ impl DeltaTable {
&self,
retention_hours: Option<u64>,
dry_run: bool,
enforce_retention_duration: Option<bool>,
) -> Result<Vec<String>, DeltaTableError> {
let expired_tombstones = self.get_stale_files(retention_hours)?;
let expired_tombstones =
self.get_stale_files(retention_hours, enforce_retention_duration)?;
let valid_files = self.get_file_set();

let mut files_to_delete = vec![];
Expand Down Expand Up @@ -1776,7 +1792,6 @@ mod tests {
tmp_dir.path().to_str().unwrap(),
));
let mut dt = DeltaTable::new(path, backend, DeltaTableConfig::default()).unwrap();
// let mut dt = DeltaTable::new(path, backend, DeltaTableLoadOptions::default()).unwrap();

let mut commit_info = Map::<String, Value>::new();
commit_info.insert(
Expand Down
29 changes: 25 additions & 4 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,16 +420,34 @@ async fn vacuum_delta_8_0_table() {

assert!(matches!(
table
.vacuum(Some(retention_hours), dry_run)
.vacuum(Some(retention_hours), dry_run, None)
.await
.unwrap_err(),
deltalake::DeltaTableError::InvalidVacuumRetentionPeriod,
deltalake::DeltaTableError::InvalidVacuumRetentionPeriod {
provided,
min,
} if provided == retention_hours as i64
&& min == table.get_state().tombstone_retention_millis() / 3600000,
));

// do not enforce retention duration check with 0 hour will purge all files
assert_eq!(
table.vacuum(Some(0), dry_run, Some(false)).await.unwrap(),
vec![backend.join_paths(&[
"tests",
"data",
"delta-0.8.0",
"part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet",
])]
);

let retention_hours = 169;

assert_eq!(
table.vacuum(Some(retention_hours), dry_run).await.unwrap(),
table
.vacuum(Some(retention_hours), dry_run, None)
.await
.unwrap(),
vec![backend.join_paths(&[
"tests",
"data",
Expand All @@ -446,7 +464,10 @@ async fn vacuum_delta_8_0_table() {
let empty: Vec<String> = Vec::new();

assert_eq!(
table.vacuum(Some(retention_hours), dry_run).await.unwrap(),
table
.vacuum(Some(retention_hours), dry_run, None)
.await
.unwrap(),
empty
);
}
Expand Down

0 comments on commit 2ac3c3e

Please sign in to comment.