From d3909548a163f9dece0aded861adbb45e82469a0 Mon Sep 17 00:00:00 2001 From: loleek Date: Thu, 29 Jun 2023 18:40:33 +0800 Subject: [PATCH 1/8] feat: implement resotre operation (#837) --- rust/src/action/mod.rs | 27 ++- rust/src/operations/mod.rs | 8 + rust/src/operations/restore.rs | 285 +++++++++++++++++++++++++ rust/src/operations/transaction/mod.rs | 2 +- rust/tests/command_restore.rs | 210 ++++++++++++++++++ 5 files changed, 529 insertions(+), 3 deletions(-) create mode 100644 rust/src/operations/restore.rs create mode 100644 rust/tests/command_restore.rs diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index ad114a49d5..5ed387e8dd 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -284,6 +284,20 @@ impl Hash for Add { } } +impl PartialEq for Add { + fn eq(&self, other: &Self) -> bool { + self.path == other.path + && self.size == other.size + && self.partition_values == other.partition_values + && self.modification_time == other.modification_time + && self.data_change == other.data_change + && self.stats == other.stats + && self.tags == other.tags + } +} + +impl Eq for Add {} + impl Add { /// Returns the Add action with path decoded. pub fn path_decoded(self) -> Result { @@ -623,7 +637,14 @@ pub enum DeltaOperation { #[serde(rename_all = "camelCase")] /// Represents a `FileSystemCheck` operation FileSystemCheck {}, - // TODO: Add more operations + + /// Represents a `Restore` operation + Restore { + /// Version to restore + version: Option, + ///Datetime to restore + datetime: Option, + }, // TODO: Add more operations } impl DeltaOperation { @@ -641,6 +662,7 @@ impl DeltaOperation { DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE", DeltaOperation::Optimize { .. } => "OPTIMIZE", DeltaOperation::FileSystemCheck { .. } => "FSCK", + DeltaOperation::Restore { .. } => "RESTORE", } } @@ -682,7 +704,8 @@ impl DeltaOperation { | Self::StreamingUpdate { .. } | Self::Write { .. } | Self::Delete { .. } - | Self::Update { .. } => true, + | Self::Update { .. } + | Self::Restore { .. } => true, } } diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index 6bf1584708..2307a197e8 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -18,6 +18,7 @@ pub mod create; pub mod filesystem_check; #[cfg(all(feature = "arrow", feature = "parquet"))] pub mod optimize; +pub mod restore; pub mod transaction; pub mod vacuum; @@ -29,6 +30,7 @@ use arrow::record_batch::RecordBatch; pub use datafusion::physical_plan::common::collect as collect_sendable_stream; #[cfg(all(feature = "arrow", feature = "parquet"))] use optimize::OptimizeBuilder; +use restore::RestoreBuilder; #[cfg(feature = "datafusion")] pub mod delete; @@ -149,6 +151,12 @@ impl DeltaOps { pub fn update(self) -> UpdateBuilder { UpdateBuilder::new(self.0.object_store(), self.0.state) } + + /// Restore delta table to a specified version or datetime + #[must_use] + pub fn restore(self) -> RestoreBuilder { + RestoreBuilder::new(self.0.object_store(), self.0.state) + } } impl From for DeltaOps { diff --git a/rust/src/operations/restore.rs b/rust/src/operations/restore.rs new file mode 100644 index 0000000000..d1e1a8b4de --- /dev/null +++ b/rust/src/operations/restore.rs @@ -0,0 +1,285 @@ +//! Perform restore of delta table to a specified version or datetime +//! +//! Algorithm: +//! 1) Read the latest state snapshot of the table. +//! 2) Read table state for version or datetime to restore +//! 3) Compute files available in state for restoring (files were removed by some commit) +//! but missed in the latest. Add these files into commit as AddFile action. +//! 4) Compute files available in the latest state snapshot (files were added after version to restore) +//! but missed in the state to restore. Add these files into commit as RemoveFile action. +//! 5) If ignore_missing_files option is false (default value) check availability of AddFile +//! in file system. +//! 6) Commit Protocol, all RemoveFile and AddFile actions +//! into delta log using `try_commit_transaction` (commit will be failed in case of parallel transaction) +//! 7) If table was modified in parallel then ignore restore and raise exception. +//! +//! # Example +//! ```rust ignore +//! let table = open_table("../path/to/table")?; +//! let (table, metrics) = RestoreBuilder::new(table.object_store(), table.state).restore_to_version(1).await?; +//! ```` + +use std::cmp::max; +use std::collections::HashSet; +use std::ops::BitXor; +use std::time::{SystemTime, UNIX_EPOCH}; + +use chrono::{DateTime, Utc}; +use futures::future::BoxFuture; +use object_store::path::Path; +use object_store::ObjectStore; + +use crate::action::{Action, Add, DeltaOperation, Remove}; +use crate::operations::transaction::{prepare_commit, try_commit_transaction}; +use crate::storage::ObjectStoreRef; +use crate::table_state::DeltaTableState; +use crate::{action, DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError}; + +/// Errors that can occur during restore +#[derive(thiserror::Error, Debug)] +enum RestoreError { + #[error("Either the version or datetime should be provided for restore")] + InvalidRestoreParameter, + + #[error("Version to restore {0} should be less then last available version {1}.")] + TooLargeRestoreVersion(i64, i64), + + #[error("Find missing file {0} when restore.")] + MissingDataFile(String), +} + +impl From for DeltaTableError { + fn from(err: RestoreError) -> Self { + DeltaTableError::GenericError { + source: Box::new(err), + } + } +} + +/// Metrics from Restore +#[derive(Default, Debug)] +pub struct RestoreMetrics { + /// Number of files removed + pub num_removed_file: usize, + /// Number of files restored + pub num_restored_file: usize, +} + +/// Restore a Delta table with given version +/// See this module's documentation for more information +pub struct RestoreBuilder { + /// A snapshot of the to-be-restored table's state + snapshot: DeltaTableState, + /// Delta object store for handling data files + store: ObjectStoreRef, + /// Version to restore + version_to_restore: Option, + /// Datetime to restore + datetime_to_restore: Option>, + /// Ignore missing files + ignore_missing_files: bool, + /// Protocol downgrade allowed + protocol_downgrade_allowed: bool, +} + +impl RestoreBuilder { + /// Create a new [`RestoreBuilder`] + pub fn new(store: ObjectStoreRef, snapshot: DeltaTableState) -> Self { + Self { + snapshot, + store, + version_to_restore: None, + datetime_to_restore: None, + ignore_missing_files: false, + protocol_downgrade_allowed: false, + } + } + + /// Set the version to restore + pub fn restore_to_version(mut self, version: i64) -> Self { + self.version_to_restore = Some(version); + self + } + + /// Set the datetime to restore + pub fn restore_to_datetime(mut self, datetime: DateTime) -> Self { + self.datetime_to_restore = Some(datetime); + self + } + + /// Set whether to ignore missing files which delete manually or by vacuum. + /// If true, continue to run when encountering missing files. + pub fn ignore_missing_files(mut self, ignore_missing_files: bool) -> Self { + self.ignore_missing_files = ignore_missing_files; + self + } + + /// Set whether allow to downgrade protocol + pub fn protocol_downgrade_allowed(mut self, protocol_downgrade_allowed: bool) -> Self { + self.protocol_downgrade_allowed = protocol_downgrade_allowed; + self + } +} + +async fn execute( + object_store: ObjectStoreRef, + snapshot: DeltaTableState, + version_to_restore: Option, + datetime_to_restore: Option>, + ignore_missing_files: bool, + protocol_downgrade_allowed: bool, +) -> DeltaResult { + if !(version_to_restore + .is_none() + .bitxor(datetime_to_restore.is_none())) + { + return Err(DeltaTableError::from(RestoreError::InvalidRestoreParameter)); + } + let mut table = DeltaTable::new(object_store.clone(), DeltaTableConfig::default()); + let version = match datetime_to_restore { + Some(datetime) => { + table.load_with_datetime(datetime).await?; + table.version() + } + None => { + table.load_version(version_to_restore.unwrap()).await?; + table.version() + } + }; + + if version >= snapshot.version() { + return Err(DeltaTableError::from(RestoreError::TooLargeRestoreVersion( + version, + snapshot.version(), + ))); + } + let state_to_restore_files = table.get_state().files().clone(); + let latest_state_files = snapshot.files().clone(); + let state_to_restore_files_set = + HashSet::::from_iter(state_to_restore_files.iter().cloned()); + let latest_state_files_set = HashSet::::from_iter(latest_state_files.iter().cloned()); + + let files_to_add: Vec = state_to_restore_files + .into_iter() + .filter(|a: &Add| !latest_state_files_set.contains(a)) + .map(|mut a: Add| -> Add { + a.data_change = true; + a + }) + .collect(); + + let deletion_timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64; + let files_to_remove: Vec = latest_state_files + .into_iter() + .filter(|a: &Add| !state_to_restore_files_set.contains(a)) + .map(|a: Add| -> Remove { + Remove { + path: a.path.clone(), + deletion_timestamp: Some(deletion_timestamp), + data_change: true, + extended_file_metadata: Some(false), + partition_values: Some(a.partition_values.clone()), + size: Some(a.size), + tags: a.tags.clone(), + } + }) + .collect(); + + if !ignore_missing_files { + check_files_available(object_store.as_ref(), &files_to_add).await?; + } + + let metrics = RestoreMetrics { + num_removed_file: files_to_remove.len(), + num_restored_file: files_to_add.len(), + }; + + let mut actions = vec![]; + let protocol = if protocol_downgrade_allowed { + action::Protocol { + min_reader_version: table.get_min_reader_version(), + min_writer_version: table.get_min_writer_version(), + } + } else { + action::Protocol { + min_reader_version: max( + table.get_min_reader_version(), + snapshot.min_reader_version(), + ), + min_writer_version: max( + table.get_min_writer_version(), + snapshot.min_writer_version(), + ), + } + }; + actions.push(Action::protocol(protocol)); + actions.extend(files_to_add.into_iter().map(Action::add)); + actions.extend(files_to_remove.into_iter().map(Action::remove)); + + let commit = prepare_commit( + object_store.as_ref(), + &DeltaOperation::Restore { + version: version_to_restore, + datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }), + }, + &actions, + None, + ) + .await?; + let commit_version = snapshot.version() + 1; + match try_commit_transaction(object_store.as_ref(), &commit, commit_version).await { + Ok(_) => {} + Err(err) => { + object_store.delete(&commit).await?; + return Err(err.into()); + } + } + + Ok(metrics) +} + +async fn check_files_available( + object_store: &dyn ObjectStore, + files: &Vec, +) -> DeltaResult<()> { + for file in files { + let file_path = Path::from(file.path.clone()); + match object_store.head(&file_path).await { + Ok(_) => {} + Err(ObjectStoreError::NotFound { .. }) => { + return Err(DeltaTableError::from(RestoreError::MissingDataFile( + file.path.clone(), + ))) + } + Err(e) => return Err(DeltaTableError::from(e)), + } + } + Ok(()) +} + +impl std::future::IntoFuture for RestoreBuilder { + type Output = DeltaResult<(DeltaTable, RestoreMetrics)>; + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let this = self; + + Box::pin(async move { + let metrics = execute( + this.store.clone(), + this.snapshot.clone(), + this.version_to_restore, + this.datetime_to_restore, + this.ignore_missing_files, + this.protocol_downgrade_allowed, + ) + .await?; + let mut table = DeltaTable::new_with_state(this.store, this.snapshot); + table.update().await?; + Ok((table, metrics)) + }) + } +} diff --git a/rust/src/operations/transaction/mod.rs b/rust/src/operations/transaction/mod.rs index ba285bd046..ff45c123c0 100644 --- a/rust/src/operations/transaction/mod.rs +++ b/rust/src/operations/transaction/mod.rs @@ -132,7 +132,7 @@ pub(crate) async fn prepare_commit<'a>( /// This is low-level transaction API. If user does not want to maintain the commit loop then /// the `DeltaTransaction.commit` is desired to be used as it handles `try_commit_transaction` /// with retry logic. -async fn try_commit_transaction( +pub(crate) async fn try_commit_transaction( storage: &dyn ObjectStore, tmp_commit: &Path, version: i64, diff --git a/rust/tests/command_restore.rs b/rust/tests/command_restore.rs new file mode 100644 index 0000000000..57f3f4c116 --- /dev/null +++ b/rust/tests/command_restore.rs @@ -0,0 +1,210 @@ +#![cfg(all(feature = "arrow", feature = "parquet", feature = "datafusion"))] + +use arrow::datatypes::Schema as ArrowSchema; +use arrow_array::{Int32Array, RecordBatch}; +use arrow_schema::{DataType, Field}; +use chrono::{DateTime, NaiveDateTime, Utc}; +use deltalake::action::SaveMode; +use deltalake::{DeltaOps, DeltaTable, SchemaDataType, SchemaField}; +use rand::Rng; +use std::collections::HashMap; +use std::error::Error; +use std::fs; +use std::sync::Arc; +use tempdir::TempDir; + +#[derive(Debug)] +struct Context { + pub tmp_dir: TempDir, + pub table: DeltaTable, +} + +async fn setup_test() -> Result> { + let columns = vec![ + SchemaField::new( + "id".to_string(), + SchemaDataType::primitive("integer".to_string()), + true, + HashMap::new(), + ), + SchemaField::new( + "value".to_string(), + SchemaDataType::primitive("integer".to_string()), + true, + HashMap::new(), + ), + ]; + + let tmp_dir = tempdir::TempDir::new("restore_table").unwrap(); + let table_uri = tmp_dir.path().to_str().to_owned().unwrap(); + let table = DeltaOps::try_from_uri(table_uri) + .await? + .create() + .with_columns(columns) + .await?; + + let batch = get_record_batch(); + + let table = DeltaOps(table) + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + + let table = DeltaOps(table) + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::Overwrite) + .await + .unwrap(); + + let table = DeltaOps(table) + .write(vec![batch.clone()]) + .with_save_mode(SaveMode::Append) + .await + .unwrap(); + + Ok(Context { tmp_dir, table }) +} + +fn get_record_batch() -> RecordBatch { + let mut id_vec: Vec = Vec::with_capacity(10); + let mut value_vec: Vec = Vec::with_capacity(10); + let mut rng = rand::thread_rng(); + + for _ in 0..10 { + id_vec.push(rng.gen()); + value_vec.push(rng.gen()); + } + + let schema = ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, true), + Field::new("value", DataType::Int32, true), + ]); + + let id_array = Int32Array::from(id_vec); + let value_array = Int32Array::from(value_vec); + + RecordBatch::try_new( + Arc::new(schema), + vec![Arc::new(id_array), Arc::new(value_array)], + ) + .unwrap() +} + +#[tokio::test] +async fn test_restore_by_version() -> Result<(), Box> { + let context = setup_test().await?; + let table = context.table; + let result = DeltaOps(table).restore().restore_to_version(1).await?; + assert_eq!(result.1.num_restored_file, 1); + assert_eq!(result.1.num_removed_file, 2); + assert_eq!(result.0.state.version(), 4); + let table_uri = context.tmp_dir.path().to_str().to_owned().unwrap(); + let mut table = DeltaOps::try_from_uri(table_uri).await?; + table.0.load_version(1).await?; + assert_eq!(table.0.state.files(), result.0.state.files()); + + let result = DeltaOps(result.0).restore().restore_to_version(0).await?; + assert_eq!(result.0.state.files().len(), 0); + Ok(()) +} + +#[tokio::test] +async fn test_restore_by_datetime() -> Result<(), Box> { + let context = setup_test().await?; + let mut table = context.table; + let history = table.history(Some(10)).await?; + let timestamp = history.get(1).unwrap().timestamp.unwrap(); + let naive = NaiveDateTime::from_timestamp_millis(timestamp).unwrap(); + let datetime: DateTime = DateTime::from_utc(naive, Utc); + + let result = DeltaOps(table) + .restore() + .restore_to_datetime(datetime) + .await?; + assert_eq!(result.1.num_restored_file, 1); + assert_eq!(result.1.num_removed_file, 2); + assert_eq!(result.0.state.version(), 4); + Ok(()) +} + +#[tokio::test] +async fn test_restore_with_error_params() -> Result<(), Box> { + let context = setup_test().await?; + let mut table = context.table; + let history = table.history(Some(10)).await?; + let timestamp = history.get(1).unwrap().timestamp.unwrap(); + let naive = NaiveDateTime::from_timestamp_millis(timestamp).unwrap(); + let datetime: DateTime = DateTime::from_utc(naive, Utc); + + // datetime and version both set + let result = DeltaOps(table) + .restore() + .restore_to_version(1) + .restore_to_datetime(datetime) + .await; + assert_eq!(result.is_err(), true); + + // version too large + let table_uri = context.tmp_dir.path().to_str().to_owned().unwrap(); + let ops = DeltaOps::try_from_uri(table_uri).await?; + let result = ops.restore().restore_to_version(5).await; + assert_eq!(result.is_err(), true); + Ok(()) +} + +#[tokio::test] +async fn test_restore_file_missing() -> Result<(), Box> { + let context = setup_test().await?; + + for file in context.table.state.files().iter() { + let p = context.tmp_dir.path().join(file.clone().path); + fs::remove_file(p).unwrap(); + } + + for file in context.table.state.all_tombstones().iter() { + let p = context.tmp_dir.path().join(file.clone().path); + fs::remove_file(p).unwrap(); + } + + let result = DeltaOps(context.table) + .restore() + .restore_to_version(1) + .await; + assert_eq!(result.is_err(), true); + Ok(()) +} + +#[tokio::test] +async fn test_restore_allow_file_missing() -> Result<(), Box> { + let context = setup_test().await?; + + for file in context.table.state.files().iter() { + let p = context.tmp_dir.path().join(file.clone().path); + fs::remove_file(p).unwrap(); + } + + for file in context.table.state.all_tombstones().iter() { + let p = context.tmp_dir.path().join(file.clone().path); + fs::remove_file(p).unwrap(); + } + + let result = DeltaOps(context.table) + .restore() + .ignore_missing_files(true) + .restore_to_version(1) + .await; + assert_eq!(result.is_err(), false); + Ok(()) +} + +#[tokio::test] +async fn test_restore_transaction_conflict() -> Result<(), Box> { + let context = setup_test().await?; + let mut table = context.table; + table.load_version(2).await?; + + let result = DeltaOps(table).restore().restore_to_version(1).await; + assert_eq!(result.is_err(), true); + Ok(()) +} From fcb565076d045f4d485ff4da96260df739d01de7 Mon Sep 17 00:00:00 2001 From: loleek Date: Thu, 29 Jun 2023 19:24:11 +0800 Subject: [PATCH 2/8] feat: implement restore operation (#837) --- rust/src/operations/restore.rs | 2 +- rust/tests/command_restore.rs | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rust/src/operations/restore.rs b/rust/src/operations/restore.rs index d1e1a8b4de..a37d6d67af 100644 --- a/rust/src/operations/restore.rs +++ b/rust/src/operations/restore.rs @@ -183,7 +183,7 @@ async fn execute( extended_file_metadata: Some(false), partition_values: Some(a.partition_values.clone()), size: Some(a.size), - tags: a.tags.clone(), + tags: a.tags, } }) .collect(); diff --git a/rust/tests/command_restore.rs b/rust/tests/command_restore.rs index 57f3f4c116..c6aae90b1d 100644 --- a/rust/tests/command_restore.rs +++ b/rust/tests/command_restore.rs @@ -143,13 +143,13 @@ async fn test_restore_with_error_params() -> Result<(), Box> { .restore_to_version(1) .restore_to_datetime(datetime) .await; - assert_eq!(result.is_err(), true); + assert!(result.is_err()); // version too large let table_uri = context.tmp_dir.path().to_str().to_owned().unwrap(); let ops = DeltaOps::try_from_uri(table_uri).await?; let result = ops.restore().restore_to_version(5).await; - assert_eq!(result.is_err(), true); + assert!(result.is_err()); Ok(()) } @@ -171,7 +171,7 @@ async fn test_restore_file_missing() -> Result<(), Box> { .restore() .restore_to_version(1) .await; - assert_eq!(result.is_err(), true); + assert!(result.is_err()); Ok(()) } @@ -194,7 +194,7 @@ async fn test_restore_allow_file_missing() -> Result<(), Box> { .ignore_missing_files(true) .restore_to_version(1) .await; - assert_eq!(result.is_err(), false); + assert!(result.is_ok()); Ok(()) } @@ -205,6 +205,6 @@ async fn test_restore_transaction_conflict() -> Result<(), Box> { table.load_version(2).await?; let result = DeltaOps(table).restore().restore_to_version(1).await; - assert_eq!(result.is_err(), true); + assert!(result.is_err()); Ok(()) } From 3a412a092a58254275cf84cff4b2e7b533106ce4 Mon Sep 17 00:00:00 2001 From: loleek Date: Thu, 29 Jun 2023 19:24:11 +0800 Subject: [PATCH 3/8] feat: implement restore operation (#837) --- rust/src/operations/restore.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/rust/src/operations/restore.rs b/rust/src/operations/restore.rs index a37d6d67af..5d24cfd3b2 100644 --- a/rust/src/operations/restore.rs +++ b/rust/src/operations/restore.rs @@ -30,7 +30,7 @@ use object_store::path::Path; use object_store::ObjectStore; use crate::action::{Action, Add, DeltaOperation, Remove}; -use crate::operations::transaction::{prepare_commit, try_commit_transaction}; +use crate::operations::transaction::{prepare_commit, TransactionError, try_commit_transaction}; use crate::storage::ObjectStoreRef; use crate::table_state::DeltaTableState; use crate::{action, DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError}; @@ -231,7 +231,10 @@ async fn execute( .await?; let commit_version = snapshot.version() + 1; match try_commit_transaction(object_store.as_ref(), &commit, commit_version).await { - Ok(_) => {} + Ok(_) => {}, + Err(err @ TransactionError::VersionAlreadyExists(_)) => { + return Err(err.into()); + }, Err(err) => { object_store.delete(&commit).await?; return Err(err.into()); From 210c9b6105dcdb0c144be518e82edf3ba73ef5ef Mon Sep 17 00:00:00 2001 From: loleek Date: Thu, 29 Jun 2023 19:24:11 +0800 Subject: [PATCH 4/8] feat: implement restore operation (#837) --- rust/src/operations/restore.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/src/operations/restore.rs b/rust/src/operations/restore.rs index 5d24cfd3b2..2ae84c8633 100644 --- a/rust/src/operations/restore.rs +++ b/rust/src/operations/restore.rs @@ -30,7 +30,7 @@ use object_store::path::Path; use object_store::ObjectStore; use crate::action::{Action, Add, DeltaOperation, Remove}; -use crate::operations::transaction::{prepare_commit, TransactionError, try_commit_transaction}; +use crate::operations::transaction::{prepare_commit, try_commit_transaction, TransactionError}; use crate::storage::ObjectStoreRef; use crate::table_state::DeltaTableState; use crate::{action, DeltaResult, DeltaTable, DeltaTableConfig, DeltaTableError, ObjectStoreError}; @@ -231,10 +231,10 @@ async fn execute( .await?; let commit_version = snapshot.version() + 1; match try_commit_transaction(object_store.as_ref(), &commit, commit_version).await { - Ok(_) => {}, + Ok(_) => {} Err(err @ TransactionError::VersionAlreadyExists(_)) => { return Err(err.into()); - }, + } Err(err) => { object_store.delete(&commit).await?; return Err(err.into()); From 310524a822e282160399323d810a71fc47b4ab49 Mon Sep 17 00:00:00 2001 From: loleek Date: Thu, 29 Jun 2023 19:24:11 +0800 Subject: [PATCH 5/8] feat: implement restore operation (#837) --- rust/src/operations/restore.rs | 6 +++--- rust/tests/command_restore.rs | 21 ++++++++++++--------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/rust/src/operations/restore.rs b/rust/src/operations/restore.rs index 2ae84c8633..d1e8a378aa 100644 --- a/rust/src/operations/restore.rs +++ b/rust/src/operations/restore.rs @@ -16,7 +16,7 @@ //! # Example //! ```rust ignore //! let table = open_table("../path/to/table")?; -//! let (table, metrics) = RestoreBuilder::new(table.object_store(), table.state).restore_to_version(1).await?; +//! let (table, metrics) = RestoreBuilder::new(table.object_store(), table.state).with_version_to_restore(1).await?; //! ```` use std::cmp::max; @@ -96,13 +96,13 @@ impl RestoreBuilder { } /// Set the version to restore - pub fn restore_to_version(mut self, version: i64) -> Self { + pub fn with_version_to_restore(mut self, version: i64) -> Self { self.version_to_restore = Some(version); self } /// Set the datetime to restore - pub fn restore_to_datetime(mut self, datetime: DateTime) -> Self { + pub fn with_datetime_to_restore(mut self, datetime: DateTime) -> Self { self.datetime_to_restore = Some(datetime); self } diff --git a/rust/tests/command_restore.rs b/rust/tests/command_restore.rs index c6aae90b1d..4902d85a28 100644 --- a/rust/tests/command_restore.rs +++ b/rust/tests/command_restore.rs @@ -95,7 +95,7 @@ fn get_record_batch() -> RecordBatch { async fn test_restore_by_version() -> Result<(), Box> { let context = setup_test().await?; let table = context.table; - let result = DeltaOps(table).restore().restore_to_version(1).await?; + let result = DeltaOps(table).restore().with_version_to_restore(1).await?; assert_eq!(result.1.num_restored_file, 1); assert_eq!(result.1.num_removed_file, 2); assert_eq!(result.0.state.version(), 4); @@ -104,7 +104,10 @@ async fn test_restore_by_version() -> Result<(), Box> { table.0.load_version(1).await?; assert_eq!(table.0.state.files(), result.0.state.files()); - let result = DeltaOps(result.0).restore().restore_to_version(0).await?; + let result = DeltaOps(result.0) + .restore() + .with_version_to_restore(0) + .await?; assert_eq!(result.0.state.files().len(), 0); Ok(()) } @@ -120,7 +123,7 @@ async fn test_restore_by_datetime() -> Result<(), Box> { let result = DeltaOps(table) .restore() - .restore_to_datetime(datetime) + .with_datetime_to_restore(datetime) .await?; assert_eq!(result.1.num_restored_file, 1); assert_eq!(result.1.num_removed_file, 2); @@ -140,15 +143,15 @@ async fn test_restore_with_error_params() -> Result<(), Box> { // datetime and version both set let result = DeltaOps(table) .restore() - .restore_to_version(1) - .restore_to_datetime(datetime) + .with_version_to_restore(1) + .with_datetime_to_restore(datetime) .await; assert!(result.is_err()); // version too large let table_uri = context.tmp_dir.path().to_str().to_owned().unwrap(); let ops = DeltaOps::try_from_uri(table_uri).await?; - let result = ops.restore().restore_to_version(5).await; + let result = ops.restore().with_version_to_restore(5).await; assert!(result.is_err()); Ok(()) } @@ -169,7 +172,7 @@ async fn test_restore_file_missing() -> Result<(), Box> { let result = DeltaOps(context.table) .restore() - .restore_to_version(1) + .with_version_to_restore(1) .await; assert!(result.is_err()); Ok(()) @@ -192,7 +195,7 @@ async fn test_restore_allow_file_missing() -> Result<(), Box> { let result = DeltaOps(context.table) .restore() .ignore_missing_files(true) - .restore_to_version(1) + .with_version_to_restore(1) .await; assert!(result.is_ok()); Ok(()) @@ -204,7 +207,7 @@ async fn test_restore_transaction_conflict() -> Result<(), Box> { let mut table = context.table; table.load_version(2).await?; - let result = DeltaOps(table).restore().restore_to_version(1).await; + let result = DeltaOps(table).restore().with_version_to_restore(1).await; assert!(result.is_err()); Ok(()) } From b8389a79c1a7cca961c2f55d93f5a01af0690c6d Mon Sep 17 00:00:00 2001 From: loleek Date: Tue, 11 Jul 2023 21:37:34 +0800 Subject: [PATCH 6/8] feat: add restore command in python binding --- python/deltalake/table.py | 24 ++++++++++++++++ python/docs/source/usage.rst | 19 +++++++++++++ python/src/lib.rs | 30 ++++++++++++++++++++ python/tests/test_restore.py | 52 ++++++++++++++++++++++++++++++++++ rust/src/operations/restore.rs | 8 ++++-- rust/tests/command_restore.rs | 2 +- 6 files changed, 131 insertions(+), 4 deletions(-) create mode 100644 python/tests/test_restore.py diff --git a/python/deltalake/table.py b/python/deltalake/table.py index b57cad0213..896485b2ef 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -459,6 +459,30 @@ def pyarrow_schema(self) -> pyarrow.Schema: ) return self.schema().to_pyarrow() + def restore( + self, + version_to_restore: Optional[int] = None, + datetime_to_restore: Optional[str] = None, + ignore_missing_files: bool = False, + protocol_downgrade_allowed: bool = False, + ) -> Dict[str, Any]: + """ + Run the Restore command on the Delta Table: restore table to a given version or datetime. + + :param version_to_restore: the expected version will restore. + :param datetime_to_restore: the expected datetime will restore. + :param ignore_missing_files: whether the operation carry on when some data files missing. + :param protocol_downgrade_allowed: whether the operation when protocol version upgraded. + :return: the metrics from restore. + """ + metrics = self._table.restore( + version_to_restore, + datetime_to_restore, + ignore_missing_files, + protocol_downgrade_allowed, + ) + return json.loads(metrics) + def to_pyarrow_dataset( self, partitions: Optional[List[Tuple[str, str, Any]]] = None, diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index fdecfdc1a1..6762c1e787 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -508,3 +508,22 @@ the method will raise an error. This method could also be used to insert a new partition if one doesn't already exist, making this operation idempotent. + + +Restoring tables +~~~~~~~~~~~~~~~~ + +Restoring a table will restore delta table to a specified version or datetime. This +operation compares the current state of the delta table with the state to be restored. +And add those missing files into the AddFile actions and add redundant files into +RemoveFile actions.Then commit into a new version. + + +Use :meth:`DeltaTable.restore` to perform the restore operation. Note that if any other +concurrent operation was performed on the table, restore will fail. + +.. code-block:: python + + >>> dt = DeltaTable("../rust/tests/data/simple_table") + >>> dt.restore(1) + {'numRemovedFile': 5, 'numRestoredFile': 22} diff --git a/python/src/lib.rs b/python/src/lib.rs index ca7247c365..de9813dba8 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -25,6 +25,7 @@ use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; +use deltalake::operations::restore::RestoreBuilder; use deltalake::operations::transaction::commit; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::partitions::PartitionFilter; @@ -318,6 +319,35 @@ impl RawDeltaTable { Ok(serde_json::to_string(&metrics).unwrap()) } + // Run the restore command on the Delta Table: restore table to a given version or datetime + #[pyo3(signature = (version_to_restore = None, datetime_to_restore = None, ignore_missing_files = false, protocol_downgrade_allowed = false))] + pub fn restore( + &mut self, + version_to_restore: Option, + datetime_to_restore: Option<&str>, + ignore_missing_files: bool, + protocol_downgrade_allowed: bool, + ) -> PyResult { + let mut cmd = RestoreBuilder::new(self._table.object_store(), self._table.state.clone()); + if let Some(version) = version_to_restore { + cmd = cmd.with_version_to_restore(version) + } + if let Some(ds) = datetime_to_restore { + let datetime = + DateTime::::from(DateTime::::parse_from_rfc3339(ds).map_err( + |err| PyValueError::new_err(format!("Failed to parse datetime string: {err}")), + )?); + cmd = cmd.with_datetime_to_restore(datetime) + } + cmd = cmd.with_ignore_missing_files(ignore_missing_files); + cmd = cmd.with_protocol_downgrade_allowed(protocol_downgrade_allowed); + let (table, metrics) = rt()? + .block_on(cmd.into_future()) + .map_err(PythonError::from)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + } + /// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table. pub fn history(&mut self, limit: Option) -> PyResult> { let history = rt()? diff --git a/python/tests/test_restore.py b/python/tests/test_restore.py new file mode 100644 index 0000000000..a4be733cab --- /dev/null +++ b/python/tests/test_restore.py @@ -0,0 +1,52 @@ +import pathlib + +import pyarrow as pa +import pytest + +from deltalake import DeltaTable, write_deltalake + + +@pytest.mark.parametrize("use_relative", [True, False]) +def test_restore_with_version( + tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool +): + if use_relative: + monkeypatch.chdir(tmp_path) # Make tmp_path the working directory + (tmp_path / "path/to/table").mkdir(parents=True) + table_path = "./path/to/table" + else: + table_path = str(tmp_path) + + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + + dt = DeltaTable(table_path) + old_version = dt.version() + dt.restore(version_to_restore=1) + last_action = dt.history(1)[0] + assert last_action["operation"] == "RESTORE" + assert dt.version() == old_version + 1 + + +@pytest.mark.parametrize("use_relative", [True, False]) +def test_restore_with_datetime( + tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool +): + if use_relative: + monkeypatch.chdir(tmp_path) # Make tmp_path the working directory + (tmp_path / "path/to/table").mkdir(parents=True) + table_path = "./path/to/table" + else: + table_path = str(tmp_path) + + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + + dt = DeltaTable(table_path) + old_version = dt.version() + dt.restore(datetime_to_restore="2020-05-01T00:47:31-07:00") + last_action = dt.history(1)[0] + assert last_action["operation"] == "RESTORE" + assert dt.version() == old_version + 1 diff --git a/rust/src/operations/restore.rs b/rust/src/operations/restore.rs index d1e8a378aa..77bcaa492a 100644 --- a/rust/src/operations/restore.rs +++ b/rust/src/operations/restore.rs @@ -28,6 +28,7 @@ use chrono::{DateTime, Utc}; use futures::future::BoxFuture; use object_store::path::Path; use object_store::ObjectStore; +use serde::Serialize; use crate::action::{Action, Add, DeltaOperation, Remove}; use crate::operations::transaction::{prepare_commit, try_commit_transaction, TransactionError}; @@ -57,7 +58,8 @@ impl From for DeltaTableError { } /// Metrics from Restore -#[derive(Default, Debug)] +#[derive(Default, Debug, Serialize)] +#[serde(rename_all = "camelCase")] pub struct RestoreMetrics { /// Number of files removed pub num_removed_file: usize, @@ -109,13 +111,13 @@ impl RestoreBuilder { /// Set whether to ignore missing files which delete manually or by vacuum. /// If true, continue to run when encountering missing files. - pub fn ignore_missing_files(mut self, ignore_missing_files: bool) -> Self { + pub fn with_ignore_missing_files(mut self, ignore_missing_files: bool) -> Self { self.ignore_missing_files = ignore_missing_files; self } /// Set whether allow to downgrade protocol - pub fn protocol_downgrade_allowed(mut self, protocol_downgrade_allowed: bool) -> Self { + pub fn with_protocol_downgrade_allowed(mut self, protocol_downgrade_allowed: bool) -> Self { self.protocol_downgrade_allowed = protocol_downgrade_allowed; self } diff --git a/rust/tests/command_restore.rs b/rust/tests/command_restore.rs index 4902d85a28..7a54f7e3a3 100644 --- a/rust/tests/command_restore.rs +++ b/rust/tests/command_restore.rs @@ -194,7 +194,7 @@ async fn test_restore_allow_file_missing() -> Result<(), Box> { let result = DeltaOps(context.table) .restore() - .ignore_missing_files(true) + .with_ignore_missing_files(true) .with_version_to_restore(1) .await; assert!(result.is_ok()); From 4750b03b18b62e5b2e512683c402a065c7f36547 Mon Sep 17 00:00:00 2001 From: loleek Date: Tue, 11 Jul 2023 21:37:34 +0800 Subject: [PATCH 7/8] feat: add restore command in python binding --- python/deltalake/table.py | 26 ++++++++++++++++---------- python/src/lib.rs | 26 ++++++++++++++------------ python/tests/test_restore.py | 31 +++++++++++++++++++++++++++++-- 3 files changed, 59 insertions(+), 24 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 896485b2ef..875c346d9e 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -2,6 +2,7 @@ import operator import warnings from dataclasses import dataclass +from datetime import datetime from functools import reduce from pathlib import Path from typing import ( @@ -461,26 +462,31 @@ def pyarrow_schema(self) -> pyarrow.Schema: def restore( self, - version_to_restore: Optional[int] = None, - datetime_to_restore: Optional[str] = None, + target: Union[int, datetime, str], + *, ignore_missing_files: bool = False, protocol_downgrade_allowed: bool = False, ) -> Dict[str, Any]: """ Run the Restore command on the Delta Table: restore table to a given version or datetime. - :param version_to_restore: the expected version will restore. - :param datetime_to_restore: the expected datetime will restore. + :param target: the expected version will restore, which represented by int, date str or datetime. :param ignore_missing_files: whether the operation carry on when some data files missing. :param protocol_downgrade_allowed: whether the operation when protocol version upgraded. :return: the metrics from restore. """ - metrics = self._table.restore( - version_to_restore, - datetime_to_restore, - ignore_missing_files, - protocol_downgrade_allowed, - ) + if isinstance(target, datetime): + metrics = self._table.restore( + target.isoformat(), + ignore_missing_files=ignore_missing_files, + protocol_downgrade_allowed=protocol_downgrade_allowed, + ) + else: + metrics = self._table.restore( + target, + ignore_missing_files=ignore_missing_files, + protocol_downgrade_allowed=protocol_downgrade_allowed, + ) return json.loads(metrics) def to_pyarrow_dataset( diff --git a/python/src/lib.rs b/python/src/lib.rs index 829c26a5a5..568e2cab0d 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -320,24 +320,26 @@ impl RawDeltaTable { } // Run the restore command on the Delta Table: restore table to a given version or datetime - #[pyo3(signature = (version_to_restore = None, datetime_to_restore = None, ignore_missing_files = false, protocol_downgrade_allowed = false))] + #[pyo3(signature = (target, *, ignore_missing_files = false, protocol_downgrade_allowed = false))] pub fn restore( &mut self, - version_to_restore: Option, - datetime_to_restore: Option<&str>, + target: Option<&PyAny>, ignore_missing_files: bool, protocol_downgrade_allowed: bool, ) -> PyResult { let mut cmd = RestoreBuilder::new(self._table.object_store(), self._table.state.clone()); - if let Some(version) = version_to_restore { - cmd = cmd.with_version_to_restore(version) - } - if let Some(ds) = datetime_to_restore { - let datetime = - DateTime::::from(DateTime::::parse_from_rfc3339(ds).map_err( - |err| PyValueError::new_err(format!("Failed to parse datetime string: {err}")), - )?); - cmd = cmd.with_datetime_to_restore(datetime) + if let Some(val) = target { + if let Ok(version) = val.extract::() { + cmd = cmd.with_version_to_restore(version) + } + if let Ok(ds) = val.extract::<&str>() { + let datetime = DateTime::::from( + DateTime::::parse_from_rfc3339(ds).map_err(|err| { + PyValueError::new_err(format!("Failed to parse datetime string: {err}")) + })?, + ); + cmd = cmd.with_datetime_to_restore(datetime) + } } cmd = cmd.with_ignore_missing_files(ignore_missing_files); cmd = cmd.with_protocol_downgrade_allowed(protocol_downgrade_allowed); diff --git a/python/tests/test_restore.py b/python/tests/test_restore.py index a4be733cab..d877b97c40 100644 --- a/python/tests/test_restore.py +++ b/python/tests/test_restore.py @@ -1,3 +1,4 @@ +import datetime import pathlib import pyarrow as pa @@ -23,7 +24,30 @@ def test_restore_with_version( dt = DeltaTable(table_path) old_version = dt.version() - dt.restore(version_to_restore=1) + dt.restore(1) + last_action = dt.history(1)[0] + assert last_action["operation"] == "RESTORE" + assert dt.version() == old_version + 1 + + +@pytest.mark.parametrize("use_relative", [True, False]) +def test_restore_with_datetime_str( + tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool +): + if use_relative: + monkeypatch.chdir(tmp_path) # Make tmp_path the working directory + (tmp_path / "path/to/table").mkdir(parents=True) + table_path = "./path/to/table" + else: + table_path = str(tmp_path) + + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + + dt = DeltaTable(table_path) + old_version = dt.version() + dt.restore("2020-05-01T00:47:31-07:00") last_action = dt.history(1)[0] assert last_action["operation"] == "RESTORE" assert dt.version() == old_version + 1 @@ -46,7 +70,10 @@ def test_restore_with_datetime( dt = DeltaTable(table_path) old_version = dt.version() - dt.restore(datetime_to_restore="2020-05-01T00:47:31-07:00") + date = datetime.datetime.strptime( + "2023-04-26T21:23:32+08:00", "%Y-%m-%dT%H:%M:%S%z" + ) + dt.restore(date) last_action = dt.history(1)[0] assert last_action["operation"] == "RESTORE" assert dt.version() == old_version + 1 From 3d659a852a10446a09b4f7ed74af20f1102eccd9 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 26 Jul 2023 20:06:45 -0700 Subject: [PATCH 8/8] fix docs build warnings --- python/docs/source/conf.py | 1 + python/docs/source/usage.rst | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index 692dc758fe..72a6cd929d 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -65,6 +65,7 @@ def get_release_version() -> str: ("py:class", "pandas.DataFrame"), ("py:class", "pyarrow._dataset_parquet.ParquetFileWriteOptions"), ("py:class", "pathlib.Path"), + ("py:class", "datetime.datetime"), ] # Add any paths that contain templates here, relative to this directory. diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index 6762c1e787..5dc5a0959e 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -513,10 +513,12 @@ exist, making this operation idempotent. Restoring tables ~~~~~~~~~~~~~~~~ +.. py:currentmodule:: deltalake.table + Restoring a table will restore delta table to a specified version or datetime. This operation compares the current state of the delta table with the state to be restored. And add those missing files into the AddFile actions and add redundant files into -RemoveFile actions.Then commit into a new version. +RemoveFile actions. Then commit into a new version. Use :meth:`DeltaTable.restore` to perform the restore operation. Note that if any other