Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch-apply remove actions in tombstone handling #444

Merged
merged 3 commits into from
Sep 24, 2021

Conversation

dispanser
Copy link
Contributor

Improve tombstone handling by only applying the remove actions as a
batch at the end of a load or incremental update operation.

For checkpoint loading, we entirely skip interpretation of the remove
actions because according to the delta spec, a remove must be in a later
revision than the associated add, so they wouldn't be both in the same
checkpoint.

The PR does not add any new tests, as we are only changing implementation details. I think the proper handling of tombstones is demonstrated by read_simple_table() as it makes sure that tombstones are applied properly when loading a table with remove operations.

Improve tombstone handling by only applying the remove actions as a
batch at the end of a load or incremental update operation.

For checkpoint loading, we entirely skip interpretation of the remove
actions because according to the delta spec, a remove must be in a later
revision than the associated add, so they wouldn't be both in the same
checkpoint.
@dispanser
Copy link
Contributor Author

My initial idea was to capture the changes from an update operation in a struct DeltaChangeSet, but then this struct had exactly the same fields as DeltaTableState, because all these fields can be modified by actions. I then dropped that class, which has the nice advantage that all code paths stay the same, with the exception of apply_log_from_bufread(), which happens to be the only place that requires the alignment of add and remove actions.

@dispanser dispanser changed the title Closes #442. Batch-apply remove actions in tombstone handling Sep 22, 2021
Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually like your initial idea of introducing a change capture struct better ;) But the modification I would make is to make that struct dedicated for batch operations. Today, we only need to batch file list remove, but maybe there will be others in the future.

So I am thinking of something like this:

struct DeltaBatchChangeSet {
    remove_set: HashSet<&str>,
}

fn process_action(state, batch_change, action) {
    match action {
        ...
        Action::remove(v) => {
            state.tombstones.push(v);
            batch_change.remove_set.add(v.path.as_ref());
        }
        ...
    }
}

fn apply_logs_in_batch(
    state: &mut DeltaTableState,
    actions: impl IntoIterator<Item = Result<Action, ApplyLogError>>,
) -> Result<(), ApplyLogError> {
   let mut batch_change = DeltaBatchChangeSet::new();
   for action in actions.into() {
       process_action(&mut state, &mut batch_change, action?)?;
   } 
   if !batch_change.remove_set.is_empty() {
        state.files
                .retain(|a| !batch_change.remove_set.contains(a.path.as_str()));
   }
   Ok(())
}

fn apply_log_from_bufread(self, reader) {
    apply_logs_in_batch(&mut self.state, reader.lines().map(|line| {
        serde_json::from_str(line?.as_str())
    }))
}

This way, we won't need to check and reassign for other fields in the merge method. We also won't need to go through the tombstone for a 2nd time to build the hashset.

What do you think?

@dispanser
Copy link
Contributor Author

So I am thinking of something like this:

struct DeltaBatchChangeSet {
    remove_set: HashSet<&str>,
}

fn process_action(state, batch_change, action) {
    match action {
        ...
        Action::remove(v) => {
            state.tombstones.push(v);
            batch_change.remove_set.add(v.path.as_ref());
        }
        ...
    }
}

fn apply_logs_in_batch(
    state: &mut DeltaTableState,
    actions: impl IntoIterator<Item = Result<Action, ApplyLogError>>,
) -> Result<(), ApplyLogError> {
   let mut batch_change = DeltaBatchChangeSet::new();
   for action in actions.into() {
       process_action(&mut state, &mut batch_change, action?)?;
   } 
   if !batch_change.remove_set.is_empty() {
        state.files
                .retain(|a| !batch_change.remove_set.contains(a.path.as_str()));
   }
   Ok(())
}

The only disadvantage I can see here is that retain is called on a larger set, because even the add actions from the current update are considered as remove candidates. On the other hand, this potentially handles moving forward multiple versions at once (calling retain ony once).

This way, we won't need to check and reassign for other fields in the merge method. We also won't need to go through the tombstone for a 2nd time to build the hashset.

