Skip to content

Commit

Permalink
feat(python): expose FSCK (repair) operation (#1730)
Browse files Browse the repository at this point in the history
# Description
This PR exposes the FSCK operation as a `repair` method under the
`DeltaTable `class.

# Related Issue(s)
<!---
For example:

- closes #106
--->
- closes #1727

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
ion-elgreco and wjones127 authored Oct 30, 2023
1 parent 0ad02d4 commit b53b686
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 1 deletion.
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class RawDeltaTable:
def create_checkpoint(self) -> None: ...
def get_add_actions(self, flatten: bool) -> pyarrow.RecordBatch: ...
def delete(self, predicate: Optional[str]) -> str: ...
def repair(self, dry_run: bool) -> str: ...
def update(
self,
updates: Dict[str, str],
Expand Down
25 changes: 25 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,31 @@ def delete(self, predicate: Optional[str] = None) -> Dict[str, Any]:
metrics = self._table.delete(predicate)
return json.loads(metrics)

def repair(self, dry_run: bool = False) -> Dict[str, Any]:
"""Repair the Delta Table by auditing active files that do not exist in the underlying
filesystem and removes them. This can be useful when there are accidental deletions or corrupted files.
Active files are ones that have an add action in the log, but no corresponding remove action.
This operation creates a new FSCK transaction containing a remove action for each of the missing
or corrupted files.
Args:
dry_run(bool): when activated, list only the files, otherwise add remove actions to transaction log. Defaults to False.
Returns:
The metrics from repair (FSCK) action.
Examples:
>>> from deltalake import DeltaTable
>>> dt = DeltaTable('TEST')
>>> dt.repair(dry_run=False)
{'dry_run': False,
'files_removed': ['6-0d084325-6885-4847-b008-82c1cf30674c-0.parquet',
'5-4fba1d3e-3e20-4de1-933d-a8e13ac59f53-0.parquet']}
"""
metrics = self._table.repair(dry_run)
return json.loads(metrics)


class TableMerger:
"""API for various table MERGE commands."""
Expand Down
16 changes: 16 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::operations::delete::DeleteBuilder;
use deltalake::operations::filesystem_check::FileSystemCheckBuilder;
use deltalake::operations::merge::MergeBuilder;
use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType};
use deltalake::operations::restore::RestoreBuilder;
Expand Down Expand Up @@ -853,6 +854,21 @@ impl RawDeltaTable {
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
}

/// Execute the File System Check command (FSCK) on the delta table: removes old reference to files that
/// have been deleted or are malformed
#[pyo3(signature = (dry_run = true))]
pub fn repair(&mut self, dry_run: bool) -> PyResult<String> {
let cmd =
FileSystemCheckBuilder::new(self._table.object_store(), self._table.state.clone())
.with_dry_run(dry_run);

let (table, metrics) = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
}
}

fn convert_partition_filters<'a>(
Expand Down
31 changes: 31 additions & 0 deletions python/tests/test_repair.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import os

from deltalake import DeltaTable, write_deltalake


def test_repair_with_dry_run(tmp_path, sample_data):
write_deltalake(tmp_path, sample_data, mode="append")
write_deltalake(tmp_path, sample_data, mode="append")
dt = DeltaTable(tmp_path)
os.remove(dt.file_uris()[0])

metrics = dt.repair(dry_run=True)
last_action = dt.history(1)[0]

assert len(metrics["files_removed"]) == 1
assert metrics["dry_run"] is True
assert last_action["operation"] == "WRITE"


def test_repair_wo_dry_run(tmp_path, sample_data):
write_deltalake(tmp_path, sample_data, mode="append")
write_deltalake(tmp_path, sample_data, mode="append")
dt = DeltaTable(tmp_path)
os.remove(dt.file_uris()[0])

metrics = dt.repair(dry_run=False)
last_action = dt.history(1)[0]

assert len(metrics["files_removed"]) == 1
assert metrics["dry_run"] is False
assert last_action["operation"] == "FSCK"
3 changes: 2 additions & 1 deletion rust/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures::future::BoxFuture;
use futures::StreamExt;
pub use object_store::path::Path;
use object_store::ObjectStore;
use serde::Serialize;
use url::{ParseError, Url};

use crate::errors::{DeltaResult, DeltaTableError};
Expand All @@ -44,7 +45,7 @@ pub struct FileSystemCheckBuilder {
}

/// Details of the FSCK operation including which files were removed from the log
#[derive(Debug)]
#[derive(Debug, Serialize)]
pub struct FileSystemCheckMetrics {
/// Was this a dry run
pub dry_run: bool,
Expand Down

0 comments on commit b53b686

Please sign in to comment.