diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 91c30fda13..16d07e144e 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -88,6 +88,7 @@ class RawDeltaTable: def create_checkpoint(self) -> None: ... def get_add_actions(self, flatten: bool) -> pyarrow.RecordBatch: ... def delete(self, predicate: Optional[str]) -> str: ... + def repair(self, dry_run: bool) -> str: ... def update( self, updates: Dict[str, str], diff --git a/python/deltalake/table.py b/python/deltalake/table.py index e8f622847b..03082b3307 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -826,6 +826,31 @@ def delete(self, predicate: Optional[str] = None) -> Dict[str, Any]: metrics = self._table.delete(predicate) return json.loads(metrics) + def repair(self, dry_run: bool = False) -> Dict[str, Any]: + """Repair the Delta Table by auditing active files that do not exist in the underlying + filesystem and removes them. This can be useful when there are accidental deletions or corrupted files. + + Active files are ones that have an add action in the log, but no corresponding remove action. + This operation creates a new FSCK transaction containing a remove action for each of the missing + or corrupted files. + + Args: + dry_run(bool): when activated, list only the files, otherwise add remove actions to transaction log. Defaults to False. + Returns: + The metrics from repair (FSCK) action. + + Examples: + + >>> from deltalake import DeltaTable + >>> dt = DeltaTable('TEST') + >>> dt.repair(dry_run=False) + {'dry_run': False, + 'files_removed': ['6-0d084325-6885-4847-b008-82c1cf30674c-0.parquet', + '5-4fba1d3e-3e20-4de1-933d-a8e13ac59f53-0.parquet']} + """ + metrics = self._table.repair(dry_run) + return json.loads(metrics) + class TableMerger: """API for various table MERGE commands.""" diff --git a/python/src/lib.rs b/python/src/lib.rs index 2f46436984..5f2fd8a11f 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -26,6 +26,7 @@ use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; use deltalake::operations::delete::DeleteBuilder; +use deltalake::operations::filesystem_check::FileSystemCheckBuilder; use deltalake::operations::merge::MergeBuilder; use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; use deltalake::operations::restore::RestoreBuilder; @@ -853,6 +854,21 @@ impl RawDeltaTable { self._table.state = table.state; Ok(serde_json::to_string(&metrics).unwrap()) } + + /// Execute the File System Check command (FSCK) on the delta table: removes old reference to files that + /// have been deleted or are malformed + #[pyo3(signature = (dry_run = true))] + pub fn repair(&mut self, dry_run: bool) -> PyResult { + let cmd = + FileSystemCheckBuilder::new(self._table.object_store(), self._table.state.clone()) + .with_dry_run(dry_run); + + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + } } fn convert_partition_filters<'a>( diff --git a/python/tests/test_repair.py b/python/tests/test_repair.py new file mode 100644 index 0000000000..d1c631dcf1 --- /dev/null +++ b/python/tests/test_repair.py @@ -0,0 +1,31 @@ +import os + +from deltalake import DeltaTable, write_deltalake + + +def test_repair_with_dry_run(tmp_path, sample_data): + write_deltalake(tmp_path, sample_data, mode="append") + write_deltalake(tmp_path, sample_data, mode="append") + dt = DeltaTable(tmp_path) + os.remove(dt.file_uris()[0]) + + metrics = dt.repair(dry_run=True) + last_action = dt.history(1)[0] + + assert len(metrics["files_removed"]) == 1 + assert metrics["dry_run"] is True + assert last_action["operation"] == "WRITE" + + +def test_repair_wo_dry_run(tmp_path, sample_data): + write_deltalake(tmp_path, sample_data, mode="append") + write_deltalake(tmp_path, sample_data, mode="append") + dt = DeltaTable(tmp_path) + os.remove(dt.file_uris()[0]) + + metrics = dt.repair(dry_run=False) + last_action = dt.history(1)[0] + + assert len(metrics["files_removed"]) == 1 + assert metrics["dry_run"] is False + assert last_action["operation"] == "FSCK" diff --git a/rust/src/operations/filesystem_check.rs b/rust/src/operations/filesystem_check.rs index bf047c45c4..83af12b57c 100644 --- a/rust/src/operations/filesystem_check.rs +++ b/rust/src/operations/filesystem_check.rs @@ -22,6 +22,7 @@ use futures::future::BoxFuture; use futures::StreamExt; pub use object_store::path::Path; use object_store::ObjectStore; +use serde::Serialize; use url::{ParseError, Url}; use crate::errors::{DeltaResult, DeltaTableError}; @@ -44,7 +45,7 @@ pub struct FileSystemCheckBuilder { } /// Details of the FSCK operation including which files were removed from the log -#[derive(Debug)] +#[derive(Debug, Serialize)] pub struct FileSystemCheckMetrics { /// Was this a dry run pub dry_run: bool,