Thanks for the great idea! The one thing I didn't like about the approach I've taken is indeed that I'm forced to handle all fields, even though only one of them is actually relevant for the merge operation.

What do you think?

I'll give it a go this evening. Thanks for the suggestion.

@dispanser
Copy link
Contributor Author

In that idea, we pass batch_change into process_action, irrespective of it being necessary. I wonder if we should pass in an Option<DeltaBatchChangeSet> instead so we don't have to build a potentially large hash set for checkpoint loading.

@houqp
Copy link
Member

houqp commented Sep 23, 2021

The only disadvantage I can see here is that retain is called on a larger set, because even the add actions from the current update are considered as remove candidates. On the other hand, this potentially handles moving forward multiple versions at once (calling retain ony once).

This is a good point, I think this optimization is relevant for applying actions from checkpoints. For incremental updates, I think we do still need to consider add actions from the current batch.

Another observation is even though we have to go through the newly added files in the retain call, we avoided the self.files.append(&mut new_state.files); call, so the number of times we need to iterate through the newly added batch is the same compared to the current approach?

@dispanser
Copy link
Contributor Author

I'm a bit torn here. While your propsal reduces the code size and removes unnecessary handling of unrelated fields in the change set, making it very clear that (and why) we handle tombstones separately, the initial PR has the advantage that process_action just "collects some actions", and introduces the notion of merging. The latter allows for a trivial way to do the checkpoint ingestion concurrently (on a per-file granularity), which would have been one of the next things planned to raise an issue for ;).

@houqp
Copy link
Member

houqp commented Sep 23, 2021

The latter allows for a trivial way to do the checkpoint ingestion concurrently (on a per-file granularity), which would have been one of the next things planned to raise an issue for ;).

Yeah, this is a good point. We should definitely design for concurrent apply. So looks like state merge code is unavoidable. In this case, the BatchChangeSet design becomes an optimization details. In other words, we can use it together with state merge later if going through tombstone the 2nd time to build the hashset becomes a performance issue.

In short, 👍 for the current approach :)

houqp
houqp previously approved these changes Sep 24, 2021
Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left two minor questions, the rest looks good to me 👍 Really nice performance improvement.

// previous state files must filtered for removals,
// new state files are appended without looking.
self.files
.retain(|a| !new_removals.contains(a.path.as_str()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dispanser we need to apply retain for new file list in new_state too if it is constructed from incremental updates instead of checkpoints right?

Copy link
Contributor Author

@dispanser dispanser Sep 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting question. In the current impleentation, merge is called by apply_log_from_bufread, which is in turn called from apply_log for each version separately. So if we assume that no add + remove for the same file happens in the same version, we're safe.

We may be better of doing that "merge" once for multiple commits, and in that case the curent implementation becomes wrong.

I think I'll change it as you suggest because the way it is currently implemented makes a bit of a hidden assumption about how it's called, and the generic merge operation should handle all scenarios.

Copy link
Member

@houqp houqp Sep 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah, you are right, this is not a problem for the current PR then 👍 we can revisit this when we start doing parallel table load. but if you want to make a change in this PR, then it's even better :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to do the append first, which makes the code shorter because it allows me to remove the comments explaining the pecularities of the implementation :-).

One final thought wrt to this topic: we could argue that a DeltaTableState must be consistent / self-contained according to some rules, i.e. that "removals for its own files are already resolved". In that case we wouldn't need that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I think that's a good idea. It also avoids having to go through the files field twice, once for append, once for retain.

rust/src/delta.rs Show resolved Hide resolved
@houqp houqp self-requested a review September 24, 2021 04:30
@houqp houqp dismissed their stale review September 24, 2021 04:30

wait for response on comments

@houqp houqp requested review from xianwill, fvaleye and mosyp September 24, 2021 04:31
@houqp houqp merged commit 68061a2 into delta-io:main Sep 24, 2021
@houqp
Copy link
Member

houqp commented Sep 24, 2021

The beginning of exciting optimizations :) Thanks @dispanser .

@dispanser dispanser deleted the improve-tombstone-performance branch September 25, 2021 03:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants