From 1fa8d6d18bd2962986271c781d640a569266c1f0 Mon Sep 17 00:00:00 2001 From: Robert Pack <42610831+roeap@users.noreply.github.com> Date: Sun, 19 Jun 2022 22:51:24 +0200 Subject: [PATCH] refactor: move version field to `DeltaTableState` (#649) * refactor: move version field to table state * docs: add comments explaining how treats the version field --- python/src/lib.rs | 2 +- ruby/src/lib.rs | 2 +- rust/src/checkpoints.rs | 8 ++-- rust/src/delta.rs | 49 ++++++++++++------------ rust/src/operations/create.rs | 4 +- rust/src/operations/mod.rs | 10 ++--- rust/src/operations/write.rs | 14 +++---- rust/src/optimize.rs | 2 +- rust/src/table_state.rs | 30 +++++++++++++-- rust/tests/adls_gen2_table_test.rs | 8 ++-- rust/tests/checkpoint_writer_test.rs | 4 +- rust/tests/concurrent_writes_test.rs | 2 +- rust/tests/gcs_test.rs | 2 +- rust/tests/optimize_test.rs | 28 +++++++------- rust/tests/read_delta_partitions_test.rs | 2 +- rust/tests/read_delta_test.rs | 26 ++++++------- rust/tests/read_simple_table_test.rs | 24 ++++++------ rust/tests/s3_test.rs | 8 ++-- rust/tests/simple_commit_test.rs | 20 +++++----- 19 files changed, 134 insertions(+), 111 deletions(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index 8066ebcd0e..90391731e3 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -137,7 +137,7 @@ impl RawDeltaTable { } pub fn version(&self) -> PyResult { - Ok(self._table.version) + Ok(self._table.version()) } pub fn metadata(&self) -> PyResult { diff --git a/ruby/src/lib.rs b/ruby/src/lib.rs index 18f077f9e4..2323314b13 100644 --- a/ruby/src/lib.rs +++ b/ruby/src/lib.rs @@ -33,7 +33,7 @@ impl TableData { } fn version(&self) -> i64 { - self.actual.version + self.actual.version() } fn files(&self) -> Vec { diff --git a/rust/src/checkpoints.rs b/rust/src/checkpoints.rs index b8f774b4f8..c708eaa9ef 100644 --- a/rust/src/checkpoints.rs +++ b/rust/src/checkpoints.rs @@ -86,10 +86,10 @@ impl From for ArrowError { } } -/// Creates checkpoint at `table.version` for given `table`. +/// Creates checkpoint at current table version pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), CheckpointError> { create_checkpoint_for( - table.version, + table.version(), table.get_state(), table.storage.as_ref(), &table.table_uri, @@ -105,7 +105,7 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result= 0 && enable_expired_log_cleanup { + if table.version() >= 0 && enable_expired_log_cleanup { let deleted_log_num = cleanup_metadata(&table).await?; debug!("Deleted {:?} log files.", deleted_log_num); } diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 291f51baf4..9cfe0b2eb3 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -34,7 +34,7 @@ use crate::delta_config::DeltaConfigError; #[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)] pub struct CheckPoint { /// Delta table version - version: DeltaDataTypeVersion, // 20 digits decimals + pub(crate) version: DeltaDataTypeVersion, // 20 digits decimals size: DeltaDataTypeLong, parts: Option, // 10 digits decimals } @@ -583,15 +583,12 @@ pub enum PeekCommit { /// In memory representation of a Delta Table pub struct DeltaTable { - /// The version of the table as of the most recent loaded Delta log entry. - pub version: DeltaDataTypeVersion, + /// The state of the table as of the most recent loaded Delta log entry. + pub state: DeltaTableState, /// The URI the DeltaTable was loaded from. pub table_uri: String, /// the load options used during load pub config: DeltaTableConfig, - - state: DeltaTableState, - // metadata // application_transactions pub(crate) storage: Box, @@ -795,11 +792,15 @@ impl DeltaTable { Ok(version) } + /// Currently loaded evrsion of the table + pub fn version(&self) -> DeltaDataTypeVersion { + self.state.version + } + /// Load DeltaTable with data from latest checkpoint pub async fn load(&mut self) -> Result<(), DeltaTableError> { self.last_check_point = None; - self.version = -1; - self.state = DeltaTableState::default(); + self.state = DeltaTableState::with_version(-1); self.update().await } @@ -832,14 +833,16 @@ impl DeltaTable { new_version: DeltaDataTypeVersion, actions: Vec, ) -> Result<(), DeltaTableError> { - if self.version + 1 != new_version { - return Err(DeltaTableError::VersionMismatch(new_version, self.version)); + if self.version() + 1 != new_version { + return Err(DeltaTableError::VersionMismatch( + new_version, + self.version(), + )); } - let s = DeltaTableState::from_actions(actions)?; + let s = DeltaTableState::from_actions(actions, new_version)?; self.state .merge(s, self.config.require_tombstones, self.config.require_files); - self.version = new_version; Ok(()) } @@ -854,7 +857,7 @@ impl DeltaTable { } else { self.last_check_point = Some(last_check_point); self.restore_checkpoint(last_check_point).await?; - self.version = last_check_point.version; + self.state.version = last_check_point.version; self.update_incremental().await } } @@ -866,11 +869,11 @@ impl DeltaTable { /// Updates the DeltaTable to the latest version by incrementally applying newer versions. /// It assumes that the table is already updated to the current version `self.version`. pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> { - while let PeekCommit::New(version, actions) = self.peek_next_commit(self.version).await? { + while let PeekCommit::New(version, actions) = self.peek_next_commit(self.version()).await? { self.apply_actions(version, actions)?; } - if self.version == -1 { + if self.version() == -1 { let err = format!( "No snapshot or version 0 found, perhaps {} is an empty dir?", self.table_uri @@ -897,7 +900,6 @@ impl DeltaTable { return Err(DeltaTableError::from(e)); } } - self.version = version; let mut next_version; // 1. find latest checkpoint below version @@ -908,13 +910,13 @@ impl DeltaTable { } None => { // no checkpoint found, clear table state and start from the beginning - self.state = DeltaTableState::default(); + self.state = DeltaTableState::with_version(0); next_version = 0; } } // 2. apply all logs starting from checkpoint - while next_version <= self.version { + while next_version <= version { self.apply_log(next_version).await?; next_version += 1; } @@ -951,7 +953,7 @@ impl DeltaTable { limit: Option, ) -> Result>, DeltaTableError> { let mut version = match limit { - Some(l) => max(self.version - l as i64 + 1, 0), + Some(l) => max(self.version() - l as i64 + 1, 0), None => self.get_earliest_delta_log_version().await?, }; let mut commit_infos_list = vec![]; @@ -1309,8 +1311,7 @@ impl DeltaTable { let table_uri = storage_backend.trim_path(table_uri); let log_uri_normalized = storage_backend.join_path(&table_uri, "_delta_log"); Ok(Self { - version: -1, - state: DeltaTableState::default(), + state: DeltaTableState::with_version(-1), storage: storage_backend, table_uri, config, @@ -1412,7 +1413,7 @@ impl DeltaTable { impl fmt::Display for DeltaTable { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { writeln!(f, "DeltaTable({})", self.table_uri)?; - writeln!(f, "\tversion: {}", self.version)?; + writeln!(f, "\tversion: {}", self.version())?; match self.state.current_metadata() { Some(metadata) => { writeln!(f, "\tmetadata: {}", metadata)?; @@ -1599,7 +1600,7 @@ impl<'a> DeltaTransaction<'a> { loop { self.delta_table.update().await?; - let version = self.delta_table.version + 1; + let version = self.delta_table.version() + 1; match self .delta_table @@ -1793,7 +1794,7 @@ mod tests { // Validation // assert DeltaTable version is now 0 and no data files have been added - assert_eq!(dt.version, 0); + assert_eq!(dt.version(), 0); assert_eq!(dt.state.files().len(), 0); // assert new _delta_log file created in tempDir diff --git a/rust/src/operations/create.rs b/rust/src/operations/create.rs index 97feb1a896..7a1d42a92c 100644 --- a/rust/src/operations/create.rs +++ b/rust/src/operations/create.rs @@ -203,7 +203,7 @@ mod tests { assert!(log_path.exists()); let mut table = open_table(&table_uri).await.unwrap(); - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); // Check we can create an existing table with ignore let ts1 = table.get_version_timestamp(0).await.unwrap(); @@ -211,7 +211,7 @@ mod tests { let _result = child.wait().unwrap(); let _ = collect(transaction, task_ctx.clone()).await.unwrap(); let mut table = open_table(&table_uri).await.unwrap(); - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); let ts2 = table.get_version_timestamp(0).await.unwrap(); assert_eq!(ts1, ts2); diff --git a/rust/src/operations/mod.rs b/rust/src/operations/mod.rs index ccc4ac1da2..a87b038728 100644 --- a/rust/src/operations/mod.rs +++ b/rust/src/operations/mod.rs @@ -122,7 +122,7 @@ impl DeltaCommands { ) -> DeltaCommandResult<()> { let transaction = Arc::new(DeltaTransactionPlan::new( self.table.table_uri.clone(), - self.table.version, + self.table.version(), plan, operation, None, @@ -259,7 +259,7 @@ mod tests { .unwrap(); let table = open_table(&table_uri).await.unwrap(); - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); let res = commands.create(metadata, SaveMode::ErrorIfExists).await; assert!(res.is_err()) @@ -270,7 +270,7 @@ mod tests { let batch = get_record_batch(None, false); let partition_cols = vec!["modified".to_string()]; let mut table = create_initialized_table(&partition_cols).await; - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); let mut commands = DeltaCommands::try_from_uri(table.table_uri.to_string()) .await @@ -286,7 +286,7 @@ mod tests { .unwrap(); table.update().await.unwrap(); - assert_eq!(table.version, 1); + assert_eq!(table.version(), 1); let files = table.get_file_uris(); assert_eq!(files.collect::>().len(), 2) @@ -311,7 +311,7 @@ mod tests { .unwrap(); let table = open_table(&table_uri).await.unwrap(); - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); let files = table.get_file_uris(); assert_eq!(files.collect::>().len(), 2) diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index f1de0530e7..da362bef9f 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -423,7 +423,7 @@ mod tests { async fn test_append_data() { let partition_cols = vec!["modified".to_string()]; let mut table = create_initialized_table(&partition_cols).await; - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); let transaction = get_transaction(table.table_uri.clone(), 0, SaveMode::Append); let session_ctx = SessionContext::new(); @@ -434,20 +434,20 @@ mod tests { .unwrap(); table.update().await.unwrap(); assert_eq!(table.get_file_uris().collect::>().len(), 2); - assert_eq!(table.version, 1); + assert_eq!(table.version(), 1); let transaction = get_transaction(table.table_uri.clone(), 1, SaveMode::Append); let _ = collect(transaction.clone(), task_ctx).await.unwrap(); table.update().await.unwrap(); assert_eq!(table.get_file_uris().collect::>().len(), 4); - assert_eq!(table.version, 2); + assert_eq!(table.version(), 2); } #[tokio::test] async fn test_overwrite_data() { let partition_cols = vec!["modified".to_string()]; let mut table = create_initialized_table(&partition_cols).await; - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); let transaction = get_transaction(table.table_uri.clone(), 0, SaveMode::Overwrite); let session_ctx = SessionContext::new(); @@ -458,13 +458,13 @@ mod tests { .unwrap(); table.update().await.unwrap(); assert_eq!(table.get_file_uris().collect::>().len(), 2); - assert_eq!(table.version, 1); + assert_eq!(table.version(), 1); let transaction = get_transaction(table.table_uri.clone(), 1, SaveMode::Overwrite); let _ = collect(transaction.clone(), task_ctx).await.unwrap(); table.update().await.unwrap(); assert_eq!(table.get_file_uris().collect::>().len(), 2); - assert_eq!(table.version, 2); + assert_eq!(table.version(), 2); } #[tokio::test] @@ -487,7 +487,7 @@ mod tests { // THe table should be created on write and thus have version 0 let table = open_table(table_path.to_str().unwrap()).await.unwrap(); assert_eq!(table.get_file_uris().collect::>().len(), 2); - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); } fn get_transaction( diff --git a/rust/src/optimize.rs b/rust/src/optimize.rs index 37f1bf8a16..ab07507b2a 100644 --- a/rust/src/optimize.rs +++ b/rust/src/optimize.rs @@ -284,7 +284,7 @@ impl MergePlan { // optimized partition was updated then abort the commit. Requires (#593). if !actions.is_empty() { let mut metadata = Map::new(); - metadata.insert("readVersion".to_owned(), table.version.into()); + metadata.insert("readVersion".to_owned(), table.version().into()); let maybe_map_metrics = serde_json::to_value(metrics.clone()); if let Ok(map) = maybe_map_metrics { metadata.insert("operationMetrics".to_owned(), map); diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 3a6d2f82d9..93f378d526 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -21,6 +21,7 @@ use crate::delta_config; /// State snapshot currently held by the Delta Table instance. #[derive(Default, Debug, Clone)] pub struct DeltaTableState { + pub version: DeltaDataTypeVersion, // A remove action should remain in the state of the table as a tombstone until it has expired. // A tombstone expires when the creation timestamp of the delta file exceeds the expiration tombstones: HashSet, @@ -36,6 +37,12 @@ pub struct DeltaTableState { } impl DeltaTableState { + pub fn with_version(version: DeltaDataTypeVersion) -> Self { + Self { + version, + ..Self::default() + } + } /// Construct a delta table state object from commit version. pub async fn from_commit( table: &DeltaTable, @@ -45,7 +52,7 @@ impl DeltaTableState { let commit_log_bytes = table.storage.get_obj(&commit_uri).await?; let reader = BufReader::new(Cursor::new(commit_log_bytes)); - let mut new_state = DeltaTableState::default(); + let mut new_state = DeltaTableState::with_version(version); for line in reader.lines() { let action: action::Action = serde_json::from_str(line?.as_str())?; new_state.process_action( @@ -59,8 +66,11 @@ impl DeltaTableState { } /// Construct a delta table state object from a list of actions - pub fn from_actions(actions: Vec) -> Result { - let mut new_state = DeltaTableState::default(); + pub fn from_actions( + actions: Vec, + version: DeltaDataTypeVersion, + ) -> Result { + let mut new_state = DeltaTableState::with_version(version); for action in actions { new_state.process_action(action, true, true)?; } @@ -74,7 +84,7 @@ impl DeltaTableState { ) -> Result { let checkpoint_data_paths = table.get_checkpoint_data_paths(check_point); // process actions from checkpoint - let mut new_state = DeltaTableState::default(); + let mut new_state = DeltaTableState::with_version(check_point.version); for f in &checkpoint_data_paths { let obj = table.storage.get_obj(f).await?; @@ -159,6 +169,13 @@ impl DeltaTableState { } /// Merges new state information into our state + /// + /// The DeltaTableState also carries the version information for the given state, + /// as there is a one-to-one match between a table state and a version. In merge/update + /// scenarios we cannot infer the intended / correct version number. By default this + /// function will update the tracked version if the version on `new_state` is larger then the + /// currently set version however it is up to the caller to update the `version` field according + /// to the version the merged state represents. pub fn merge( &mut self, mut new_state: DeltaTableState, @@ -211,6 +228,10 @@ impl DeltaTableState { if !new_state.commit_infos.is_empty() { self.commit_infos.append(&mut new_state.commit_infos); } + + if self.version < new_state.version { + self.version = new_state.version + } } /// Process given action by updating current state. @@ -276,6 +297,7 @@ mod tests { app_transaction_version.insert("xyz".to_string(), 1); let mut state = DeltaTableState { + version: -1, files: vec![], commit_infos: vec![], tombstones: HashSet::new(), diff --git a/rust/tests/adls_gen2_table_test.rs b/rust/tests/adls_gen2_table_test.rs index 20a6876f00..531ce0b83d 100644 --- a/rust/tests/adls_gen2_table_test.rs +++ b/rust/tests/adls_gen2_table_test.rs @@ -33,7 +33,7 @@ mod adls_gen2_table { .await .unwrap(); - assert_eq!(table.version, 4); + assert_eq!(table.version(), 4); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); assert_eq!( @@ -87,7 +87,7 @@ mod adls_gen2_table { let table = builder.load().await.unwrap(); - assert_eq!(table.version, 4); + assert_eq!(table.version(), 4); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); assert_eq!( @@ -148,7 +148,7 @@ mod adls_gen2_table { .unwrap(); // Assert 1 - assert_eq!(0, dt.version); + assert_eq!(0, dt.version()); assert_eq!(1, dt.get_min_reader_version()); assert_eq!(2, dt.get_min_writer_version()); assert_eq!(0, dt.get_files().len()); @@ -161,7 +161,7 @@ mod adls_gen2_table { // Assert 2 assert_eq!(1, version); - assert_eq!(version, dt.version); + assert_eq!(version, dt.version()); assert_eq!(2, dt.get_files().len()); // Cleanup diff --git a/rust/tests/checkpoint_writer_test.rs b/rust/tests/checkpoint_writer_test.rs index 406eb9fa45..a8cb108d8a 100644 --- a/rust/tests/checkpoint_writer_test.rs +++ b/rust/tests/checkpoint_writer_test.rs @@ -135,7 +135,7 @@ mod delete_expired_delta_log_in_checkpoint { checkpoints::create_checkpoint_from_table_uri_and_cleanup( &table.table_uri, - table.version, + table.version(), None, ) .await @@ -178,7 +178,7 @@ mod delete_expired_delta_log_in_checkpoint { checkpoints::create_checkpoint_from_table_uri_and_cleanup( &table.table_uri, - table.version, + table.version(), None, ) .await diff --git a/rust/tests/concurrent_writes_test.rs b/rust/tests/concurrent_writes_test.rs index 9f592d79b7..242e785521 100644 --- a/rust/tests/concurrent_writes_test.rs +++ b/rust/tests/concurrent_writes_test.rs @@ -87,7 +87,7 @@ async fn concurrent_writes_azure() { .await .unwrap(); - assert_eq!(0, dt.version); + assert_eq!(0, dt.version()); assert_eq!(1, dt.get_min_reader_version()); assert_eq!(2, dt.get_min_writer_version()); assert_eq!(0, dt.get_files().len()); diff --git a/rust/tests/gcs_test.rs b/rust/tests/gcs_test.rs index dd512eda45..a01a83b33b 100644 --- a/rust/tests/gcs_test.rs +++ b/rust/tests/gcs_test.rs @@ -18,7 +18,7 @@ mod gcs { let table = deltalake::open_table(format!("gs://{}/simple_table", bucket).as_str()) .await .unwrap(); - assert_eq!(table.version, 4); + assert_eq!(table.version(), 4); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); assert_eq!( diff --git a/rust/tests/optimize_test.rs b/rust/tests/optimize_test.rs index c7e9684f18..4ce9734eb0 100644 --- a/rust/tests/optimize_test.rs +++ b/rust/tests/optimize_test.rs @@ -196,13 +196,13 @@ mod optimize { ) .await?; - let version = dt.version; + let version = dt.version(); assert_eq!(dt.get_active_add_actions().len(), 5); let optimize = Optimize::default().target_size(2_000_000); let metrics = optimize.execute(&mut dt).await?; - assert_eq!(version + 1, dt.version); + assert_eq!(version + 1, dt.version()); assert_eq!(metrics.num_files_added, 1); assert_eq!(metrics.num_files_removed, 4); assert_eq!(metrics.total_considered_files, 5); @@ -253,14 +253,14 @@ mod optimize { ) .await?; - let version = dt.version; + let version = dt.version(); let mut filter = vec![]; filter.push(PartitionFilter::try_from(("date", "=", "2022-05-22"))?); let optimize = Optimize::default().filter(&filter); let metrics = optimize.execute(&mut dt).await?; - assert_eq!(version + 1, dt.version); + assert_eq!(version + 1, dt.version()); assert_eq!(metrics.num_files_added, 1); assert_eq!(metrics.num_files_removed, 2); assert_eq!(dt.get_active_add_actions().len(), 3); @@ -290,7 +290,7 @@ mod optimize { ) .await?; - let version = dt.version; + let version = dt.version(); //create the merge plan, remove a file, and execute the plan. let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; @@ -320,7 +320,7 @@ mod optimize { let maybe_metrics = plan.execute(&mut dt).await; assert!(maybe_metrics.is_err()); - assert_eq!(dt.version, version + 1); + assert_eq!(dt.version(), version + 1); Ok(()) } @@ -345,7 +345,7 @@ mod optimize { ) .await?; - let version = dt.version; + let version = dt.version(); //create the merge plan, remove a file, and execute the plan. let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; @@ -364,7 +364,7 @@ mod optimize { let metrics = plan.execute(&mut dt).await?; assert_eq!(metrics.num_files_added, 1); assert_eq!(metrics.num_files_removed, 2); - assert_eq!(dt.version, version + 2); + assert_eq!(dt.version(), version + 2); Ok(()) } @@ -397,7 +397,7 @@ mod optimize { ) .await?; - let version = dt.version; + let version = dt.version(); let mut filter = vec![]; filter.push(PartitionFilter::try_from(("date", "=", "2022-05-22"))?); @@ -406,13 +406,13 @@ mod optimize { let metrics = optimize.execute(&mut dt).await?; assert_eq!(metrics.num_files_added, 1); assert_eq!(metrics.num_files_removed, 2); - assert_eq!(dt.version, version + 1); + assert_eq!(dt.version(), version + 1); let metrics = optimize.execute(&mut dt).await?; assert_eq!(metrics.num_files_added, 0); assert_eq!(metrics.num_files_removed, 0); - assert_eq!(dt.version, version + 1); + assert_eq!(dt.version(), version + 1); Ok(()) } @@ -431,7 +431,7 @@ mod optimize { ) .await?; - let version = dt.version; + let version = dt.version(); let optimize = Optimize::default().target_size(10_000_000); let metrics = optimize.execute(&mut dt).await?; @@ -454,7 +454,7 @@ mod optimize { expected.files_removed = expected_metric_details.clone(); assert_eq!(expected, metrics); - assert_eq!(version, dt.version); + assert_eq!(version, dt.version()); Ok(()) } @@ -479,7 +479,7 @@ mod optimize { ) .await?; - let version = dt.version; + let version = dt.version(); let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; diff --git a/rust/tests/read_delta_partitions_test.rs b/rust/tests/read_delta_partitions_test.rs index 3afc2652cb..f2d2fae679 100644 --- a/rust/tests/read_delta_partitions_test.rs +++ b/rust/tests/read_delta_partitions_test.rs @@ -165,5 +165,5 @@ async fn read_null_partitions_from_checkpoint() { // verify that table loads from checkpoint and handles null partitions let table = deltalake::open_table(&table.table_uri).await.unwrap(); - assert_eq!(table.version, 2); + assert_eq!(table.version(), 2); } diff --git a/rust/tests/read_delta_test.rs b/rust/tests/read_delta_test.rs index 741259dafc..cfb017508a 100644 --- a/rust/tests/read_delta_test.rs +++ b/rust/tests/read_delta_test.rs @@ -17,7 +17,7 @@ async fn read_delta_2_0_table_without_version() { let table = deltalake::open_table("./tests/data/delta-0.2.0") .await .unwrap(); - assert_eq!(table.version, 3); + assert_eq!(table.version(), 3); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); assert_eq!( @@ -103,7 +103,7 @@ async fn read_delta_table_with_ignoring_files_on_apply_log() { .await .unwrap(); - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); assert!(table.get_files().is_empty(), "files should be empty"); assert!( table.get_tombstones().next().is_none(), @@ -111,7 +111,7 @@ async fn read_delta_table_with_ignoring_files_on_apply_log() { ); table.update().await.unwrap(); - assert_eq!(table.version, 1); + assert_eq!(table.version(), 1); assert!(table.get_files().is_empty(), "files should be empty"); assert!( table.get_tombstones().next().is_none(), @@ -124,7 +124,7 @@ async fn read_delta_2_0_table_with_version() { let mut table = deltalake::open_table_with_version("./tests/data/delta-0.2.0", 0) .await .unwrap(); - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); assert_eq!( @@ -138,7 +138,7 @@ async fn read_delta_2_0_table_with_version() { table = deltalake::open_table_with_version("./tests/data/delta-0.2.0", 2) .await .unwrap(); - assert_eq!(table.version, 2); + assert_eq!(table.version(), 2); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); assert_eq!( @@ -152,7 +152,7 @@ async fn read_delta_2_0_table_with_version() { table = deltalake::open_table_with_version("./tests/data/delta-0.2.0", 3) .await .unwrap(); - assert_eq!(table.version, 3); + assert_eq!(table.version(), 3); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); assert_eq!( @@ -170,7 +170,7 @@ async fn read_delta_8_0_table_without_version() { let table = deltalake::open_table("./tests/data/delta-0.8.0") .await .unwrap(); - assert_eq!(table.version, 1); + assert_eq!(table.version(), 1); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); assert_eq!( @@ -215,7 +215,7 @@ async fn read_delta_8_0_table_with_load_version() { let mut table = deltalake::open_table("./tests/data/delta-0.8.0") .await .unwrap(); - assert_eq!(table.version, 1); + assert_eq!(table.version(), 1); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); assert_eq!( @@ -226,7 +226,7 @@ async fn read_delta_8_0_table_with_load_version() { ] ); table.load_version(0).await.unwrap(); - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); assert_eq!( @@ -517,10 +517,10 @@ async fn test_table_history() { async fn test_poll_table_commits() { let path = "./tests/data/simple_table_with_checkpoint"; let mut table = deltalake::open_table_with_version(path, 9).await.unwrap(); - let peek = table.peek_next_commit(table.version).await.unwrap(); + let peek = table.peek_next_commit(table.version()).await.unwrap(); let is_new = if let PeekCommit::New(version, actions) = peek { - assert_eq!(table.version, 9); + assert_eq!(table.version(), 9); assert!(!table .get_files_iter() .any(|f| f == "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet")); @@ -530,7 +530,7 @@ async fn test_poll_table_commits() { table.apply_actions(version, actions).unwrap(); - assert_eq!(table.version, 10); + assert_eq!(table.version(), 10); assert!(table .get_files_iter() .any(|f| f == "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet")); @@ -541,7 +541,7 @@ async fn test_poll_table_commits() { }; assert!(is_new); - let peek = table.peek_next_commit(table.version).await.unwrap(); + let peek = table.peek_next_commit(table.version()).await.unwrap(); let is_up_to_date = match peek { PeekCommit::UpToDate => true, _ => false, diff --git a/rust/tests/read_simple_table_test.rs b/rust/tests/read_simple_table_test.rs index e4b1df5251..117a0a934c 100644 --- a/rust/tests/read_simple_table_test.rs +++ b/rust/tests/read_simple_table_test.rs @@ -11,7 +11,7 @@ async fn read_simple_table() { let table = deltalake::open_table("./tests/data/simple_table") .await .unwrap(); - assert_eq!(table.version, 4); + assert_eq!(table.version(), 4); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); let mut files = table.get_files(); @@ -68,7 +68,7 @@ async fn read_simple_table_with_version() { let table = deltalake::open_table_with_version("./tests/data/simple_table", 0) .await .unwrap(); - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); let mut files = table.get_files(); @@ -88,7 +88,7 @@ async fn read_simple_table_with_version() { let table = deltalake::open_table_with_version("./tests/data/simple_table", 2) .await .unwrap(); - assert_eq!(table.version, 2); + assert_eq!(table.version(), 2); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); let mut files = table.get_files(); @@ -108,7 +108,7 @@ async fn read_simple_table_with_version() { let table = deltalake::open_table_with_version("./tests/data/simple_table", 3) .await .unwrap(); - assert_eq!(table.version, 3); + assert_eq!(table.version(), 3); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); let mut files = table.get_files(); @@ -151,40 +151,40 @@ async fn time_travel_by_ds() { deltalake::open_table_with_ds("./tests/data/simple_table", "2020-05-01T00:47:31-07:00") .await .unwrap(); - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); table = deltalake::open_table_with_ds("./tests/data/simple_table", "2020-05-02T22:47:31-07:00") .await .unwrap(); - assert_eq!(table.version, 1); + assert_eq!(table.version(), 1); table = deltalake::open_table_with_ds("./tests/data/simple_table", "2020-05-02T23:47:31-07:00") .await .unwrap(); - assert_eq!(table.version, 1); + assert_eq!(table.version(), 1); table = deltalake::open_table_with_ds("./tests/data/simple_table", "2020-05-03T22:47:31-07:00") .await .unwrap(); - assert_eq!(table.version, 2); + assert_eq!(table.version(), 2); table = deltalake::open_table_with_ds("./tests/data/simple_table", "2020-05-04T22:47:31-07:00") .await .unwrap(); - assert_eq!(table.version, 3); + assert_eq!(table.version(), 3); table = deltalake::open_table_with_ds("./tests/data/simple_table", "2020-05-05T21:47:31-07:00") .await .unwrap(); - assert_eq!(table.version, 3); + assert_eq!(table.version(), 3); table = deltalake::open_table_with_ds("./tests/data/simple_table", "2020-05-05T22:47:31-07:00") .await .unwrap(); - assert_eq!(table.version, 4); + assert_eq!(table.version(), 4); table = deltalake::open_table_with_ds("./tests/data/simple_table", "2020-05-25T22:47:31-07:00") .await .unwrap(); - assert_eq!(table.version, 4); + assert_eq!(table.version(), 4); } diff --git a/rust/tests/s3_test.rs b/rust/tests/s3_test.rs index 1c216e2bb3..67956602cb 100644 --- a/rust/tests/s3_test.rs +++ b/rust/tests/s3_test.rs @@ -40,7 +40,7 @@ mod s3 { table.load().await.unwrap(); println!("{}", table); - assert_eq!(table.version, 4); + assert_eq!(table.version(), 4); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); assert_eq!( @@ -71,7 +71,7 @@ mod s3 { .await .unwrap(); println!("{}", table); - assert_eq!(table.version, 3); + assert_eq!(table.version(), 3); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); assert_eq!( @@ -101,7 +101,7 @@ mod s3 { setup(); let table = deltalake::open_table("s3://deltars/simple/").await.unwrap(); println!("{}", table); - assert_eq!(table.version, 4); + assert_eq!(table.version(), 4); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); } @@ -115,7 +115,7 @@ mod s3 { .await .unwrap(); println!("{}", table); - assert_eq!(table.version, 0); + assert_eq!(table.version(), 0); assert_eq!(table.get_min_writer_version(), 2); assert_eq!(table.get_min_reader_version(), 1); } diff --git a/rust/tests/simple_commit_test.rs b/rust/tests/simple_commit_test.rs index 94c07db47a..67f63dd530 100644 --- a/rust/tests/simple_commit_test.rs +++ b/rust/tests/simple_commit_test.rs @@ -78,7 +78,7 @@ mod simple_commit_fs { let table_path = "./tests/data/simple_commit"; let mut table = deltalake::open_table(table_path).await.unwrap(); - assert_eq!(0, table.version); + assert_eq!(0, table.version()); assert_eq!(0, table.get_files().len()); let mut tx1 = table.create_transaction(None); @@ -87,7 +87,7 @@ mod simple_commit_fs { let result = table.try_commit_transaction(&commit, 1).await.unwrap(); assert_eq!(1, result); - assert_eq!(1, table.version); + assert_eq!(1, table.version()); assert_eq!(2, table.get_files().len()); } @@ -99,7 +99,7 @@ mod simple_commit_fs { let table_path = "./tests/data/simple_commit"; let mut table = deltalake::open_table(table_path).await.unwrap(); - assert_eq!(0, table.version); + assert_eq!(0, table.version()); assert_eq!(0, table.get_files().len()); let mut tx1 = table.create_transaction(None); @@ -123,7 +123,7 @@ mod simple_commit_fs { } assert!(result.is_err()); - assert_eq!(1, table.version); + assert_eq!(1, table.version()); assert_eq!(2, table.get_files().len()); } @@ -137,7 +137,7 @@ mod simple_commit_fs { let table_path = "./tests/data/simple_commit"; let mut table = deltalake::open_table(table_path).await.unwrap(); - assert_eq!(0, table.version); + assert_eq!(0, table.version()); assert_eq!(0, table.get_files().len()); let mut attempt = 0; @@ -150,7 +150,7 @@ mod simple_commit_fs { loop { table.update().await.unwrap(); - let version = table.version + 1; + let version = table.version() + 1; match table .try_commit_transaction(&prepared_commit, version) .await @@ -168,7 +168,7 @@ mod simple_commit_fs { } assert_eq!(0, attempt); - assert_eq!(1, table.version); + assert_eq!(1, table.version()); assert_eq!(2, table.get_files().len()); } @@ -183,7 +183,7 @@ mod simple_commit_fs { async fn test_two_commits(table_path: &str) -> Result<(), DeltaTableError> { let mut table = deltalake::open_table(table_path).await?; - assert_eq!(0, table.version); + assert_eq!(0, table.version()); assert_eq!(0, table.get_files().len()); let mut tx1 = table.create_transaction(None); @@ -191,7 +191,7 @@ async fn test_two_commits(table_path: &str) -> Result<(), DeltaTableError> { let version = tx1.commit(None, None).await?; assert_eq!(1, version); - assert_eq!(version, table.version); + assert_eq!(version, table.version()); assert_eq!(2, table.get_files().len()); let mut tx2 = table.create_transaction(None); @@ -199,7 +199,7 @@ async fn test_two_commits(table_path: &str) -> Result<(), DeltaTableError> { let version = tx2.commit(None, None).await.unwrap(); assert_eq!(2, version); - assert_eq!(version, table.version); + assert_eq!(version, table.version()); assert_eq!(4, table.get_files().len()); Ok(()) }