Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch-apply remove actions in tombstone handling #444

Merged
merged 3 commits into from
Sep 24, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,49 @@ impl DeltaTableState {
pub fn current_metadata(&self) -> Option<&DeltaTableMetaData> {
self.current_metadata.as_ref()
}

/// merges new state information into our state
pub fn merge(&mut self, new_state: &mut DeltaTableState) {
// build a lookup table of all remove actions

if !new_state.tombstones.is_empty() {
let new_removals: HashSet<&str> = new_state
.tombstones
.iter()
.map(|s| s.path.as_str())
.collect();

// previous state files must filtered for removals,
// new state files are appended without looking.
self.files
.retain(|a| !new_removals.contains(a.path.as_str()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dispanser we need to apply retain for new file list in new_state too if it is constructed from incremental updates instead of checkpoints right?

Copy link
Contributor Author

@dispanser dispanser Sep 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting question. In the current impleentation, merge is called by apply_log_from_bufread, which is in turn called from apply_log for each version separately. So if we assume that no add + remove for the same file happens in the same version, we're safe.

We may be better of doing that "merge" once for multiple commits, and in that case the curent implementation becomes wrong.

I think I'll change it as you suggest because the way it is currently implemented makes a bit of a hidden assumption about how it's called, and the generic merge operation should handle all scenarios.

Copy link
Member

@houqp houqp Sep 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah, you are right, this is not a problem for the current PR then 👍 we can revisit this when we start doing parallel table load. but if you want to make a change in this PR, then it's even better :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to do the append first, which makes the code shorter because it allows me to remove the comments explaining the pecularities of the implementation :-).

One final thought wrt to this topic: we could argue that a DeltaTableState must be consistent / self-contained according to some rules, i.e. that "removals for its own files are already resolved". In that case we wouldn't need that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think that's a good idea. It also avoids having to go through the files field twice, once for append, once for retain.

}
self.files.append(&mut new_state.files);
self.tombstones.append(&mut new_state.tombstones);
if new_state.min_reader_version > 0 {
self.min_reader_version = new_state.min_reader_version;
self.min_writer_version = new_state.min_writer_version;
}

if new_state.current_metadata.is_some() {
self.tombstone_retention_millis = new_state.tombstone_retention_millis;
self.current_metadata = new_state.current_metadata.take();
}

new_state
.app_transaction_version
.drain()
houqp marked this conversation as resolved.
Show resolved Hide resolved
.for_each(|(app_id, version)| {
*self
.app_transaction_version
.entry(app_id)
.or_insert(version) = version
});

if !new_state.commit_infos.is_empty() {
self.commit_infos.append(&mut new_state.commit_infos);
}
}
}

#[inline]
Expand Down Expand Up @@ -565,11 +608,14 @@ impl DeltaTable {
&mut self,
reader: BufReader<R>,
) -> Result<(), ApplyLogError> {
let mut new_state = DeltaTableState::default();
for line in reader.lines() {
let action: Action = serde_json::from_str(line?.as_str())?;
process_action(&mut self.state, action)?;
process_action(&mut new_state, action)?;
}

self.state.merge(&mut new_state);

Ok(())
}

Expand Down Expand Up @@ -1474,10 +1520,6 @@ fn process_action(state: &mut DeltaTableState, action: Action) -> Result<(), App
}
Action::remove(v) => {
let v = v.path_decoded()?;
let index = { state.files.iter().position(|a| *a.path == v.path) };
if let Some(index) = index {
state.files.swap_remove(index);
}
state.tombstones.push(v);
}
Action::protocol(v) => {
Expand Down