diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 8e4da6337d..c4b1473ce4 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -426,6 +426,46 @@ impl DeltaTableState { pub fn current_metadata(&self) -> Option<&DeltaTableMetaData> { self.current_metadata.as_ref() } + + /// merges new state information into our state + pub fn merge(&mut self, mut new_state: DeltaTableState) { + self.files.append(&mut new_state.files); + + if !new_state.tombstones.is_empty() { + let new_removals: HashSet<&str> = new_state + .tombstones + .iter() + .map(|s| s.path.as_str()) + .collect(); + + self.files + .retain(|a| !new_removals.contains(a.path.as_str())); + } + self.tombstones.append(&mut new_state.tombstones); + if new_state.min_reader_version > 0 { + self.min_reader_version = new_state.min_reader_version; + self.min_writer_version = new_state.min_writer_version; + } + + if new_state.current_metadata.is_some() { + self.tombstone_retention_millis = new_state.tombstone_retention_millis; + self.current_metadata = new_state.current_metadata.take(); + } + + new_state + .app_transaction_version + .drain() + .for_each(|(app_id, version)| { + *self + .app_transaction_version + .entry(app_id) + .or_insert(version) = version + }); + + if !new_state.commit_infos.is_empty() { + self.commit_infos.append(&mut new_state.commit_infos); + } + } } #[inline] @@ -565,11 +605,14 @@ impl DeltaTable { &mut self, reader: BufReader, ) -> Result<(), ApplyLogError> { + let mut new_state = DeltaTableState::default(); for line in reader.lines() { let action: Action = serde_json::from_str(line?.as_str())?; - process_action(&mut self.state, action)?; + process_action(&mut new_state, action)?; } + self.state.merge(new_state); + Ok(()) } @@ -1474,10 +1517,6 @@ fn process_action(state: &mut DeltaTableState, action: Action) -> Result<(), App } Action::remove(v) => { let v = v.path_decoded()?; - let index = { state.files.iter().position(|a| *a.path == v.path) }; - if let Some(index) = index { - state.files.swap_remove(index); - } state.tombstones.push(v); } Action::protocol(v) => {