diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 53eff5d255..18e6da8794 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -633,24 +633,10 @@ impl DeltaTable { /// Load DeltaTable with data from latest checkpoint pub async fn load(&mut self) -> Result<(), DeltaTableError> { - match self.get_last_checkpoint().await { - Ok(last_check_point) => { - self.last_check_point = Some(last_check_point); - self.restore_checkpoint(last_check_point).await?; - self.version = last_check_point.version + 1; - } - Err(LoadCheckpointError::NotFound) => { - // no checkpoint, start with version 0 - self.version = 0; - } - Err(e) => { - return Err(DeltaTableError::LoadCheckpoint { source: e }); - } - } - - self.apply_logs_from_current_version().await?; - - Ok(()) + self.last_check_point = None; + self.version = -1; + self.state = DeltaTableState::default(); + self.update().await } /// Updates the DeltaTable to the most recent state committed to the transaction log by @@ -658,33 +644,25 @@ impl DeltaTable { pub async fn update(&mut self) -> Result<(), DeltaTableError> { match self.get_last_checkpoint().await { Ok(last_check_point) => { - if self.last_check_point != Some(last_check_point) { + if Some(last_check_point) == self.last_check_point { + self.update_incremental().await + } else { self.last_check_point = Some(last_check_point); self.restore_checkpoint(last_check_point).await?; - self.version = last_check_point.version + 1; + self.version = last_check_point.version; + self.update_incremental().await } } - Err(LoadCheckpointError::NotFound) => { - self.version += 1; - } - Err(e) => { - return Err(DeltaTableError::LoadCheckpoint { source: e }); - } + Err(LoadCheckpointError::NotFound) => self.update_incremental().await, + Err(e) => Err(DeltaTableError::LoadCheckpoint { source: e }), } - - self.apply_logs_from_current_version().await?; - - Ok(()) } /// Updates the DeltaTable to the latest version by incrementally applying newer versions. + /// It assumes that the table is already updated to the current version `self.version`. pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> { self.version += 1; - self.apply_logs_from_current_version().await - } - async fn apply_logs_from_current_version(&mut self) -> Result<(), DeltaTableError> { - // replay logs after checkpoint loop { match self.apply_log(self.version).await { Ok(_) => { @@ -706,12 +684,10 @@ impl DeltaTable { return Err(DeltaTableError::from(e)); } } - break; + return Ok(()); } } } - - Ok(()) } /// Loads the DeltaTable state for the given version. diff --git a/rust/tests/read_delta_test.rs b/rust/tests/read_delta_test.rs index fb89a8979c..8cc6ad955e 100644 --- a/rust/tests/read_delta_test.rs +++ b/rust/tests/read_delta_test.rs @@ -40,6 +40,9 @@ async fn read_delta_table_with_update() { let path = "./tests/data/simple_table_with_checkpoint/"; let table_newest_version = deltalake::open_table(path).await.unwrap(); let mut table_to_update = deltalake::open_table_with_version(path, 0).await.unwrap(); + // calling update several times should not produce any duplicates + table_to_update.update().await.unwrap(); + table_to_update.update().await.unwrap(); table_to_update.update().await.unwrap(); assert_eq!(