Skip to content

Commit

Permalink
Use update_incremental in update (#398)
Browse files Browse the repository at this point in the history
  • Loading branch information
mosyp authored Aug 24, 2021
1 parent 2928272 commit e7e549d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 37 deletions.
50 changes: 13 additions & 37 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,58 +633,36 @@ impl DeltaTable {

/// Load DeltaTable with data from latest checkpoint
pub async fn load(&mut self) -> Result<(), DeltaTableError> {
match self.get_last_checkpoint().await {
Ok(last_check_point) => {
self.last_check_point = Some(last_check_point);
self.restore_checkpoint(last_check_point).await?;
self.version = last_check_point.version + 1;
}
Err(LoadCheckpointError::NotFound) => {
// no checkpoint, start with version 0
self.version = 0;
}
Err(e) => {
return Err(DeltaTableError::LoadCheckpoint { source: e });
}
}

self.apply_logs_from_current_version().await?;

Ok(())
self.last_check_point = None;
self.version = -1;
self.state = DeltaTableState::default();
self.update().await
}

/// Updates the DeltaTable to the most recent state committed to the transaction log by
/// loading the last checkpoint and incrementally applying each version since.
pub async fn update(&mut self) -> Result<(), DeltaTableError> {
match self.get_last_checkpoint().await {
Ok(last_check_point) => {
if self.last_check_point != Some(last_check_point) {
if Some(last_check_point) == self.last_check_point {
self.update_incremental().await
} else {
self.last_check_point = Some(last_check_point);
self.restore_checkpoint(last_check_point).await?;
self.version = last_check_point.version + 1;
self.version = last_check_point.version;
self.update_incremental().await
}
}
Err(LoadCheckpointError::NotFound) => {
self.version += 1;
}
Err(e) => {
return Err(DeltaTableError::LoadCheckpoint { source: e });
}
Err(LoadCheckpointError::NotFound) => self.update_incremental().await,
Err(e) => Err(DeltaTableError::LoadCheckpoint { source: e }),
}

self.apply_logs_from_current_version().await?;

Ok(())
}

/// Updates the DeltaTable to the latest version by incrementally applying newer versions.
/// It assumes that the table is already updated to the current version `self.version`.
pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> {
self.version += 1;
self.apply_logs_from_current_version().await
}

async fn apply_logs_from_current_version(&mut self) -> Result<(), DeltaTableError> {
// replay logs after checkpoint
loop {
match self.apply_log(self.version).await {
Ok(_) => {
Expand All @@ -706,12 +684,10 @@ impl DeltaTable {
return Err(DeltaTableError::from(e));
}
}
break;
return Ok(());
}
}
}

Ok(())
}

/// Loads the DeltaTable state for the given version.
Expand Down
3 changes: 3 additions & 0 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ async fn read_delta_table_with_update() {
let path = "./tests/data/simple_table_with_checkpoint/";
let table_newest_version = deltalake::open_table(path).await.unwrap();
let mut table_to_update = deltalake::open_table_with_version(path, 0).await.unwrap();
// calling update several times should not produce any duplicates
table_to_update.update().await.unwrap();
table_to_update.update().await.unwrap();
table_to_update.update().await.unwrap();

assert_eq!(
Expand Down

0 comments on commit e7e549d

Please sign in to comment.