From 2a9132c0450f3e626312d06dd5235a8e5fdbcf99 Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Wed, 22 Sep 2021 13:57:55 +0200 Subject: [PATCH 1/3] Closes #442. Improve tombstone handling by only applying the remove actions as a batch at the end of a load or incremental update operation. For checkpoint loading, we entirely skip interpretation of the remove actions because according to the delta spec, a remove must be in a later revision than the associated add, so they wouldn't be both in the same checkpoint. --- rust/src/delta.rs | 52 ++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 8e4da6337d..1c27d39f5b 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -426,6 +426,49 @@ impl DeltaTableState { pub fn current_metadata(&self) -> Option<&DeltaTableMetaData> { self.current_metadata.as_ref() } + + /// merges new state information into our state + pub fn merge(&mut self, new_state: &mut DeltaTableState) -> () { + // build a lookup table of all remove actions + + if !new_state.tombstones.is_empty() { + let new_removals: HashSet<&str> = new_state + .tombstones + .iter() + .map(|s| s.path.as_str()) + .collect(); + + // previous state files must filtered for removals, + // new state files are appended without looking. + self.files + .retain(|a| !new_removals.contains(a.path.as_str())); + } + self.files.append(&mut new_state.files); + self.tombstones.append(&mut new_state.tombstones); + if new_state.min_reader_version > 0 { + self.min_reader_version = new_state.min_reader_version; + self.min_writer_version = new_state.min_writer_version; + } + + if new_state.current_metadata.is_some() { + self.tombstone_retention_millis = new_state.tombstone_retention_millis; + self.current_metadata = new_state.current_metadata.take(); + } + + new_state + .app_transaction_version + .drain() + .for_each(|(app_id, version)| { + *self + .app_transaction_version + .entry(app_id) + .or_insert(version) = version + }); + + if !new_state.commit_infos.is_empty() { + self.commit_infos.append(&mut new_state.commit_infos); + } + } } #[inline] @@ -565,11 +608,14 @@ impl DeltaTable { &mut self, reader: BufReader, ) -> Result<(), ApplyLogError> { + let mut new_state = DeltaTableState::default(); for line in reader.lines() { let action: Action = serde_json::from_str(line?.as_str())?; - process_action(&mut self.state, action)?; + process_action(&mut new_state, action)?; } + self.state.merge(&mut new_state); + Ok(()) } @@ -1474,10 +1520,6 @@ fn process_action(state: &mut DeltaTableState, action: Action) -> Result<(), App } Action::remove(v) => { let v = v.path_decoded()?; - let index = { state.files.iter().position(|a| *a.path == v.path) }; - if let Some(index) = index { - state.files.swap_remove(index); - } state.tombstones.push(v); } Action::protocol(v) => { From 62a54e7aa1161e0d3c26902e022460d5608c6e63 Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Wed, 22 Sep 2021 14:13:35 +0200 Subject: [PATCH 2/3] Clippy warnings. --- rust/src/delta.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 1c27d39f5b..6c160080bd 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -428,7 +428,7 @@ impl DeltaTableState { } /// merges new state information into our state - pub fn merge(&mut self, new_state: &mut DeltaTableState) -> () { + pub fn merge(&mut self, new_state: &mut DeltaTableState) { // build a lookup table of all remove actions if !new_state.tombstones.is_empty() { From b7ed38723c32b1c23b3e85036c948fd29ee36543 Mon Sep 17 00:00:00 2001 From: Thomas Peiselt Date: Fri, 24 Sep 2021 07:29:52 +0200 Subject: [PATCH 3/3] Integrating review comments: ownership and retain logic. --- rust/src/delta.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 6c160080bd..c4b1473ce4 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -428,8 +428,8 @@ impl DeltaTableState { } /// merges new state information into our state - pub fn merge(&mut self, new_state: &mut DeltaTableState) { - // build a lookup table of all remove actions + pub fn merge(&mut self, mut new_state: DeltaTableState) { + self.files.append(&mut new_state.files); if !new_state.tombstones.is_empty() { let new_removals: HashSet<&str> = new_state @@ -438,12 +438,9 @@ impl DeltaTableState { .map(|s| s.path.as_str()) .collect(); - // previous state files must filtered for removals, - // new state files are appended without looking. self.files .retain(|a| !new_removals.contains(a.path.as_str())); } - self.files.append(&mut new_state.files); self.tombstones.append(&mut new_state.tombstones); if new_state.min_reader_version > 0 { self.min_reader_version = new_state.min_reader_version; @@ -614,7 +611,7 @@ impl DeltaTable { process_action(&mut new_state, action)?; } - self.state.merge(&mut new_state); + self.state.merge(new_state); Ok(()) }