Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add restore command in python binding #1529

Merged
merged 17 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import operator
import warnings
from dataclasses import dataclass
from datetime import datetime
from functools import reduce
from pathlib import Path
from typing import (
Expand Down Expand Up @@ -454,6 +455,35 @@ def pyarrow_schema(self) -> pyarrow.Schema:
)
return self.schema().to_pyarrow()

def restore(
self,
target: Union[int, datetime, str],
*,
ignore_missing_files: bool = False,
protocol_downgrade_allowed: bool = False,
) -> Dict[str, Any]:
"""
Run the Restore command on the Delta Table: restore table to a given version or datetime.

:param target: the expected version will restore, which represented by int, date str or datetime.
:param ignore_missing_files: whether the operation carry on when some data files missing.
:param protocol_downgrade_allowed: whether the operation when protocol version upgraded.
:return: the metrics from restore.
"""
if isinstance(target, datetime):
metrics = self._table.restore(
target.isoformat(),
ignore_missing_files=ignore_missing_files,
protocol_downgrade_allowed=protocol_downgrade_allowed,
)
else:
metrics = self._table.restore(
target,
ignore_missing_files=ignore_missing_files,
protocol_downgrade_allowed=protocol_downgrade_allowed,
)
return json.loads(metrics)

def to_pyarrow_dataset(
self,
partitions: Optional[List[Tuple[str, str, Any]]] = None,
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def get_release_version() -> str:
("py:class", "pandas.DataFrame"),
("py:class", "pyarrow._dataset_parquet.ParquetFileWriteOptions"),
("py:class", "pathlib.Path"),
("py:class", "datetime.datetime"),
]

# Add any paths that contain templates here, relative to this directory.
Expand Down
21 changes: 21 additions & 0 deletions python/docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,24 @@ the method will raise an error.

This method could also be used to insert a new partition if one doesn't already
exist, making this operation idempotent.


Restoring tables
~~~~~~~~~~~~~~~~

.. py:currentmodule:: deltalake.table

Restoring a table will restore delta table to a specified version or datetime. This
operation compares the current state of the delta table with the state to be restored.
And add those missing files into the AddFile actions and add redundant files into
RemoveFile actions. Then commit into a new version.


Use :meth:`DeltaTable.restore` to perform the restore operation. Note that if any other
concurrent operation was performed on the table, restore will fail.

.. code-block:: python

>>> dt = DeltaTable("../rust/tests/data/simple_table")
>>> dt.restore(1)
{'numRemovedFile': 5, 'numRestoredFile': 22}
32 changes: 32 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::errors::DeltaTableError;
use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType};
use deltalake::operations::restore::RestoreBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::partitions::PartitionFilter;
Expand Down Expand Up @@ -316,6 +317,37 @@ impl RawDeltaTable {
Ok(serde_json::to_string(&metrics).unwrap())
}

// Run the restore command on the Delta Table: restore table to a given version or datetime
#[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false))]
pub fn restore(
&mut self,
target: Option<&PyAny>,
ignore_missing_files: bool,
protocol_downgrade_allowed: bool,
) -> PyResult<String> {
let mut cmd = RestoreBuilder::new(self._table.object_store(), self._table.state.clone());
if let Some(val) = target {
if let Ok(version) = val.extract::<i64>() {
cmd = cmd.with_version_to_restore(version)
}
if let Ok(ds) = val.extract::<&str>() {
let datetime = DateTime::<Utc>::from(
DateTime::<FixedOffset>::parse_from_rfc3339(ds).map_err(|err| {
PyValueError::new_err(format!("Failed to parse datetime string: {err}"))
})?,
);
cmd = cmd.with_datetime_to_restore(datetime)
}
}
cmd = cmd.with_ignore_missing_files(ignore_missing_files);
cmd = cmd.with_protocol_downgrade_allowed(protocol_downgrade_allowed);
let (table, metrics) = rt()?
.block_on(cmd.into_future())
.map_err(PythonError::from)?;
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
}

/// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table.
pub fn history(&mut self, limit: Option<usize>) -> PyResult<Vec<String>> {
let history = rt()?
Expand Down
79 changes: 79 additions & 0 deletions python/tests/test_restore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import datetime
import pathlib

import pyarrow as pa
import pytest

from deltalake import DeltaTable, write_deltalake


@pytest.mark.parametrize("use_relative", [True, False])
def test_restore_with_version(
tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool
):
if use_relative:
monkeypatch.chdir(tmp_path) # Make tmp_path the working directory
(tmp_path / "path/to/table").mkdir(parents=True)
table_path = "./path/to/table"
else:
table_path = str(tmp_path)

write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")

dt = DeltaTable(table_path)
old_version = dt.version()
dt.restore(1)
last_action = dt.history(1)[0]
assert last_action["operation"] == "RESTORE"
assert dt.version() == old_version + 1


@pytest.mark.parametrize("use_relative", [True, False])
def test_restore_with_datetime_str(
tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool
):
if use_relative:
monkeypatch.chdir(tmp_path) # Make tmp_path the working directory
(tmp_path / "path/to/table").mkdir(parents=True)
table_path = "./path/to/table"
else:
table_path = str(tmp_path)

write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")

dt = DeltaTable(table_path)
old_version = dt.version()
dt.restore("2020-05-01T00:47:31-07:00")
last_action = dt.history(1)[0]
assert last_action["operation"] == "RESTORE"
assert dt.version() == old_version + 1


@pytest.mark.parametrize("use_relative", [True, False])
def test_restore_with_datetime(
tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool
):
if use_relative:
monkeypatch.chdir(tmp_path) # Make tmp_path the working directory
(tmp_path / "path/to/table").mkdir(parents=True)
table_path = "./path/to/table"
else:
table_path = str(tmp_path)

write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")

dt = DeltaTable(table_path)
old_version = dt.version()
date = datetime.datetime.strptime(
"2023-04-26T21:23:32+08:00", "%Y-%m-%dT%H:%M:%S%z"
)
dt.restore(date)
last_action = dt.history(1)[0]
assert last_action["operation"] == "RESTORE"
assert dt.version() == old_version + 1
8 changes: 5 additions & 3 deletions rust/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use chrono::{DateTime, Utc};
use futures::future::BoxFuture;
use object_store::path::Path;
use object_store::ObjectStore;
use serde::Serialize;

use crate::action::{Action, Add, DeltaOperation, Remove};
use crate::operations::transaction::{prepare_commit, try_commit_transaction, TransactionError};
Expand Down Expand Up @@ -57,7 +58,8 @@ impl From<RestoreError> for DeltaTableError {
}

/// Metrics from Restore
#[derive(Default, Debug)]
#[derive(Default, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RestoreMetrics {
/// Number of files removed
pub num_removed_file: usize,
Expand Down Expand Up @@ -109,13 +111,13 @@ impl RestoreBuilder {

/// Set whether to ignore missing files which delete manually or by vacuum.
/// If true, continue to run when encountering missing files.
pub fn ignore_missing_files(mut self, ignore_missing_files: bool) -> Self {
pub fn with_ignore_missing_files(mut self, ignore_missing_files: bool) -> Self {
self.ignore_missing_files = ignore_missing_files;
self
}

/// Set whether allow to downgrade protocol
pub fn protocol_downgrade_allowed(mut self, protocol_downgrade_allowed: bool) -> Self {
pub fn with_protocol_downgrade_allowed(mut self, protocol_downgrade_allowed: bool) -> Self {
self.protocol_downgrade_allowed = protocol_downgrade_allowed;
self
}
Expand Down
2 changes: 1 addition & 1 deletion rust/tests/command_restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ async fn test_restore_allow_file_missing() -> Result<(), Box<dyn Error>> {

let result = DeltaOps(context.table)
.restore()
.ignore_missing_files(true)
.with_ignore_missing_files(true)
.with_version_to_restore(1)
.await;
assert!(result.is_ok());
Expand Down